• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

ClusterRPC.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 
00025 /****************************************************************************
00026 
00027   ClusterRPC.cc
00028 ****************************************************************************/
00029 
00030 #include "P_Cluster.h"
00031 /////////////////////////////////////////////////////////////////////////
00032 // All RPC function handlers (xxx_ClusterFunction() ) are invoked from
00033 // ClusterHandler::update_channels_read().
00034 /////////////////////////////////////////////////////////////////////////
00035 
00036 void
00037 ping_ClusterFunction(ClusterHandler *ch, void *data, int len)
00038 {
00039   //
00040   // Just return the data back
00041   //
00042   clusterProcessor.invoke_remote(ch, PING_REPLY_CLUSTER_FUNCTION, data, len);
00043 }
00044 
00045 void
00046 ping_reply_ClusterFunction(ClusterHandler *ch, void *data, int len)
00047 {
00048   //
00049   // Pass back the data.
00050   //
00051   PingMessage *msg = (PingMessage *) data;
00052   msg->fn(ch, msg->data, (len - msg->sizeof_fixedlen_msg()));
00053 }
00054 
00055 void
00056 machine_list_ClusterFunction(ClusterHandler * from, void *data, int len)
00057 {
00058   (void) from;
00059   ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
00060   MachineListMessage *m = (MachineListMessage *) data;
00061 
00062   if (mh->GetMsgVersion() != MachineListMessage::MACHINE_LIST_MESSAGE_VERSION) {        ////////////////////////////////////////////////
00063     // Convert from old to current message format
00064     ////////////////////////////////////////////////
00065     ink_release_assert(!"machine_list_ClusterFunction() bad msg version");
00066   }
00067   if (m->NeedByteSwap())
00068     m->SwapBytes();
00069 
00070   ink_assert(m->n_ip == ((len - m->sizeof_fixedlen_msg()) / sizeof(uint32_t)));
00071 
00072   //
00073   // The machine list is a vector of ip's stored in network byte order.
00074   // This list is exchanged whenever a new Cluster Connection is formed.
00075   //
00076   ClusterConfiguration *cc = this_cluster()->current_configuration();
00077 
00078   for (unsigned int i = 0; i < m->n_ip; i++) {
00079     for (int j = 0; j < cc->n_machines; j++) {
00080       if (cc->machines[j]->ip == m->ip[i])
00081         goto Lfound;
00082     }
00083     // not found, must be a new machine
00084     {
00085       int num_connections = this_cluster_machine()->num_connections;
00086       for (int k = 0; k < num_connections; k++) {
00087         clusterProcessor.connect(m->ip[i], k);
00088       }
00089     }
00090   Lfound:
00091     ;
00092   }
00093 }
00094 
00095 void
00096 close_channel_ClusterFunction(ClusterHandler *ch, void *data, int len)
00097 {
00098   ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
00099   CloseMessage *m = (CloseMessage *) data;
00100 
00101   if (mh->GetMsgVersion() != CloseMessage::CLOSE_CHAN_MESSAGE_VERSION) {        ////////////////////////////////////////////////
00102     // Convert from old to current message format
00103     ////////////////////////////////////////////////
00104     ink_release_assert(!"close_channel_ClusterFunction() bad msg version");
00105   }
00106   if (m->NeedByteSwap())
00107     m->SwapBytes();
00108 
00109   //
00110   // Close the remote side of a VC connection (remote node is originator)
00111   //
00112   ink_assert(len >= (int) sizeof(CloseMessage));
00113   if (!ch || !ch->channels)
00114     return;
00115   ClusterVConnection *vc = ch->channels[m->channel];
00116   if (VALID_CHANNEL(vc) && vc->token.sequence_number == m->sequence_number) {
00117     vc->remote_closed = m->status;
00118     vc->remote_lerrno = m->lerrno;
00119     ch->vcs_push(vc, vc->type);
00120   }
00121 }
00122 
00123 void
00124 test_ClusterFunction(ClusterHandler *ch, void *data, int len)
00125 {
00126   //
00127   // Note: Only for testing.
00128   //
00129   if (ptest_ClusterFunction)
00130     ptest_ClusterFunction(ch, data, len);
00131 }
00132 
00133 CacheVC *
00134 ChannelToCacheWriteVC(ClusterHandler * ch, int channel, uint32_t channel_seqno, ClusterVConnection ** cluster_vc)
00135 {
00136   EThread *thread = this_ethread();
00137   ProxyMutex *mutex = thread->mutex;
00138 
00139   ClusterVConnection *cvc = ch->channels[channel];
00140   if (!VALID_CHANNEL(cvc)
00141       || (channel_seqno != cvc->token.sequence_number)
00142       || (cvc->read.vio.op != VIO::READ)) {
00143     CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTERVC_STAT);
00144     return NULL;
00145   }
00146   // Tunneling from cluster to cache (remote write).
00147   // Get cache VC pointer.
00148 
00149   OneWayTunnel *owt = (OneWayTunnel *) cvc->read.vio._cont;
00150   if (!owt) {
00151     CLUSTER_INCREMENT_DYN_STAT(CLUSTER_SETDATA_NO_TUNNEL_STAT);
00152     return NULL;
00153   }
00154   CacheVC *cache_vc = (CacheVC *) owt->vioTarget->vc_server;
00155   if (!cache_vc) {
00156     CLUSTER_INCREMENT_DYN_STAT(CLUSTER_SETDATA_NO_CACHEVC_STAT);
00157     return NULL;
00158   }
00159   *cluster_vc = cvc;
00160   return cache_vc;
00161 }
00162 
00163 void
00164 set_channel_data_ClusterFunction(ClusterHandler *ch, void *tdata, int tlen)
00165 {
00166   EThread *thread = this_ethread();
00167   ProxyMutex *mutex = thread->mutex;
00168   // We are called on the ET_CLUSTER thread.
00169 
00170   char *data;
00171   int len;
00172   int res;
00173 
00174   // Allocate memory for set channel data and pass it to the cache
00175   IncomingControl *ic = IncomingControl::alloc();
00176   ic->len = tlen;
00177   ic->alloc_data();
00178 
00179   data = ic->data + sizeof(int32_t);      // free_remote_data expects d+sizeof(int32_t)
00180   memcpy(data, tdata, tlen);
00181   len = tlen;
00182 
00183   ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
00184   SetChanDataMessage *m = (SetChanDataMessage *) data;
00185 
00186   if (mh->GetMsgVersion() != SetChanDataMessage::SET_CHANNEL_DATA_MESSAGE_VERSION) {    ////////////////////////////////////////////////
00187     // Convert from old to current message format
00188     ////////////////////////////////////////////////
00189     ink_release_assert(!"set_channel_data_ClusterFunction() bad msg version");
00190   }
00191   if (m->NeedByteSwap())
00192     m->SwapBytes();
00193 
00194   ClusterVConnection *cvc;
00195   CacheVC *cache_vc;
00196 
00197   if (ch) {
00198     cache_vc = ChannelToCacheWriteVC(ch, m->channel, m->sequence_number, &cvc);
00199     if (!cache_vc) {
00200       ic->freeall();
00201       return;
00202     }
00203     // Unmarshal data.
00204     switch (m->data_type) {
00205     case CACHE_DATA_HTTP_INFO:
00206       {
00207         char *p = (char *) m + SetChanDataMessage::sizeof_fixedlen_msg();
00208 
00209         IOBufferBlock *block_ref = ic->get_block();
00210         res = HTTPInfo::unmarshal(p, len, block_ref);
00211         ink_assert(res > 0);
00212 
00213         CacheHTTPInfo h;
00214         h.get_handle((char *) &m->data[0], len);
00215         h.set_buffer_reference(block_ref);
00216         cache_vc->set_http_info(&h);
00217         ic->freeall();
00218         break;
00219       }
00220     default:
00221       {
00222         ink_release_assert(!"set_channel_data_ClusterFunction bad CacheDataType");
00223       }
00224     }                           // End of switch
00225     ++cvc->n_recv_set_data_msgs;        // note received messages
00226 
00227   } else {
00228     ic->freeall();
00229     CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTER_STAT);
00230   }
00231 }
00232 
00233 void
00234 post_setchan_send_ClusterFunction(ClusterHandler *ch, void *data, int /* len ATS_UNUSED */)
00235 {
00236   EThread *thread = this_ethread();
00237   ProxyMutex *mutex = thread->mutex;
00238   // We are called on the ET_CLUSTER thread.
00239   // set_data() control message has been queued into cluster transfer message.
00240   // This allows us to assume that it has been sent.
00241   // Decrement Cluster VC n_set_data_msgs to allow transmission of
00242   // initial open_write data after (n_set_data_msgs == 0).
00243 
00244   SetChanDataMessage *m = (SetChanDataMessage *) data;
00245   ClusterVConnection *cvc;
00246 
00247   if (ch) {
00248     cvc = ch->channels[m->channel];
00249     if (VALID_CHANNEL(cvc)) {
00250       ink_atomic_increment(&cvc->n_set_data_msgs, -1);
00251     } else {
00252       CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTERVC_STAT);
00253     }
00254   } else {
00255     CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTER_STAT);
00256   }
00257 }
00258 
00259 void
00260 set_channel_pin_ClusterFunction(ClusterHandler *ch, void *data, int /* len ATS_UNUSED */)
00261 {
00262   // This isn't used. /leif
00263   //EThread *thread = this_ethread();
00264   //ProxyMutex *mutex = thread->mutex;
00265 
00266   // We are called on the ET_CLUSTER thread.
00267 
00268   ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
00269   SetChanPinMessage *m = (SetChanPinMessage *) data;
00270 
00271   if (mh->GetMsgVersion() != SetChanPinMessage::SET_CHANNEL_PIN_MESSAGE_VERSION) {      ////////////////////////////////////////////////
00272     // Convert from old to current message format
00273     ////////////////////////////////////////////////
00274     ink_release_assert(!"set_channel_pin_ClusterFunction() bad msg version");
00275   }
00276 
00277   if (m->NeedByteSwap())
00278     m->SwapBytes();
00279 
00280   ClusterVConnection *cvc = NULL;       // Just to make GCC happy
00281   CacheVC *cache_vc;
00282 
00283   if (ch != 0) {
00284     cache_vc = ChannelToCacheWriteVC(ch, m->channel, m->sequence_number, &cvc);
00285     if (cache_vc) {
00286       cache_vc->set_pin_in_cache(m->pin_time);
00287     }
00288     // cvc is always set in ChannelToCacheWriteVC, so need to check it
00289     ++cvc->n_recv_set_data_msgs;        // note received messages
00290   }
00291 }
00292 
00293 void
00294 post_setchan_pin_ClusterFunction(ClusterHandler *ch, void *data, int /* len ATS_UNUSED */)
00295 {
00296   EThread *thread = this_ethread();
00297   ProxyMutex *mutex = thread->mutex;
00298   // We are called on the ET_CLUSTER thread.
00299   // Control message has been queued into cluster transfer message.
00300   // This allows us to assume that it has been sent.
00301   // Decrement Cluster VC n_set_data_msgs to allow transmission of
00302   // initial open_write data after (n_set_data_msgs == 0).
00303 
00304   SetChanPinMessage *m = (SetChanPinMessage *) data;
00305   ClusterVConnection *cvc;
00306 
00307   if (ch) {
00308     cvc = ch->channels[m->channel];
00309     if (VALID_CHANNEL(cvc)) {
00310       ink_atomic_increment(&cvc->n_set_data_msgs, -1);
00311     } else {
00312       CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTERVC_STAT);
00313     }
00314   } else {
00315     CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTER_STAT);
00316   }
00317 }
00318 
00319 void
00320 set_channel_priority_ClusterFunction(ClusterHandler *ch, void *data, int /* len ATS_UNUSED */)
00321 {
00322   // This isn't used.
00323   //EThread *thread = this_ethread();
00324   //ProxyMutex *mutex = thread->mutex;
00325 
00326   // We are called on the ET_CLUSTER thread.
00327 
00328   ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
00329   SetChanPriorityMessage *m = (SetChanPriorityMessage *) data;
00330 
00331   if (mh->GetMsgVersion() != SetChanPriorityMessage::SET_CHANNEL_PRIORITY_MESSAGE_VERSION) {    ////////////////////////////////////////////////
00332     // Convert from old to current message format
00333     ////////////////////////////////////////////////
00334     ink_release_assert(!"set_channel_priority_ClusterFunction() bad msg version");
00335   }
00336   if (m->NeedByteSwap())
00337     m->SwapBytes();
00338 
00339   ClusterVConnection *cvc = NULL;       // Just to make GCC happy
00340   CacheVC *cache_vc;
00341 
00342   if (ch != 0) {
00343     cache_vc = ChannelToCacheWriteVC(ch, m->channel, m->sequence_number, &cvc);
00344     if (cache_vc) {
00345       cache_vc->set_disk_io_priority(m->disk_priority);
00346     }
00347     // cvc is always set in ChannelToCacheWriteVC, so need to check it
00348     ++cvc->n_recv_set_data_msgs;        // note received messages
00349   }
00350 }
00351 
00352 void
00353 post_setchan_priority_ClusterFunction(ClusterHandler *ch, void *data, int /* len ATS_UNUSED */)
00354 {
00355   EThread *thread = this_ethread();
00356   ProxyMutex *mutex = thread->mutex;
00357 
00358   // We are called on the ET_CLUSTER thread.
00359   // Control message has been queued into cluster transfer message.
00360   // This allows us to assume that it has been sent.
00361   // Decrement Cluster VC n_set_data_msgs to allow transmission of
00362   // initial open_write data after (n_set_data_msgs == 0).
00363 
00364   SetChanPriorityMessage *m = (SetChanPriorityMessage *) data;
00365   ClusterVConnection *cvc;
00366 
00367   if (ch) {
00368     cvc = ch->channels[m->channel];
00369     if (VALID_CHANNEL(cvc)) {
00370       ink_atomic_increment(&cvc->n_set_data_msgs, -1);
00371     } else {
00372       CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTERVC_STAT);
00373     }
00374   } else {
00375     CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTER_STAT);
00376   }
00377 }
00378 
00379 // End of ClusterRPC.cc

Generated by  doxygen 1.7.1