00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include "P_Cluster.h"
00031
00032
00033
00034
00035
00036 void
00037 ping_ClusterFunction(ClusterHandler *ch, void *data, int len)
00038 {
00039
00040
00041
00042 clusterProcessor.invoke_remote(ch, PING_REPLY_CLUSTER_FUNCTION, data, len);
00043 }
00044
00045 void
00046 ping_reply_ClusterFunction(ClusterHandler *ch, void *data, int len)
00047 {
00048
00049
00050
00051 PingMessage *msg = (PingMessage *) data;
00052 msg->fn(ch, msg->data, (len - msg->sizeof_fixedlen_msg()));
00053 }
00054
00055 void
00056 machine_list_ClusterFunction(ClusterHandler * from, void *data, int len)
00057 {
00058 (void) from;
00059 ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
00060 MachineListMessage *m = (MachineListMessage *) data;
00061
00062 if (mh->GetMsgVersion() != MachineListMessage::MACHINE_LIST_MESSAGE_VERSION) {
00063
00064
00065 ink_release_assert(!"machine_list_ClusterFunction() bad msg version");
00066 }
00067 if (m->NeedByteSwap())
00068 m->SwapBytes();
00069
00070 ink_assert(m->n_ip == ((len - m->sizeof_fixedlen_msg()) / sizeof(uint32_t)));
00071
00072
00073
00074
00075
00076 ClusterConfiguration *cc = this_cluster()->current_configuration();
00077
00078 for (unsigned int i = 0; i < m->n_ip; i++) {
00079 for (int j = 0; j < cc->n_machines; j++) {
00080 if (cc->machines[j]->ip == m->ip[i])
00081 goto Lfound;
00082 }
00083
00084 {
00085 int num_connections = this_cluster_machine()->num_connections;
00086 for (int k = 0; k < num_connections; k++) {
00087 clusterProcessor.connect(m->ip[i], k);
00088 }
00089 }
00090 Lfound:
00091 ;
00092 }
00093 }
00094
00095 void
00096 close_channel_ClusterFunction(ClusterHandler *ch, void *data, int len)
00097 {
00098 ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
00099 CloseMessage *m = (CloseMessage *) data;
00100
00101 if (mh->GetMsgVersion() != CloseMessage::CLOSE_CHAN_MESSAGE_VERSION) {
00102
00103
00104 ink_release_assert(!"close_channel_ClusterFunction() bad msg version");
00105 }
00106 if (m->NeedByteSwap())
00107 m->SwapBytes();
00108
00109
00110
00111
00112 ink_assert(len >= (int) sizeof(CloseMessage));
00113 if (!ch || !ch->channels)
00114 return;
00115 ClusterVConnection *vc = ch->channels[m->channel];
00116 if (VALID_CHANNEL(vc) && vc->token.sequence_number == m->sequence_number) {
00117 vc->remote_closed = m->status;
00118 vc->remote_lerrno = m->lerrno;
00119 ch->vcs_push(vc, vc->type);
00120 }
00121 }
00122
00123 void
00124 test_ClusterFunction(ClusterHandler *ch, void *data, int len)
00125 {
00126
00127
00128
00129 if (ptest_ClusterFunction)
00130 ptest_ClusterFunction(ch, data, len);
00131 }
00132
00133 CacheVC *
00134 ChannelToCacheWriteVC(ClusterHandler * ch, int channel, uint32_t channel_seqno, ClusterVConnection ** cluster_vc)
00135 {
00136 EThread *thread = this_ethread();
00137 ProxyMutex *mutex = thread->mutex;
00138
00139 ClusterVConnection *cvc = ch->channels[channel];
00140 if (!VALID_CHANNEL(cvc)
00141 || (channel_seqno != cvc->token.sequence_number)
00142 || (cvc->read.vio.op != VIO::READ)) {
00143 CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTERVC_STAT);
00144 return NULL;
00145 }
00146
00147
00148
00149 OneWayTunnel *owt = (OneWayTunnel *) cvc->read.vio._cont;
00150 if (!owt) {
00151 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_SETDATA_NO_TUNNEL_STAT);
00152 return NULL;
00153 }
00154 CacheVC *cache_vc = (CacheVC *) owt->vioTarget->vc_server;
00155 if (!cache_vc) {
00156 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_SETDATA_NO_CACHEVC_STAT);
00157 return NULL;
00158 }
00159 *cluster_vc = cvc;
00160 return cache_vc;
00161 }
00162
00163 void
00164 set_channel_data_ClusterFunction(ClusterHandler *ch, void *tdata, int tlen)
00165 {
00166 EThread *thread = this_ethread();
00167 ProxyMutex *mutex = thread->mutex;
00168
00169
00170 char *data;
00171 int len;
00172 int res;
00173
00174
00175 IncomingControl *ic = IncomingControl::alloc();
00176 ic->len = tlen;
00177 ic->alloc_data();
00178
00179 data = ic->data + sizeof(int32_t);
00180 memcpy(data, tdata, tlen);
00181 len = tlen;
00182
00183 ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
00184 SetChanDataMessage *m = (SetChanDataMessage *) data;
00185
00186 if (mh->GetMsgVersion() != SetChanDataMessage::SET_CHANNEL_DATA_MESSAGE_VERSION) {
00187
00188
00189 ink_release_assert(!"set_channel_data_ClusterFunction() bad msg version");
00190 }
00191 if (m->NeedByteSwap())
00192 m->SwapBytes();
00193
00194 ClusterVConnection *cvc;
00195 CacheVC *cache_vc;
00196
00197 if (ch) {
00198 cache_vc = ChannelToCacheWriteVC(ch, m->channel, m->sequence_number, &cvc);
00199 if (!cache_vc) {
00200 ic->freeall();
00201 return;
00202 }
00203
00204 switch (m->data_type) {
00205 case CACHE_DATA_HTTP_INFO:
00206 {
00207 char *p = (char *) m + SetChanDataMessage::sizeof_fixedlen_msg();
00208
00209 IOBufferBlock *block_ref = ic->get_block();
00210 res = HTTPInfo::unmarshal(p, len, block_ref);
00211 ink_assert(res > 0);
00212
00213 CacheHTTPInfo h;
00214 h.get_handle((char *) &m->data[0], len);
00215 h.set_buffer_reference(block_ref);
00216 cache_vc->set_http_info(&h);
00217 ic->freeall();
00218 break;
00219 }
00220 default:
00221 {
00222 ink_release_assert(!"set_channel_data_ClusterFunction bad CacheDataType");
00223 }
00224 }
00225 ++cvc->n_recv_set_data_msgs;
00226
00227 } else {
00228 ic->freeall();
00229 CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTER_STAT);
00230 }
00231 }
00232
00233 void
00234 post_setchan_send_ClusterFunction(ClusterHandler *ch, void *data, int )
00235 {
00236 EThread *thread = this_ethread();
00237 ProxyMutex *mutex = thread->mutex;
00238
00239
00240
00241
00242
00243
00244 SetChanDataMessage *m = (SetChanDataMessage *) data;
00245 ClusterVConnection *cvc;
00246
00247 if (ch) {
00248 cvc = ch->channels[m->channel];
00249 if (VALID_CHANNEL(cvc)) {
00250 ink_atomic_increment(&cvc->n_set_data_msgs, -1);
00251 } else {
00252 CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTERVC_STAT);
00253 }
00254 } else {
00255 CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTER_STAT);
00256 }
00257 }
00258
00259 void
00260 set_channel_pin_ClusterFunction(ClusterHandler *ch, void *data, int )
00261 {
00262
00263
00264
00265
00266
00267
00268 ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
00269 SetChanPinMessage *m = (SetChanPinMessage *) data;
00270
00271 if (mh->GetMsgVersion() != SetChanPinMessage::SET_CHANNEL_PIN_MESSAGE_VERSION) {
00272
00273
00274 ink_release_assert(!"set_channel_pin_ClusterFunction() bad msg version");
00275 }
00276
00277 if (m->NeedByteSwap())
00278 m->SwapBytes();
00279
00280 ClusterVConnection *cvc = NULL;
00281 CacheVC *cache_vc;
00282
00283 if (ch != 0) {
00284 cache_vc = ChannelToCacheWriteVC(ch, m->channel, m->sequence_number, &cvc);
00285 if (cache_vc) {
00286 cache_vc->set_pin_in_cache(m->pin_time);
00287 }
00288
00289 ++cvc->n_recv_set_data_msgs;
00290 }
00291 }
00292
00293 void
00294 post_setchan_pin_ClusterFunction(ClusterHandler *ch, void *data, int )
00295 {
00296 EThread *thread = this_ethread();
00297 ProxyMutex *mutex = thread->mutex;
00298
00299
00300
00301
00302
00303
00304 SetChanPinMessage *m = (SetChanPinMessage *) data;
00305 ClusterVConnection *cvc;
00306
00307 if (ch) {
00308 cvc = ch->channels[m->channel];
00309 if (VALID_CHANNEL(cvc)) {
00310 ink_atomic_increment(&cvc->n_set_data_msgs, -1);
00311 } else {
00312 CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTERVC_STAT);
00313 }
00314 } else {
00315 CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTER_STAT);
00316 }
00317 }
00318
00319 void
00320 set_channel_priority_ClusterFunction(ClusterHandler *ch, void *data, int )
00321 {
00322
00323
00324
00325
00326
00327
00328 ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
00329 SetChanPriorityMessage *m = (SetChanPriorityMessage *) data;
00330
00331 if (mh->GetMsgVersion() != SetChanPriorityMessage::SET_CHANNEL_PRIORITY_MESSAGE_VERSION) {
00332
00333
00334 ink_release_assert(!"set_channel_priority_ClusterFunction() bad msg version");
00335 }
00336 if (m->NeedByteSwap())
00337 m->SwapBytes();
00338
00339 ClusterVConnection *cvc = NULL;
00340 CacheVC *cache_vc;
00341
00342 if (ch != 0) {
00343 cache_vc = ChannelToCacheWriteVC(ch, m->channel, m->sequence_number, &cvc);
00344 if (cache_vc) {
00345 cache_vc->set_disk_io_priority(m->disk_priority);
00346 }
00347
00348 ++cvc->n_recv_set_data_msgs;
00349 }
00350 }
00351
00352 void
00353 post_setchan_priority_ClusterFunction(ClusterHandler *ch, void *data, int )
00354 {
00355 EThread *thread = this_ethread();
00356 ProxyMutex *mutex = thread->mutex;
00357
00358
00359
00360
00361
00362
00363
00364 SetChanPriorityMessage *m = (SetChanPriorityMessage *) data;
00365 ClusterVConnection *cvc;
00366
00367 if (ch) {
00368 cvc = ch->channels[m->channel];
00369 if (VALID_CHANNEL(cvc)) {
00370 ink_atomic_increment(&cvc->n_set_data_msgs, -1);
00371 } else {
00372 CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTERVC_STAT);
00373 }
00374 } else {
00375 CLUSTER_INCREMENT_DYN_STAT(cluster_setdata_no_CLUSTER_STAT);
00376 }
00377 }
00378
00379