00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 #include "P_Cluster.h"
00030 
00031 extern int cluster_receive_buffer_size;
00032 extern int cluster_send_buffer_size;
00033 extern uint32_t cluster_sockopt_flags;
00034 extern uint32_t cluster_packet_mark;
00035 extern uint32_t cluster_packet_tos;
00036 extern int num_of_cluster_threads;
00037 
00038 
00039 
00040 
00041 
00042 
00043 ClusterCalloutContinuation::ClusterCalloutContinuation(struct ClusterHandler *ch)
00044   :
00045 Continuation(0),
00046 _ch(ch)
00047 {
00048   mutex = new_ProxyMutex();
00049   SET_HANDLER((ClstCoutContHandler)
00050               & ClusterCalloutContinuation::CalloutHandler);
00051 }
00052 
00053 ClusterCalloutContinuation::~ClusterCalloutContinuation()
00054 {
00055   mutex = 0;
00056 }
00057 
00058 int
00059 ClusterCalloutContinuation::CalloutHandler(int , Event * )
00060 {
00061   return _ch->process_incoming_callouts(this->mutex);
00062 }
00063 
00064 
00065 
00066 
00067 ClusterControl::ClusterControl():
00068 Continuation(NULL), len(0), size_index(-1), real_data(0), data(0), free_proc(0), free_proc_arg(0), iob_block(0)
00069 {
00070 }
00071 
00072 void
00073 ClusterControl::real_alloc_data(int read_access, bool align_int32_on_non_int64_boundary)
00074 {
00075   EThread *thread = this_ethread();
00076   ProxyMutex *mutex = thread->mutex;
00077 
00078   ink_assert(!data);
00079   if ((len + DATA_HDR + sizeof(int32_t)) <= DEFAULT_MAX_BUFFER_SIZE) {
00080     size_index = buffer_size_to_index(len + DATA_HDR + sizeof(int32_t), MAX_BUFFER_SIZE_INDEX);
00081     iob_block = new_IOBufferBlock();
00082     iob_block->alloc(size_index);       
00083     real_data = (int64_t *) iob_block->buf();
00084 
00085     if (align_int32_on_non_int64_boundary) {
00086       data = ((char *) real_data) + sizeof(int32_t) + DATA_HDR;
00087     } else {
00088       data = ((char *) real_data) + DATA_HDR;
00089     }
00090   } else {
00091     int size = sizeof(int64_t) * (((len + DATA_HDR + sizeof(int32_t) + sizeof(int64_t) - 1) / sizeof(int64_t)) + 1);
00092     size_index = -1;
00093     iob_block = new_IOBufferBlock();
00094     iob_block->alloc(BUFFER_SIZE_FOR_XMALLOC(size));
00095     real_data = (int64_t *) iob_block->buf();
00096 
00097     if (align_int32_on_non_int64_boundary) {
00098       data = (char *) DOUBLE_ALIGN(real_data) + sizeof(int32_t) + DATA_HDR;
00099     } else {
00100       data = (char *) DOUBLE_ALIGN(real_data) + DATA_HDR;
00101     }
00102     CLUSTER_INCREMENT_DYN_STAT(CLUSTER_ALLOC_DATA_NEWS_STAT);
00103   }
00104 
00105   
00106   if (read_access) {
00107     
00108     iob_block->fill((char *) data - (char *) real_data);        
00109     iob_block->consume((char *) data - (char *) real_data);     
00110     iob_block->fill(len);
00111   } else {
00112     
00113     iob_block->fill((char *) data - (char *) real_data);        
00114     iob_block->consume((char *) data - (char *) real_data);     
00115     iob_block->_buf_end = iob_block->end() + len;
00116   }
00117 
00118   
00119   char *size_index_ptr = (char *) data - DATA_HDR;
00120   *size_index_ptr = size_index;
00121   ++size_index_ptr;
00122 
00123   *size_index_ptr = (char) ALLOC_DATA_MAGIC;
00124   ++size_index_ptr;
00125 
00126   void *val = (void *) this;
00127   memcpy(size_index_ptr, (char *) &val, sizeof(void *));
00128 }
00129 
00130 void
00131 ClusterControl::free_data()
00132 {
00133   if (data && iob_block) {
00134     if (free_proc) {
00135       
00136       (*free_proc) (free_proc_arg);
00137       iob_block = 0;            
00138       return;
00139     }
00140     if (real_data) {
00141       ink_release_assert(*(((uint8_t *) data) - DATA_HDR + 1) == (uint8_t) ALLOC_DATA_MAGIC);
00142       *(((uint8_t *) data) - DATA_HDR + 1) = (uint8_t) ~ ALLOC_DATA_MAGIC;
00143 
00144       ink_release_assert(*(((char *) data) - DATA_HDR) == size_index);         
00145     } else {
00146       
00147       
00148     }
00149     iob_block = 0;              
00150   }
00151 }
00152 
00153 
00154 
00155 
00156 IncomingControl *
00157 IncomingControl::alloc()
00158 {
00159   return inControlAllocator.alloc();
00160 }
00161 
00162 IncomingControl::IncomingControl()
00163 :recognized_time(0)
00164 {
00165 }
00166 
00167 void
00168 IncomingControl::freeall()
00169 {
00170   free_data();
00171   inControlAllocator.free(this);
00172 }
00173 
00174 
00175 
00176 
00177 OutgoingControl *
00178 OutgoingControl::alloc()
00179 {
00180   return outControlAllocator.alloc();
00181 }
00182 
00183 OutgoingControl::OutgoingControl()
00184 :ch(NULL), submit_time(0)
00185 {
00186 }
00187 
00188 int
00189 OutgoingControl::startEvent(int event, Event * e)
00190 {
00191   
00192   
00193   
00194   
00195   (void) event;
00196   (void) e;
00197   
00198   if (!ch || !ch->thread)
00199     return EVENT_DONE;
00200 
00201   int32_t cluster_fn = *(int32_t *) this->data;
00202   int32_t pri = ClusterFuncToQpri(cluster_fn);
00203   ink_atomiclist_push(&ch->outgoing_control_al[pri], (void *) this);
00204 
00205   return EVENT_DONE;
00206 }
00207 
00208 void
00209 OutgoingControl::freeall()
00210 {
00211   free_data();
00212   outControlAllocator.free(this);
00213 }
00214 
00215 
00216 
00217 
00218 ClusterState::ClusterState(ClusterHandler * c, bool read_chan):
00219 Continuation(0),
00220 ch(c),
00221 read_channel(read_chan),
00222 do_iodone_event(false),
00223 n_descriptors(0),
00224 sequence_number(0),
00225 to_do(0),
00226 did(0),
00227 n_iov(0),
00228 io_complete(1),
00229 io_complete_event(0),
00230 v(0),
00231 bytes_xfered(0),
00232 last_ndone(0),
00233 total_bytes_xfered(0),
00234 iov(NULL),
00235 iob_iov(NULL),
00236 byte_bank(NULL),
00237 n_byte_bank(0), byte_bank_size(0), missed(0), missed_msg(false), read_state_t(READ_START), write_state_t(WRITE_START)
00238 {
00239   mutex = new_ProxyMutex();
00240   if (read_channel) {
00241     state = ClusterState::READ_START;
00242     SET_HANDLER(&ClusterState::doIO_read_event);
00243   } else {
00244     state = ClusterState::WRITE_START;
00245     SET_HANDLER(&ClusterState::doIO_write_event);
00246   }
00247   last_time = HRTIME_SECONDS(0);
00248   start_time = HRTIME_SECONDS(0);
00249   int size;
00250   
00251   
00252   
00253   
00254 
00255 
00256   
00257 
00258   size_t pagesize = ats_pagesize();
00259   size = ((MAX_TCOUNT + 1) * sizeof(IOVec)) + (2 * pagesize);
00260   iob_iov = new_IOBufferData(BUFFER_SIZE_FOR_XMALLOC(size));
00261   char *addr = (char *) align_pointer_forward(iob_iov->data(), pagesize);
00262 
00263   iov = (IOVec *) (addr + pagesize);
00264 
00265 
00266   
00267 
00268   size = sizeof(ClusterMsgHeader) + (MAX_TCOUNT + 1) * sizeof(Descriptor)
00269     + CONTROL_DATA + (2 * pagesize);
00270   msg.iob_descriptor_block = new_IOBufferBlock();
00271   msg.iob_descriptor_block->alloc(BUFFER_SIZE_FOR_XMALLOC(size));
00272 
00273   addr = (char *) align_pointer_forward(msg.iob_descriptor_block->data->data(), pagesize);
00274 
00275   addr = addr + pagesize;
00276   memset(addr, 0, size - (2 * pagesize));
00277   msg.descriptor = (Descriptor *) (addr + sizeof(ClusterMsgHeader));
00278 
00279   mbuf = new_empty_MIOBuffer();
00280 }
00281 
00282 ClusterState::~ClusterState()
00283 {
00284   mutex = 0;
00285   if (iov) {
00286     iob_iov = 0;                
00287   }
00288 
00289   if (msg.descriptor) {
00290     msg.iob_descriptor_block = 0;       
00291   }
00292   
00293   int n;
00294   for (n = 0; n < MAX_TCOUNT; ++n) {
00295     block[n] = 0;
00296   }
00297   free_empty_MIOBuffer(mbuf);
00298   mbuf = 0;
00299 }
00300 
00301 void
00302 ClusterState::build_do_io_vector()
00303 {
00304   
00305   
00306   
00307   
00308   int bytes_to_xfer = 0;
00309   int n;
00310   IOBufferBlock *last_block = 0;
00311 
00312   mbuf->clear();
00313 
00314   
00315 
00316   for (n = 0; n < n_iov; ++n) {
00317     bytes_to_xfer += iov[n].iov_len;
00318 
00319     if (last_block) {
00320       last_block->next = block[n];
00321     }
00322     last_block = block[n];
00323     while (last_block->next) {
00324       last_block = last_block->next;
00325     }
00326   }
00327   mbuf->_writer = block[0];
00328   ink_release_assert(bytes_to_xfer == to_do);
00329   ink_assert(bytes_to_xfer == bytes_IOBufferBlockList(mbuf->_writer, !read_channel));
00330 }
00331 
00332 #ifdef CLUSTER_TOMCAT
00333 #define REENABLE_IO() \
00334   if (!ch->on_stolen_thread && !io_complete) { \
00335     v->reenable_re(); \
00336   }
00337 
00338 #else // !CLUSTER_TOMCAT
00339 
00340 #ifdef CLUSTER_IMMEDIATE_NETIO
00341 #define REENABLE_IO() \
00342   if (!io_complete) { \
00343     ((NetVConnection *) v->vc_server)->reenable_re_now(v); \
00344   }
00345 
00346 #else // !CLUSTER_IMMEDIATE_NETIO
00347 
00348 #define REENABLE_IO() \
00349   if (!io_complete) { \
00350     v->reenable_re(); \
00351   }
00352 #endif // !CLUSTER_IMMEDIATE_NETIO
00353 
00354 #endif // !CLUSTER_TOMCAT
00355 
00356 int
00357 ClusterState::doIO()
00358 {
00359   ink_release_assert(io_complete);
00360 #if !defined(CLUSTER_IMMEDIATE_NETIO)
00361   MUTEX_TRY_LOCK(lock, this->mutex, this_ethread());
00362   if (!lock) {
00363     return 0;                   
00364   }
00365 #endif
00366 
00367   if (!ch->net_vc) {
00368     
00369     io_complete = 1;
00370     bytes_xfered += to_do;
00371     to_do = 0;
00372     return 1;
00373   }
00374   
00375   
00376   
00377   if ((to_do && (io_complete_event == VC_EVENT_READ_READY)) || (io_complete_event == VC_EVENT_WRITE_READY)) {
00378 
00379     if (read_channel) {
00380       
00381       ink_assert(v->buffer.writer()->current_write_avail() == to_do);
00382 
00383     } else {
00384       
00385       ink_assert(v->buffer.reader()->read_avail() == to_do);
00386     }
00387 
00388     
00389     v->nbytes = to_do + did;
00390     ink_release_assert(v->nbytes > v->ndone);
00391 
00392     io_complete = false;
00393     io_complete_event = 0;
00394     REENABLE_IO();
00395 
00396   } else {
00397     
00398     
00399 
00400     io_complete = false;
00401     io_complete_event = 0;
00402     bytes_xfered = 0;
00403     last_ndone = 0;
00404 
00405     build_do_io_vector();
00406 
00407     if (read_channel) {
00408       ink_assert(mbuf->current_write_avail() == to_do);
00409 #ifdef CLUSTER_IMMEDIATE_NETIO
00410       v = ch->net_vc->do_io_read_now(this, to_do, mbuf);
00411 #else
00412       v = ch->net_vc->do_io_read(this, to_do, mbuf);
00413 #endif
00414       REENABLE_IO();
00415 
00416     } else {
00417       IOBufferReader *r = mbuf->alloc_reader();
00418       r->block = mbuf->_writer;
00419       ink_assert(r->read_avail() == to_do);
00420 #ifdef CLUSTER_IMMEDIATE_NETIO
00421       v = ch->net_vc->do_io_write_now(this, to_do, r);
00422 #else
00423       v = ch->net_vc->do_io_write(this, to_do, r);
00424 #endif
00425       REENABLE_IO();
00426     }
00427   }
00428   return 1;                     
00429 }
00430 
00431 int
00432 ClusterState::doIO_read_event(int event, void *d)
00433 {
00434   ink_release_assert(!io_complete);
00435   if (!v) {
00436     v = (VIO *) d;              
00437   }
00438   ink_assert((VIO *) d == v);
00439 
00440   switch (event) {
00441   case VC_EVENT_READ_READY:
00442     {
00443       
00444       v->nbytes = v->ndone;
00445       
00446     }
00447   case VC_EVENT_READ_COMPLETE:
00448     {
00449       bytes_xfered = v->ndone - last_ndone;
00450       if (bytes_xfered) {
00451         total_bytes_xfered += bytes_xfered;
00452         did += bytes_xfered;
00453         to_do -= bytes_xfered;
00454       }
00455       last_ndone = v->ndone;
00456       io_complete_event = event;
00457       INK_WRITE_MEMORY_BARRIER;
00458 
00459       io_complete = 1;
00460       IOComplete();
00461 
00462       break;
00463     }
00464   case VC_EVENT_EOS:
00465   case VC_EVENT_ERROR:
00466   case VC_EVENT_INACTIVITY_TIMEOUT:
00467   case VC_EVENT_ACTIVE_TIMEOUT:
00468   default:
00469     {
00470       io_complete_event = event;
00471       INK_WRITE_MEMORY_BARRIER;
00472 
00473       io_complete = -1;
00474       IOComplete();
00475       break;
00476     }
00477   }                             
00478 
00479   return EVENT_DONE;
00480 }
00481 
00482 int
00483 ClusterState::doIO_write_event(int event, void *d)
00484 {
00485   ink_release_assert(!io_complete);
00486   if (!v) {
00487     v = (VIO *) d;              
00488   }
00489   ink_assert((VIO *) d == v);
00490 
00491   switch (event) {
00492   case VC_EVENT_WRITE_READY:
00493 #ifdef CLUSTER_IMMEDIATE_NETIO
00494     {
00495       
00496       v->nbytes = v->ndone;
00497       
00498     }
00499 #endif
00500   case VC_EVENT_WRITE_COMPLETE:
00501     {
00502       bytes_xfered = v->ndone - last_ndone;
00503       if (bytes_xfered) {
00504         total_bytes_xfered += bytes_xfered;
00505         did += bytes_xfered;
00506         to_do -= bytes_xfered;
00507       }
00508       last_ndone = v->ndone;
00509 #ifdef CLUSTER_IMMEDIATE_NETIO
00510       io_complete_event = event;
00511       INK_WRITE_MEMORY_BARRIER;
00512 
00513       io_complete = 1;
00514       IOComplete();
00515 #else
00516       if (event == VC_EVENT_WRITE_COMPLETE) {
00517         io_complete_event = event;
00518         INK_WRITE_MEMORY_BARRIER;
00519 
00520         io_complete = 1;
00521         IOComplete();
00522       } else {
00523         if (bytes_xfered) {
00524           v->reenable_re();     
00525         } else {
00526           v->reenable();
00527         }
00528         return EVENT_DONE;
00529       }
00530 #endif
00531       break;
00532     }
00533   case VC_EVENT_EOS:
00534   case VC_EVENT_ERROR:
00535   case VC_EVENT_INACTIVITY_TIMEOUT:
00536   case VC_EVENT_ACTIVE_TIMEOUT:
00537   default:
00538     {
00539       io_complete_event = event;
00540       INK_WRITE_MEMORY_BARRIER;
00541 
00542       io_complete = -1;
00543       IOComplete();
00544       break;
00545     }
00546   }                             
00547 
00548   return EVENT_DONE;
00549 }
00550 
00551 void
00552 ClusterState::IOComplete()
00553 {
00554   
00555   
00556   
00557   
00558 
00559   if (do_iodone_event && !ch->mutex->thread_holding) {
00560     MUTEX_TRY_LOCK(lock, ch->mutex, this_ethread());
00561     if (lock) {
00562       ch->handleEvent(EVENT_IMMEDIATE, (void *) 0);
00563     } else {
00564       eventProcessor.schedule_imm_signal(ch, ET_CLUSTER);
00565     }
00566   }
00567 }
00568 
00569 int
00570 ClusterHandler::cluster_signal_and_update(int event, ClusterVConnection * vc, ClusterVConnState * s)
00571 {
00572   s->vio._cont->handleEvent(event, &s->vio);
00573 
00574   if (vc->closed) {
00575     if (!vc->write_list && !vc->write_bytes_in_transit) {
00576       close_ClusterVConnection(vc);
00577     }
00578     return EVENT_DONE;
00579   } else {
00580     ink_assert((event != VC_EVENT_ERROR) || ((event == VC_EVENT_ERROR) && vc->closed));
00581     return EVENT_CONT;
00582   }
00583 }
00584 
00585 int
00586 ClusterHandler::cluster_signal_and_update_locked(int event, ClusterVConnection * vc, ClusterVConnState * s)
00587 {
00588   
00589   s->vio._cont->handleEvent(event, &s->vio);
00590 
00591   if (vc->closed) {
00592     if (!vc->write_list && !vc->write_bytes_in_transit) {
00593       close_free_lock(vc, s);
00594     }
00595     return EVENT_DONE;
00596   } else
00597     return EVENT_CONT;
00598 }
00599 
00600 int
00601 ClusterHandler::cluster_signal_error_and_update(ClusterVConnection * vc, ClusterVConnState * s, int lerrno)
00602 {
00603   s->enabled = 0;
00604   vc->lerrno = lerrno;
00605   return cluster_signal_and_update(VC_EVENT_ERROR, vc, s);
00606 }
00607 
00608 bool ClusterHandler::check_channel(int c)
00609 {
00610   
00611   
00612   
00613   while (n_channels <= c) {
00614     int
00615       old_channels = n_channels;
00616     if (!n_channels) {
00617       n_channels = MIN_CHANNELS;
00618     } else {
00619       if ((n_channels * 2) <= MAX_CHANNELS) {
00620         n_channels = n_channels * 2;
00621       } else {
00622         return false;           
00623       }
00624     }
00625     
00626     channels = (ClusterVConnection **)ats_realloc(channels, n_channels * sizeof(ClusterVConnection *));
00627 
00628     
00629     channel_data = (struct ChannelData **)ats_realloc(channel_data, n_channels * sizeof(struct ChannelData *));
00630 
00631     for (int i = old_channels; i < n_channels; i++) {
00632       if (local_channel(i)) {
00633         if (i > LAST_DEDICATED_CHANNEL) {
00634           channels[i] = (ClusterVConnection *) 1;       
00635           channel_data[i] = (struct ChannelData *)ats_malloc(sizeof(struct ChannelData));
00636           memset(channel_data[i], 0, sizeof(struct ChannelData));
00637           channel_data[i]->channel_number = i;
00638           free_local_channels.enqueue(channel_data[i]);
00639         } else {
00640           channels[i] = NULL;
00641           channel_data[i] = NULL;
00642         }
00643       } else {
00644         channels[i] = NULL;
00645         channel_data[i] = NULL;
00646       }
00647     }
00648   }
00649   return true;                  
00650 }
00651 
00652 int
00653 ClusterHandler::alloc_channel(ClusterVConnection * vc, int requested)
00654 {
00655   
00656   
00657   
00658   struct ChannelData *cdp = 0;
00659   int i = requested;
00660 
00661   if (!i) {
00662     int loops = 1;
00663     do {
00664       cdp = free_local_channels.dequeue();
00665       if (!cdp) {
00666         if (!check_channel(n_channels)) {
00667           return -2;            
00668         }
00669       } else {
00670         ink_assert(cdp == channel_data[cdp->channel_number]);
00671         i = cdp->channel_number;
00672         break;
00673       }
00674     } while (loops--);
00675 
00676     ink_release_assert(i != 0); 
00677     ink_release_assert(channels[i] == (ClusterVConnection *) 1);        
00678     Debug(CL_TRACE, "alloc_channel local chan=%d VC=%p", i, vc);
00679 
00680   } else {
00681     if (!check_channel(i)) {
00682       return -2;                
00683     }
00684     if (channels[i]) {
00685       Debug(CL_TRACE, "alloc_channel remote inuse chan=%d VC=%p", i, vc);
00686       return -1;                
00687     } else {
00688       Debug(CL_TRACE, "alloc_channel remote chan=%d VC=%p", i, vc);
00689     }
00690   }
00691   channels[i] = vc;
00692   vc->channel = i;
00693   return i;
00694 }
00695 
00696 void
00697 ClusterHandler::free_channel(ClusterVConnection * vc)
00698 {
00699   
00700   
00701   
00702   int i = vc->channel;
00703   if (i > LAST_DEDICATED_CHANNEL && channels[i] == vc) {
00704     if (local_channel(i)) {
00705       channels[i] = (ClusterVConnection *) 1;
00706       free_local_channels.enqueue(channel_data[i]);
00707       Debug(CL_TRACE, "free_channel local chan=%d VC=%p", i, vc);
00708     } else {
00709       channels[i] = 0;
00710       Debug(CL_TRACE, "free_channel remote chan=%d VC=%p", i, vc);
00711     }
00712   }
00713   vc->channel = 0;
00714 }
00715 
00716 int
00717 ClusterHandler::machine_down()
00718 {
00719   char textbuf[sizeof("255.255.255.255:65535")];
00720 
00721   if (dead) {
00722     return EVENT_DONE;
00723   }
00724   
00725   
00726   
00727   
00728   
00729   
00730   
00731 #ifdef LOCAL_CLUSTER_TEST_MODE
00732   Note("machine down %u.%u.%u.%u:%d", DOT_SEPARATED(ip), port);
00733 #else
00734   Note("machine down %u.%u.%u.%u:%d", DOT_SEPARATED(ip), id);
00735 #endif
00736   machine_offline_APIcallout(ip);
00737   snprintf(textbuf, sizeof(textbuf), "%hhu.%hhu.%hhu.%hhu:%d", DOT_SEPARATED(ip), port);
00738   RecSignalManager(REC_SIGNAL_MACHINE_DOWN, textbuf);
00739   if (net_vc) {
00740     net_vc->do_io(VIO::CLOSE);
00741     net_vc = 0;
00742   }
00743   
00744   read.io_complete = -1;
00745   write.io_complete = -1;
00746 
00747   MUTEX_TAKE_LOCK(the_cluster_config_mutex, this_ethread());
00748   ClusterConfiguration *c = this_cluster()->current_configuration();
00749   machine->clusterHandlers[id] = NULL;
00750   if ((--machine->now_connections == 0) && c->find(ip, port)) {
00751     ClusterConfiguration *cc = configuration_remove_machine(c, machine);
00752     CLUSTER_DECREMENT_DYN_STAT(CLUSTER_NODES_STAT);
00753     this_cluster()->configurations.push(cc);
00754     machine->dead = true;
00755   }
00756   MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread());
00757   MachineList *cc = the_cluster_config();
00758   if (cc && cc->find(ip, port) && connector) {
00759     Debug(CL_NOTE, "cluster connect retry for %hhu.%hhu.%hhu.%hhu", DOT_SEPARATED(ip));
00760     clusterProcessor.connect(ip, port, id);
00761   }
00762   return zombify();             
00763 }
00764 
00765 int
00766 ClusterHandler::zombify(Event * )
00767 {
00768   
00769   
00770   
00771   
00772   dead = true;
00773   if (cluster_periodic_event) {
00774     cluster_periodic_event->cancel(this);
00775     cluster_periodic_event = NULL;
00776   }
00777   clm->cancel_monitor();
00778 
00779   SET_HANDLER((ClusterContHandler) & ClusterHandler::protoZombieEvent);
00780   
00781   
00782   
00783   
00784   eventProcessor.schedule_in(this, HRTIME_SECONDS(1), ET_CLUSTER);
00785   return EVENT_DONE;
00786 }
00787 
00788 int
00789 ClusterHandler::connectClusterEvent(int event, Event * e)
00790 {
00791 
00792   if ((event == EVENT_IMMEDIATE) || (event == EVENT_INTERVAL)) {
00793     
00794     
00795     
00796     
00797     
00798     MachineList *cc = the_cluster_config();
00799     if (!machine)
00800       machine = new ClusterMachine(hostname, ip, port);
00801 #ifdef LOCAL_CLUSTER_TEST_MODE
00802     if (!(cc && cc->find(ip, port))) {
00803 #else
00804     if (this_cluster_machine()->ip == machine->ip || !(cc && cc->find(ip, port))) {
00805 #endif
00806       if (this_cluster_machine()->ip != machine->ip)
00807         Debug(CL_NOTE, "cluster connect aborted, machine %u.%u.%u.%u not in cluster", DOT_SEPARATED(machine->ip));
00808       delete machine;
00809       machine = NULL;
00810       delete this;
00811       return EVENT_DONE;
00812     }
00813     
00814     Debug(CL_NOTE, "connect_re from %u.%u.%u.%u to %u.%u.%u.%u",
00815           DOT_SEPARATED(this_cluster_machine()->ip), DOT_SEPARATED(machine->ip));
00816     ip = machine->ip;
00817 
00818     NetVCOptions opt;
00819     opt.socket_send_bufsize = cluster_send_buffer_size;
00820     opt.socket_recv_bufsize = cluster_receive_buffer_size;
00821     opt.sockopt_flags = cluster_sockopt_flags;
00822     opt.packet_mark = cluster_packet_mark;
00823     opt.packet_tos = cluster_packet_tos;
00824     opt.etype = ET_CLUSTER;
00825     opt.addr_binding = NetVCOptions::INTF_ADDR;
00826     opt.local_ip = this_cluster_machine()->ip;
00827 
00828     struct sockaddr_in addr;
00829     ats_ip4_set(&addr, machine->ip,
00830         htons(machine->cluster_port ? machine->cluster_port : cluster_port));
00831 
00832     
00833     netProcessor.connect_re(this, ats_ip_sa_cast(&addr), &opt);
00834     return EVENT_DONE;
00835   } else {
00836     if (event == NET_EVENT_OPEN) {
00837       net_vc = (NetVConnection *) e;
00838       SET_HANDLER((ClusterContHandler) & ClusterHandler::startClusterEvent);
00839       eventProcessor.schedule_imm(this, ET_CLUSTER);
00840       return EVENT_DONE;
00841 
00842     } else {
00843       eventProcessor.schedule_in(this, CLUSTER_MEMBER_DELAY);
00844       return EVENT_CONT;
00845     }
00846   }
00847 }
00848 
00849 int
00850 ClusterHandler::startClusterEvent(int event, Event * e)
00851 {
00852   char textbuf[sizeof("255.255.255.255:65535")];
00853 
00854   
00855 
00856   (void) event;
00857   ink_assert(!read_vcs);
00858   ink_assert(!write_vcs);
00859 
00860   if (event == EVENT_IMMEDIATE) {
00861     if (cluster_connect_state == ClusterHandler::CLCON_INITIAL) {
00862       cluster_connect_state = ClusterHandler::CLCON_SEND_MSG;
00863     } else {
00864       ink_release_assert(!"startClusterEvent, EVENT_IMMEDIATE not expected");
00865     }
00866   } else {
00867     ink_release_assert(event == EVENT_INTERVAL);
00868   }
00869 
00870   for (;;) {
00871 
00872     switch (cluster_connect_state) {
00873 
00874     case ClusterHandler::CLCON_INITIAL:
00875 
00876       {
00877         ink_release_assert(!"Invalid state [CLCON_INITIAL]");
00878       }
00879 
00880     case ClusterHandler::CLCON_SEND_MSG:
00881 
00882       {
00883         
00884 #ifdef LOCAL_CLUSTER_TEST_MODE
00885         nodeClusteringVersion._port = cluster_port;
00886 #endif
00887         cluster_connect_state = ClusterHandler::CLCON_SEND_MSG_COMPLETE;
00888         if (connector)
00889           nodeClusteringVersion._id = id;
00890         build_data_vector((char *) &nodeClusteringVersion, sizeof(nodeClusteringVersion), false);
00891         if (!write.doIO()) {
00892           
00893           cluster_connect_state = ClusterHandler::CLCON_SEND_MSG;
00894           eventProcessor.schedule_in(this, CLUSTER_PERIOD, ET_CLUSTER);
00895           return EVENT_DONE;
00896         }
00897         break;
00898       }
00899 
00900     case ClusterHandler::CLCON_SEND_MSG_COMPLETE:
00901 
00902       {
00903         if (write.io_complete) {
00904           if ((write.io_complete < 0)
00905               || ((size_t) write.did < sizeof(nodeClusteringVersion))) {
00906             Debug(CL_NOTE, "unable to write to cluster node %u.%u.%u.%u: %d",
00907                   DOT_SEPARATED(ip), write.io_complete_event);
00908             cluster_connect_state = ClusterHandler::CLCON_ABORT_CONNECT;
00909             break;              
00910           }
00911           
00912           build_data_vector((char *) &clusteringVersion, sizeof(clusteringVersion), true);
00913           cluster_connect_state = ClusterHandler::CLCON_READ_MSG;
00914           break;
00915         } else {
00916           
00917           eventProcessor.schedule_in(this, CLUSTER_PERIOD, ET_CLUSTER);
00918           return EVENT_DONE;
00919         }
00920       }
00921 
00922     case ClusterHandler::CLCON_READ_MSG:
00923 
00924       {
00925         cluster_connect_state = ClusterHandler::CLCON_READ_MSG_COMPLETE;
00926         if (!read.doIO()) {
00927           
00928           cluster_connect_state = ClusterHandler::CLCON_READ_MSG;
00929           eventProcessor.schedule_in(this, CLUSTER_PERIOD, ET_CLUSTER);
00930           return EVENT_DONE;
00931         }
00932         break;
00933       }
00934 
00935     case ClusterHandler::CLCON_READ_MSG_COMPLETE:
00936 
00937       {
00938         if (read.io_complete) {
00939           if (read.io_complete < 0) {
00940             
00941             cluster_connect_state = ClusterHandler::CLCON_ABORT_CONNECT;
00942             break;              
00943           }
00944           if ((size_t) read.did < sizeof(clusteringVersion)) {
00945             
00946             cluster_connect_state = ClusterHandler::CLCON_READ_MSG;
00947             break;
00948           }
00949           cluster_connect_state = ClusterHandler::CLCON_VALIDATE_MSG;
00950           break;
00951         } else {
00952           
00953           eventProcessor.schedule_in(this, CLUSTER_PERIOD, ET_CLUSTER);
00954           return EVENT_DONE;
00955         }
00956       }
00957 
00958     case ClusterHandler::CLCON_VALIDATE_MSG:
00959 
00960       {
00961         int proto_major = -1;
00962         int proto_minor = -1;
00963 
00964         clusteringVersion.AdjustByteOrder();
00965 
00966         
00967         
00968         
00969         
00970         
00971 
00972         for (int major = clusteringVersion._major; major >= clusteringVersion._min_major; --major) {
00973           if ((major >= nodeClusteringVersion._min_major) && (major <= nodeClusteringVersion._major)) {
00974             proto_major = major;
00975           }
00976         }
00977         if (proto_major > 0) {
00978 
00979           
00980 
00981           if (proto_major == clusteringVersion._major) {
00982             proto_minor = clusteringVersion._minor;
00983 
00984             if (proto_minor != nodeClusteringVersion._minor)
00985               Warning("Different clustering minor versions (%d,%d) for node %u.%u.%u.%u, continuing",
00986                       proto_minor, nodeClusteringVersion._minor, DOT_SEPARATED(ip));
00987           } else {
00988             proto_minor = 0;
00989           }
00990 
00991         } else {
00992           Warning("Bad cluster major version range (%d-%d) for node %u.%u.%u.%u connect failed",
00993                   clusteringVersion._min_major, clusteringVersion._major, DOT_SEPARATED(ip));
00994           cluster_connect_state = ClusterHandler::CLCON_ABORT_CONNECT;
00995           break;                
00996         }
00997 
00998 #ifdef LOCAL_CLUSTER_TEST_MODE
00999         port = clusteringVersion._port & 0xffff;
01000 #endif
01001         if (!connector)
01002           id = clusteringVersion._id & 0xffff;
01003 
01004         machine->msg_proto_major = proto_major;
01005         machine->msg_proto_minor = proto_minor;
01006 
01007         if (eventProcessor.n_threads_for_type[ET_CLUSTER] != num_of_cluster_threads) {
01008           cluster_connect_state = ClusterHandler::CLCON_ABORT_CONNECT;
01009           break;
01010         }
01011 
01012         thread = eventProcessor.eventthread[ET_CLUSTER][id % num_of_cluster_threads];
01013         if (net_vc->thread == thread) {
01014           cluster_connect_state = CLCON_CONN_BIND_OK;
01015           break;
01016         } else { 
01017           cluster_connect_state = ClusterHandler::CLCON_CONN_BIND_CLEAR;
01018         }
01019       }
01020 
01021     case ClusterHandler::CLCON_CONN_BIND_CLEAR:
01022       {
01023         UnixNetVConnection *vc = (UnixNetVConnection *)net_vc; 
01024         MUTEX_TRY_LOCK(lock, vc->nh->mutex, e->ethread);
01025         MUTEX_TRY_LOCK(lock1, vc->mutex, e->ethread);
01026         if (lock && lock1) {
01027           vc->ep.stop();
01028           vc->nh->open_list.remove(vc);
01029           vc->thread = NULL;
01030           if (vc->nh->read_ready_list.in(vc))
01031             vc->nh->read_ready_list.remove(vc);
01032           if (vc->nh->write_ready_list.in(vc))
01033             vc->nh->write_ready_list.remove(vc);
01034           if (vc->read.in_enabled_list)
01035             vc->nh->read_enable_list.remove(vc);
01036           if (vc->write.in_enabled_list)
01037             vc->nh->write_enable_list.remove(vc);
01038 
01039           
01040           cluster_connect_state = ClusterHandler::CLCON_CONN_BIND;
01041           thread->schedule_in(this, CLUSTER_PERIOD);
01042           return EVENT_DONE;
01043         } else {
01044           
01045           vc->thread->schedule_in(this, CLUSTER_PERIOD);
01046           return EVENT_DONE;
01047         }
01048       }
01049 
01050     case ClusterHandler::CLCON_CONN_BIND:
01051       {
01052         
01053         NetHandler *nh = get_NetHandler(e->ethread);
01054         UnixNetVConnection *vc = (UnixNetVConnection *)net_vc; 
01055         MUTEX_TRY_LOCK(lock, nh->mutex, e->ethread);
01056         MUTEX_TRY_LOCK(lock1, vc->mutex, e->ethread);
01057         if (lock && lock1) {
01058           if (vc->read.in_enabled_list)
01059             nh->read_enable_list.push(vc);
01060           if (vc->write.in_enabled_list)
01061             nh->write_enable_list.push(vc);
01062 
01063           vc->nh = nh;
01064           vc->thread = e->ethread;
01065           PollDescriptor *pd = get_PollDescriptor(e->ethread);
01066           if (vc->ep.start(pd, vc, EVENTIO_READ|EVENTIO_WRITE) < 0) {
01067             cluster_connect_state = ClusterHandler::CLCON_DELETE_CONNECT;
01068             break;                
01069           }
01070 
01071           nh->open_list.enqueue(vc);
01072           cluster_connect_state = ClusterHandler::CLCON_CONN_BIND_OK;
01073         } else {
01074           thread->schedule_in(this, CLUSTER_PERIOD);
01075           return EVENT_DONE;
01076         }
01077       }
01078 
01079     case ClusterHandler::CLCON_CONN_BIND_OK:
01080       {
01081         int failed = 0;
01082 
01083         
01084         MUTEX_TAKE_LOCK(the_cluster_config_mutex, this_ethread());
01085         MachineList *cc = the_cluster_config();
01086         if (cc && cc->find(ip, port)) {
01087           ClusterConfiguration *c = this_cluster()->current_configuration();
01088           ClusterMachine *m = c->find(ip, port);
01089           
01090           if (!m) { 
01091             ClusterConfiguration *cconf = configuration_add_machine(c, machine);
01092             CLUSTER_INCREMENT_DYN_STAT(CLUSTER_NODES_STAT);
01093             this_cluster()->configurations.push(cconf);
01094           } else {
01095             
01096             if (id >= m->num_connections || m->clusterHandlers[id]) {
01097               failed = -2;
01098               MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread());
01099               goto failed;
01100             }
01101             machine = m;
01102           }
01103           machine->now_connections++;
01104           machine->clusterHandlers[id] = this;
01105           machine->dead = false;
01106           dead = false;
01107         } else {
01108           Debug(CL_NOTE, "cluster connect aborted, machine %u.%u.%u.%u:%d not in cluster", DOT_SEPARATED(ip), port);
01109           failed = -1;
01110         }
01111         MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread());
01112 failed:
01113         if (failed) {
01114           if (failed == -1) {
01115             if (++configLookupFails <= CONFIG_LOOKUP_RETRIES) {
01116               thread->schedule_in(this, CLUSTER_PERIOD);
01117               return EVENT_DONE;
01118             }
01119           }
01120           cluster_connect_state = ClusterHandler::CLCON_DELETE_CONNECT;
01121           break;                
01122         }
01123 
01124         this->needByteSwap = !clusteringVersion.NativeByteOrder();
01125         machine_online_APIcallout(ip);
01126 
01127         
01128         snprintf(textbuf, sizeof(textbuf), "%hhu.%hhu.%hhu.%hhu:%d", DOT_SEPARATED(ip), port);
01129         RecSignalManager(REC_SIGNAL_MACHINE_UP, textbuf);
01130 #ifdef LOCAL_CLUSTER_TEST_MODE
01131         Note("machine up %hhu.%hhu.%hhu.%hhu:%d, protocol version=%d.%d",
01132              DOT_SEPARATED(ip), port, clusteringVersion._major, clusteringVersion._minor);
01133 #else
01134         Note("machine up %hhu.%hhu.%hhu.%hhu:%d, protocol version=%d.%d",
01135              DOT_SEPARATED(ip), id, clusteringVersion._major, clusteringVersion._minor);
01136 #endif
01137 
01138         read_vcs = new Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_read_link>[CLUSTER_BUCKETS];
01139         write_vcs = new Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_write_link>[CLUSTER_BUCKETS];
01140         SET_HANDLER((ClusterContHandler) & ClusterHandler::beginClusterEvent);
01141 
01142         
01143         read.do_iodone_event = true;
01144         write.do_iodone_event = true;
01145 
01146         cluster_periodic_event = thread->schedule_every(this, -CLUSTER_PERIOD);
01147 
01148         
01149         
01150 
01151         int procs_online = ink_number_of_processors();
01152         int total_callbacks = min(procs_online, MAX_COMPLETION_CALLBACK_EVENTS);
01153         for (int n = 0; n < total_callbacks; ++n) {
01154           callout_cont[n] = new ClusterCalloutContinuation(this);
01155           callout_events[n] = eventProcessor.schedule_every(callout_cont[n], COMPLETION_CALLBACK_PERIOD, ET_NET);
01156         }
01157 
01158         
01159 
01160         if (!clm) {
01161           clm = new ClusterLoadMonitor(this);
01162           clm->init();
01163         }
01164         return EVENT_DONE;
01165       }
01166 
01167     case ClusterHandler::CLCON_ABORT_CONNECT:
01168 
01169       {
01170         if (connector) {
01171           Debug(CL_NOTE, "cluster connect retry for %u.%u.%u.%u", DOT_SEPARATED(ip));
01172           
01173           clusterProcessor.connect(ip, port, id, true);
01174         }
01175         cluster_connect_state = ClusterHandler::CLCON_DELETE_CONNECT;
01176         break;                  
01177       }
01178 
01179     case ClusterHandler::CLCON_DELETE_CONNECT:
01180 
01181       {
01182         
01183         delete machine;
01184         machine = NULL;
01185         delete this;
01186         Debug(CL_NOTE, "Failed cluster connect, deleting");
01187         return EVENT_DONE;
01188       }
01189 
01190     default:
01191 
01192       {
01193         Warning("startClusterEvent invalid state %d", cluster_connect_state);
01194         ink_release_assert(!"ClusterHandler::startClusterEvent invalid state");
01195         return EVENT_DONE;
01196       }
01197 
01198     }                           
01199   }                             
01200   return EVENT_DONE;
01201 }
01202 
01203 int
01204 ClusterHandler::beginClusterEvent(int , Event * e)
01205 {
01206   
01207 #ifdef CLUSTER_IMMEDIATE_NETIO
01208   build_poll(false);
01209 #endif
01210   SET_HANDLER((ClusterContHandler) & ClusterHandler::mainClusterEvent);
01211   return handleEvent(EVENT_INTERVAL, e);
01212 }
01213 
01214 int
01215 ClusterHandler::zombieClusterEvent(int event, Event * e)
01216 {
01217   
01218   
01219   
01220   
01221   
01222   (void) event;
01223   (void) e;
01224   delete this;                  
01225   return EVENT_DONE;
01226 }
01227 
01228 int
01229 ClusterHandler::protoZombieEvent(int , Event * e)
01230 {
01231   
01232   
01233   
01234   
01235   
01236   bool failed = false;
01237   ink_hrtime delay = CLUSTER_MEMBER_DELAY * 5;
01238   EThread *t = e ? e->ethread : this_ethread();
01239   head_p item;
01240 
01241 
01242   
01243 
01244   mainClusterEvent(EVENT_INTERVAL, e);
01245 
01246   item.data = external_incoming_open_local.head.data;
01247   if (TO_PTR(FREELIST_POINTER(item)) ||
01248       delayed_reads.head || pw_write_descriptors_built
01249       || pw_freespace_descriptors_built || pw_controldata_descriptors_built) {
01250     
01251     if (e) {
01252       e->schedule_in(delay);
01253       return EVENT_CONT;
01254     } else {
01255       eventProcessor.schedule_in(this, delay, ET_CLUSTER);
01256       return EVENT_DONE;
01257     }
01258   }
01259 
01260   
01261 
01262   IncomingControl *ic;
01263   while ((ic = incoming_control.dequeue())) {
01264     failed = true;
01265     ic->mutex = NULL;
01266     ic->freeall();
01267   }
01268 
01269 
01270   
01271   
01272 
01273   for (int i = 0; i < n_channels; i++) {
01274     ClusterVConnection *vc = channels[i];
01275     if (VALID_CHANNEL(vc)) {
01276       if (!vc->closed && vc->read.vio.op == VIO::READ) {
01277         MUTEX_TRY_LOCK(lock, vc->read.vio.mutex, t);
01278         if (lock) {
01279           cluster_signal_error_and_update(vc, &vc->read, 0);
01280         } else {
01281           failed = true;
01282         }
01283       }
01284       vc = channels[i];
01285       if (VALID_CHANNEL(vc)
01286           && !vc->closed && vc->write.vio.op == VIO::WRITE) {
01287         MUTEX_TRY_LOCK(lock, vc->write.vio.mutex, t);
01288         if (lock) {
01289           cluster_signal_error_and_update(vc, &vc->write, 0);
01290         } else {
01291           failed = true;
01292         }
01293       }
01294       vc = channels[i];
01295       if (VALID_CHANNEL(vc)) {
01296         if (vc->closed) {
01297           vc->ch = 0;
01298           vc->write_list = 0;
01299           vc->write_list_tail = 0;
01300           vc->write_list_bytes = 0;
01301           vc->write_bytes_in_transit = 0;
01302           close_ClusterVConnection(vc);
01303         } else {
01304           failed = true;
01305         }
01306       }
01307     }
01308   }
01309 
01310 
01311   
01312   
01313 
01314   item.data = external_incoming_control.head.data;
01315   if (TO_PTR(FREELIST_POINTER(item)) == NULL) {
01316     for (int n = 0; n < MAX_COMPLETION_CALLBACK_EVENTS; ++n) {
01317       if (callout_cont[n]) {
01318         MUTEX_TRY_LOCK(lock, callout_cont[n]->mutex, t);
01319         if (lock) {
01320           callout_events[n]->cancel(callout_cont[n]);
01321           callout_events[n] = 0;
01322           delete callout_cont[n];
01323           callout_cont[n] = 0;
01324         } else {
01325           failed = true;
01326         }
01327       }
01328     }
01329   } else {
01330     failed = true;
01331   }
01332 
01333   if (!failed) {
01334     Debug("cluster_down", "ClusterHandler zombie [%u.%u.%u.%u]", DOT_SEPARATED(ip));
01335     SET_HANDLER((ClusterContHandler) & ClusterHandler::zombieClusterEvent);
01336     delay = NO_RACE_DELAY;
01337   }
01338   if (e) {
01339     e->schedule_in(delay);
01340     return EVENT_CONT;
01341   } else {
01342     eventProcessor.schedule_in(this, delay, ET_CLUSTER);
01343     return EVENT_DONE;
01344   }
01345 }
01346 
01347 int dump_verbose = 0;
01348 
01349 int
01350 ClusterHandler::compute_active_channels()
01351 {
01352   ClusterHandler *ch = this;
01353   int active_chans = 0;
01354 
01355   for (int i = LAST_DEDICATED_CHANNEL + 1; i < ch->n_channels; i++) {
01356     ClusterVConnection *vc = ch->channels[i];
01357     if (VALID_CHANNEL(vc) && (vc->iov_map != CLUSTER_IOV_NOT_OPEN)) {
01358       ++active_chans;
01359       if (dump_verbose) {
01360         printf("ch[%d] vc=0x%p remote_free=%d last_local_free=%d\n", i, vc,
01361                vc->remote_free, vc->last_local_free);
01362         printf("  r_bytes=%d r_done=%d w_bytes=%d w_done=%d\n",
01363                (int)vc->read.vio.nbytes, (int)vc->read.vio.ndone,
01364                (int)vc->write.vio.nbytes, (int)vc->write.vio.ndone);
01365       }
01366     }
01367   }
01368   return active_chans;
01369 }
01370 
01371 void
01372 ClusterHandler::dump_internal_data()
01373 {
01374   if (!message_blk) {
01375     message_blk = new_IOBufferBlock();
01376     message_blk->alloc(MAX_IOBUFFER_SIZE);
01377   }
01378   int r;
01379   int n = 0;
01380   char *b = message_blk->data->data();
01381   unsigned int b_size = message_blk->data->block_size();
01382 
01383   r = snprintf(&b[n], b_size - n, "Host: %hhu.%hhu.%hhu.%hhu\n", DOT_SEPARATED(ip));
01384   n += r;
01385 
01386   r = snprintf(&b[n], b_size - n,
01387                "chans: %d vc_writes: %" PRId64 " write_bytes: %" PRId64 "(d)+%" PRId64 "(c)=%" PRId64 "\n",
01388                compute_active_channels(),
01389                _vc_writes, _vc_write_bytes, _control_write_bytes, _vc_write_bytes + _control_write_bytes);
01390 
01391   n += r;
01392   r = snprintf(&b[n], b_size - n,
01393                "dw: missed_lock: %d not_enabled: %d wait_remote_fill: %d no_active_vio: %d\n",
01394                _dw_missed_lock, _dw_not_enabled, _dw_wait_remote_fill, _dw_no_active_vio);
01395 
01396   n += r;
01397   r = snprintf(&b[n], b_size - n,
01398                "dw: not_enabled_or_no_write: %d set_data_pending: %d no_free_space: %d\n",
01399                _dw_not_enabled_or_no_write, _dw_set_data_pending, _dw_no_free_space);
01400 
01401   n += r;
01402   r = snprintf(&b[n], b_size - n,
01403                "fw: missed_lock: %d not_enabled: %d wait_remote_fill: %d no_active_vio: %d\n",
01404                _fw_missed_lock, _fw_not_enabled, _fw_wait_remote_fill, _fw_no_active_vio);
01405 
01406   n += r;
01407   r = snprintf(&b[n], b_size - n, "fw: not_enabled_or_no_read: %d\n", _fw_not_enabled_or_no_read);
01408 
01409   n += r;
01410   r = snprintf(&b[n], b_size - n,
01411                "rd(%d): st:%d rh:%d ahd:%d sd:%d rd:%d ad:%d sda:%d rda:%d awd:%d p:%d c:%d\n",
01412                _process_read_calls, _n_read_start, _n_read_header, _n_read_await_header,
01413                _n_read_setup_descriptor, _n_read_descriptor, _n_read_await_descriptor,
01414                _n_read_setup_data, _n_read_data, _n_read_await_data, _n_read_post_complete, _n_read_complete);
01415 
01416   n += r;
01417   r = snprintf(&b[n], b_size - n,
01418                "wr(%d): st:%d set:%d ini:%d wait:%d post:%d comp:%d\n",
01419                _process_write_calls, _n_write_start, _n_write_setup, _n_write_initiate,
01420                _n_write_await_completion, _n_write_post_complete, _n_write_complete);
01421 
01422   n += r;
01423   ink_release_assert((n + 1) <= BUFFER_SIZE_FOR_INDEX(MAX_IOBUFFER_SIZE));
01424   Note("%s", b);
01425   clear_cluster_stats();
01426 }
01427 
01428 void
01429 ClusterHandler::dump_write_msg(int res)
01430 {
01431   
01432   Alias32 x;
01433   x.u32 = (uint32_t) ((struct sockaddr_in *)(net_vc->get_remote_addr()))->sin_addr.s_addr;
01434 
01435   fprintf(stderr,
01436           "[W] %hhu.%hhu.%hhu.%hhu SeqNo=%u, Cnt=%d, CntlCnt=%d Todo=%d, Res=%d\n",
01437           x.byte[0], x.byte[1], x.byte[2], x.byte[3], write.sequence_number, write.msg.count, write.msg.control_bytes, write.to_do, res);
01438   for (int i = 0; i < write.msg.count; ++i) {
01439     fprintf(stderr, "   d[%i] Type=%d, Chan=%d, SeqNo=%d, Len=%u\n",
01440             i, (write.msg.descriptor[i].type ? 1 : 0),
01441             (int) write.msg.descriptor[i].channel,
01442             (int) write.msg.descriptor[i].sequence_number, write.msg.descriptor[i].length);
01443   }
01444 }
01445 
01446 void
01447 ClusterHandler::dump_read_msg()
01448 {
01449   
01450   Alias32 x;
01451   x.u32 = (uint32_t) ((struct sockaddr_in *)(net_vc->get_remote_addr()))->sin_addr.s_addr;
01452 
01453   fprintf(stderr, "[R] %hhu.%hhu.%hhu.%hhu  SeqNo=%u, Cnt=%d, CntlCnt=%d\n",
01454           x.byte[0], x.byte[1], x.byte[2], x.byte[3], read.sequence_number, read.msg.count, read.msg.control_bytes);
01455   for (int i = 0; i < read.msg.count; ++i) {
01456     fprintf(stderr, "   d[%i] Type=%d, Chan=%d, SeqNo=%d, Len=%u\n",
01457             i, (read.msg.descriptor[i].type ? 1 : 0),
01458             (int) read.msg.descriptor[i].channel,
01459             (int) read.msg.descriptor[i].sequence_number, read.msg.descriptor[i].length);
01460   }
01461 }
01462 
01463