00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include "P_Cluster.h"
00030
00031 extern int cluster_receive_buffer_size;
00032 extern int cluster_send_buffer_size;
00033 extern uint32_t cluster_sockopt_flags;
00034 extern uint32_t cluster_packet_mark;
00035 extern uint32_t cluster_packet_tos;
00036 extern int num_of_cluster_threads;
00037
00038
00039
00040
00041
00042
00043 ClusterCalloutContinuation::ClusterCalloutContinuation(struct ClusterHandler *ch)
00044 :
00045 Continuation(0),
00046 _ch(ch)
00047 {
00048 mutex = new_ProxyMutex();
00049 SET_HANDLER((ClstCoutContHandler)
00050 & ClusterCalloutContinuation::CalloutHandler);
00051 }
00052
00053 ClusterCalloutContinuation::~ClusterCalloutContinuation()
00054 {
00055 mutex = 0;
00056 }
00057
00058 int
00059 ClusterCalloutContinuation::CalloutHandler(int , Event * )
00060 {
00061 return _ch->process_incoming_callouts(this->mutex);
00062 }
00063
00064
00065
00066
00067 ClusterControl::ClusterControl():
00068 Continuation(NULL), len(0), size_index(-1), real_data(0), data(0), free_proc(0), free_proc_arg(0), iob_block(0)
00069 {
00070 }
00071
00072 void
00073 ClusterControl::real_alloc_data(int read_access, bool align_int32_on_non_int64_boundary)
00074 {
00075 EThread *thread = this_ethread();
00076 ProxyMutex *mutex = thread->mutex;
00077
00078 ink_assert(!data);
00079 if ((len + DATA_HDR + sizeof(int32_t)) <= DEFAULT_MAX_BUFFER_SIZE) {
00080 size_index = buffer_size_to_index(len + DATA_HDR + sizeof(int32_t), MAX_BUFFER_SIZE_INDEX);
00081 iob_block = new_IOBufferBlock();
00082 iob_block->alloc(size_index);
00083 real_data = (int64_t *) iob_block->buf();
00084
00085 if (align_int32_on_non_int64_boundary) {
00086 data = ((char *) real_data) + sizeof(int32_t) + DATA_HDR;
00087 } else {
00088 data = ((char *) real_data) + DATA_HDR;
00089 }
00090 } else {
00091 int size = sizeof(int64_t) * (((len + DATA_HDR + sizeof(int32_t) + sizeof(int64_t) - 1) / sizeof(int64_t)) + 1);
00092 size_index = -1;
00093 iob_block = new_IOBufferBlock();
00094 iob_block->alloc(BUFFER_SIZE_FOR_XMALLOC(size));
00095 real_data = (int64_t *) iob_block->buf();
00096
00097 if (align_int32_on_non_int64_boundary) {
00098 data = (char *) DOUBLE_ALIGN(real_data) + sizeof(int32_t) + DATA_HDR;
00099 } else {
00100 data = (char *) DOUBLE_ALIGN(real_data) + DATA_HDR;
00101 }
00102 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_ALLOC_DATA_NEWS_STAT);
00103 }
00104
00105
00106 if (read_access) {
00107
00108 iob_block->fill((char *) data - (char *) real_data);
00109 iob_block->consume((char *) data - (char *) real_data);
00110 iob_block->fill(len);
00111 } else {
00112
00113 iob_block->fill((char *) data - (char *) real_data);
00114 iob_block->consume((char *) data - (char *) real_data);
00115 iob_block->_buf_end = iob_block->end() + len;
00116 }
00117
00118
00119 char *size_index_ptr = (char *) data - DATA_HDR;
00120 *size_index_ptr = size_index;
00121 ++size_index_ptr;
00122
00123 *size_index_ptr = (char) ALLOC_DATA_MAGIC;
00124 ++size_index_ptr;
00125
00126 void *val = (void *) this;
00127 memcpy(size_index_ptr, (char *) &val, sizeof(void *));
00128 }
00129
00130 void
00131 ClusterControl::free_data()
00132 {
00133 if (data && iob_block) {
00134 if (free_proc) {
00135
00136 (*free_proc) (free_proc_arg);
00137 iob_block = 0;
00138 return;
00139 }
00140 if (real_data) {
00141 ink_release_assert(*(((uint8_t *) data) - DATA_HDR + 1) == (uint8_t) ALLOC_DATA_MAGIC);
00142 *(((uint8_t *) data) - DATA_HDR + 1) = (uint8_t) ~ ALLOC_DATA_MAGIC;
00143
00144 ink_release_assert(*(((char *) data) - DATA_HDR) == size_index);
00145 } else {
00146
00147
00148 }
00149 iob_block = 0;
00150 }
00151 }
00152
00153
00154
00155
00156 IncomingControl *
00157 IncomingControl::alloc()
00158 {
00159 return inControlAllocator.alloc();
00160 }
00161
00162 IncomingControl::IncomingControl()
00163 :recognized_time(0)
00164 {
00165 }
00166
00167 void
00168 IncomingControl::freeall()
00169 {
00170 free_data();
00171 inControlAllocator.free(this);
00172 }
00173
00174
00175
00176
00177 OutgoingControl *
00178 OutgoingControl::alloc()
00179 {
00180 return outControlAllocator.alloc();
00181 }
00182
00183 OutgoingControl::OutgoingControl()
00184 :ch(NULL), submit_time(0)
00185 {
00186 }
00187
00188 int
00189 OutgoingControl::startEvent(int event, Event * e)
00190 {
00191
00192
00193
00194
00195 (void) event;
00196 (void) e;
00197
00198 if (!ch || !ch->thread)
00199 return EVENT_DONE;
00200
00201 int32_t cluster_fn = *(int32_t *) this->data;
00202 int32_t pri = ClusterFuncToQpri(cluster_fn);
00203 ink_atomiclist_push(&ch->outgoing_control_al[pri], (void *) this);
00204
00205 return EVENT_DONE;
00206 }
00207
00208 void
00209 OutgoingControl::freeall()
00210 {
00211 free_data();
00212 outControlAllocator.free(this);
00213 }
00214
00215
00216
00217
00218 ClusterState::ClusterState(ClusterHandler * c, bool read_chan):
00219 Continuation(0),
00220 ch(c),
00221 read_channel(read_chan),
00222 do_iodone_event(false),
00223 n_descriptors(0),
00224 sequence_number(0),
00225 to_do(0),
00226 did(0),
00227 n_iov(0),
00228 io_complete(1),
00229 io_complete_event(0),
00230 v(0),
00231 bytes_xfered(0),
00232 last_ndone(0),
00233 total_bytes_xfered(0),
00234 iov(NULL),
00235 iob_iov(NULL),
00236 byte_bank(NULL),
00237 n_byte_bank(0), byte_bank_size(0), missed(0), missed_msg(false), read_state_t(READ_START), write_state_t(WRITE_START)
00238 {
00239 mutex = new_ProxyMutex();
00240 if (read_channel) {
00241 state = ClusterState::READ_START;
00242 SET_HANDLER(&ClusterState::doIO_read_event);
00243 } else {
00244 state = ClusterState::WRITE_START;
00245 SET_HANDLER(&ClusterState::doIO_write_event);
00246 }
00247 last_time = HRTIME_SECONDS(0);
00248 start_time = HRTIME_SECONDS(0);
00249 int size;
00250
00251
00252
00253
00254
00255
00256
00257
00258 size_t pagesize = ats_pagesize();
00259 size = ((MAX_TCOUNT + 1) * sizeof(IOVec)) + (2 * pagesize);
00260 iob_iov = new_IOBufferData(BUFFER_SIZE_FOR_XMALLOC(size));
00261 char *addr = (char *) align_pointer_forward(iob_iov->data(), pagesize);
00262
00263 iov = (IOVec *) (addr + pagesize);
00264
00265
00266
00267
00268 size = sizeof(ClusterMsgHeader) + (MAX_TCOUNT + 1) * sizeof(Descriptor)
00269 + CONTROL_DATA + (2 * pagesize);
00270 msg.iob_descriptor_block = new_IOBufferBlock();
00271 msg.iob_descriptor_block->alloc(BUFFER_SIZE_FOR_XMALLOC(size));
00272
00273 addr = (char *) align_pointer_forward(msg.iob_descriptor_block->data->data(), pagesize);
00274
00275 addr = addr + pagesize;
00276 memset(addr, 0, size - (2 * pagesize));
00277 msg.descriptor = (Descriptor *) (addr + sizeof(ClusterMsgHeader));
00278
00279 mbuf = new_empty_MIOBuffer();
00280 }
00281
00282 ClusterState::~ClusterState()
00283 {
00284 mutex = 0;
00285 if (iov) {
00286 iob_iov = 0;
00287 }
00288
00289 if (msg.descriptor) {
00290 msg.iob_descriptor_block = 0;
00291 }
00292
00293 int n;
00294 for (n = 0; n < MAX_TCOUNT; ++n) {
00295 block[n] = 0;
00296 }
00297 free_empty_MIOBuffer(mbuf);
00298 mbuf = 0;
00299 }
00300
00301 void
00302 ClusterState::build_do_io_vector()
00303 {
00304
00305
00306
00307
00308 int bytes_to_xfer = 0;
00309 int n;
00310 IOBufferBlock *last_block = 0;
00311
00312 mbuf->clear();
00313
00314
00315
00316 for (n = 0; n < n_iov; ++n) {
00317 bytes_to_xfer += iov[n].iov_len;
00318
00319 if (last_block) {
00320 last_block->next = block[n];
00321 }
00322 last_block = block[n];
00323 while (last_block->next) {
00324 last_block = last_block->next;
00325 }
00326 }
00327 mbuf->_writer = block[0];
00328 ink_release_assert(bytes_to_xfer == to_do);
00329 ink_assert(bytes_to_xfer == bytes_IOBufferBlockList(mbuf->_writer, !read_channel));
00330 }
00331
00332 #ifdef CLUSTER_TOMCAT
00333 #define REENABLE_IO() \
00334 if (!ch->on_stolen_thread && !io_complete) { \
00335 v->reenable_re(); \
00336 }
00337
00338 #else // !CLUSTER_TOMCAT
00339
00340 #ifdef CLUSTER_IMMEDIATE_NETIO
00341 #define REENABLE_IO() \
00342 if (!io_complete) { \
00343 ((NetVConnection *) v->vc_server)->reenable_re_now(v); \
00344 }
00345
00346 #else // !CLUSTER_IMMEDIATE_NETIO
00347
00348 #define REENABLE_IO() \
00349 if (!io_complete) { \
00350 v->reenable_re(); \
00351 }
00352 #endif // !CLUSTER_IMMEDIATE_NETIO
00353
00354 #endif // !CLUSTER_TOMCAT
00355
00356 int
00357 ClusterState::doIO()
00358 {
00359 ink_release_assert(io_complete);
00360 #if !defined(CLUSTER_IMMEDIATE_NETIO)
00361 MUTEX_TRY_LOCK(lock, this->mutex, this_ethread());
00362 if (!lock) {
00363 return 0;
00364 }
00365 #endif
00366
00367 if (!ch->net_vc) {
00368
00369 io_complete = 1;
00370 bytes_xfered += to_do;
00371 to_do = 0;
00372 return 1;
00373 }
00374
00375
00376
00377 if ((to_do && (io_complete_event == VC_EVENT_READ_READY)) || (io_complete_event == VC_EVENT_WRITE_READY)) {
00378
00379 if (read_channel) {
00380
00381 ink_assert(v->buffer.writer()->current_write_avail() == to_do);
00382
00383 } else {
00384
00385 ink_assert(v->buffer.reader()->read_avail() == to_do);
00386 }
00387
00388
00389 v->nbytes = to_do + did;
00390 ink_release_assert(v->nbytes > v->ndone);
00391
00392 io_complete = false;
00393 io_complete_event = 0;
00394 REENABLE_IO();
00395
00396 } else {
00397
00398
00399
00400 io_complete = false;
00401 io_complete_event = 0;
00402 bytes_xfered = 0;
00403 last_ndone = 0;
00404
00405 build_do_io_vector();
00406
00407 if (read_channel) {
00408 ink_assert(mbuf->current_write_avail() == to_do);
00409 #ifdef CLUSTER_IMMEDIATE_NETIO
00410 v = ch->net_vc->do_io_read_now(this, to_do, mbuf);
00411 #else
00412 v = ch->net_vc->do_io_read(this, to_do, mbuf);
00413 #endif
00414 REENABLE_IO();
00415
00416 } else {
00417 IOBufferReader *r = mbuf->alloc_reader();
00418 r->block = mbuf->_writer;
00419 ink_assert(r->read_avail() == to_do);
00420 #ifdef CLUSTER_IMMEDIATE_NETIO
00421 v = ch->net_vc->do_io_write_now(this, to_do, r);
00422 #else
00423 v = ch->net_vc->do_io_write(this, to_do, r);
00424 #endif
00425 REENABLE_IO();
00426 }
00427 }
00428 return 1;
00429 }
00430
00431 int
00432 ClusterState::doIO_read_event(int event, void *d)
00433 {
00434 ink_release_assert(!io_complete);
00435 if (!v) {
00436 v = (VIO *) d;
00437 }
00438 ink_assert((VIO *) d == v);
00439
00440 switch (event) {
00441 case VC_EVENT_READ_READY:
00442 {
00443
00444 v->nbytes = v->ndone;
00445
00446 }
00447 case VC_EVENT_READ_COMPLETE:
00448 {
00449 bytes_xfered = v->ndone - last_ndone;
00450 if (bytes_xfered) {
00451 total_bytes_xfered += bytes_xfered;
00452 did += bytes_xfered;
00453 to_do -= bytes_xfered;
00454 }
00455 last_ndone = v->ndone;
00456 io_complete_event = event;
00457 INK_WRITE_MEMORY_BARRIER;
00458
00459 io_complete = 1;
00460 IOComplete();
00461
00462 break;
00463 }
00464 case VC_EVENT_EOS:
00465 case VC_EVENT_ERROR:
00466 case VC_EVENT_INACTIVITY_TIMEOUT:
00467 case VC_EVENT_ACTIVE_TIMEOUT:
00468 default:
00469 {
00470 io_complete_event = event;
00471 INK_WRITE_MEMORY_BARRIER;
00472
00473 io_complete = -1;
00474 IOComplete();
00475 break;
00476 }
00477 }
00478
00479 return EVENT_DONE;
00480 }
00481
00482 int
00483 ClusterState::doIO_write_event(int event, void *d)
00484 {
00485 ink_release_assert(!io_complete);
00486 if (!v) {
00487 v = (VIO *) d;
00488 }
00489 ink_assert((VIO *) d == v);
00490
00491 switch (event) {
00492 case VC_EVENT_WRITE_READY:
00493 #ifdef CLUSTER_IMMEDIATE_NETIO
00494 {
00495
00496 v->nbytes = v->ndone;
00497
00498 }
00499 #endif
00500 case VC_EVENT_WRITE_COMPLETE:
00501 {
00502 bytes_xfered = v->ndone - last_ndone;
00503 if (bytes_xfered) {
00504 total_bytes_xfered += bytes_xfered;
00505 did += bytes_xfered;
00506 to_do -= bytes_xfered;
00507 }
00508 last_ndone = v->ndone;
00509 #ifdef CLUSTER_IMMEDIATE_NETIO
00510 io_complete_event = event;
00511 INK_WRITE_MEMORY_BARRIER;
00512
00513 io_complete = 1;
00514 IOComplete();
00515 #else
00516 if (event == VC_EVENT_WRITE_COMPLETE) {
00517 io_complete_event = event;
00518 INK_WRITE_MEMORY_BARRIER;
00519
00520 io_complete = 1;
00521 IOComplete();
00522 } else {
00523 if (bytes_xfered) {
00524 v->reenable_re();
00525 } else {
00526 v->reenable();
00527 }
00528 return EVENT_DONE;
00529 }
00530 #endif
00531 break;
00532 }
00533 case VC_EVENT_EOS:
00534 case VC_EVENT_ERROR:
00535 case VC_EVENT_INACTIVITY_TIMEOUT:
00536 case VC_EVENT_ACTIVE_TIMEOUT:
00537 default:
00538 {
00539 io_complete_event = event;
00540 INK_WRITE_MEMORY_BARRIER;
00541
00542 io_complete = -1;
00543 IOComplete();
00544 break;
00545 }
00546 }
00547
00548 return EVENT_DONE;
00549 }
00550
00551 void
00552 ClusterState::IOComplete()
00553 {
00554
00555
00556
00557
00558
00559 if (do_iodone_event && !ch->mutex->thread_holding) {
00560 MUTEX_TRY_LOCK(lock, ch->mutex, this_ethread());
00561 if (lock) {
00562 ch->handleEvent(EVENT_IMMEDIATE, (void *) 0);
00563 } else {
00564 eventProcessor.schedule_imm_signal(ch, ET_CLUSTER);
00565 }
00566 }
00567 }
00568
00569 int
00570 ClusterHandler::cluster_signal_and_update(int event, ClusterVConnection * vc, ClusterVConnState * s)
00571 {
00572 s->vio._cont->handleEvent(event, &s->vio);
00573
00574 if (vc->closed) {
00575 if (!vc->write_list && !vc->write_bytes_in_transit) {
00576 close_ClusterVConnection(vc);
00577 }
00578 return EVENT_DONE;
00579 } else {
00580 ink_assert((event != VC_EVENT_ERROR) || ((event == VC_EVENT_ERROR) && vc->closed));
00581 return EVENT_CONT;
00582 }
00583 }
00584
00585 int
00586 ClusterHandler::cluster_signal_and_update_locked(int event, ClusterVConnection * vc, ClusterVConnState * s)
00587 {
00588
00589 s->vio._cont->handleEvent(event, &s->vio);
00590
00591 if (vc->closed) {
00592 if (!vc->write_list && !vc->write_bytes_in_transit) {
00593 close_free_lock(vc, s);
00594 }
00595 return EVENT_DONE;
00596 } else
00597 return EVENT_CONT;
00598 }
00599
00600 int
00601 ClusterHandler::cluster_signal_error_and_update(ClusterVConnection * vc, ClusterVConnState * s, int lerrno)
00602 {
00603 s->enabled = 0;
00604 vc->lerrno = lerrno;
00605 return cluster_signal_and_update(VC_EVENT_ERROR, vc, s);
00606 }
00607
00608 bool ClusterHandler::check_channel(int c)
00609 {
00610
00611
00612
00613 while (n_channels <= c) {
00614 int
00615 old_channels = n_channels;
00616 if (!n_channels) {
00617 n_channels = MIN_CHANNELS;
00618 } else {
00619 if ((n_channels * 2) <= MAX_CHANNELS) {
00620 n_channels = n_channels * 2;
00621 } else {
00622 return false;
00623 }
00624 }
00625
00626 channels = (ClusterVConnection **)ats_realloc(channels, n_channels * sizeof(ClusterVConnection *));
00627
00628
00629 channel_data = (struct ChannelData **)ats_realloc(channel_data, n_channels * sizeof(struct ChannelData *));
00630
00631 for (int i = old_channels; i < n_channels; i++) {
00632 if (local_channel(i)) {
00633 if (i > LAST_DEDICATED_CHANNEL) {
00634 channels[i] = (ClusterVConnection *) 1;
00635 channel_data[i] = (struct ChannelData *)ats_malloc(sizeof(struct ChannelData));
00636 memset(channel_data[i], 0, sizeof(struct ChannelData));
00637 channel_data[i]->channel_number = i;
00638 free_local_channels.enqueue(channel_data[i]);
00639 } else {
00640 channels[i] = NULL;
00641 channel_data[i] = NULL;
00642 }
00643 } else {
00644 channels[i] = NULL;
00645 channel_data[i] = NULL;
00646 }
00647 }
00648 }
00649 return true;
00650 }
00651
00652 int
00653 ClusterHandler::alloc_channel(ClusterVConnection * vc, int requested)
00654 {
00655
00656
00657
00658 struct ChannelData *cdp = 0;
00659 int i = requested;
00660
00661 if (!i) {
00662 int loops = 1;
00663 do {
00664 cdp = free_local_channels.dequeue();
00665 if (!cdp) {
00666 if (!check_channel(n_channels)) {
00667 return -2;
00668 }
00669 } else {
00670 ink_assert(cdp == channel_data[cdp->channel_number]);
00671 i = cdp->channel_number;
00672 break;
00673 }
00674 } while (loops--);
00675
00676 ink_release_assert(i != 0);
00677 ink_release_assert(channels[i] == (ClusterVConnection *) 1);
00678 Debug(CL_TRACE, "alloc_channel local chan=%d VC=%p", i, vc);
00679
00680 } else {
00681 if (!check_channel(i)) {
00682 return -2;
00683 }
00684 if (channels[i]) {
00685 Debug(CL_TRACE, "alloc_channel remote inuse chan=%d VC=%p", i, vc);
00686 return -1;
00687 } else {
00688 Debug(CL_TRACE, "alloc_channel remote chan=%d VC=%p", i, vc);
00689 }
00690 }
00691 channels[i] = vc;
00692 vc->channel = i;
00693 return i;
00694 }
00695
00696 void
00697 ClusterHandler::free_channel(ClusterVConnection * vc)
00698 {
00699
00700
00701
00702 int i = vc->channel;
00703 if (i > LAST_DEDICATED_CHANNEL && channels[i] == vc) {
00704 if (local_channel(i)) {
00705 channels[i] = (ClusterVConnection *) 1;
00706 free_local_channels.enqueue(channel_data[i]);
00707 Debug(CL_TRACE, "free_channel local chan=%d VC=%p", i, vc);
00708 } else {
00709 channels[i] = 0;
00710 Debug(CL_TRACE, "free_channel remote chan=%d VC=%p", i, vc);
00711 }
00712 }
00713 vc->channel = 0;
00714 }
00715
00716 int
00717 ClusterHandler::machine_down()
00718 {
00719 char textbuf[sizeof("255.255.255.255:65535")];
00720
00721 if (dead) {
00722 return EVENT_DONE;
00723 }
00724
00725
00726
00727
00728
00729
00730
00731 #ifdef LOCAL_CLUSTER_TEST_MODE
00732 Note("machine down %u.%u.%u.%u:%d", DOT_SEPARATED(ip), port);
00733 #else
00734 Note("machine down %u.%u.%u.%u:%d", DOT_SEPARATED(ip), id);
00735 #endif
00736 machine_offline_APIcallout(ip);
00737 snprintf(textbuf, sizeof(textbuf), "%hhu.%hhu.%hhu.%hhu:%d", DOT_SEPARATED(ip), port);
00738 RecSignalManager(REC_SIGNAL_MACHINE_DOWN, textbuf);
00739 if (net_vc) {
00740 net_vc->do_io(VIO::CLOSE);
00741 net_vc = 0;
00742 }
00743
00744 read.io_complete = -1;
00745 write.io_complete = -1;
00746
00747 MUTEX_TAKE_LOCK(the_cluster_config_mutex, this_ethread());
00748 ClusterConfiguration *c = this_cluster()->current_configuration();
00749 machine->clusterHandlers[id] = NULL;
00750 if ((--machine->now_connections == 0) && c->find(ip, port)) {
00751 ClusterConfiguration *cc = configuration_remove_machine(c, machine);
00752 CLUSTER_DECREMENT_DYN_STAT(CLUSTER_NODES_STAT);
00753 this_cluster()->configurations.push(cc);
00754 machine->dead = true;
00755 }
00756 MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread());
00757 MachineList *cc = the_cluster_config();
00758 if (cc && cc->find(ip, port) && connector) {
00759 Debug(CL_NOTE, "cluster connect retry for %hhu.%hhu.%hhu.%hhu", DOT_SEPARATED(ip));
00760 clusterProcessor.connect(ip, port, id);
00761 }
00762 return zombify();
00763 }
00764
00765 int
00766 ClusterHandler::zombify(Event * )
00767 {
00768
00769
00770
00771
00772 dead = true;
00773 if (cluster_periodic_event) {
00774 cluster_periodic_event->cancel(this);
00775 cluster_periodic_event = NULL;
00776 }
00777 clm->cancel_monitor();
00778
00779 SET_HANDLER((ClusterContHandler) & ClusterHandler::protoZombieEvent);
00780
00781
00782
00783
00784 eventProcessor.schedule_in(this, HRTIME_SECONDS(1), ET_CLUSTER);
00785 return EVENT_DONE;
00786 }
00787
00788 int
00789 ClusterHandler::connectClusterEvent(int event, Event * e)
00790 {
00791
00792 if ((event == EVENT_IMMEDIATE) || (event == EVENT_INTERVAL)) {
00793
00794
00795
00796
00797
00798 MachineList *cc = the_cluster_config();
00799 if (!machine)
00800 machine = new ClusterMachine(hostname, ip, port);
00801 #ifdef LOCAL_CLUSTER_TEST_MODE
00802 if (!(cc && cc->find(ip, port))) {
00803 #else
00804 if (this_cluster_machine()->ip == machine->ip || !(cc && cc->find(ip, port))) {
00805 #endif
00806 if (this_cluster_machine()->ip != machine->ip)
00807 Debug(CL_NOTE, "cluster connect aborted, machine %u.%u.%u.%u not in cluster", DOT_SEPARATED(machine->ip));
00808 delete machine;
00809 machine = NULL;
00810 delete this;
00811 return EVENT_DONE;
00812 }
00813
00814 Debug(CL_NOTE, "connect_re from %u.%u.%u.%u to %u.%u.%u.%u",
00815 DOT_SEPARATED(this_cluster_machine()->ip), DOT_SEPARATED(machine->ip));
00816 ip = machine->ip;
00817
00818 NetVCOptions opt;
00819 opt.socket_send_bufsize = cluster_send_buffer_size;
00820 opt.socket_recv_bufsize = cluster_receive_buffer_size;
00821 opt.sockopt_flags = cluster_sockopt_flags;
00822 opt.packet_mark = cluster_packet_mark;
00823 opt.packet_tos = cluster_packet_tos;
00824 opt.etype = ET_CLUSTER;
00825 opt.addr_binding = NetVCOptions::INTF_ADDR;
00826 opt.local_ip = this_cluster_machine()->ip;
00827
00828 struct sockaddr_in addr;
00829 ats_ip4_set(&addr, machine->ip,
00830 htons(machine->cluster_port ? machine->cluster_port : cluster_port));
00831
00832
00833 netProcessor.connect_re(this, ats_ip_sa_cast(&addr), &opt);
00834 return EVENT_DONE;
00835 } else {
00836 if (event == NET_EVENT_OPEN) {
00837 net_vc = (NetVConnection *) e;
00838 SET_HANDLER((ClusterContHandler) & ClusterHandler::startClusterEvent);
00839 eventProcessor.schedule_imm(this, ET_CLUSTER);
00840 return EVENT_DONE;
00841
00842 } else {
00843 eventProcessor.schedule_in(this, CLUSTER_MEMBER_DELAY);
00844 return EVENT_CONT;
00845 }
00846 }
00847 }
00848
00849 int
00850 ClusterHandler::startClusterEvent(int event, Event * e)
00851 {
00852 char textbuf[sizeof("255.255.255.255:65535")];
00853
00854
00855
00856 (void) event;
00857 ink_assert(!read_vcs);
00858 ink_assert(!write_vcs);
00859
00860 if (event == EVENT_IMMEDIATE) {
00861 if (cluster_connect_state == ClusterHandler::CLCON_INITIAL) {
00862 cluster_connect_state = ClusterHandler::CLCON_SEND_MSG;
00863 } else {
00864 ink_release_assert(!"startClusterEvent, EVENT_IMMEDIATE not expected");
00865 }
00866 } else {
00867 ink_release_assert(event == EVENT_INTERVAL);
00868 }
00869
00870 for (;;) {
00871
00872 switch (cluster_connect_state) {
00873
00874 case ClusterHandler::CLCON_INITIAL:
00875
00876 {
00877 ink_release_assert(!"Invalid state [CLCON_INITIAL]");
00878 }
00879
00880 case ClusterHandler::CLCON_SEND_MSG:
00881
00882 {
00883
00884 #ifdef LOCAL_CLUSTER_TEST_MODE
00885 nodeClusteringVersion._port = cluster_port;
00886 #endif
00887 cluster_connect_state = ClusterHandler::CLCON_SEND_MSG_COMPLETE;
00888 if (connector)
00889 nodeClusteringVersion._id = id;
00890 build_data_vector((char *) &nodeClusteringVersion, sizeof(nodeClusteringVersion), false);
00891 if (!write.doIO()) {
00892
00893 cluster_connect_state = ClusterHandler::CLCON_SEND_MSG;
00894 eventProcessor.schedule_in(this, CLUSTER_PERIOD, ET_CLUSTER);
00895 return EVENT_DONE;
00896 }
00897 break;
00898 }
00899
00900 case ClusterHandler::CLCON_SEND_MSG_COMPLETE:
00901
00902 {
00903 if (write.io_complete) {
00904 if ((write.io_complete < 0)
00905 || ((size_t) write.did < sizeof(nodeClusteringVersion))) {
00906 Debug(CL_NOTE, "unable to write to cluster node %u.%u.%u.%u: %d",
00907 DOT_SEPARATED(ip), write.io_complete_event);
00908 cluster_connect_state = ClusterHandler::CLCON_ABORT_CONNECT;
00909 break;
00910 }
00911
00912 build_data_vector((char *) &clusteringVersion, sizeof(clusteringVersion), true);
00913 cluster_connect_state = ClusterHandler::CLCON_READ_MSG;
00914 break;
00915 } else {
00916
00917 eventProcessor.schedule_in(this, CLUSTER_PERIOD, ET_CLUSTER);
00918 return EVENT_DONE;
00919 }
00920 }
00921
00922 case ClusterHandler::CLCON_READ_MSG:
00923
00924 {
00925 cluster_connect_state = ClusterHandler::CLCON_READ_MSG_COMPLETE;
00926 if (!read.doIO()) {
00927
00928 cluster_connect_state = ClusterHandler::CLCON_READ_MSG;
00929 eventProcessor.schedule_in(this, CLUSTER_PERIOD, ET_CLUSTER);
00930 return EVENT_DONE;
00931 }
00932 break;
00933 }
00934
00935 case ClusterHandler::CLCON_READ_MSG_COMPLETE:
00936
00937 {
00938 if (read.io_complete) {
00939 if (read.io_complete < 0) {
00940
00941 cluster_connect_state = ClusterHandler::CLCON_ABORT_CONNECT;
00942 break;
00943 }
00944 if ((size_t) read.did < sizeof(clusteringVersion)) {
00945
00946 cluster_connect_state = ClusterHandler::CLCON_READ_MSG;
00947 break;
00948 }
00949 cluster_connect_state = ClusterHandler::CLCON_VALIDATE_MSG;
00950 break;
00951 } else {
00952
00953 eventProcessor.schedule_in(this, CLUSTER_PERIOD, ET_CLUSTER);
00954 return EVENT_DONE;
00955 }
00956 }
00957
00958 case ClusterHandler::CLCON_VALIDATE_MSG:
00959
00960 {
00961 int proto_major = -1;
00962 int proto_minor = -1;
00963
00964 clusteringVersion.AdjustByteOrder();
00965
00966
00967
00968
00969
00970
00971
00972 for (int major = clusteringVersion._major; major >= clusteringVersion._min_major; --major) {
00973 if ((major >= nodeClusteringVersion._min_major) && (major <= nodeClusteringVersion._major)) {
00974 proto_major = major;
00975 }
00976 }
00977 if (proto_major > 0) {
00978
00979
00980
00981 if (proto_major == clusteringVersion._major) {
00982 proto_minor = clusteringVersion._minor;
00983
00984 if (proto_minor != nodeClusteringVersion._minor)
00985 Warning("Different clustering minor versions (%d,%d) for node %u.%u.%u.%u, continuing",
00986 proto_minor, nodeClusteringVersion._minor, DOT_SEPARATED(ip));
00987 } else {
00988 proto_minor = 0;
00989 }
00990
00991 } else {
00992 Warning("Bad cluster major version range (%d-%d) for node %u.%u.%u.%u connect failed",
00993 clusteringVersion._min_major, clusteringVersion._major, DOT_SEPARATED(ip));
00994 cluster_connect_state = ClusterHandler::CLCON_ABORT_CONNECT;
00995 break;
00996 }
00997
00998 #ifdef LOCAL_CLUSTER_TEST_MODE
00999 port = clusteringVersion._port & 0xffff;
01000 #endif
01001 if (!connector)
01002 id = clusteringVersion._id & 0xffff;
01003
01004 machine->msg_proto_major = proto_major;
01005 machine->msg_proto_minor = proto_minor;
01006
01007 if (eventProcessor.n_threads_for_type[ET_CLUSTER] != num_of_cluster_threads) {
01008 cluster_connect_state = ClusterHandler::CLCON_ABORT_CONNECT;
01009 break;
01010 }
01011
01012 thread = eventProcessor.eventthread[ET_CLUSTER][id % num_of_cluster_threads];
01013 if (net_vc->thread == thread) {
01014 cluster_connect_state = CLCON_CONN_BIND_OK;
01015 break;
01016 } else {
01017 cluster_connect_state = ClusterHandler::CLCON_CONN_BIND_CLEAR;
01018 }
01019 }
01020
01021 case ClusterHandler::CLCON_CONN_BIND_CLEAR:
01022 {
01023 UnixNetVConnection *vc = (UnixNetVConnection *)net_vc;
01024 MUTEX_TRY_LOCK(lock, vc->nh->mutex, e->ethread);
01025 MUTEX_TRY_LOCK(lock1, vc->mutex, e->ethread);
01026 if (lock && lock1) {
01027 vc->ep.stop();
01028 vc->nh->open_list.remove(vc);
01029 vc->thread = NULL;
01030 if (vc->nh->read_ready_list.in(vc))
01031 vc->nh->read_ready_list.remove(vc);
01032 if (vc->nh->write_ready_list.in(vc))
01033 vc->nh->write_ready_list.remove(vc);
01034 if (vc->read.in_enabled_list)
01035 vc->nh->read_enable_list.remove(vc);
01036 if (vc->write.in_enabled_list)
01037 vc->nh->write_enable_list.remove(vc);
01038
01039
01040 cluster_connect_state = ClusterHandler::CLCON_CONN_BIND;
01041 thread->schedule_in(this, CLUSTER_PERIOD);
01042 return EVENT_DONE;
01043 } else {
01044
01045 vc->thread->schedule_in(this, CLUSTER_PERIOD);
01046 return EVENT_DONE;
01047 }
01048 }
01049
01050 case ClusterHandler::CLCON_CONN_BIND:
01051 {
01052
01053 NetHandler *nh = get_NetHandler(e->ethread);
01054 UnixNetVConnection *vc = (UnixNetVConnection *)net_vc;
01055 MUTEX_TRY_LOCK(lock, nh->mutex, e->ethread);
01056 MUTEX_TRY_LOCK(lock1, vc->mutex, e->ethread);
01057 if (lock && lock1) {
01058 if (vc->read.in_enabled_list)
01059 nh->read_enable_list.push(vc);
01060 if (vc->write.in_enabled_list)
01061 nh->write_enable_list.push(vc);
01062
01063 vc->nh = nh;
01064 vc->thread = e->ethread;
01065 PollDescriptor *pd = get_PollDescriptor(e->ethread);
01066 if (vc->ep.start(pd, vc, EVENTIO_READ|EVENTIO_WRITE) < 0) {
01067 cluster_connect_state = ClusterHandler::CLCON_DELETE_CONNECT;
01068 break;
01069 }
01070
01071 nh->open_list.enqueue(vc);
01072 cluster_connect_state = ClusterHandler::CLCON_CONN_BIND_OK;
01073 } else {
01074 thread->schedule_in(this, CLUSTER_PERIOD);
01075 return EVENT_DONE;
01076 }
01077 }
01078
01079 case ClusterHandler::CLCON_CONN_BIND_OK:
01080 {
01081 int failed = 0;
01082
01083
01084 MUTEX_TAKE_LOCK(the_cluster_config_mutex, this_ethread());
01085 MachineList *cc = the_cluster_config();
01086 if (cc && cc->find(ip, port)) {
01087 ClusterConfiguration *c = this_cluster()->current_configuration();
01088 ClusterMachine *m = c->find(ip, port);
01089
01090 if (!m) {
01091 ClusterConfiguration *cconf = configuration_add_machine(c, machine);
01092 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_NODES_STAT);
01093 this_cluster()->configurations.push(cconf);
01094 } else {
01095
01096 if (id >= m->num_connections || m->clusterHandlers[id]) {
01097 failed = -2;
01098 MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread());
01099 goto failed;
01100 }
01101 machine = m;
01102 }
01103 machine->now_connections++;
01104 machine->clusterHandlers[id] = this;
01105 machine->dead = false;
01106 dead = false;
01107 } else {
01108 Debug(CL_NOTE, "cluster connect aborted, machine %u.%u.%u.%u:%d not in cluster", DOT_SEPARATED(ip), port);
01109 failed = -1;
01110 }
01111 MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread());
01112 failed:
01113 if (failed) {
01114 if (failed == -1) {
01115 if (++configLookupFails <= CONFIG_LOOKUP_RETRIES) {
01116 thread->schedule_in(this, CLUSTER_PERIOD);
01117 return EVENT_DONE;
01118 }
01119 }
01120 cluster_connect_state = ClusterHandler::CLCON_DELETE_CONNECT;
01121 break;
01122 }
01123
01124 this->needByteSwap = !clusteringVersion.NativeByteOrder();
01125 machine_online_APIcallout(ip);
01126
01127
01128 snprintf(textbuf, sizeof(textbuf), "%hhu.%hhu.%hhu.%hhu:%d", DOT_SEPARATED(ip), port);
01129 RecSignalManager(REC_SIGNAL_MACHINE_UP, textbuf);
01130 #ifdef LOCAL_CLUSTER_TEST_MODE
01131 Note("machine up %hhu.%hhu.%hhu.%hhu:%d, protocol version=%d.%d",
01132 DOT_SEPARATED(ip), port, clusteringVersion._major, clusteringVersion._minor);
01133 #else
01134 Note("machine up %hhu.%hhu.%hhu.%hhu:%d, protocol version=%d.%d",
01135 DOT_SEPARATED(ip), id, clusteringVersion._major, clusteringVersion._minor);
01136 #endif
01137
01138 read_vcs = new Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_read_link>[CLUSTER_BUCKETS];
01139 write_vcs = new Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_write_link>[CLUSTER_BUCKETS];
01140 SET_HANDLER((ClusterContHandler) & ClusterHandler::beginClusterEvent);
01141
01142
01143 read.do_iodone_event = true;
01144 write.do_iodone_event = true;
01145
01146 cluster_periodic_event = thread->schedule_every(this, -CLUSTER_PERIOD);
01147
01148
01149
01150
01151 int procs_online = ink_number_of_processors();
01152 int total_callbacks = min(procs_online, MAX_COMPLETION_CALLBACK_EVENTS);
01153 for (int n = 0; n < total_callbacks; ++n) {
01154 callout_cont[n] = new ClusterCalloutContinuation(this);
01155 callout_events[n] = eventProcessor.schedule_every(callout_cont[n], COMPLETION_CALLBACK_PERIOD, ET_NET);
01156 }
01157
01158
01159
01160 if (!clm) {
01161 clm = new ClusterLoadMonitor(this);
01162 clm->init();
01163 }
01164 return EVENT_DONE;
01165 }
01166
01167 case ClusterHandler::CLCON_ABORT_CONNECT:
01168
01169 {
01170 if (connector) {
01171 Debug(CL_NOTE, "cluster connect retry for %u.%u.%u.%u", DOT_SEPARATED(ip));
01172
01173 clusterProcessor.connect(ip, port, id, true);
01174 }
01175 cluster_connect_state = ClusterHandler::CLCON_DELETE_CONNECT;
01176 break;
01177 }
01178
01179 case ClusterHandler::CLCON_DELETE_CONNECT:
01180
01181 {
01182
01183 delete machine;
01184 machine = NULL;
01185 delete this;
01186 Debug(CL_NOTE, "Failed cluster connect, deleting");
01187 return EVENT_DONE;
01188 }
01189
01190 default:
01191
01192 {
01193 Warning("startClusterEvent invalid state %d", cluster_connect_state);
01194 ink_release_assert(!"ClusterHandler::startClusterEvent invalid state");
01195 return EVENT_DONE;
01196 }
01197
01198 }
01199 }
01200 return EVENT_DONE;
01201 }
01202
01203 int
01204 ClusterHandler::beginClusterEvent(int , Event * e)
01205 {
01206
01207 #ifdef CLUSTER_IMMEDIATE_NETIO
01208 build_poll(false);
01209 #endif
01210 SET_HANDLER((ClusterContHandler) & ClusterHandler::mainClusterEvent);
01211 return handleEvent(EVENT_INTERVAL, e);
01212 }
01213
01214 int
01215 ClusterHandler::zombieClusterEvent(int event, Event * e)
01216 {
01217
01218
01219
01220
01221
01222 (void) event;
01223 (void) e;
01224 delete this;
01225 return EVENT_DONE;
01226 }
01227
01228 int
01229 ClusterHandler::protoZombieEvent(int , Event * e)
01230 {
01231
01232
01233
01234
01235
01236 bool failed = false;
01237 ink_hrtime delay = CLUSTER_MEMBER_DELAY * 5;
01238 EThread *t = e ? e->ethread : this_ethread();
01239 head_p item;
01240
01241
01242
01243
01244 mainClusterEvent(EVENT_INTERVAL, e);
01245
01246 item.data = external_incoming_open_local.head.data;
01247 if (TO_PTR(FREELIST_POINTER(item)) ||
01248 delayed_reads.head || pw_write_descriptors_built
01249 || pw_freespace_descriptors_built || pw_controldata_descriptors_built) {
01250
01251 if (e) {
01252 e->schedule_in(delay);
01253 return EVENT_CONT;
01254 } else {
01255 eventProcessor.schedule_in(this, delay, ET_CLUSTER);
01256 return EVENT_DONE;
01257 }
01258 }
01259
01260
01261
01262 IncomingControl *ic;
01263 while ((ic = incoming_control.dequeue())) {
01264 failed = true;
01265 ic->mutex = NULL;
01266 ic->freeall();
01267 }
01268
01269
01270
01271
01272
01273 for (int i = 0; i < n_channels; i++) {
01274 ClusterVConnection *vc = channels[i];
01275 if (VALID_CHANNEL(vc)) {
01276 if (!vc->closed && vc->read.vio.op == VIO::READ) {
01277 MUTEX_TRY_LOCK(lock, vc->read.vio.mutex, t);
01278 if (lock) {
01279 cluster_signal_error_and_update(vc, &vc->read, 0);
01280 } else {
01281 failed = true;
01282 }
01283 }
01284 vc = channels[i];
01285 if (VALID_CHANNEL(vc)
01286 && !vc->closed && vc->write.vio.op == VIO::WRITE) {
01287 MUTEX_TRY_LOCK(lock, vc->write.vio.mutex, t);
01288 if (lock) {
01289 cluster_signal_error_and_update(vc, &vc->write, 0);
01290 } else {
01291 failed = true;
01292 }
01293 }
01294 vc = channels[i];
01295 if (VALID_CHANNEL(vc)) {
01296 if (vc->closed) {
01297 vc->ch = 0;
01298 vc->write_list = 0;
01299 vc->write_list_tail = 0;
01300 vc->write_list_bytes = 0;
01301 vc->write_bytes_in_transit = 0;
01302 close_ClusterVConnection(vc);
01303 } else {
01304 failed = true;
01305 }
01306 }
01307 }
01308 }
01309
01310
01311
01312
01313
01314 item.data = external_incoming_control.head.data;
01315 if (TO_PTR(FREELIST_POINTER(item)) == NULL) {
01316 for (int n = 0; n < MAX_COMPLETION_CALLBACK_EVENTS; ++n) {
01317 if (callout_cont[n]) {
01318 MUTEX_TRY_LOCK(lock, callout_cont[n]->mutex, t);
01319 if (lock) {
01320 callout_events[n]->cancel(callout_cont[n]);
01321 callout_events[n] = 0;
01322 delete callout_cont[n];
01323 callout_cont[n] = 0;
01324 } else {
01325 failed = true;
01326 }
01327 }
01328 }
01329 } else {
01330 failed = true;
01331 }
01332
01333 if (!failed) {
01334 Debug("cluster_down", "ClusterHandler zombie [%u.%u.%u.%u]", DOT_SEPARATED(ip));
01335 SET_HANDLER((ClusterContHandler) & ClusterHandler::zombieClusterEvent);
01336 delay = NO_RACE_DELAY;
01337 }
01338 if (e) {
01339 e->schedule_in(delay);
01340 return EVENT_CONT;
01341 } else {
01342 eventProcessor.schedule_in(this, delay, ET_CLUSTER);
01343 return EVENT_DONE;
01344 }
01345 }
01346
01347 int dump_verbose = 0;
01348
01349 int
01350 ClusterHandler::compute_active_channels()
01351 {
01352 ClusterHandler *ch = this;
01353 int active_chans = 0;
01354
01355 for (int i = LAST_DEDICATED_CHANNEL + 1; i < ch->n_channels; i++) {
01356 ClusterVConnection *vc = ch->channels[i];
01357 if (VALID_CHANNEL(vc) && (vc->iov_map != CLUSTER_IOV_NOT_OPEN)) {
01358 ++active_chans;
01359 if (dump_verbose) {
01360 printf("ch[%d] vc=0x%p remote_free=%d last_local_free=%d\n", i, vc,
01361 vc->remote_free, vc->last_local_free);
01362 printf(" r_bytes=%d r_done=%d w_bytes=%d w_done=%d\n",
01363 (int)vc->read.vio.nbytes, (int)vc->read.vio.ndone,
01364 (int)vc->write.vio.nbytes, (int)vc->write.vio.ndone);
01365 }
01366 }
01367 }
01368 return active_chans;
01369 }
01370
01371 void
01372 ClusterHandler::dump_internal_data()
01373 {
01374 if (!message_blk) {
01375 message_blk = new_IOBufferBlock();
01376 message_blk->alloc(MAX_IOBUFFER_SIZE);
01377 }
01378 int r;
01379 int n = 0;
01380 char *b = message_blk->data->data();
01381 unsigned int b_size = message_blk->data->block_size();
01382
01383 r = snprintf(&b[n], b_size - n, "Host: %hhu.%hhu.%hhu.%hhu\n", DOT_SEPARATED(ip));
01384 n += r;
01385
01386 r = snprintf(&b[n], b_size - n,
01387 "chans: %d vc_writes: %" PRId64 " write_bytes: %" PRId64 "(d)+%" PRId64 "(c)=%" PRId64 "\n",
01388 compute_active_channels(),
01389 _vc_writes, _vc_write_bytes, _control_write_bytes, _vc_write_bytes + _control_write_bytes);
01390
01391 n += r;
01392 r = snprintf(&b[n], b_size - n,
01393 "dw: missed_lock: %d not_enabled: %d wait_remote_fill: %d no_active_vio: %d\n",
01394 _dw_missed_lock, _dw_not_enabled, _dw_wait_remote_fill, _dw_no_active_vio);
01395
01396 n += r;
01397 r = snprintf(&b[n], b_size - n,
01398 "dw: not_enabled_or_no_write: %d set_data_pending: %d no_free_space: %d\n",
01399 _dw_not_enabled_or_no_write, _dw_set_data_pending, _dw_no_free_space);
01400
01401 n += r;
01402 r = snprintf(&b[n], b_size - n,
01403 "fw: missed_lock: %d not_enabled: %d wait_remote_fill: %d no_active_vio: %d\n",
01404 _fw_missed_lock, _fw_not_enabled, _fw_wait_remote_fill, _fw_no_active_vio);
01405
01406 n += r;
01407 r = snprintf(&b[n], b_size - n, "fw: not_enabled_or_no_read: %d\n", _fw_not_enabled_or_no_read);
01408
01409 n += r;
01410 r = snprintf(&b[n], b_size - n,
01411 "rd(%d): st:%d rh:%d ahd:%d sd:%d rd:%d ad:%d sda:%d rda:%d awd:%d p:%d c:%d\n",
01412 _process_read_calls, _n_read_start, _n_read_header, _n_read_await_header,
01413 _n_read_setup_descriptor, _n_read_descriptor, _n_read_await_descriptor,
01414 _n_read_setup_data, _n_read_data, _n_read_await_data, _n_read_post_complete, _n_read_complete);
01415
01416 n += r;
01417 r = snprintf(&b[n], b_size - n,
01418 "wr(%d): st:%d set:%d ini:%d wait:%d post:%d comp:%d\n",
01419 _process_write_calls, _n_write_start, _n_write_setup, _n_write_initiate,
01420 _n_write_await_completion, _n_write_post_complete, _n_write_complete);
01421
01422 n += r;
01423 ink_release_assert((n + 1) <= BUFFER_SIZE_FOR_INDEX(MAX_IOBUFFER_SIZE));
01424 Note("%s", b);
01425 clear_cluster_stats();
01426 }
01427
01428 void
01429 ClusterHandler::dump_write_msg(int res)
01430 {
01431
01432 Alias32 x;
01433 x.u32 = (uint32_t) ((struct sockaddr_in *)(net_vc->get_remote_addr()))->sin_addr.s_addr;
01434
01435 fprintf(stderr,
01436 "[W] %hhu.%hhu.%hhu.%hhu SeqNo=%u, Cnt=%d, CntlCnt=%d Todo=%d, Res=%d\n",
01437 x.byte[0], x.byte[1], x.byte[2], x.byte[3], write.sequence_number, write.msg.count, write.msg.control_bytes, write.to_do, res);
01438 for (int i = 0; i < write.msg.count; ++i) {
01439 fprintf(stderr, " d[%i] Type=%d, Chan=%d, SeqNo=%d, Len=%u\n",
01440 i, (write.msg.descriptor[i].type ? 1 : 0),
01441 (int) write.msg.descriptor[i].channel,
01442 (int) write.msg.descriptor[i].sequence_number, write.msg.descriptor[i].length);
01443 }
01444 }
01445
01446 void
01447 ClusterHandler::dump_read_msg()
01448 {
01449
01450 Alias32 x;
01451 x.u32 = (uint32_t) ((struct sockaddr_in *)(net_vc->get_remote_addr()))->sin_addr.s_addr;
01452
01453 fprintf(stderr, "[R] %hhu.%hhu.%hhu.%hhu SeqNo=%u, Cnt=%d, CntlCnt=%d\n",
01454 x.byte[0], x.byte[1], x.byte[2], x.byte[3], read.sequence_number, read.msg.count, read.msg.control_bytes);
01455 for (int i = 0; i < read.msg.count; ++i) {
01456 fprintf(stderr, " d[%i] Type=%d, Chan=%d, SeqNo=%d, Len=%u\n",
01457 i, (read.msg.descriptor[i].type ? 1 : 0),
01458 (int) read.msg.descriptor[i].channel,
01459 (int) read.msg.descriptor[i].sequence_number, read.msg.descriptor[i].length);
01460 }
01461 }
01462
01463