00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 #include "P_Cluster.h"
00031 
00032 
00033 
00034 
00035 
00036 void
00037 ping_ClusterFunction(ClusterHandler *ch, void *data, int len)
00038 {
00039   
00040   
00041   
00042   clusterProcessor.invoke_remote(ch, PING_REPLY_CLUSTER_FUNCTION, data, len);
00043 }
00044 
00045 void
00046 ping_reply_ClusterFunction(ClusterHandler *ch, void *data, int len)
00047 {
00048   
00049   
00050   
00051   PingMessage *msg = (PingMessage *) data;
00052   msg->fn(ch, msg->data, (len - msg->sizeof_fixedlen_msg()));
00053 }
00054 
00055 void
00056 machine_list_ClusterFunction(ClusterHandler * from, void *data, int len)
00057 {
00058   (void) from;
00059   ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
00060   MachineListMessage *m = (MachineListMessage *) data;
00061 
00062   if (mh->GetMsgVersion() != MachineListMessage::MACHINE_LIST_MESSAGE_VERSION) {        
00063     
00064 
00065     ink_release_assert(!"machine_list_ClusterFunction() bad msg version");
00066   }
00067   if (m->NeedByteSwap())
00068     m->SwapBytes();
00069 
00070   ink_assert(m->n_ip == ((len - m->sizeof_fixedlen_msg()) / sizeof(uint32_t)));
00071 
00072   
00073   
00074   
00075   
00076   ClusterConfiguration *cc = this_cluster()->current_configuration();
00077 
00078   for (unsigned int i = 0; i < m->n_ip; i++) {
00079     for (int j = 0; j < cc->n_machines; j++) {
00080       if (cc->machines[j]->ip == m->ip[i])
00081         goto Lfound;
00082     }
00083     
00084     {
00085       int num_connections = this_cluster_machine()->num_connections;
00086       for (int k = 0; k < num_connections; k++) {
00087         clusterProcessor.connect(m->ip[i], k);
00088       }
00089     }
00090   Lfound:
00091     ;
00092   }
00093 }
00094 
00095 void
00096 close_channel_ClusterFunction(ClusterHandler *ch, void *data, int len)
00097 {
00098   ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
00099   CloseMessage *m = (CloseMessage *) data;
00100 
00101   if (mh->GetMsgVersion() != CloseMessage::CLOSE_CHAN_MESSAGE_VERSION) {        
00102     
00103 
00104     ink_release_assert(!"close_channel_ClusterFunction() bad msg version");
00105   }
00106   if (m->NeedByteSwap())
00107     m->SwapBytes();
00108 
00109   
00110   
00111   
00112   ink_assert(len >= (int) sizeof(CloseMessage));
00113   if (!ch || !ch->channels)
00114     return;
00115   ClusterVConnection *vc = ch->channels[m->channel];
00116   if (VALID_CHANNEL(vc) && vc->token.sequence_number == m->sequence_number) {
00117     vc->remote_closed = m->status;
00118     vc->remote_lerrno = m->lerrno;
00119     ch->vcs_push(vc, vc->type);
00120   }
00121 }
00122 
00123 void
00124 test_ClusterFunction(ClusterHandler *ch, void *data, int len)
00125 {
00126   
00127   
00128   
00129   if (ptest_ClusterFunction)
00130     ptest_ClusterFunction(ch, data, len);
00131 }
00132 
00133 CacheVC *
00134 ChannelToCacheWriteVC(ClusterHandler * ch, int channel, uint32_t channel_seqno, ClusterVConnection ** cluster_vc)
00135 {
00136   EThread *thread = this_ethread();
00137   ProxyMutex *mutex = thread->mutex;
00138 
00139   ClusterVConnection *cvc = ch->channels[channel];
00140   if (!VALID_CHANNEL(cvc)
00141       || (channel_seqno != cvc->token.sequence_number)
00142       || (cvc->read.vio.op != VIO::READ)) {
00143     CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTERVC_STAT);
00144     return NULL;
00145   }
00146   
00147   
00148 
00149   OneWayTunnel *owt = (OneWayTunnel *) cvc->read.vio._cont;
00150   if (!owt) {
00151     CLUSTER_INCREMENT_DYN_STAT(CLUSTER_SETDATA_NO_TUNNEL_STAT);
00152     return NULL;
00153   }
00154   CacheVC *cache_vc = (CacheVC *) owt->vioTarget->vc_server;
00155   if (!cache_vc) {
00156     CLUSTER_INCREMENT_DYN_STAT(CLUSTER_SETDATA_NO_CACHEVC_STAT);
00157     return NULL;
00158   }
00159   *cluster_vc = cvc;
00160   return cache_vc;
00161 }
00162 
00163 void
00164 set_channel_data_ClusterFunction(ClusterHandler *ch, void *tdata, int tlen)
00165 {
00166   EThread *thread = this_ethread();
00167   ProxyMutex *mutex = thread->mutex;
00168   
00169 
00170   char *data;
00171   int len;
00172   int res;
00173 
00174   
00175   IncomingControl *ic = IncomingControl::alloc();
00176   ic->len = tlen;
00177   ic->alloc_data();
00178 
00179   data = ic->data + sizeof(int32_t);      
00180   memcpy(data, tdata, tlen);
00181   len = tlen;
00182 
00183   ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
00184   SetChanDataMessage *m = (SetChanDataMessage *) data;
00185 
00186   if (mh->GetMsgVersion() != SetChanDataMessage::SET_CHANNEL_DATA_MESSAGE_VERSION) {    
00187     
00188 
00189     ink_release_assert(!"set_channel_data_ClusterFunction() bad msg version");
00190   }
00191   if (m->NeedByteSwap())
00192     m->SwapBytes();
00193 
00194   ClusterVConnection *cvc;
00195   CacheVC *cache_vc;
00196 
00197   if (ch) {
00198     cache_vc = ChannelToCacheWriteVC(ch, m->channel, m->sequence_number, &cvc);
00199     if (!cache_vc) {
00200       ic->freeall();
00201       return;
00202     }
00203     
00204     switch (m->data_type) {
00205     case CACHE_DATA_HTTP_INFO:
00206       {
00207         char *p = (char *) m + SetChanDataMessage::sizeof_fixedlen_msg();
00208 
00209         IOBufferBlock *block_ref = ic->get_block();
00210         res = HTTPInfo::unmarshal(p, len, block_ref);
00211         ink_assert(res > 0);
00212 
00213         CacheHTTPInfo h;
00214         h.get_handle((char *) &m->data[0], len);
00215         h.set_buffer_reference(block_ref);
00216         cache_vc->set_http_info(&h);
00217         ic->freeall();
00218         break;
00219       }
00220     default:
00221       {
00222         ink_release_assert(!"set_channel_data_ClusterFunction bad CacheDataType");
00223       }
00224     }                           
00225     ++cvc->n_recv_set_data_msgs;        
00226 
00227   } else {
00228     ic->freeall();
00229     CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTER_STAT);
00230   }
00231 }
00232 
00233 void
00234 post_setchan_send_ClusterFunction(ClusterHandler *ch, void *data, int )
00235 {
00236   EThread *thread = this_ethread();
00237   ProxyMutex *mutex = thread->mutex;
00238   
00239   
00240   
00241   
00242   
00243 
00244   SetChanDataMessage *m = (SetChanDataMessage *) data;
00245   ClusterVConnection *cvc;
00246 
00247   if (ch) {
00248     cvc = ch->channels[m->channel];
00249     if (VALID_CHANNEL(cvc)) {
00250       ink_atomic_increment(&cvc->n_set_data_msgs, -1);
00251     } else {
00252       CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTERVC_STAT);
00253     }
00254   } else {
00255     CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTER_STAT);
00256   }
00257 }
00258 
00259 void
00260 set_channel_pin_ClusterFunction(ClusterHandler *ch, void *data, int )
00261 {
00262   
00263   
00264   
00265 
00266   
00267 
00268   ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
00269   SetChanPinMessage *m = (SetChanPinMessage *) data;
00270 
00271   if (mh->GetMsgVersion() != SetChanPinMessage::SET_CHANNEL_PIN_MESSAGE_VERSION) {      
00272     
00273 
00274     ink_release_assert(!"set_channel_pin_ClusterFunction() bad msg version");
00275   }
00276 
00277   if (m->NeedByteSwap())
00278     m->SwapBytes();
00279 
00280   ClusterVConnection *cvc = NULL;       
00281   CacheVC *cache_vc;
00282 
00283   if (ch != 0) {
00284     cache_vc = ChannelToCacheWriteVC(ch, m->channel, m->sequence_number, &cvc);
00285     if (cache_vc) {
00286       cache_vc->set_pin_in_cache(m->pin_time);
00287     }
00288     
00289     ++cvc->n_recv_set_data_msgs;        
00290   }
00291 }
00292 
00293 void
00294 post_setchan_pin_ClusterFunction(ClusterHandler *ch, void *data, int )
00295 {
00296   EThread *thread = this_ethread();
00297   ProxyMutex *mutex = thread->mutex;
00298   
00299   
00300   
00301   
00302   
00303 
00304   SetChanPinMessage *m = (SetChanPinMessage *) data;
00305   ClusterVConnection *cvc;
00306 
00307   if (ch) {
00308     cvc = ch->channels[m->channel];
00309     if (VALID_CHANNEL(cvc)) {
00310       ink_atomic_increment(&cvc->n_set_data_msgs, -1);
00311     } else {
00312       CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTERVC_STAT);
00313     }
00314   } else {
00315     CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTER_STAT);
00316   }
00317 }
00318 
00319 void
00320 set_channel_priority_ClusterFunction(ClusterHandler *ch, void *data, int )
00321 {
00322   
00323   
00324   
00325 
00326   
00327 
00328   ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
00329   SetChanPriorityMessage *m = (SetChanPriorityMessage *) data;
00330 
00331   if (mh->GetMsgVersion() != SetChanPriorityMessage::SET_CHANNEL_PRIORITY_MESSAGE_VERSION) {    
00332     
00333 
00334     ink_release_assert(!"set_channel_priority_ClusterFunction() bad msg version");
00335   }
00336   if (m->NeedByteSwap())
00337     m->SwapBytes();
00338 
00339   ClusterVConnection *cvc = NULL;       
00340   CacheVC *cache_vc;
00341 
00342   if (ch != 0) {
00343     cache_vc = ChannelToCacheWriteVC(ch, m->channel, m->sequence_number, &cvc);
00344     if (cache_vc) {
00345       cache_vc->set_disk_io_priority(m->disk_priority);
00346     }
00347     
00348     ++cvc->n_recv_set_data_msgs;        
00349   }
00350 }
00351 
00352 void
00353 post_setchan_priority_ClusterFunction(ClusterHandler *ch, void *data, int )
00354 {
00355   EThread *thread = this_ethread();
00356   ProxyMutex *mutex = thread->mutex;
00357 
00358   
00359   
00360   
00361   
00362   
00363 
00364   SetChanPriorityMessage *m = (SetChanPriorityMessage *) data;
00365   ClusterVConnection *cvc;
00366 
00367   if (ch) {
00368     cvc = ch->channels[m->channel];
00369     if (VALID_CHANNEL(cvc)) {
00370       ink_atomic_increment(&cvc->n_set_data_msgs, -1);
00371     } else {
00372       CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTERVC_STAT);
00373     }
00374   } else {
00375     CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTER_STAT);
00376   }
00377 }
00378 
00379