00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 #define DEFINE_CLUSTER_FUNCTIONS
00031 #include "P_Cluster.h"
00032 
00033 
00034 
00035 
00036 
00037 unsigned SIZE_clusterFunction = countof(clusterFunction);
00038 
00039 
00040 ClusterFunction *ptest_ClusterFunction = NULL;
00041 
00042 
00043 static char channel_dummy_input[DEFAULT_MAX_BUFFER_SIZE];
00044 char channel_dummy_output[DEFAULT_MAX_BUFFER_SIZE];
00045 
00046 
00047 ClassAllocator<OutgoingControl> outControlAllocator("outControlAllocator");
00048 
00049 
00050 ClassAllocator<IncomingControl> inControlAllocator("inControlAllocator");
00051 
00052 static int dump_msgs = 0;
00053 
00054 
00055 
00056 
00057 #ifdef VERIFY_PETERS_DATA
00058 #define DO_VERIFY_PETERS_DATA(_p,_l) verify_peters_data(_p,_l)
00059 #else
00060 #define DO_VERIFY_PETERS_DATA(_p,_l)
00061 #endif
00062 
00063 void
00064 verify_peters_data(char *ap, int l)
00065 {
00066   unsigned char *p = (unsigned char *) ap;
00067   for (int i = 0; i < l - 1; i++) {
00068     unsigned char x1 = p[i];
00069     unsigned char x2 = p[i + 1];
00070     x1 += 1;
00071     if (x1 != x2) {
00072       fprintf(stderr, "verify peter's data failed at %d\n", i);
00073       break;
00074     }
00075   }
00076 }
00077 
00078 
00079 
00080 
00081 
00082 
00083 
00084 
00085 
00086 
00087 
00088 
00089 
00090 
00091 
00092 
00093 
00094 
00095 
00096 
00097 
00098 
00099 
00100 
00101 
00102 
00103 
00104 
00105 
00106 
00107 
00108 
00109 
00110 
00111 
00112 
00113 
00114 
00115 
00116 
00117 
00118 
00119 
00120 
00121 
00122 
00123 
00124 
00125 
00126 
00127 
00128 
00129 
00130 
00131 
00132 
00133 
00134 
00135 
00136 ClusterHandler::ClusterHandler()
00137   : net_vc(0),
00138     thread(0),
00139     ip(0),
00140     port(0),
00141     hostname(NULL),
00142     machine(NULL),
00143     ifd(-1),
00144     id(-1),
00145     dead(true),
00146     downing(false),
00147     active(false),
00148     on_stolen_thread(false),
00149     n_channels(0),
00150     channels(NULL),
00151     channel_data(NULL),
00152     connector(false),
00153     cluster_connect_state(ClusterHandler::CLCON_INITIAL),
00154     needByteSwap(false),
00155     configLookupFails(0),
00156     cluster_periodic_event(0),
00157     read(this, true),
00158     write(this, false),
00159     current_time(0),
00160     last(0),
00161     last_report(0),
00162     n_since_last_report(0),
00163     last_cluster_op_enable(0),
00164     last_trace_dump(0),
00165     clm(0),
00166     disable_remote_cluster_ops(0),
00167     pw_write_descriptors_built(0),
00168     pw_freespace_descriptors_built(0),
00169     pw_controldata_descriptors_built(0), pw_time_expired(0), started_on_stolen_thread(false), control_message_write(false)
00170 #ifdef CLUSTER_STATS
00171   ,
00172     _vc_writes(0),
00173     _vc_write_bytes(0),
00174     _control_write_bytes(0),
00175     _dw_missed_lock(0),
00176     _dw_not_enabled(0),
00177     _dw_wait_remote_fill(0),
00178     _dw_no_active_vio(0),
00179     _dw_not_enabled_or_no_write(0),
00180     _dw_set_data_pending(0),
00181     _dw_no_free_space(0),
00182     _fw_missed_lock(0),
00183     _fw_not_enabled(0),
00184     _fw_wait_remote_fill(0),
00185     _fw_no_active_vio(0),
00186     _fw_not_enabled_or_no_read(0),
00187     _process_read_calls(0),
00188     _n_read_start(0),
00189     _n_read_header(0),
00190     _n_read_await_header(0),
00191     _n_read_setup_descriptor(0),
00192     _n_read_descriptor(0),
00193     _n_read_await_descriptor(0),
00194     _n_read_setup_data(0),
00195     _n_read_data(0),
00196     _n_read_await_data(0),
00197     _n_read_post_complete(0),
00198     _n_read_complete(0),
00199     _process_write_calls(0),
00200     _n_write_start(0),
00201     _n_write_setup(0), _n_write_initiate(0), _n_write_await_completion(0), _n_write_post_complete(0), _n_write_complete(0)
00202 #endif
00203 {
00204 #ifdef MSG_TRACE
00205   t_fd = fopen("msgtrace.log", "w");
00206 #endif
00207   
00208 
00209   min_priority = 1;
00210   SET_HANDLER((ClusterContHandler) & ClusterHandler::startClusterEvent);
00211 
00212   mutex = new_ProxyMutex();
00213   OutgoingControl oc;
00214   int n;
00215   for (n = 0; n < CLUSTER_CMSG_QUEUES; ++n) {
00216     ink_atomiclist_init(&outgoing_control_al[n], "OutGoingControlQueue", (char *) &oc.link.next - (char *) &oc);
00217   }
00218 
00219   IncomingControl ic;
00220   ink_atomiclist_init(&external_incoming_control,
00221                       "ExternalIncomingControlQueue", (char *) &ic.link.next - (char *) &ic);
00222 
00223   ClusterVConnection ivc;
00224   ink_atomiclist_init(&external_incoming_open_local,
00225                       "ExternalIncomingOpenLocalQueue", (char *) &ivc.link.next - (char *) &ivc);
00226   ink_atomiclist_init(&read_vcs_ready, "ReadVcReadyQueue", offsetof(ClusterVConnection, ready_alink.next));
00227   ink_atomiclist_init(&write_vcs_ready, "WriteVcReadyQueue", offsetof(ClusterVConnection, ready_alink.next));
00228   memset((char *) &callout_cont[0], 0, sizeof(callout_cont));
00229   memset((char *) &callout_events[0], 0, sizeof(callout_events));
00230 }
00231 
00232 ClusterHandler::~ClusterHandler()
00233 {
00234   bool free_m = false;
00235   if (net_vc) {
00236     net_vc->do_io(VIO::CLOSE);
00237     net_vc = 0;
00238   }
00239   if (machine) {
00240     MUTEX_TAKE_LOCK(the_cluster_config_mutex, this_ethread());
00241     if (++machine->free_connections >= machine->num_connections)
00242       free_m = true;
00243     MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread());
00244     if (free_m)
00245       free_ClusterMachine(machine);
00246   }
00247   machine = NULL;
00248   ats_free(hostname);
00249   hostname = NULL;
00250   ats_free(channels);
00251   channels = NULL;
00252   if (channel_data) {
00253     for (int i = 0; i < n_channels; ++i) {
00254       if (channel_data[i]) {
00255         ats_free(channel_data[i]);
00256         channel_data[i] = 0;
00257       }
00258     }
00259     ats_free(channel_data);
00260     channel_data = NULL;
00261   }
00262   if (read_vcs)
00263     delete[]read_vcs;
00264   read_vcs = NULL;
00265 
00266   if (write_vcs)
00267     delete[]write_vcs;
00268   write_vcs = NULL;
00269 
00270   if (clm) {
00271     delete clm;
00272     clm = NULL;
00273   }
00274 #ifdef CLUSTER_STATS
00275   message_blk = 0;
00276 #endif
00277 }
00278 
00279 void
00280 ClusterHandler::close_ClusterVConnection(ClusterVConnection * vc)
00281 {
00282   
00283   
00284   
00285   if (vc->inactivity_timeout)
00286     vc->inactivity_timeout->cancel(vc);
00287   if (vc->active_timeout)
00288     vc->active_timeout->cancel(vc);
00289   if (vc->read.queue)
00290     ClusterVC_remove_read(vc);
00291   if (vc->write.queue)
00292     ClusterVC_remove_write(vc);
00293   vc->read.vio.mutex = NULL;
00294   vc->write.vio.mutex = NULL;
00295 
00296   ink_assert(!vc->read_locked);
00297   ink_assert(!vc->write_locked);
00298   int channel = vc->channel;
00299   free_channel(vc);
00300 
00301   if (vc->byte_bank_q.head) {
00302     delayed_reads.remove(vc);
00303 
00304     
00305     ByteBankDescriptor *d;
00306     while ((d = vc->byte_bank_q.dequeue())) {
00307       ByteBankDescriptor::ByteBankDescriptor_free(d);
00308     }
00309   }
00310   vc->read_block = 0;
00311 
00312   ink_assert(!vc->write_list);
00313   ink_assert(!vc->write_list_tail);
00314   ink_assert(!vc->write_list_bytes);
00315   ink_assert(!vc->write_bytes_in_transit);
00316 
00317   if (((!vc->remote_closed && !vc->have_all_data) || (vc->remote_closed == FORCE_CLOSE_ON_OPEN_CHANNEL)) && vc->ch) {
00318 
00319     CloseMessage msg;
00320     int vers = CloseMessage::protoToVersion(vc->ch->machine->msg_proto_major);
00321     void *data;
00322     int len;
00323 
00324     if (vers == CloseMessage::CLOSE_CHAN_MESSAGE_VERSION) {
00325       msg.channel = channel;
00326       msg.status = (vc->remote_closed == FORCE_CLOSE_ON_OPEN_CHANNEL) ? FORCE_CLOSE_ON_OPEN_CHANNEL : vc->closed;
00327       msg.lerrno = vc->lerrno;
00328       msg.sequence_number = vc->token.sequence_number;
00329       data = (void *) &msg;
00330       len = sizeof(CloseMessage);
00331 
00332     } else {
00333 
00334       
00335 
00336       ink_release_assert(!"close_ClusterVConnection() bad msg version");
00337     }
00338     clusterProcessor.invoke_remote(vc->ch, CLOSE_CHANNEL_CLUSTER_FUNCTION, data, len);
00339   }
00340   ink_hrtime now = ink_get_hrtime();
00341   CLUSTER_DECREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPEN_STAT);
00342   CLUSTER_SUM_DYN_STAT(CLUSTER_CON_TOTAL_TIME_STAT, now - vc->start_time);
00343   if (!local_channel(channel)) {
00344     CLUSTER_SUM_DYN_STAT(CLUSTER_REMOTE_CONNECTION_TIME_STAT, now - vc->start_time);
00345   } else {
00346     CLUSTER_SUM_DYN_STAT(CLUSTER_LOCAL_CONNECTION_TIME_STAT, now - vc->start_time);
00347   }
00348   clusterVCAllocator_free(vc);
00349 }
00350 
00351 inline bool
00352 ClusterHandler::vc_ok_write(ClusterVConnection * vc)
00353 {
00354   return (((vc->closed > 0)
00355            && (vc->write_list || vc->write_bytes_in_transit)) ||
00356           (!vc->closed && vc->write.enabled && vc->write.vio.op == VIO::WRITE && vc->write.vio.buffer.writer()));
00357 }
00358 
00359 inline bool
00360 ClusterHandler::vc_ok_read(ClusterVConnection * vc)
00361 {
00362   return (!vc->closed && vc->read.vio.op == VIO::READ && vc->read.vio.buffer.writer());
00363 }
00364 
00365 void
00366 ClusterHandler::close_free_lock(ClusterVConnection * vc, ClusterVConnState * s)
00367 {
00368   Ptr<ProxyMutex> m(s->vio.mutex);
00369   if (s == &vc->read) {
00370     if ((ProxyMutex *) vc->read_locked)
00371       MUTEX_UNTAKE_LOCK(vc->read_locked, thread);
00372     vc->read_locked = NULL;
00373   } else {
00374     if ((ProxyMutex *) vc->write_locked)
00375       MUTEX_UNTAKE_LOCK(vc->write_locked, thread);
00376     vc->write_locked = NULL;
00377   }
00378   close_ClusterVConnection(vc);
00379 }
00380 
00381 bool ClusterHandler::build_data_vector(char *d, int len, bool read_flag)
00382 {
00383   
00384   
00385 
00386   ClusterState & s = (read_flag ? read : write);
00387   ink_assert(d);
00388   ink_assert(len);
00389   ink_assert(s.iov);
00390 
00391   s.msg.count = 1;
00392   s.iov[0].iov_base = 0;
00393   s.iov[0].iov_len = len;
00394   s.block[0] = new_IOBufferBlock();
00395   s.block[0]->set(new_constant_IOBufferData(d, len));
00396 
00397   if (read_flag) {
00398     
00399     s.block[0]->_buf_end = s.block[0]->end() + len;
00400   } else {
00401     
00402     s.block[0]->fill(len);
00403   }
00404 
00405   s.to_do = len;
00406   s.did = 0;
00407   s.n_iov = 1;
00408 
00409   return true;
00410 }
00411 
00412 bool ClusterHandler::build_initial_vector(bool read_flag)
00413 {
00414   
00415   
00416   
00417   
00418   
00419   
00420 
00421   
00422 
00423   
00424   
00425   
00426   
00427   
00428   
00429   
00430   
00431   
00432 
00433   
00434   
00435 
00436   
00437   
00438   
00439 
00440   
00441   
00442   
00443   
00444 
00445   int i, n;
00446   
00447   
00448 
00449   ink_hrtime now = ink_get_hrtime();
00450   ClusterState & s = (read_flag ? read : write);
00451   OutgoingControl *oc = s.msg.outgoing_control.head;
00452   IncomingControl *ic = incoming_control.head;
00453   int new_n_iov = 0;
00454   int to_do = 0;
00455   int len;
00456 
00457   ink_assert(s.iov);
00458 
00459   if (!read_flag) {
00460 
00461     
00462 
00463     len = sizeof(ClusterMsgHeader) + (s.msg.count * sizeof(Descriptor)) + s.msg.control_bytes;
00464     s.iov[new_n_iov].iov_base = 0;
00465     s.iov[new_n_iov].iov_len = len;
00466     s.block[new_n_iov] = s.msg.get_block_header();
00467 
00468     
00469     s.block[new_n_iov]->fill(len);
00470 
00471     to_do += len;
00472     ++new_n_iov;
00473 
00474   } else {
00475     if (s.msg.state == 0) {
00476 
00477       
00478 
00479       len = sizeof(ClusterMsgHeader);
00480       s.iov[new_n_iov].iov_base = 0;
00481       s.iov[new_n_iov].iov_len = len;
00482       s.block[new_n_iov] = s.msg.get_block_header();
00483 
00484       
00485       s.block[new_n_iov]->_buf_end = s.block[new_n_iov]->end() + len;
00486 
00487       to_do += len;
00488       ++new_n_iov;
00489 
00490     } else if (s.msg.state == 1) {
00491 
00492       
00493 
00494       len = (s.msg.count * sizeof(Descriptor)) + s.msg.control_bytes;
00495       s.iov[new_n_iov].iov_base = 0;
00496       s.iov[new_n_iov].iov_len = len;
00497       s.block[new_n_iov] = s.msg.get_block_descriptor();
00498 
00499       
00500       s.block[new_n_iov]->_buf_end = s.block[new_n_iov]->end() + len;
00501 
00502       to_do += s.iov[new_n_iov].iov_len;
00503       ++new_n_iov;
00504     }
00505   }
00506 
00507 
00508   
00509   
00510   
00511 
00512   
00513   
00514 
00515   for (i = 0; i<(read_flag ? ((s.msg.state>= 2) ? s.msg.count : 0)
00516                    : s.msg.count); i++) {
00517     if (s.msg.descriptor[i].type == CLUSTER_SEND_DATA) {
00518 
00519       
00520 
00521       if (s.msg.descriptor[i].channel == CLUSTER_CONTROL_CHANNEL) {
00522         if (read_flag) {
00523 
00524           
00525 
00526           if (!ic) {
00527             ic = IncomingControl::alloc();
00528             ic->recognized_time = now;
00529             CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CTRL_MSGS_RECVD_STAT);
00530             ic->len = s.msg.descriptor[i].length;
00531             ic->alloc_data();
00532             if (!ic->fast_data()) {
00533               CLUSTER_INCREMENT_DYN_STAT(CLUSTER_SLOW_CTRL_MSGS_RECVD_STAT);
00534             }
00535             
00536             *((uint32_t *) ic->data) = UNDEFINED_CLUSTER_FUNCTION;
00537             incoming_control.enqueue(ic);
00538           }
00539           s.iov[new_n_iov].iov_base = 0;
00540           s.iov[new_n_iov].iov_len = ic->len;
00541           s.block[new_n_iov] = ic->get_block();
00542           to_do += s.iov[new_n_iov].iov_len;
00543           ++new_n_iov;
00544           ic = (IncomingControl *) ic->link.next;
00545         } else {
00546 
00547           
00548 
00549           ink_assert(oc);
00550           s.iov[new_n_iov].iov_base = 0;
00551           s.iov[new_n_iov].iov_len = oc->len;
00552           s.block[new_n_iov] = oc->get_block();
00553           to_do += s.iov[new_n_iov].iov_len;
00554           ++new_n_iov;
00555           oc = (OutgoingControl *) oc->link.next;
00556         }
00557       } else {
00558 
00559         
00560 
00561         ClusterVConnection *
00562           vc = channels[s.msg.descriptor[i].channel];
00563 
00564         if (VALID_CHANNEL(vc) &&
00565             (s.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
00566           if (read_flag) {
00567             ink_release_assert(!vc->initial_data_bytes);
00568 
00569             
00570 
00571             ink_release_assert(!(ProxyMutex *) vc->read_locked);
00572 #ifdef CLUSTER_TOMCAT
00573             if (!vc->read.vio.mutex ||
00574                 !MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->read.vio.mutex, thread, vc->read.vio._cont, READ_LOCK_SPIN_COUNT))
00575 #else
00576             if (!MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->read.vio.mutex, thread, vc->read.vio._cont, READ_LOCK_SPIN_COUNT))
00577 #endif
00578             {
00579               vc->read_locked = 0;
00580             } else {
00581               vc->read_locked = vc->read.vio.mutex;
00582             }
00583 
00584 
00585             
00586 
00587             if (s.msg.descriptor[i].length) {
00588               vc->iov_map = new_n_iov;
00589             } else {
00590               vc->iov_map = CLUSTER_IOV_NONE;
00591             }
00592             if (vc->pending_remote_fill || vc_ok_read(vc)) {
00593 
00594               
00595               
00596 
00597               ink_release_assert(s.msg.descriptor[i].length <= DEFAULT_MAX_BUFFER_SIZE);
00598               vc->read_block = new_IOBufferBlock();
00599               int64_t index = buffer_size_to_index(s.msg.descriptor[i].length, MAX_BUFFER_SIZE_INDEX);
00600               vc->read_block->alloc(index);
00601 
00602               s.iov[new_n_iov].iov_base = 0;
00603               s.block[new_n_iov] = vc->read_block->clone();
00604 
00605             } else {
00606               Debug(CL_NOTE, "dumping cluster read data");
00607               s.iov[new_n_iov].iov_base = 0;
00608               s.block[new_n_iov] = new_IOBufferBlock();
00609               s.block[new_n_iov]->set(new_constant_IOBufferData(channel_dummy_input, DEFAULT_MAX_BUFFER_SIZE));
00610             }
00611 
00612             
00613             s.block[new_n_iov]->_buf_end = s.block[new_n_iov]->end() + s.msg.descriptor[i].length;
00614 
00615           } else {
00616             bool remote_write_fill = (vc->pending_remote_fill && vc->remote_write_block);
00617             
00618             if (!remote_write_fill) {
00619               ink_assert((ProxyMutex *) vc->write_locked);
00620             }
00621             if (vc_ok_write(vc) || remote_write_fill) {
00622               if (remote_write_fill) {
00623                 s.iov[new_n_iov].iov_base = 0;
00624                 ink_release_assert((int) s.msg.descriptor[i].length == bytes_IOBufferBlockList(vc->remote_write_block, 1));
00625                 s.block[new_n_iov] = vc->remote_write_block;
00626 
00627               } else {
00628                 s.iov[new_n_iov].iov_base = 0;
00629                 ink_release_assert((int) s.msg.descriptor[i].length <= vc->write_list_bytes);
00630                 s.block[new_n_iov] = vc->write_list;
00631                 vc->write_list = consume_IOBufferBlockList(vc->write_list, (int) s.msg.descriptor[i].length);
00632                 vc->write_list_bytes -= (int) s.msg.descriptor[i].length;
00633                 vc->write_bytes_in_transit += (int) s.msg.descriptor[i].length;
00634 
00635                 vc->write_list_tail = vc->write_list;
00636                 while (vc->write_list_tail && vc->write_list_tail->next)
00637                   vc->write_list_tail = vc->write_list_tail->next;
00638               }
00639             } else {
00640               Debug(CL_NOTE, "faking cluster write data");
00641               s.iov[new_n_iov].iov_base = 0;
00642               s.block[new_n_iov] = new_IOBufferBlock();
00643               s.block[new_n_iov]->set(new_constant_IOBufferData(channel_dummy_output, DEFAULT_MAX_BUFFER_SIZE));
00644               
00645               s.block[new_n_iov]->fill(s.msg.descriptor[i].length);
00646             }
00647           }
00648         } else {
00649           
00650           s.iov[new_n_iov].iov_base = 0;
00651           s.block[new_n_iov] = new_IOBufferBlock();
00652 
00653           if (read_flag) {
00654             s.block[new_n_iov]->set(new_constant_IOBufferData(channel_dummy_input, DEFAULT_MAX_BUFFER_SIZE));
00655 
00656             
00657             s.block[new_n_iov]->_buf_end = s.block[new_n_iov]->end() + s.msg.descriptor[i].length;
00658 
00659           } else {
00660             s.block[new_n_iov]->set(new_constant_IOBufferData(channel_dummy_output, DEFAULT_MAX_BUFFER_SIZE));
00661 
00662             
00663             s.block[new_n_iov]->fill(s.msg.descriptor[i].length);
00664           }
00665         }
00666         s.iov[new_n_iov].iov_len = s.msg.descriptor[i].length;
00667         to_do += s.iov[new_n_iov].iov_len;
00668         ++new_n_iov;
00669       }
00670     }
00671   }
00672   
00673   for (n = new_n_iov; n < MAX_TCOUNT; ++n) {
00674     s.block[n] = 0;
00675   }
00676 
00677   
00678   s.to_do = to_do;
00679   s.did = 0;
00680   s.n_iov = new_n_iov;
00681   return true;
00682 
00683   
00684   
00685 #if 0
00686   
00687   for (n = 0; n < MAX_TCOUNT; ++n) {
00688     s.block[n] = 0;
00689   }
00690   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_OP_DELAYED_FOR_LOCK_STAT);
00691   Debug(CL_WARN, "%s delayed for locks", read_flag ? "read" : "write");
00692   free_locks(read_flag, i);
00693   return false;
00694 #endif
00695 }
00696 
00697 bool ClusterHandler::get_read_locks()
00698 {
00699 
00700   
00701   
00702 
00703   ClusterState & s = read;
00704   int i, n;
00705   int bytes_processed;
00706   int vec_bytes_remainder;
00707   int iov_done[MAX_TCOUNT];
00708 
00709   memset((char *) iov_done, 0, sizeof(int) * MAX_TCOUNT);
00710 
00711   
00712   bytes_processed = s.did - s.bytes_xfered;     
00713 
00714   i = -1;
00715   for (n = 0; n < s.n_iov; ++n) {
00716     bytes_processed -= s.iov[n].iov_len;
00717     if (bytes_processed >= 0) {
00718       iov_done[n] = s.iov[n].iov_len;
00719     } else {
00720       iov_done[n] = s.iov[n].iov_len + bytes_processed;
00721       if (i < 0) {
00722         i = n;                  
00723 
00724         
00725         
00726 
00727         vec_bytes_remainder = (s.iov[n].iov_len - iov_done[n]);
00728         bytes_processed = s.bytes_xfered;
00729 
00730         bytes_processed -= vec_bytes_remainder;
00731         if (bytes_processed >= 0) {
00732           iov_done[n] = vec_bytes_remainder;
00733         } else {
00734           iov_done[n] = vec_bytes_remainder + bytes_processed;
00735           break;
00736         }
00737       } else {
00738         break;
00739       }
00740     }
00741   }
00742   ink_release_assert(i >= 0);
00743 
00744   
00745   
00746   
00747   
00748   
00749 
00750   for (; i < s.n_iov; ++i) {
00751     if ((s.msg.descriptor[i].type == CLUSTER_SEND_DATA)
00752         && (s.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL)) {
00753 
00754       
00755 
00756       ClusterVConnection *
00757         vc = channels[s.msg.descriptor[i].channel];
00758       if (!VALID_CHANNEL(vc) ||
00759           ((s.msg.descriptor[i].sequence_number) !=
00760            CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) || !vc_ok_read(vc)) {
00761         
00762         
00763         continue;
00764       }
00765 
00766       ink_assert(!(ProxyMutex *) vc->read_locked);
00767       vc->read_locked = vc->read.vio.mutex;
00768       if (vc->byte_bank_q.head
00769           || !MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->read.vio.mutex, thread, vc->read.vio._cont, READ_LOCK_SPIN_COUNT)) {
00770         
00771 
00772         vc->read_locked = NULL;
00773         continue;
00774       }
00775       
00776 
00777       if (!vc_ok_read(vc)) {
00778         MUTEX_UNTAKE_LOCK(vc->read.vio.mutex, thread);
00779         vc->read_locked = NULL;
00780         continue;
00781       }
00782       
00783 
00784       int64_t read_avail = vc->read_block->read_avail();
00785 
00786       if (!vc->pending_remote_fill && read_avail) {
00787         Debug("cluster_vc_xfer", "Deferred fill ch %d %p %" PRId64" bytes", vc->channel, vc, read_avail);
00788 
00789         vc->read.vio.buffer.writer()->append_block(vc->read_block->clone());
00790         if (complete_channel_read(read_avail, vc)) {
00791           vc->read_block->consume(read_avail);
00792         }
00793       }
00794     }
00795   }
00796   return true;                  
00797 }
00798 
00799 bool ClusterHandler::get_write_locks()
00800 {
00801 
00802   
00803   
00804   
00805 
00806   ClusterState & s = write;
00807   int i;
00808 
00809   for (i = 0; i < s.msg.count; ++i) {
00810     if ((s.msg.descriptor[i].type == CLUSTER_SEND_DATA)
00811         && (s.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL)) {
00812 
00813       
00814 
00815       ClusterVConnection *
00816         vc = channels[s.msg.descriptor[i].channel];
00817       if (!VALID_CHANNEL(vc) ||
00818           (s.msg.descriptor[i].sequence_number) != CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
00819         
00820         
00821         continue;
00822       }
00823       ink_assert(!(ProxyMutex *) vc->write_locked);
00824       vc->write_locked = vc->write.vio.mutex;
00825 #ifdef CLUSTER_TOMCAT
00826       if (vc->write_locked &&
00827           !MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->write.vio.mutex, thread, vc->write.vio._cont, WRITE_LOCK_SPIN_COUNT)) {
00828 #else
00829       if (!MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->write.vio.mutex, thread, vc->write.vio._cont, WRITE_LOCK_SPIN_COUNT)) {
00830 #endif
00831         
00832         vc->write_locked = 0;
00833         free_locks(CLUSTER_WRITE, i);
00834         return false;
00835       }
00836     }
00837   }
00838   return true;
00839 }
00840 
00841 void
00842 ClusterHandler::swap_descriptor_bytes()
00843 {
00844   for (int i = 0; i < read.msg.count; i++) {
00845     read.msg.descriptor[i].SwapBytes();
00846   }
00847 }
00848 
00849 void
00850 ClusterHandler::process_set_data_msgs()
00851 {
00852   uint32_t cluster_function_index;
00853   
00854   
00855   
00856   
00857   
00858   
00859 
00860 
00861   
00862 
00863   if (!read.msg.did_small_control_set_data) {
00864     char *p = (char *) &read.msg.descriptor[read.msg.count];
00865     char *endp = p + read.msg.control_bytes;
00866     while (p < endp) {
00867       if (needByteSwap) {
00868         ats_swap32((uint32_t *) p);   
00869         ats_swap32((uint32_t *) (p + sizeof(int32_t))); 
00870       }
00871       int len = *(int32_t *) p;
00872       cluster_function_index = *(uint32_t *) (p + sizeof(int32_t));
00873 
00874       if ((cluster_function_index < (uint32_t) SIZE_clusterFunction)
00875           && (cluster_function_index == SET_CHANNEL_DATA_CLUSTER_FUNCTION)) {
00876         clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].pfn(this, p + (2 * sizeof(uint32_t)), len - sizeof(uint32_t));
00877         
00878         *((uint32_t *) (p + sizeof(uint32_t))) = ~*((uint32_t *) (p + sizeof(uint32_t)));
00879         p += (2 * sizeof(uint32_t)) + (len - sizeof(uint32_t));
00880         p = (char *) DOUBLE_ALIGN(p);
00881       } else {
00882         
00883 
00884         if (needByteSwap) {
00885           ats_swap32((uint32_t *) p); 
00886           ats_swap32((uint32_t *) (p + sizeof(int32_t)));       
00887         }
00888         break;                  
00889       }
00890     }
00891     read.msg.control_data_offset = p - (char *) &read.msg.descriptor[read.msg.count];
00892     read.msg.did_small_control_set_data = 1;
00893   }
00894 
00895   
00896 
00897   if (!read.msg.did_large_control_set_data) {
00898     IncomingControl *ic = incoming_control.head;
00899 
00900     while (ic) {
00901       if (needByteSwap) {
00902         ats_swap32((uint32_t *) ic->data);    
00903       }
00904       cluster_function_index = *((uint32_t *) ic->data);
00905 
00906       if ((cluster_function_index < (uint32_t) SIZE_clusterFunction)
00907           && (cluster_function_index == SET_CHANNEL_DATA_CLUSTER_FUNCTION)) {
00908 
00909         char *p = ic->data;
00910         clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].pfn(this,
00911                                                                (void *) (p + sizeof(int32_t)), ic->len - sizeof(int32_t));
00912 
00913         
00914         if (needByteSwap) {
00915           ats_swap32((uint32_t *) p); 
00916           ats_swap32((uint32_t *) (p + sizeof(int32_t)));       
00917         }
00918         
00919         
00920         *((uint32_t *) p) = ~*((uint32_t *) p);
00921 
00922         ic = (IncomingControl *) ic->link.next;
00923       } else {
00924         
00925         if (needByteSwap) {
00926           ats_swap32((uint32_t *) ic->data);  
00927         }
00928         break;
00929       }
00930     }
00931     read.msg.did_large_control_set_data = 1;
00932   }
00933 }
00934 
00935 void
00936 ClusterHandler::process_small_control_msgs()
00937 {
00938   if (read.msg.did_small_control_msgs) {
00939     return;
00940   } else {
00941     read.msg.did_small_control_msgs = 1;
00942   }
00943 
00944   ink_hrtime now = ink_get_hrtime();
00945   char *p = (char *) &read.msg.descriptor[read.msg.count] + read.msg.control_data_offset;
00946   char *endp = (char *) &read.msg.descriptor[read.msg.count] + read.msg.control_bytes;
00947 
00948   while (p < endp) {
00949 
00950     
00951     
00952 
00953     if (needByteSwap) {
00954       ats_swap32((uint32_t *) p);     
00955       ats_swap32((uint32_t *) (p + sizeof(int32_t)));   
00956     }
00957     int len = *(int32_t *) p;
00958     p += sizeof(int32_t);
00959     uint32_t cluster_function_index = *(uint32_t *) p;
00960     ink_release_assert(cluster_function_index != SET_CHANNEL_DATA_CLUSTER_FUNCTION);
00961 
00962     if (cluster_function_index >= (uint32_t) SIZE_clusterFunction) {
00963       Warning("1Bad cluster function index (small control)");
00964       p += len;
00965 
00966     } else if (clusterFunction[cluster_function_index].ClusterFunc) {
00967 
00968       
00969 
00970       p += sizeof(uint32_t);
00971       clusterFunction[cluster_function_index].pfn(this, p, len - sizeof(int32_t));
00972       p += (len - sizeof(int32_t));
00973 
00974     } else {
00975 
00976       
00977 
00978       IncomingControl *ic = IncomingControl::alloc();
00979       ic->recognized_time = now;
00980       ic->len = len;
00981       ic->alloc_data();
00982       memcpy(ic->data, p, ic->len);
00983       SetHighBit(&ic->len);     
00984       ink_atomiclist_push(&external_incoming_control, (void *) ic);
00985       p += len;
00986     }
00987     p = (char *) DOUBLE_ALIGN(p);
00988   }
00989 }
00990 
00991 void
00992 ClusterHandler::process_large_control_msgs()
00993 {
00994   if (read.msg.did_large_control_msgs) {
00995     return;
00996   } else {
00997     read.msg.did_large_control_msgs = 1;
00998   }
00999 
01000 
01001   
01002   
01003 
01004   IncomingControl *ic = NULL;
01005   uint32_t cluster_function_index;
01006 
01007   while ((ic = incoming_control.dequeue())) {
01008     if (needByteSwap) {
01009       ats_swap32((uint32_t *) ic->data);      
01010     }
01011     cluster_function_index = *((uint32_t *) ic->data);
01012     ink_release_assert(cluster_function_index != SET_CHANNEL_DATA_CLUSTER_FUNCTION);
01013 
01014     if (cluster_function_index == (uint32_t) ~ SET_CHANNEL_DATA_CLUSTER_FUNCTION) {
01015       
01016       
01017 
01018       if (!clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].fMalloced)
01019         ic->freeall();
01020       continue;
01021     }
01022 
01023     if (cluster_function_index >= (uint32_t) SIZE_clusterFunction) {
01024       Warning("Bad cluster function index (large control)");
01025       ic->freeall();
01026 
01027     } else if (clusterFunction[cluster_function_index].ClusterFunc) {
01028       
01029       clusterFunction[cluster_function_index].pfn(this, (void *) (ic->data + sizeof(int32_t)),
01030                                                   ic->len - sizeof(int32_t));
01031 
01032       
01033       if (!clusterFunction[cluster_function_index].fMalloced)
01034         ic->freeall();
01035 
01036     } else {
01037       
01038       ink_atomiclist_push(&external_incoming_control, (void *) ic);
01039     }
01040   }
01041 }
01042 
01043 void
01044 ClusterHandler::process_freespace_msgs()
01045 {
01046   if (read.msg.did_freespace_msgs) {
01047     return;
01048   } else {
01049     read.msg.did_freespace_msgs = 1;
01050   }
01051 
01052   int i;
01053   
01054   
01055   
01056   
01057   for (i = 0; i < read.msg.count; i++) {
01058     if (read.msg.descriptor[i].type == CLUSTER_SEND_FREE && read.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL) {
01059       int c = read.msg.descriptor[i].channel;
01060       if (c < n_channels && VALID_CHANNEL(channels[c]) &&
01061           (CLUSTER_SEQUENCE_NUMBER(channels[c]->token.sequence_number) == read.msg.descriptor[i].sequence_number)) {
01062         
01063         
01064         
01065         
01066         
01067         channels[c]->remote_free = read.msg.descriptor[i].length;
01068         vcs_push(channels[c], VC_CLUSTER_WRITE);
01069       }
01070     }
01071   }
01072 }
01073 
01074 void
01075 ClusterHandler::add_to_byte_bank(ClusterVConnection * vc)
01076 {
01077   ByteBankDescriptor *bb_desc = ByteBankDescriptor::ByteBankDescriptor_alloc(vc->read_block);
01078   bool pending_byte_bank_completion = vc->byte_bank_q.head ? true : false;
01079 
01080   
01081   vc->byte_bank_q.enqueue(bb_desc);
01082 
01083   
01084   if (!pending_byte_bank_completion) {
01085     ClusterVC_remove_read(vc);
01086     delayed_reads.push(vc);
01087     CLUSTER_INCREMENT_DYN_STAT(CLUSTER_LEVEL1_BANK_STAT);
01088   } else {
01089     CLUSTER_INCREMENT_DYN_STAT(CLUSTER_MULTILEVEL_BANK_STAT);
01090   }
01091   vc->read_block = 0;
01092 }
01093 
01094 void
01095 ClusterHandler::update_channels_read()
01096 {
01097   
01098   
01099   
01100   int i;
01101   int len;
01102   
01103   
01104 
01105   process_set_data_msgs();
01106 
01107   
01108   
01109   
01110   for (i = 0; i < read.msg.count; i++) {
01111     if (read.msg.descriptor[i].type == CLUSTER_SEND_DATA && read.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL) {
01112       ClusterVConnection *vc = channels[read.msg.descriptor[i].channel];
01113       if (VALID_CHANNEL(vc) &&
01114           (read.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
01115         vc->last_activity_time = current_time;  
01116 
01117         len = read.msg.descriptor[i].length;
01118         if (!len) {
01119           continue;
01120         }
01121 
01122         if (!vc->pending_remote_fill && vc_ok_read(vc)
01123             && (!((ProxyMutex *) vc->read_locked) || vc->byte_bank_q.head)) {
01124           
01125           
01126           
01127           
01128           
01129           vc->read_block->fill(len);    
01130           add_to_byte_bank(vc);
01131 
01132         } else {
01133           if (vc->pending_remote_fill || ((ProxyMutex *) vc->read_locked && vc_ok_read(vc))) {
01134             vc->read_block->fill(len);  
01135             if (!vc->pending_remote_fill) {
01136               vc->read.vio.buffer.writer()->append_block(vc->read_block->clone());
01137               vc->read_block->consume(len);     
01138             }
01139             complete_channel_read(len, vc);
01140           }
01141         }
01142       }
01143     }
01144   }
01145 
01146   
01147   process_small_control_msgs();
01148   process_large_control_msgs();
01149   process_freespace_msgs();
01150 }
01151 
01152 
01153 
01154 
01155 
01156 
01157 
01158 int
01159 ClusterHandler::process_incoming_callouts(ProxyMutex * m)
01160 {
01161   ProxyMutex *mutex = m;
01162   ink_hrtime now;
01163   
01164   
01165   
01166   
01167   
01168   Queue<IncomingControl> local_incoming_control;
01169   IncomingControl *ic_ext_next;
01170   IncomingControl *ic_ext;
01171 
01172   while (true) {
01173     ic_ext = (IncomingControl *)
01174       ink_atomiclist_popall(&external_incoming_control);
01175     if (!ic_ext)
01176       break;
01177 
01178     while (ic_ext) {
01179       ic_ext_next = (IncomingControl *) ic_ext->link.next;
01180       ic_ext->link.next = NULL;
01181       local_incoming_control.push(ic_ext);
01182       ic_ext = ic_ext_next;
01183     }
01184 
01185     
01186     int small_control_msg;
01187     IncomingControl *ic = NULL;
01188 
01189     while ((ic = local_incoming_control.pop())) {
01190       LOG_EVENT_TIME(ic->recognized_time, inmsg_time_dist, inmsg_events);
01191 
01192       
01193       small_control_msg = IsHighBitSet(&ic->len);
01194       ClearHighBit(&ic->len);   
01195 
01196       if (small_control_msg) {
01197         int len = ic->len;
01198         char *p = ic->data;
01199         uint32_t cluster_function_index = *(uint32_t *) p;
01200         p += sizeof(uint32_t);
01201 
01202         if (cluster_function_index < (uint32_t) SIZE_clusterFunction) {
01203 
01204           
01205 
01206           ink_assert(!clusterFunction[cluster_function_index].ClusterFunc);
01207           clusterFunction[cluster_function_index].pfn(this, p, len - sizeof(int32_t));
01208           now = ink_get_hrtime();
01209           CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_RECV_TIME_STAT, now - ic->recognized_time);
01210         } else {
01211           Warning("2Bad cluster function index (small control)");
01212         }
01213         
01214         if (!clusterFunction[cluster_function_index].fMalloced)
01215           ic->freeall();
01216 
01217       } else {
01218         ink_assert(ic->len > 4);
01219         uint32_t cluster_function_index = *(uint32_t *) ic->data;
01220         bool valid_index;
01221 
01222         if (cluster_function_index < (uint32_t) SIZE_clusterFunction) {
01223           valid_index = true;
01224 
01225           
01226 
01227           ink_assert(!clusterFunction[cluster_function_index].ClusterFunc);
01228           clusterFunction[cluster_function_index].pfn(this, (void *) (ic->data + sizeof(int32_t)),
01229                                                       ic->len - sizeof(int32_t));
01230           now = ink_get_hrtime();
01231           CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_RECV_TIME_STAT, now - ic->recognized_time);
01232         } else {
01233           valid_index = false;
01234           Warning("2Bad cluster function index (large control)");
01235         }
01236         if (valid_index && !clusterFunction[cluster_function_index].fMalloced)
01237           ic->freeall();
01238       }
01239     }
01240   }
01241   return EVENT_CONT;
01242 }
01243 
01244 void
01245 ClusterHandler::update_channels_partial_read()
01246 {
01247   
01248   
01249   
01250   
01251   int i;
01252   int64_t res = read.bytes_xfered;
01253 
01254   if (!res) {
01255     return;
01256   }
01257   ink_assert(res <= read.did);
01258 
01259   
01260 
01261   int64_t iov_done[MAX_TCOUNT];
01262   int64_t total = 0;
01263   int64_t already_read = read.did - read.bytes_xfered;
01264 
01265   for (i = 0; i < read.n_iov; i++) {
01266     ink_release_assert(already_read >= 0);
01267     iov_done[i] = read.iov[i].iov_len;
01268 
01269     
01270     if (already_read) {
01271       already_read -= iov_done[i];
01272       if (already_read < 0) {
01273         iov_done[i] = -already_read;    
01274         already_read = 0;
01275       } else {
01276         iov_done[i] = 0;
01277         continue;
01278       }
01279     }
01280     
01281     res -= iov_done[i];
01282     if (res < 0) {
01283       iov_done[i] += res;
01284       res = 0;
01285     } else {
01286       total += iov_done[i];
01287     }
01288   }
01289   ink_assert(total <= read.did);
01290 
01291   int read_all_large_control_msgs = 0;
01292   
01293   
01294   
01295   for (i = 0; i < read.msg.count; i++) {
01296     if (read.msg.descriptor[i].type == CLUSTER_SEND_DATA && read.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL) {
01297       ClusterVConnection *vc = channels[read.msg.descriptor[i].channel];
01298       if (VALID_CHANNEL(vc) &&
01299           (read.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
01300         if (vc->pending_remote_fill || (vc_ok_read(vc) && (vc->iov_map != CLUSTER_IOV_NONE))) {
01301           vc->last_activity_time = current_time;        
01302           ClusterVConnState *s = &vc->read;
01303           ink_assert(vc->iov_map < read.n_iov);
01304           int len = iov_done[vc->iov_map];
01305 
01306           if (len) {
01307             if (!read_all_large_control_msgs) {
01308               
01309               
01310               
01311               
01312               
01313               
01314               
01315               process_set_data_msgs();
01316               process_small_control_msgs();
01317               process_freespace_msgs();
01318               read_all_large_control_msgs = 1;
01319             }
01320             iov_done[vc->iov_map] = 0;
01321             vc->read_block->fill(len);  
01322 
01323             if (!vc->pending_remote_fill) {
01324               if ((ProxyMutex *) vc->read_locked) {
01325                 Debug("cluster_vc_xfer", "Partial read, credit ch %d %p %d bytes", vc->channel, vc, len);
01326                 s->vio.buffer.writer()->append_block(vc->read_block->clone());
01327                 if (complete_channel_read(len, vc)) {
01328                   vc->read_block->consume(len); 
01329                 }
01330 
01331               } else {
01332                 
01333                 
01334                 
01335 
01336                 if (len == (int) read.msg.descriptor[i].length) {
01337                   Debug("cluster_vc_xfer", "Partial read, byte bank move ch %d %p %d bytes", vc->channel, vc, len);
01338                   add_to_byte_bank(vc);
01339                 }
01340               }
01341             } else {
01342               Debug("cluster_vc_xfer", "Partial remote fill read, credit ch %d %p %d bytes", vc->channel, vc, len);
01343               complete_channel_read(len, vc);
01344             }
01345             read.msg.descriptor[i].length -= len;
01346             ink_assert(((int) read.msg.descriptor[i].length) >= 0);
01347           }
01348           Debug(CL_TRACE, "partial_channel_read chan=%d len=%d", vc->channel, len);
01349         }
01350       }
01351     }
01352   }
01353 }
01354 
01355 bool ClusterHandler::complete_channel_read(int len, ClusterVConnection * vc)
01356 {
01357   
01358   
01359   
01360   
01361   ClusterVConnState *s = &vc->read;
01362 
01363   if (vc->pending_remote_fill) {
01364     Debug(CL_TRACE, "complete_channel_read chan=%d len=%d", vc->channel, len);
01365     vc->initial_data_bytes += len;
01366     ++vc->pending_remote_fill;  
01367     return (vc->closed ? false : true);
01368   }
01369 
01370   if (vc->closed)
01371     return false;               
01372 
01373   ink_assert((ProxyMutex *) s->vio.mutex == (ProxyMutex *) s->vio._cont->mutex);
01374 
01375   Debug("cluster_vc_xfer", "Complete read, credit ch %d %p %d bytes", vc->channel, vc, len);
01376   s->vio.ndone += len;
01377 
01378   if (s->vio.ntodo() <= 0) {
01379     s->enabled = 0;
01380     if (cluster_signal_and_update_locked(VC_EVENT_READ_COMPLETE, vc, s)
01381         == EVENT_DONE)
01382       return false;
01383   } else {
01384     if (cluster_signal_and_update_locked(VC_EVENT_READ_READY, vc, s) == EVENT_DONE)
01385       return false;
01386     if (s->vio.ntodo() <= 0)
01387       s->enabled = 0;
01388   }
01389 
01390   vcs_push(vc, VC_CLUSTER_READ);
01391   return true;
01392 }
01393 
01394 void
01395 ClusterHandler::finish_delayed_reads()
01396 {
01397   
01398   
01399   
01400   
01401   
01402   ClusterVConnection *vc = NULL;
01403   DLL<ClusterVConnectionBase> l;
01404   while ((vc = (ClusterVConnection *) delayed_reads.pop())) {
01405     MUTEX_TRY_LOCK_SPIN(lock, vc->read.vio.mutex, thread, READ_LOCK_SPIN_COUNT);
01406     if (lock) {
01407       if (vc_ok_read(vc)) {
01408         ink_assert(!vc->read.queue);
01409         ByteBankDescriptor *d;
01410 
01411         while ((d = vc->byte_bank_q.dequeue())) {
01412           if (vc->read.queue) {
01413             
01414             
01415             ClusterVC_remove_read(vc);
01416           }
01417           Debug("cluster_vc_xfer",
01418                 "Delayed read, credit ch %d %p %" PRId64" bytes", vc->channel, vc, d->get_block()->read_avail());
01419           vc->read.vio.buffer.writer()->append_block(d->get_block());
01420 
01421           if (complete_channel_read(d->get_block()->read_avail(), vc)) {
01422             ByteBankDescriptor::ByteBankDescriptor_free(d);
01423           } else {
01424             ByteBankDescriptor::ByteBankDescriptor_free(d);
01425             break;
01426           }
01427         }
01428       }
01429     } else
01430       l.push(vc);
01431   }
01432   delayed_reads = l;
01433 }
01434 
01435 void
01436 ClusterHandler::update_channels_written()
01437 {
01438   
01439   
01440   
01441   
01442   
01443   
01444   
01445   ink_hrtime now;
01446   for (int i = 0; i < write.msg.count; i++) {
01447     if (write.msg.descriptor[i].type == CLUSTER_SEND_DATA) {
01448       if (write.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL) {
01449         ClusterVConnection *vc = channels[write.msg.descriptor[i].channel];
01450         if (VALID_CHANNEL(vc) &&
01451             (write.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
01452 
01453           if (vc->pending_remote_fill) {
01454             Debug(CL_TRACE,
01455                   "update_channels_written chan=%d seqno=%d len=%d",
01456                   write.msg.descriptor[i].channel,
01457                   write.msg.descriptor[i].sequence_number, write.msg.descriptor[i].length);
01458             vc->pending_remote_fill = 0;
01459             vc->remote_write_block = 0; 
01460             continue;           
01461           }
01462 
01463           ClusterVConnState *s = &vc->write;
01464           int len = write.msg.descriptor[i].length;
01465           vc->write_bytes_in_transit -= len;
01466           ink_release_assert(vc->write_bytes_in_transit >= 0);
01467           Debug(CL_PROTO, "(%d) data sent %d %" PRId64, write.msg.descriptor[i].channel, len, s->vio.ndone);
01468 
01469           if (vc_ok_write(vc)) {
01470             vc->last_activity_time = current_time;      
01471             int64_t ndone = vc->was_closed()? 0 : s->vio.ndone;
01472 
01473             if (ndone < vc->remote_free) {
01474               vcs_push(vc, VC_CLUSTER_WRITE);
01475             }
01476           }
01477         }
01478       } else {
01479         
01480         
01481         
01482         OutgoingControl *oc = write.msg.outgoing_control.dequeue();
01483         oc->free_data();
01484         oc->mutex = NULL;
01485         now = ink_get_hrtime();
01486         CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_SEND_TIME_STAT, now - oc->submit_time);
01487         LOG_EVENT_TIME(oc->submit_time, cluster_send_time_dist, cluster_send_events);
01488         oc->freeall();
01489       }
01490     }
01491   }
01492   
01493   
01494   
01495   
01496   
01497   invoke_remote_data_args *args;
01498   OutgoingControl *hdr_oc;
01499   while ((hdr_oc = write.msg.outgoing_callout.dequeue())) {
01500     args = (invoke_remote_data_args *) (hdr_oc->data + sizeof(int32_t));
01501     ink_assert(args->magicno == invoke_remote_data_args::MagicNo);
01502 
01503     
01504     args->data_oc->free_data(); 
01505     args->data_oc->mutex = NULL;
01506     args->data_oc->freeall();
01507 
01508     
01509     hdr_oc->free_data();
01510     hdr_oc->mutex = NULL;
01511     now = ink_get_hrtime();
01512     CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_SEND_TIME_STAT, now - hdr_oc->submit_time);
01513     LOG_EVENT_TIME(hdr_oc->submit_time, cluster_send_time_dist, cluster_send_events);
01514     hdr_oc->freeall();
01515   }
01516 }
01517 
01518 int
01519 ClusterHandler::build_write_descriptors()
01520 {
01521   
01522   
01523   
01524   
01525   
01526   int count_bucket = cur_vcs;
01527   int tcount = write.msg.count + 2;     
01528   int write_descriptors_built = 0;
01529   int valid;
01530   int list_len = 0;
01531   ClusterVConnection *vc, *vc_next;
01532 
01533   
01534   
01535   
01536   vc = (ClusterVConnection *)ink_atomiclist_popall(&write_vcs_ready);
01537   while (vc) {
01538     enter_exit(&cls_build_writes_entered, &cls_writes_exited);
01539     vc_next = (ClusterVConnection *) vc->ready_alink.next;
01540     vc->ready_alink.next = NULL;
01541     list_len++;
01542     if (VC_CLUSTER_CLOSED == vc->type) {
01543       vc->in_vcs = false;
01544       vc->type = VC_NULL;
01545       clusterVCAllocator.free(vc);
01546       vc = vc_next;
01547       continue;
01548     }
01549 
01550     if (tcount >= MAX_TCOUNT) {
01551       vcs_push(vc, VC_CLUSTER_WRITE);
01552     } else {
01553       vc->in_vcs = false;
01554       cluster_reschedule_offset(this, vc, &vc->write, 0);
01555       tcount++;
01556     }
01557     vc = vc_next;
01558   }
01559   if (list_len) {
01560     CLUSTER_SUM_DYN_STAT(CLUSTER_VC_WRITE_LIST_LEN_STAT, list_len);
01561   }
01562 
01563   tcount = write.msg.count + 2;
01564   vc_next = (ClusterVConnection *) write_vcs[count_bucket].head;
01565   while (vc_next) {
01566     vc = vc_next;
01567     vc_next = (ClusterVConnection *) vc->write.link.next;
01568 
01569     if (VC_CLUSTER_CLOSED == vc->type) {
01570       vc->type = VC_NULL;
01571       clusterVCAllocator.free(vc);
01572       continue;
01573     }
01574 
01575     if (tcount >= MAX_TCOUNT)
01576       break;
01577 
01578     valid = valid_for_data_write(vc);
01579     if (-1 == valid) {
01580       vcs_push(vc, VC_CLUSTER_WRITE);
01581     } else if (valid) {
01582       ink_assert(vc->write_locked);     
01583       if ((vc->remote_free > (vc->write.vio.ndone - vc->write_list_bytes))
01584           && channels[vc->channel] == vc) {
01585 
01586         ink_assert(vc->write_list && vc->write_list_bytes);
01587 
01588         int d = write.msg.count;
01589         write.msg.descriptor[d].type = CLUSTER_SEND_DATA;
01590         write.msg.descriptor[d].channel = vc->channel;
01591         write.msg.descriptor[d].sequence_number = vc->token.sequence_number;
01592         int s = vc->write_list_bytes;
01593         ink_release_assert(s <= MAX_CLUSTER_SEND_LENGTH);
01594 
01595         
01596         if ((vc->write.vio.ndone - s) > vc->write.vio.nbytes)
01597           s = vc->write.vio.nbytes - (vc->write.vio.ndone - s);
01598 
01599         if ((vc->write.vio.ndone - s) > vc->remote_free)
01600           s = vc->remote_free - (vc->write.vio.ndone - s);
01601         write.msg.descriptor[d].length = s;
01602         write.msg.count++;
01603         tcount++;
01604         write_descriptors_built++;
01605 
01606 #ifdef CLUSTER_STATS
01607         _vc_writes++;
01608         _vc_write_bytes += s;
01609 #endif
01610 
01611       } else {
01612         MUTEX_UNTAKE_LOCK(vc->write_locked, thread);
01613         vc->write_locked = NULL;
01614 
01615         if (channels[vc->channel] == vc)
01616           CLUSTER_INCREMENT_DYN_STAT(CLUSTER_NO_REMOTE_SPACE_STAT);
01617       }
01618     }
01619   }
01620   return (write_descriptors_built);
01621 }
01622 
01623 int
01624 ClusterHandler::build_freespace_descriptors()
01625 {
01626   
01627   
01628   
01629   
01630   
01631   
01632   int count_bucket = cur_vcs;
01633   int tcount = write.msg.count + 2;     
01634   int freespace_descriptors_built = 0;
01635   int s = 0;
01636   int list_len = 0;
01637   ClusterVConnection *vc, *vc_next;
01638 
01639   
01640   
01641   
01642   vc = (ClusterVConnection *)ink_atomiclist_popall(&read_vcs_ready);
01643   while (vc) {
01644     enter_exit(&cls_build_reads_entered, &cls_reads_exited);
01645     vc_next = (ClusterVConnection *) vc->ready_alink.next;
01646     vc->ready_alink.next = NULL;
01647     list_len++;
01648     if (VC_CLUSTER_CLOSED == vc->type) {
01649       vc->in_vcs = false;
01650       vc->type = VC_NULL;
01651       clusterVCAllocator.free(vc);
01652       vc = vc_next;
01653       continue;
01654     }
01655 
01656     if (tcount >= MAX_TCOUNT) {
01657       vcs_push(vc, VC_CLUSTER_READ);
01658     } else {
01659       vc->in_vcs = false;
01660       cluster_reschedule_offset(this, vc, &vc->read, 0);
01661       tcount++;
01662     }
01663     vc = vc_next;
01664   }
01665   if (list_len) {
01666     CLUSTER_SUM_DYN_STAT(CLUSTER_VC_READ_LIST_LEN_STAT, list_len);
01667   }
01668 
01669   tcount = write.msg.count + 2;
01670   vc_next = (ClusterVConnection *) read_vcs[count_bucket].head;
01671   while (vc_next) {
01672     vc = vc_next;
01673     vc_next = (ClusterVConnection *) vc->read.link.next;
01674 
01675     if (VC_CLUSTER_CLOSED == vc->type) {
01676       vc->type = VC_NULL;
01677       clusterVCAllocator.free(vc);
01678       continue;
01679     }
01680 
01681     if (tcount >= MAX_TCOUNT)
01682       break;
01683 
01684     s = valid_for_freespace_write(vc);
01685     if (-1 == s) {
01686       vcs_push(vc, VC_CLUSTER_READ);
01687     } else if (s) {
01688       if (vc_ok_read(vc) && channels[vc->channel] == vc) {
01689         
01690         int d = write.msg.count;
01691         write.msg.descriptor[d].type = CLUSTER_SEND_FREE;
01692         write.msg.descriptor[d].channel = vc->channel;
01693         write.msg.descriptor[d].sequence_number = vc->token.sequence_number;
01694 
01695         ink_assert(s > 0);
01696         write.msg.descriptor[d].length = s;
01697         vc->last_local_free = s;
01698         Debug(CL_PROTO, "(%d) free space priority %d", vc->channel, vc->read.priority);
01699         write.msg.count++;
01700         tcount++;
01701         freespace_descriptors_built++;
01702       }
01703     }
01704   }
01705   return (freespace_descriptors_built);
01706 }
01707 
01708 int
01709 ClusterHandler::build_controlmsg_descriptors()
01710 {
01711   
01712   
01713   
01714   
01715   
01716   
01717   int tcount = write.msg.count + 2;     
01718   int control_msgs_built = 0;
01719   bool compound_msg;            
01720   
01721   
01722   
01723   OutgoingControl *c = NULL;
01724   int control_bytes = 0;
01725   int q = 0;
01726 
01727   while (tcount < (MAX_TCOUNT - 1)) {   
01728     c = outgoing_control[q].pop();
01729     if (!c) {
01730       
01731       OutgoingControl *c_next;
01732       c = (OutgoingControl *) ink_atomiclist_popall(&outgoing_control_al[q]);
01733       if (c == 0) {
01734         if (++q >= CLUSTER_CMSG_QUEUES) {
01735           break;
01736         } else {
01737           continue;
01738         }
01739       }
01740       while (c) {
01741         c_next = (OutgoingControl *) c->link.next;
01742         c->link.next = NULL;
01743         outgoing_control[q].push(c);
01744         c = c_next;
01745       }
01746       continue;
01747 
01748     } else {
01749       compound_msg = (*((int32_t *) c->data) == -1);      
01750     }
01751     if (!compound_msg && c->len <= SMALL_CONTROL_MESSAGE &&
01752         
01753         !clusterFunction[*(int32_t *) c->data].fMalloced && control_bytes + c->len + sizeof(int32_t) * 2 + 7 < CONTROL_DATA) {
01754       write.msg.outgoing_small_control.enqueue(c);
01755       control_bytes += c->len + sizeof(int32_t) * 2 + 7;  
01756       control_msgs_built++;
01757 
01758       if (clusterFunction[*(int32_t *) c->data].post_pfn) {
01759         clusterFunction[*(int32_t *) c->data].post_pfn(this, c->data + sizeof(int32_t), c->len);
01760       }
01761       continue;
01762     }
01763     
01764     
01765     
01766     if (compound_msg) {
01767       
01768       invoke_remote_data_args *cmhdr = (invoke_remote_data_args *) (c->data + sizeof(int32_t));
01769       OutgoingControl *oc_header = c;
01770       OutgoingControl *oc_msg = cmhdr->msg_oc;
01771       OutgoingControl *oc_data = cmhdr->data_oc;
01772 
01773       ink_assert(cmhdr->magicno == invoke_remote_data_args::MagicNo);
01774       
01775       
01776       
01777       
01778       
01779       
01780       int d;
01781       d = write.msg.count;
01782       write.msg.descriptor[d].type = CLUSTER_SEND_DATA;
01783       write.msg.descriptor[d].channel = cmhdr->dest_channel;
01784       write.msg.descriptor[d].length = oc_data->len;
01785       write.msg.descriptor[d].sequence_number = cmhdr->token.sequence_number;
01786 
01787 #ifdef CLUSTER_STATS
01788       _vc_write_bytes += oc_data->len;
01789 #endif
01790 
01791       
01792       ClusterVConnection *vc = channels[cmhdr->dest_channel];
01793 
01794       if (VALID_CHANNEL(vc) && vc->pending_remote_fill) {
01795         ink_release_assert(!vc->remote_write_block);
01796         vc->remote_write_block = oc_data->get_block();
01797 
01798         
01799         write.msg.count++;
01800         tcount++;
01801         control_msgs_built++;
01802         d = write.msg.count;
01803         write.msg.outgoing_control.enqueue(oc_msg);
01804         write.msg.descriptor[d].type = CLUSTER_SEND_DATA;
01805         write.msg.descriptor[d].channel = CLUSTER_CONTROL_CHANNEL;
01806         write.msg.descriptor[d].length = oc_msg->len;
01807 
01808 #ifdef CLUSTER_STATS
01809         _control_write_bytes += oc_msg->len;
01810 #endif
01811 
01812         write.msg.count++;
01813         tcount++;
01814         control_msgs_built++;
01815 
01816         
01817         write.msg.outgoing_callout.enqueue(oc_header);
01818 
01819       } else {
01820         
01821         Warning("Pending remote read fill aborted chan=%d len=%d", cmhdr->dest_channel, oc_data->len);
01822 
01823         
01824         oc_header->free_data();
01825         oc_header->mutex = NULL;
01826         oc_header->freeall();
01827 
01828         
01829         oc_msg->free_data();
01830         oc_msg->mutex = 0;
01831         oc_msg->freeall();
01832 
01833         
01834         oc_data->free_data();   
01835         oc_data->mutex = 0;
01836         oc_data->freeall();
01837       }
01838 
01839     } else {
01840       write.msg.outgoing_control.enqueue(c);
01841 
01842       int d = write.msg.count;
01843       write.msg.descriptor[d].type = CLUSTER_SEND_DATA;
01844       write.msg.descriptor[d].channel = CLUSTER_CONTROL_CHANNEL;
01845       write.msg.descriptor[d].length = c->len;
01846 
01847 #ifdef CLUSTER_STATS
01848       _control_write_bytes += c->len;
01849 #endif
01850 
01851       write.msg.count++;
01852       tcount++;
01853       control_msgs_built++;
01854 
01855       if (clusterFunction[*(int32_t *) c->data].post_pfn) {
01856         clusterFunction[*(int32_t *) c->data].post_pfn(this, c->data + sizeof(int32_t), c->len);
01857       }
01858     }
01859   }
01860   return control_msgs_built;
01861 }
01862 
01863 int
01864 ClusterHandler::add_small_controlmsg_descriptors()
01865 {
01866   
01867   
01868   
01869   char *p = (char *) &write.msg.descriptor[write.msg.count];
01870   OutgoingControl *c = NULL;
01871 
01872   while ((c = write.msg.outgoing_small_control.dequeue())) {
01873     *(int32_t *) p = c->len;
01874     p += sizeof(int32_t);
01875     memcpy(p, c->data, c->len);
01876     c->free_data();
01877     c->mutex = NULL;
01878     p += c->len;
01879     ink_hrtime now = ink_get_hrtime();
01880     CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_SEND_TIME_STAT, now - c->submit_time);
01881     LOG_EVENT_TIME(c->submit_time, cluster_send_time_dist, cluster_send_events);
01882     c->freeall();
01883     p = (char *) DOUBLE_ALIGN(p);
01884   }
01885   write.msg.control_bytes = p - (char *) &write.msg.descriptor[write.msg.count];
01886 
01887 #ifdef CLUSTER_STATS
01888   _control_write_bytes += write.msg.control_bytes;
01889 #endif
01890 
01891   return 1;
01892 }
01893 
01894 struct DestructorLock
01895 {
01896   DestructorLock(EThread * thread)
01897   {
01898     have_lock = false;
01899     t = thread;
01900   }
01901    ~DestructorLock()
01902   {
01903     if (have_lock && m) {
01904       Mutex_unlock(m, t);
01905     }
01906     m = 0;
01907   }
01908   EThread *t;
01909   Ptr<ProxyMutex> m;
01910   bool have_lock;
01911 };
01912 
01913 int
01914 ClusterHandler::valid_for_data_write(ClusterVConnection * vc)
01915 {
01916   
01917   
01918   
01919   ClusterVConnState *s = &vc->write;
01920 
01921   ink_assert(!on_stolen_thread);
01922   ink_assert((ProxyMutex *) ! vc->write_locked);
01923 
01924   
01925   
01926   
01927   DestructorLock lock(thread);
01928 
01929 retry:
01930   if ((lock.m = s->vio.mutex)) {
01931     lock.have_lock = MUTEX_TAKE_TRY_LOCK_FOR_SPIN(lock.m, thread, s->vio._cont, WRITE_LOCK_SPIN_COUNT);
01932     if (!lock.have_lock) {
01933       CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_WRITE_LOCKED_STAT);
01934 
01935 #ifdef CLUSTER_STATS
01936       _dw_missed_lock++;
01937 #endif
01938       return -1;
01939     }
01940   }
01941 
01942   if (vc->was_closed()) {
01943     if (vc->schedule_write()) {
01944 #ifdef CLUSTER_TOMCAT
01945       ink_assert(lock.m);
01946 #endif
01947       vc->write_locked = lock.m;
01948       lock.m = 0;
01949       lock.have_lock = false;
01950       return 1;
01951     } else {
01952       if (!vc->write_bytes_in_transit) {
01953         close_ClusterVConnection(vc);
01954       }
01955       return 0;
01956     }
01957   }
01958 
01959   if (!s->enabled && !vc->was_remote_closed()) {
01960 #ifdef CLUSTER_STATS
01961     _dw_not_enabled++;
01962 #endif
01963     return 0;
01964   }
01965 
01966   if (vc->pending_remote_fill) {
01967     if (vc->was_remote_closed())
01968       close_ClusterVConnection(vc);
01969 
01970 #ifdef CLUSTER_STATS
01971     _dw_wait_remote_fill++;
01972 #endif
01973     return 0;
01974   }
01975 
01976   if (!lock.have_lock || !s->vio.mutex || !s->vio._cont) {
01977     if (!lock.have_lock && s->vio.mutex && s->vio._cont) {
01978       goto retry;
01979     } else {
01980       
01981 #ifdef CLUSTER_STATS
01982       _dw_no_active_vio++;
01983 #endif
01984       return 0;
01985     }
01986   }
01987   
01988   
01989   
01990   if (vc->was_remote_closed()) {
01991     if (!vc->write_bytes_in_transit && !vc->schedule_write()) {
01992       remote_close(vc, s);
01993     }
01994     return 0;
01995   }
01996   
01997   
01998   
01999   if (!s->enabled || s->vio.op != VIO::WRITE) {
02000     s->enabled = 0;
02001 #ifdef CLUSTER_STATS
02002     _dw_not_enabled_or_no_write++;
02003 #endif
02004     return 0;
02005   }
02006   
02007   
02008   
02009   int set_data_msgs_pending = vc->n_set_data_msgs;
02010   if (set_data_msgs_pending || (vc->remote_free <= (s->vio.ndone - vc->write_list_bytes))) {
02011     if (set_data_msgs_pending) {
02012       CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_WRITE_STALL_STAT);
02013 
02014 #ifdef CLUSTER_STATS
02015       _dw_set_data_pending++;
02016 #endif
02017 
02018     } else {
02019 #ifdef CLUSTER_STATS
02020       _dw_no_free_space++;
02021 #endif
02022     }
02023     return 0;
02024   }
02025   
02026   
02027   
02028   MIOBufferAccessor & buf = s->vio.buffer;
02029 
02030   int64_t towrite = buf.reader()->read_avail();
02031   int64_t ntodo = s->vio.ntodo();
02032   bool write_vc_signal = false;
02033 
02034   if (towrite > ntodo)
02035     towrite = ntodo;
02036 
02037   ink_assert(ntodo >= 0);
02038   if (ntodo <= 0) {
02039     cluster_signal_and_update(VC_EVENT_WRITE_COMPLETE, vc, s);
02040     return 0;
02041   }
02042   if (buf.writer()->write_avail() && towrite != ntodo) {
02043     write_vc_signal = true;
02044     if (cluster_signal_and_update(VC_EVENT_WRITE_READY, vc, s) == EVENT_DONE)
02045       return 0;
02046     ink_assert(s->vio.ntodo() >= 0);
02047     if (s->vio.ntodo() <= 0) {
02048       cluster_signal_and_update(VC_EVENT_WRITE_COMPLETE, vc, s);
02049       return 0;
02050     }
02051   }
02052   
02053   
02054 
02055   Ptr<IOBufferBlock> b_list;
02056   IOBufferBlock *b_tail;
02057   int bytes_to_fill;
02058   int consume_bytes;
02059 
02060   bytes_to_fill = DEFAULT_MAX_BUFFER_SIZE - vc->write_list_bytes;
02061 
02062   if (towrite && bytes_to_fill) {
02063     consume_bytes = (towrite > bytes_to_fill) ? bytes_to_fill : towrite;
02064     b_list = clone_IOBufferBlockList(s->vio.buffer.reader()->block,
02065                                      s->vio.buffer.reader()->start_offset, consume_bytes, &b_tail);
02066     ink_assert(b_tail);
02067 
02068     
02069 
02070     if (vc->write_list_tail) {
02071       vc->write_list_tail->next = b_list;
02072     } else {
02073       vc->write_list = b_list;
02074     }
02075     vc->write_list_tail = b_tail;
02076     vc->write_list_bytes += consume_bytes;
02077     ink_assert(bytes_IOBufferBlockList(vc->write_list, 1) == vc->write_list_bytes);
02078 
02079     
02080 
02081     (s->vio.buffer.reader())->consume(consume_bytes);
02082     s->vio.ndone += consume_bytes;
02083     if (s->vio.ntodo() <= 0) {
02084       cluster_signal_and_update_locked(VC_EVENT_WRITE_COMPLETE, vc, s);
02085     }
02086   }
02087 
02088   if (vc->schedule_write()) {
02089 #ifdef CLUSTER_TOMCAT
02090     ink_assert(s->vio.mutex);
02091 #endif
02092     vc->write_locked = lock.m;
02093     lock.m = 0;
02094     lock.have_lock = false;
02095     return 1;
02096   } else {
02097     if (!write_vc_signal && buf.writer()->write_avail() && towrite != ntodo)
02098        cluster_signal_and_update(VC_EVENT_WRITE_READY, vc, s);
02099     return 0;
02100   }
02101 }
02102 
02103 int
02104 ClusterHandler::valid_for_freespace_write(ClusterVConnection * vc)
02105 {
02106   
02107   
02108   
02109   ClusterVConnState *s = &vc->read;
02110 
02111   ink_assert(!on_stolen_thread);
02112 
02113   
02114   
02115   
02116   DestructorLock lock(thread);
02117 
02118 retry:
02119   if ((lock.m = s->vio.mutex)) {
02120     lock.have_lock = MUTEX_TAKE_TRY_LOCK_FOR_SPIN(lock.m, thread, s->vio._cont, READ_LOCK_SPIN_COUNT);
02121 
02122     if (!lock.have_lock) {
02123       CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_READ_LOCKED_STAT);
02124 
02125 #ifdef CLUSTER_STATS
02126       _fw_missed_lock++;
02127 #endif
02128       return -1;
02129     }
02130   }
02131   if (vc->was_closed()) {
02132     if (!vc->write_bytes_in_transit && !vc->schedule_write()) {
02133       close_ClusterVConnection(vc);
02134     } 
02135     return 0;
02136   }
02137 
02138   if (!s->enabled && !vc->was_remote_closed()) {
02139 #ifdef CLUSTER_STATS
02140     _fw_not_enabled++;
02141 #endif
02142     return 0;
02143   }
02144 
02145   if (vc->pending_remote_fill) {
02146     if (vc->was_remote_closed())
02147       close_ClusterVConnection(vc);
02148 
02149 #ifdef CLUSTER_STATS
02150     _fw_wait_remote_fill++;
02151 #endif
02152     return 0;
02153   }
02154 
02155   if (!lock.have_lock || !s->vio.mutex || !s->vio._cont) {
02156     if (!lock.have_lock && s->vio.mutex && s->vio._cont) {
02157       goto retry;
02158     } else {
02159       
02160 #ifdef CLUSTER_STATS
02161       _fw_no_active_vio++;
02162 #endif
02163       return 0;
02164     }
02165   }
02166   
02167   
02168   
02169   if (vc->was_remote_closed()) {
02170     if (vc->write_bytes_in_transit || vc->schedule_write()) {
02171       
02172       return 0;
02173     }
02174     remote_close(vc, s);
02175     return 0;
02176   }
02177   
02178   
02179   
02180   if (!s->enabled || s->vio.op != VIO::READ) {
02181 #ifdef CLUSTER_STATS
02182     _fw_not_enabled_or_no_read++;
02183 #endif
02184     return 0;
02185   }
02186 
02187   int64_t ntodo = s->vio.ntodo();
02188   ink_assert(ntodo >= 0);
02189 
02190   if (ntodo <= 0) {
02191     cluster_signal_and_update(VC_EVENT_READ_COMPLETE, vc, s);
02192     return 0;
02193   }
02194 
02195   int64_t bytes_to_move = vc->initial_data_bytes;
02196   if (vc->read_block && bytes_to_move) {
02197 
02198     
02199 
02200     if (ntodo >= bytes_to_move) {
02201       Debug("cluster_vc_xfer", "finish initial data push ch %d bytes %" PRId64, vc->channel, vc->read_block->read_avail());
02202 
02203       s->vio.buffer.writer()->append_block(vc->read_block->clone());
02204       vc->read_block = 0;
02205 
02206     } else {
02207       bytes_to_move = ntodo;
02208 
02209       Debug("cluster_vc_xfer", "initial data push ch %d bytes %" PRId64, vc->channel, bytes_to_move);
02210 
02211       
02212 
02213       IOBufferBlock *b, *btail;
02214       b = clone_IOBufferBlockList(vc->read_block, 0, bytes_to_move, &btail);
02215       s->vio.buffer.writer()->append_block(b);
02216       vc->read_block->consume(bytes_to_move);
02217     }
02218     s->vio.ndone += bytes_to_move;
02219     vc->initial_data_bytes -= bytes_to_move;
02220 
02221     if (s->vio.ntodo() <= 0) {
02222       s->enabled = 0;
02223       cluster_signal_and_update_locked(VC_EVENT_READ_COMPLETE, vc, s);
02224       return 0;
02225 
02226     } else {
02227       if (vc->have_all_data) {
02228         if (!vc->read_block) {
02229           s->enabled = 0;
02230           cluster_signal_and_update(VC_EVENT_EOS, vc, s);
02231           return 0;
02232         }
02233       }
02234       if (cluster_signal_and_update_locked(VC_EVENT_READ_READY, vc, s)
02235           == EVENT_DONE)
02236         return false;
02237 
02238       if (s->vio.ntodo() <= 0)
02239         s->enabled = 0;
02240 
02241       if (vc->initial_data_bytes)
02242         return 0;
02243     }
02244   }
02245   
02246   
02247   
02248 
02249   int64_t nextfree = vc->read.vio.ndone;
02250 
02251   nextfree = (nextfree + DEFAULT_MAX_BUFFER_SIZE - 1) / DEFAULT_MAX_BUFFER_SIZE;
02252   nextfree *= DEFAULT_MAX_BUFFER_SIZE;
02253 
02254   if (nextfree >= (vc->last_local_free / 2)) {
02255     nextfree = vc->last_local_free + (8 * DEFAULT_MAX_BUFFER_SIZE);
02256   }
02257 
02258   if ((vc->last_local_free == 0) || (nextfree >= vc->last_local_free)) {
02259     Debug(CL_PROTO, "(%d) update freespace %" PRId64, vc->channel, nextfree);
02260     
02261     
02262     
02263     return nextfree;
02264 
02265   } else {
02266     
02267     return 0;
02268   }
02269 }
02270 
02271 void
02272 ClusterHandler::vcs_push(ClusterVConnection * vc, int type)
02273 {
02274   if (vc->type <= VC_CLUSTER)
02275     vc->type = type;
02276 
02277   while ((vc->type > VC_CLUSTER) && !vc->in_vcs && ink_atomic_cas(pvint32(&vc->in_vcs), 0, 1)) {
02278     if (vc->type == VC_CLUSTER_READ)
02279       ink_atomiclist_push(&vc->ch->read_vcs_ready, (void *)vc);
02280     else
02281       ink_atomiclist_push(&vc->ch->write_vcs_ready, (void *)vc);
02282     return;
02283   }
02284 }
02285 
02286 int
02287 ClusterHandler::remote_close(ClusterVConnection * vc, ClusterVConnState * ns)
02288 {
02289   if (ns->vio.op != VIO::NONE && !vc->closed) {
02290     ns->enabled = 0;
02291     if (vc->remote_closed > 0) {
02292       if (ns->vio.op == VIO::READ) {
02293         if (ns->vio.nbytes == ns->vio.ndone) {
02294           return cluster_signal_and_update(VC_EVENT_READ_COMPLETE, vc, ns);
02295         } else {
02296           return cluster_signal_and_update(VC_EVENT_EOS, vc, ns);
02297         }
02298       } else {
02299         return cluster_signal_and_update(VC_EVENT_EOS, vc, ns);
02300       }
02301     } else {
02302       return cluster_signal_error_and_update(vc, ns, vc->remote_lerrno);
02303     }
02304   }
02305   return EVENT_CONT;
02306 }
02307 
02308 void
02309 ClusterHandler::steal_thread(EThread * t)
02310 {
02311   
02312   
02313   
02314   
02315   if (t != thread &&            
02316       write.to_do <= 0 &&       
02317       
02318       !write.msg.count) {
02319     mainClusterEvent(CLUSTER_EVENT_STEAL_THREAD, (Event *) t);
02320   }
02321 }
02322 
02323 void
02324 ClusterHandler::free_locks(bool read_flag, int i)
02325 {
02326   
02327   
02328   
02329   if (i == CLUSTER_FREE_ALL_LOCKS) {
02330     if (read_flag) {
02331       i = (read.msg.state >= 2 ? read.msg.count : 0);
02332     } else {
02333       i = write.msg.count;
02334     }
02335   }
02336   ClusterState & s = (read_flag ? read : write);
02337   for (int j = 0; j < i; j++) {
02338     if (s.msg.descriptor[j].type == CLUSTER_SEND_DATA && s.msg.descriptor[j].channel != CLUSTER_CONTROL_CHANNEL) {
02339       ClusterVConnection *vc = channels[s.msg.descriptor[j].channel];
02340       if (VALID_CHANNEL(vc)) {
02341         if (read_flag) {
02342           if ((ProxyMutex *) vc->read_locked) {
02343             MUTEX_UNTAKE_LOCK(vc->read.vio.mutex, thread);
02344             vc->read_locked = NULL;
02345           }
02346         } else {
02347           if ((ProxyMutex *) vc->write_locked) {
02348             MUTEX_UNTAKE_LOCK(vc->write_locked, thread);
02349             vc->write_locked = NULL;
02350           }
02351         }
02352       }
02353     } else if (!read_flag &&
02354                s.msg.descriptor[j].type == CLUSTER_SEND_FREE &&
02355                s.msg.descriptor[j].channel != CLUSTER_CONTROL_CHANNEL) {
02356       ClusterVConnection *vc = channels[s.msg.descriptor[j].channel];
02357       if (VALID_CHANNEL(vc)) {
02358         if ((ProxyMutex *) vc->read_locked) {
02359           MUTEX_UNTAKE_LOCK(vc->read_locked, thread);
02360           vc->read_locked = NULL;
02361         }
02362       }
02363     }
02364   }
02365 }
02366 
02367 #ifdef CLUSTER_IMMEDIATE_NETIO
02368 void
02369 ClusterHandler::build_poll(bool next)
02370 {
02371   Pollfd *pfd;
02372   if (next) {
02373     pfd = thread->nextPollDescriptor->alloc();
02374     pfd->fd = net_vc->get_socket();
02375     ifd = pfd - thread->nextPollDescriptor->pfd;
02376   } else {
02377     pfd = thread->pollDescriptor->alloc();
02378     pfd->fd = net_vc->get_socket();
02379     ifd = pfd - thread->pollDescriptor->pfd;
02380   }
02381   pfd->events = POLLHUP;
02382   if (next) {
02383     if (read.to_do)
02384       pfd->events |= POLLIN;
02385     if (write.to_do)
02386       pfd->events |= POLLOUT;
02387   } else {
02388     
02389     pfd->events = POLLIN | POLLOUT;
02390     
02391     pfd->revents = POLLIN | POLLOUT;
02392   }
02393 }
02394 #endif // CLUSTER_IMMEDIATE_NETIO
02395 
02396 extern int CacheClusterMonitorEnabled;
02397 extern int CacheClusterMonitorIntervalSecs;
02398 
02399 
02400 
02401 
02402 
02403 int
02404 ClusterHandler::mainClusterEvent(int event, Event * e)
02405 {
02406   
02407   current_time = ink_get_hrtime();
02408 
02409   if (CacheClusterMonitorEnabled) {
02410     if ((current_time - last_trace_dump) > HRTIME_SECONDS(CacheClusterMonitorIntervalSecs)) {
02411       last_trace_dump = current_time;
02412       dump_internal_data();
02413     }
02414   }
02415   
02416   
02417   
02418   
02419   
02420 
02421 
02422   
02423 
02424 #ifndef DEBUG
02425   if (clm && ClusterLoadMonitor::cf_monitor_enabled > 0) {
02426 #else
02427   if (0) {
02428 #endif
02429     bool last_state = disable_remote_cluster_ops;
02430     if (clm->is_cluster_overloaded()) {
02431       disable_remote_cluster_ops = true;
02432     } else {
02433       disable_remote_cluster_ops = false;
02434     }
02435     if (last_state != disable_remote_cluster_ops) {
02436       if (disable_remote_cluster_ops) {
02437         Note("Network congestion to [%u.%u.%u.%u] encountered, reverting to proxy only mode", DOT_SEPARATED(ip));
02438       } else {
02439         Note("Network congestion to [%u.%u.%u.%u] cleared, reverting to cache mode", DOT_SEPARATED(ip));
02440         last_cluster_op_enable = current_time;
02441       }
02442     }
02443   }
02444 
02445   on_stolen_thread = (event == CLUSTER_EVENT_STEAL_THREAD);
02446   bool io_callback = (event == EVENT_IMMEDIATE);
02447 
02448   if (on_stolen_thread) {
02449     thread = (EThread *) e;
02450   } else {
02451     if (io_callback) {
02452       thread = this_ethread();
02453     } else {
02454       thread = e->ethread;
02455     }
02456   }
02457 
02458   int io_activity = 1;
02459   bool only_write_control_msgs;
02460   int res;
02461 
02462   while (io_activity) {
02463     io_activity = 0;
02464     only_write_control_msgs = 0;
02465 
02466     if (downing) {
02467       machine_down();
02468       break;
02469     }
02470 
02471 
02472     
02473 
02474     if (!on_stolen_thread) {
02475       if (delayed_reads.head) {
02476         CLUSTER_INCREMENT_DYN_STAT(CLUSTER_DELAYED_READS_STAT);
02477         finish_delayed_reads();
02478       }
02479       if ((res = process_read(current_time)) < 0) {
02480         break;
02481       }
02482       io_activity += res;
02483 
02484       if (delayed_reads.head) {
02485         CLUSTER_INCREMENT_DYN_STAT(CLUSTER_DELAYED_READS_STAT);
02486         finish_delayed_reads();
02487       }
02488     }
02489 
02490     
02491 
02492     if ((res = process_write(current_time, only_write_control_msgs)) < 0) {
02493       break;
02494     }
02495     io_activity += res;
02496 
02497 
02498     
02499 
02500     if (!on_stolen_thread) {
02501       if (do_open_local_requests())
02502                 thread->signal_hook(thread);
02503     }
02504   }
02505 
02506 #ifdef CLUSTER_IMMEDIATE_NETIO
02507   if (!dead && ((event == EVENT_POLL) || (event == EVENT_INTERVAL))) {
02508     if (res >= 0) {
02509       build_poll(true);
02510     }
02511   }
02512 #endif
02513   return EVENT_CONT;
02514 }
02515 
02516 int
02517 ClusterHandler::process_read(ink_hrtime )
02518 {
02519 #ifdef CLUSTER_STATS
02520   _process_read_calls++;
02521 #endif
02522   if (dead) {
02523     
02524     return 0;
02525   }
02526 
02527   
02528 
02529 
02530   for (;;) {
02531 
02532     switch (read.state) {
02533 
02534     case ClusterState::READ_START:
02535 
02536       {
02537 #ifdef CLUSTER_STATS
02538         _n_read_start++;
02539 #endif
02540         read.msg.clear();
02541         read.start_time = ink_get_hrtime();
02542         if (build_initial_vector(CLUSTER_READ)) {
02543           read.state = ClusterState::READ_HEADER;
02544         } else {
02545           return 0;
02546         }
02547         break;
02548       }
02549 
02550     case ClusterState::READ_HEADER:
02551 
02552       {
02553 #ifdef CLUSTER_STATS
02554         _n_read_header++;
02555 #endif
02556         read.state = ClusterState::READ_AWAIT_HEADER;
02557         if (!read.doIO()) {
02558           
02559           read.state = ClusterState::READ_HEADER;
02560           return 0;
02561         }
02562         break;
02563       }
02564 
02565     case ClusterState::READ_AWAIT_HEADER:
02566 
02567       {
02568 #ifdef CLUSTER_STATS
02569         _n_read_await_header++;
02570 #endif
02571         if (!read.io_complete) {
02572           return 0;
02573         } else {
02574           if (read.io_complete < 0) {
02575             
02576             machine_down();
02577             return -1;
02578           }
02579         }
02580         if (read.to_do) {
02581           if (read.bytes_xfered) {
02582             CLUSTER_INCREMENT_DYN_STAT(CLUSTER_PARTIAL_READS_STAT);
02583             read.state = ClusterState::READ_HEADER;
02584             break;
02585           } else {
02586             
02587             read.state = ClusterState::READ_HEADER;
02588             return 0;
02589           }
02590         } else {
02591 #ifdef MSG_TRACE
02592           fprintf(t_fd,
02593                   "[R] seqno=%d count=%d control_bytes=%d count_check=%d dsum=%d csum=%d\n",
02594                   read.sequence_number,
02595                   read.msg.hdr()->count, read.msg.hdr()->control_bytes,
02596                   read.msg.hdr()->count_check, read.msg.hdr()->descriptor_cksum, read.msg.hdr()->control_bytes_cksum);
02597           fflush(t_fd);
02598 #endif
02599           CLUSTER_SUM_DYN_STAT(CLUSTER_READ_BYTES_STAT, read.did);
02600           if (needByteSwap) {
02601             read.msg.hdr()->SwapBytes();
02602           }
02603           read.msg.count = read.msg.hdr()->count;
02604           read.msg.control_bytes = read.msg.hdr()->control_bytes;
02605           read.msg.descriptor_cksum = read.msg.hdr()->descriptor_cksum;
02606           read.msg.control_bytes_cksum = read.msg.hdr()->control_bytes_cksum;
02607           read.msg.unused = read.msg.hdr()->unused;
02608 
02609           if (MAGIC_COUNT(read) != read.msg.hdr()->count_check) {
02610             ink_assert(!"Read bad ClusterMsgHeader data");
02611             Warning("Bad ClusterMsgHeader read on [%d.%d.%d.%d], restarting", DOT_SEPARATED(ip));
02612             Note("Cluster read from [%u.%u.%u.%u] failed, declaring down", DOT_SEPARATED(ip));
02613             machine_down();
02614             return -1;
02615           }
02616 
02617           if (read.msg.count || read.msg.control_bytes) {
02618             read.msg.state++;
02619             read.state = ClusterState::READ_SETUP_DESCRIPTOR;
02620           } else {
02621             read.state = ClusterState::READ_COMPLETE;
02622           }
02623           break;
02624         }
02625       }
02626 
02627     case ClusterState::READ_SETUP_DESCRIPTOR:
02628 
02629       {
02630 #ifdef CLUSTER_STATS
02631         _n_read_setup_descriptor++;
02632 #endif
02633         if (build_initial_vector(CLUSTER_READ)) {
02634           read.state = ClusterState::READ_DESCRIPTOR;
02635         } else {
02636           return 0;
02637         }
02638         break;
02639       }
02640 
02641     case ClusterState::READ_DESCRIPTOR:
02642 
02643       {
02644 #ifdef CLUSTER_STATS
02645         _n_read_descriptor++;
02646 #endif
02647         read.state = ClusterState::READ_AWAIT_DESCRIPTOR;
02648         if (!read.doIO()) {
02649           
02650           read.state = ClusterState::READ_DESCRIPTOR;
02651           return 0;
02652         }
02653         break;
02654       }
02655 
02656     case ClusterState::READ_AWAIT_DESCRIPTOR:
02657 
02658       {
02659 #ifdef CLUSTER_STATS
02660         _n_read_await_descriptor++;
02661 #endif
02662         if (!read.io_complete) {
02663           return 0;
02664         } else {
02665           if (read.io_complete < 0) {
02666             
02667             machine_down();
02668             return -1;
02669           }
02670         }
02671         if (read.to_do) {
02672           if (read.bytes_xfered) {
02673             CLUSTER_INCREMENT_DYN_STAT(CLUSTER_PARTIAL_READS_STAT);
02674             read.state = ClusterState::READ_DESCRIPTOR;
02675             break;
02676           } else {
02677             
02678             read.state = ClusterState::READ_DESCRIPTOR;
02679             return 0;
02680           }
02681         } else {
02682 #ifdef CLUSTER_MESSAGE_CKSUM
02683           ink_release_assert(read.msg.calc_descriptor_cksum() == read.msg.descriptor_cksum);
02684           ink_release_assert(read.msg.calc_control_bytes_cksum() == read.msg.control_bytes_cksum);
02685 #endif
02686           CLUSTER_SUM_DYN_STAT(CLUSTER_READ_BYTES_STAT, read.did);
02687           if (needByteSwap) {
02688             
02689             swap_descriptor_bytes();
02690           }
02691           if (read.msg.count == 0) {
02692             read.bytes_xfered = 0;
02693             read.state = ClusterState::READ_COMPLETE;
02694           } else {
02695             read.msg.state++;
02696             read.state = ClusterState::READ_SETUP_DATA;
02697           }
02698           break;
02699         }
02700       }
02701 
02702     case ClusterState::READ_SETUP_DATA:
02703 
02704       {
02705 #ifdef CLUSTER_STATS
02706         _n_read_setup_data++;
02707 #endif
02708         if (build_initial_vector(CLUSTER_READ)) {
02709           free_locks(CLUSTER_READ);
02710           if (read.to_do) {
02711             read.state = ClusterState::READ_DATA;
02712           } else {
02713             
02714             read.state = ClusterState::READ_COMPLETE;
02715           }
02716         } else {
02717           return 0;
02718         }
02719         break;
02720       }
02721 
02722     case ClusterState::READ_DATA:
02723 
02724       {
02725 #ifdef CLUSTER_STATS
02726         _n_read_data++;
02727 #endif
02728         ink_release_assert(read.to_do);
02729         read.state = ClusterState::READ_AWAIT_DATA;
02730         if (!read.doIO()) {
02731           
02732           read.state = ClusterState::READ_DATA;
02733           return 0;
02734         }
02735         break;
02736       }
02737 
02738     case ClusterState::READ_AWAIT_DATA:
02739 
02740       {
02741 #ifdef CLUSTER_STATS
02742         _n_read_await_data++;
02743 #endif
02744         if (!read.io_complete) {
02745           return 0;             
02746         } else {
02747           if (read.io_complete > 0) {
02748             read.state = ClusterState::READ_POST_COMPLETE;
02749           } else {
02750             
02751             machine_down();
02752             return -1;
02753           }
02754         }
02755         break;
02756       }
02757 
02758     case ClusterState::READ_POST_COMPLETE:
02759 
02760       {
02761 #ifdef CLUSTER_STATS
02762         _n_read_post_complete++;
02763 #endif
02764         if (!get_read_locks()) {
02765           return 0;
02766         }
02767         if (read.to_do) {
02768           if (read.bytes_xfered) {
02769             update_channels_partial_read();
02770             free_locks(CLUSTER_READ);
02771             CLUSTER_SUM_DYN_STAT(CLUSTER_READ_BYTES_STAT, read.bytes_xfered);
02772             CLUSTER_INCREMENT_DYN_STAT(CLUSTER_PARTIAL_READS_STAT);
02773             read.state = ClusterState::READ_DATA;
02774             return 1;
02775           } else {
02776             
02777             free_locks(CLUSTER_READ);
02778             read.state = ClusterState::READ_DATA;
02779             return 0;
02780           }
02781         } else {
02782           CLUSTER_SUM_DYN_STAT(CLUSTER_READ_BYTES_STAT, read.bytes_xfered);
02783           read.state = ClusterState::READ_COMPLETE;
02784           break;
02785         }
02786       }
02787 
02788     case ClusterState::READ_COMPLETE:
02789 
02790       {
02791 #ifdef CLUSTER_STATS
02792         _n_read_complete++;
02793 #endif
02794         ink_hrtime rdmsg_end_time = ink_get_hrtime();
02795         CLUSTER_SUM_DYN_STAT(CLUSTER_RDMSG_ASSEMBLE_TIME_STAT, rdmsg_end_time - read.start_time);
02796         read.start_time = HRTIME_MSECONDS(0);
02797         if (dump_msgs)
02798           dump_read_msg();
02799         read.sequence_number++;
02800         update_channels_read();
02801         free_locks(CLUSTER_READ);
02802 
02803         read.state = ClusterState::READ_START;
02804         break;                  
02805       }
02806 
02807     default:
02808 
02809       {
02810         ink_release_assert(!"ClusterHandler::process_read invalid state");
02811       }
02812 
02813     }                           
02814   }                             
02815 }
02816 
02817 int
02818 ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs)
02819 {
02820 #ifdef CLUSTER_STATS
02821   _process_write_calls++;
02822 #endif
02823 
02824   
02825 
02826   for (;;) {
02827 
02828     switch (write.state) {
02829 
02830     case ClusterState::WRITE_START:
02831 
02832       {
02833 #ifdef CLUSTER_STATS
02834         _n_write_start++;
02835 #endif
02836         write.msg.clear();
02837         write.last_time = ink_get_hrtime();
02838         pw_write_descriptors_built = -1;
02839         pw_freespace_descriptors_built = -1;
02840         pw_controldata_descriptors_built = -1;
02841         pw_time_expired = 0;
02842         write.state = ClusterState::WRITE_SETUP;
02843         break;
02844       }
02845 
02846     case ClusterState::WRITE_SETUP:
02847 
02848       {
02849 #ifdef CLUSTER_STATS
02850         _n_write_setup++;
02851 #endif
02852         if (!on_stolen_thread && !only_write_control_msgs) {
02853 
02854           
02855           
02856 
02857 
02858           
02859           if (pw_controldata_descriptors_built) {
02860             pw_controldata_descriptors_built = build_controlmsg_descriptors();
02861           }
02862           
02863           if (pw_write_descriptors_built) {
02864             pw_write_descriptors_built = build_write_descriptors();
02865           }
02866           
02867           if (pw_freespace_descriptors_built) {
02868             pw_freespace_descriptors_built = build_freespace_descriptors();
02869           }
02870           add_small_controlmsg_descriptors();   
02871         } else {
02872 
02873           
02874 
02875           pw_write_descriptors_built = 0;
02876           pw_freespace_descriptors_built = 0;
02877           pw_controldata_descriptors_built = build_controlmsg_descriptors();
02878           add_small_controlmsg_descriptors();   
02879         }
02880 
02881         
02882         if (!pw_controldata_descriptors_built && !pw_write_descriptors_built && !pw_freespace_descriptors_built) {
02883           write.state = ClusterState::WRITE_COMPLETE;
02884           break;
02885         } else {
02886           started_on_stolen_thread = on_stolen_thread;
02887           control_message_write = only_write_control_msgs;
02888         }
02889 
02890         
02891 #ifdef CLUSTER_MESSAGE_CKSUM
02892         write.msg.descriptor_cksum = write.msg.calc_descriptor_cksum();
02893         write.msg.hdr()->descriptor_cksum = write.msg.descriptor_cksum;
02894 
02895         write.msg.control_bytes_cksum = write.msg.calc_control_bytes_cksum();
02896         write.msg.hdr()->control_bytes_cksum = write.msg.control_bytes_cksum;
02897         write.msg.unused = 0;
02898 #endif
02899         write.msg.hdr()->count = write.msg.count;
02900         write.msg.hdr()->control_bytes = write.msg.control_bytes;
02901         write.msg.hdr()->count_check = MAGIC_COUNT(write);
02902 
02903         ink_release_assert(build_initial_vector(CLUSTER_WRITE));
02904         free_locks(CLUSTER_WRITE);
02905         write.state = ClusterState::WRITE_INITIATE;
02906         break;
02907       }
02908 
02909     case ClusterState::WRITE_INITIATE:
02910 
02911       {
02912 #ifdef CLUSTER_STATS
02913         _n_write_initiate++;
02914 #endif
02915         write.state = ClusterState::WRITE_AWAIT_COMPLETION;
02916         if (!write.doIO()) {
02917           
02918           write.state = ClusterState::WRITE_INITIATE;
02919           return 0;
02920         }
02921         break;
02922       }
02923 
02924     case ClusterState::WRITE_AWAIT_COMPLETION:
02925 
02926       {
02927 #ifdef CLUSTER_STATS
02928         _n_write_await_completion++;
02929 #endif
02930         if (!write.io_complete) {
02931           
02932           return 0;
02933         } else {
02934           if (write.io_complete < 0) {
02935             
02936             machine_down();
02937             write.state = ClusterState::WRITE_INITIATE;
02938             break;
02939           }
02940           if (write.to_do) {
02941             if (write.bytes_xfered) {
02942               CLUSTER_INCREMENT_DYN_STAT(CLUSTER_PARTIAL_WRITES_STAT);
02943               write.state = ClusterState::WRITE_INITIATE;
02944               break;
02945             } else {
02946               
02947               write.state = ClusterState::WRITE_INITIATE;
02948               return 0;
02949             }
02950           }
02951           CLUSTER_SUM_DYN_STAT(CLUSTER_WRITE_BYTES_STAT, write.bytes_xfered);
02952           write.sequence_number++;
02953           write.state = ClusterState::WRITE_POST_COMPLETE;
02954         }
02955         break;
02956       }
02957 
02958     case ClusterState::WRITE_POST_COMPLETE:
02959 
02960       {
02961 #ifdef CLUSTER_STATS
02962         _n_write_post_complete++;
02963 #endif
02964         if (!get_write_locks()) {
02965           CLUSTER_INCREMENT_DYN_STAT(CLUSTER_WRITE_LOCK_MISSES_STAT);
02966           return 0;
02967         }
02968         
02969         
02970         
02971         
02972         update_channels_written();
02973         free_locks(CLUSTER_WRITE);
02974         write.state = ClusterState::WRITE_COMPLETE;
02975         break;
02976       }
02977 
02978     case ClusterState::WRITE_COMPLETE:
02979 
02980       {
02981 #ifdef CLUSTER_STATS
02982         _n_write_complete++;
02983 #endif
02984         write.state = ClusterState::WRITE_START;
02985         ink_hrtime curtime = ink_get_hrtime();
02986 
02987         if (!on_stolen_thread) {
02988           
02989           
02990           
02991           pw_time_expired = (curtime - now) > CLUSTER_MAX_RUN_TIME;
02992 
02993           if (!control_message_write && !pw_write_descriptors_built
02994               && !pw_freespace_descriptors_built && !pw_controldata_descriptors_built) {
02995             
02996             cur_vcs = (cur_vcs + 1) % CLUSTER_BUCKETS;
02997           }
02998         } else {
02999           
03000           
03001           
03002           pw_time_expired = (curtime - now) > CLUSTER_MAX_THREAD_STEAL_TIME;
03003           if (pw_time_expired) {
03004             CLUSTER_INCREMENT_DYN_STAT(CLUSTER_THREAD_STEAL_EXPIRES_STAT);
03005           }
03006         }
03007         
03008         
03009         
03010         if (!on_stolen_thread && !cur_vcs && !dead) {
03011           
03012           
03013           
03014           MachineList *mc = the_cluster_machines_config();
03015           if (mc && !mc->find(ip, port)) {
03016             Note("Cluster [%u.%u.%u.%u:%d] not in config, declaring down", DOT_SEPARATED(ip), port);
03017             machine_down();
03018           }
03019         }
03020         if (pw_time_expired) {
03021           return -1;            
03022         } else {
03023           if (pw_write_descriptors_built || pw_freespace_descriptors_built || pw_controldata_descriptors_built) {
03024             break;              
03025           } else {
03026             return 0;           
03027           }
03028         }
03029       }
03030 
03031     default:
03032 
03033       {
03034         ink_release_assert(!"ClusterHandler::process_write invalid state");
03035       }
03036 
03037     }                           
03038   }                             
03039 }
03040 
03041 int
03042 ClusterHandler::do_open_local_requests()
03043 {
03044   
03045   
03046   
03047   
03048   
03049   
03050   int pending_request = 0;
03051   ClusterVConnection *cvc;
03052   ClusterVConnection *cvc_ext;
03053   ClusterVConnection *cvc_ext_next;
03054   EThread *tt = this_ethread();
03055   Queue<ClusterVConnection> local_incoming_open_local;
03056 
03057   
03058   
03059   
03060   
03061   while (true) {
03062     cvc_ext = (ClusterVConnection *)
03063       ink_atomiclist_popall(&external_incoming_open_local);
03064     if (cvc_ext == 0)
03065       break;
03066 
03067     while (cvc_ext) {
03068       cvc_ext_next = (ClusterVConnection *) cvc_ext->link.next;
03069       cvc_ext->link.next = NULL;
03070       local_incoming_open_local.push(cvc_ext);
03071       cvc_ext = cvc_ext_next;
03072     }
03073 
03074     
03075 
03076     while ((cvc = local_incoming_open_local.pop())) {
03077       MUTEX_TRY_LOCK(lock, cvc->action_.mutex, tt);
03078       if (lock) {
03079         if (cvc->start(tt) < 0) {
03080           cvc->token.clear();
03081           if (cvc->action_.continuation) {
03082             cvc->action_.continuation->handleEvent(CLUSTER_EVENT_OPEN_FAILED, 0);
03083             clusterVCAllocator.free(cvc);
03084           }
03085         }
03086         MUTEX_RELEASE(lock);
03087 
03088       } else {
03089         
03090         Debug(CL_TRACE, "do_open_local_requests() unable to acquire mutex (cvc=%p)", cvc);
03091         pending_request = 1;
03092         ink_atomiclist_push(&external_incoming_open_local, (void *) cvc);
03093       }
03094     }
03095   }
03096   return pending_request;
03097 }
03098 
03099