00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #define DEFINE_CLUSTER_FUNCTIONS
00031 #include "P_Cluster.h"
00032
00033
00034
00035
00036
00037 unsigned SIZE_clusterFunction = countof(clusterFunction);
00038
00039
00040 ClusterFunction *ptest_ClusterFunction = NULL;
00041
00042
00043 static char channel_dummy_input[DEFAULT_MAX_BUFFER_SIZE];
00044 char channel_dummy_output[DEFAULT_MAX_BUFFER_SIZE];
00045
00046
00047 ClassAllocator<OutgoingControl> outControlAllocator("outControlAllocator");
00048
00049
00050 ClassAllocator<IncomingControl> inControlAllocator("inControlAllocator");
00051
00052 static int dump_msgs = 0;
00053
00054
00055
00056
00057 #ifdef VERIFY_PETERS_DATA
00058 #define DO_VERIFY_PETERS_DATA(_p,_l) verify_peters_data(_p,_l)
00059 #else
00060 #define DO_VERIFY_PETERS_DATA(_p,_l)
00061 #endif
00062
00063 void
00064 verify_peters_data(char *ap, int l)
00065 {
00066 unsigned char *p = (unsigned char *) ap;
00067 for (int i = 0; i < l - 1; i++) {
00068 unsigned char x1 = p[i];
00069 unsigned char x2 = p[i + 1];
00070 x1 += 1;
00071 if (x1 != x2) {
00072 fprintf(stderr, "verify peter's data failed at %d\n", i);
00073 break;
00074 }
00075 }
00076 }
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136 ClusterHandler::ClusterHandler()
00137 : net_vc(0),
00138 thread(0),
00139 ip(0),
00140 port(0),
00141 hostname(NULL),
00142 machine(NULL),
00143 ifd(-1),
00144 id(-1),
00145 dead(true),
00146 downing(false),
00147 active(false),
00148 on_stolen_thread(false),
00149 n_channels(0),
00150 channels(NULL),
00151 channel_data(NULL),
00152 connector(false),
00153 cluster_connect_state(ClusterHandler::CLCON_INITIAL),
00154 needByteSwap(false),
00155 configLookupFails(0),
00156 cluster_periodic_event(0),
00157 read(this, true),
00158 write(this, false),
00159 current_time(0),
00160 last(0),
00161 last_report(0),
00162 n_since_last_report(0),
00163 last_cluster_op_enable(0),
00164 last_trace_dump(0),
00165 clm(0),
00166 disable_remote_cluster_ops(0),
00167 pw_write_descriptors_built(0),
00168 pw_freespace_descriptors_built(0),
00169 pw_controldata_descriptors_built(0), pw_time_expired(0), started_on_stolen_thread(false), control_message_write(false)
00170 #ifdef CLUSTER_STATS
00171 ,
00172 _vc_writes(0),
00173 _vc_write_bytes(0),
00174 _control_write_bytes(0),
00175 _dw_missed_lock(0),
00176 _dw_not_enabled(0),
00177 _dw_wait_remote_fill(0),
00178 _dw_no_active_vio(0),
00179 _dw_not_enabled_or_no_write(0),
00180 _dw_set_data_pending(0),
00181 _dw_no_free_space(0),
00182 _fw_missed_lock(0),
00183 _fw_not_enabled(0),
00184 _fw_wait_remote_fill(0),
00185 _fw_no_active_vio(0),
00186 _fw_not_enabled_or_no_read(0),
00187 _process_read_calls(0),
00188 _n_read_start(0),
00189 _n_read_header(0),
00190 _n_read_await_header(0),
00191 _n_read_setup_descriptor(0),
00192 _n_read_descriptor(0),
00193 _n_read_await_descriptor(0),
00194 _n_read_setup_data(0),
00195 _n_read_data(0),
00196 _n_read_await_data(0),
00197 _n_read_post_complete(0),
00198 _n_read_complete(0),
00199 _process_write_calls(0),
00200 _n_write_start(0),
00201 _n_write_setup(0), _n_write_initiate(0), _n_write_await_completion(0), _n_write_post_complete(0), _n_write_complete(0)
00202 #endif
00203 {
00204 #ifdef MSG_TRACE
00205 t_fd = fopen("msgtrace.log", "w");
00206 #endif
00207
00208
00209 min_priority = 1;
00210 SET_HANDLER((ClusterContHandler) & ClusterHandler::startClusterEvent);
00211
00212 mutex = new_ProxyMutex();
00213 OutgoingControl oc;
00214 int n;
00215 for (n = 0; n < CLUSTER_CMSG_QUEUES; ++n) {
00216 ink_atomiclist_init(&outgoing_control_al[n], "OutGoingControlQueue", (char *) &oc.link.next - (char *) &oc);
00217 }
00218
00219 IncomingControl ic;
00220 ink_atomiclist_init(&external_incoming_control,
00221 "ExternalIncomingControlQueue", (char *) &ic.link.next - (char *) &ic);
00222
00223 ClusterVConnection ivc;
00224 ink_atomiclist_init(&external_incoming_open_local,
00225 "ExternalIncomingOpenLocalQueue", (char *) &ivc.link.next - (char *) &ivc);
00226 ink_atomiclist_init(&read_vcs_ready, "ReadVcReadyQueue", offsetof(ClusterVConnection, ready_alink.next));
00227 ink_atomiclist_init(&write_vcs_ready, "WriteVcReadyQueue", offsetof(ClusterVConnection, ready_alink.next));
00228 memset((char *) &callout_cont[0], 0, sizeof(callout_cont));
00229 memset((char *) &callout_events[0], 0, sizeof(callout_events));
00230 }
00231
00232 ClusterHandler::~ClusterHandler()
00233 {
00234 bool free_m = false;
00235 if (net_vc) {
00236 net_vc->do_io(VIO::CLOSE);
00237 net_vc = 0;
00238 }
00239 if (machine) {
00240 MUTEX_TAKE_LOCK(the_cluster_config_mutex, this_ethread());
00241 if (++machine->free_connections >= machine->num_connections)
00242 free_m = true;
00243 MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread());
00244 if (free_m)
00245 free_ClusterMachine(machine);
00246 }
00247 machine = NULL;
00248 ats_free(hostname);
00249 hostname = NULL;
00250 ats_free(channels);
00251 channels = NULL;
00252 if (channel_data) {
00253 for (int i = 0; i < n_channels; ++i) {
00254 if (channel_data[i]) {
00255 ats_free(channel_data[i]);
00256 channel_data[i] = 0;
00257 }
00258 }
00259 ats_free(channel_data);
00260 channel_data = NULL;
00261 }
00262 if (read_vcs)
00263 delete[]read_vcs;
00264 read_vcs = NULL;
00265
00266 if (write_vcs)
00267 delete[]write_vcs;
00268 write_vcs = NULL;
00269
00270 if (clm) {
00271 delete clm;
00272 clm = NULL;
00273 }
00274 #ifdef CLUSTER_STATS
00275 message_blk = 0;
00276 #endif
00277 }
00278
00279 void
00280 ClusterHandler::close_ClusterVConnection(ClusterVConnection * vc)
00281 {
00282
00283
00284
00285 if (vc->inactivity_timeout)
00286 vc->inactivity_timeout->cancel(vc);
00287 if (vc->active_timeout)
00288 vc->active_timeout->cancel(vc);
00289 if (vc->read.queue)
00290 ClusterVC_remove_read(vc);
00291 if (vc->write.queue)
00292 ClusterVC_remove_write(vc);
00293 vc->read.vio.mutex = NULL;
00294 vc->write.vio.mutex = NULL;
00295
00296 ink_assert(!vc->read_locked);
00297 ink_assert(!vc->write_locked);
00298 int channel = vc->channel;
00299 free_channel(vc);
00300
00301 if (vc->byte_bank_q.head) {
00302 delayed_reads.remove(vc);
00303
00304
00305 ByteBankDescriptor *d;
00306 while ((d = vc->byte_bank_q.dequeue())) {
00307 ByteBankDescriptor::ByteBankDescriptor_free(d);
00308 }
00309 }
00310 vc->read_block = 0;
00311
00312 ink_assert(!vc->write_list);
00313 ink_assert(!vc->write_list_tail);
00314 ink_assert(!vc->write_list_bytes);
00315 ink_assert(!vc->write_bytes_in_transit);
00316
00317 if (((!vc->remote_closed && !vc->have_all_data) || (vc->remote_closed == FORCE_CLOSE_ON_OPEN_CHANNEL)) && vc->ch) {
00318
00319 CloseMessage msg;
00320 int vers = CloseMessage::protoToVersion(vc->ch->machine->msg_proto_major);
00321 void *data;
00322 int len;
00323
00324 if (vers == CloseMessage::CLOSE_CHAN_MESSAGE_VERSION) {
00325 msg.channel = channel;
00326 msg.status = (vc->remote_closed == FORCE_CLOSE_ON_OPEN_CHANNEL) ? FORCE_CLOSE_ON_OPEN_CHANNEL : vc->closed;
00327 msg.lerrno = vc->lerrno;
00328 msg.sequence_number = vc->token.sequence_number;
00329 data = (void *) &msg;
00330 len = sizeof(CloseMessage);
00331
00332 } else {
00333
00334
00335
00336 ink_release_assert(!"close_ClusterVConnection() bad msg version");
00337 }
00338 clusterProcessor.invoke_remote(vc->ch, CLOSE_CHANNEL_CLUSTER_FUNCTION, data, len);
00339 }
00340 ink_hrtime now = ink_get_hrtime();
00341 CLUSTER_DECREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPEN_STAT);
00342 CLUSTER_SUM_DYN_STAT(CLUSTER_CON_TOTAL_TIME_STAT, now - vc->start_time);
00343 if (!local_channel(channel)) {
00344 CLUSTER_SUM_DYN_STAT(CLUSTER_REMOTE_CONNECTION_TIME_STAT, now - vc->start_time);
00345 } else {
00346 CLUSTER_SUM_DYN_STAT(CLUSTER_LOCAL_CONNECTION_TIME_STAT, now - vc->start_time);
00347 }
00348 clusterVCAllocator_free(vc);
00349 }
00350
00351 inline bool
00352 ClusterHandler::vc_ok_write(ClusterVConnection * vc)
00353 {
00354 return (((vc->closed > 0)
00355 && (vc->write_list || vc->write_bytes_in_transit)) ||
00356 (!vc->closed && vc->write.enabled && vc->write.vio.op == VIO::WRITE && vc->write.vio.buffer.writer()));
00357 }
00358
00359 inline bool
00360 ClusterHandler::vc_ok_read(ClusterVConnection * vc)
00361 {
00362 return (!vc->closed && vc->read.vio.op == VIO::READ && vc->read.vio.buffer.writer());
00363 }
00364
00365 void
00366 ClusterHandler::close_free_lock(ClusterVConnection * vc, ClusterVConnState * s)
00367 {
00368 Ptr<ProxyMutex> m(s->vio.mutex);
00369 if (s == &vc->read) {
00370 if ((ProxyMutex *) vc->read_locked)
00371 MUTEX_UNTAKE_LOCK(vc->read_locked, thread);
00372 vc->read_locked = NULL;
00373 } else {
00374 if ((ProxyMutex *) vc->write_locked)
00375 MUTEX_UNTAKE_LOCK(vc->write_locked, thread);
00376 vc->write_locked = NULL;
00377 }
00378 close_ClusterVConnection(vc);
00379 }
00380
00381 bool ClusterHandler::build_data_vector(char *d, int len, bool read_flag)
00382 {
00383
00384
00385
00386 ClusterState & s = (read_flag ? read : write);
00387 ink_assert(d);
00388 ink_assert(len);
00389 ink_assert(s.iov);
00390
00391 s.msg.count = 1;
00392 s.iov[0].iov_base = 0;
00393 s.iov[0].iov_len = len;
00394 s.block[0] = new_IOBufferBlock();
00395 s.block[0]->set(new_constant_IOBufferData(d, len));
00396
00397 if (read_flag) {
00398
00399 s.block[0]->_buf_end = s.block[0]->end() + len;
00400 } else {
00401
00402 s.block[0]->fill(len);
00403 }
00404
00405 s.to_do = len;
00406 s.did = 0;
00407 s.n_iov = 1;
00408
00409 return true;
00410 }
00411
00412 bool ClusterHandler::build_initial_vector(bool read_flag)
00413 {
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436
00437
00438
00439
00440
00441
00442
00443
00444
00445 int i, n;
00446
00447
00448
00449 ink_hrtime now = ink_get_hrtime();
00450 ClusterState & s = (read_flag ? read : write);
00451 OutgoingControl *oc = s.msg.outgoing_control.head;
00452 IncomingControl *ic = incoming_control.head;
00453 int new_n_iov = 0;
00454 int to_do = 0;
00455 int len;
00456
00457 ink_assert(s.iov);
00458
00459 if (!read_flag) {
00460
00461
00462
00463 len = sizeof(ClusterMsgHeader) + (s.msg.count * sizeof(Descriptor)) + s.msg.control_bytes;
00464 s.iov[new_n_iov].iov_base = 0;
00465 s.iov[new_n_iov].iov_len = len;
00466 s.block[new_n_iov] = s.msg.get_block_header();
00467
00468
00469 s.block[new_n_iov]->fill(len);
00470
00471 to_do += len;
00472 ++new_n_iov;
00473
00474 } else {
00475 if (s.msg.state == 0) {
00476
00477
00478
00479 len = sizeof(ClusterMsgHeader);
00480 s.iov[new_n_iov].iov_base = 0;
00481 s.iov[new_n_iov].iov_len = len;
00482 s.block[new_n_iov] = s.msg.get_block_header();
00483
00484
00485 s.block[new_n_iov]->_buf_end = s.block[new_n_iov]->end() + len;
00486
00487 to_do += len;
00488 ++new_n_iov;
00489
00490 } else if (s.msg.state == 1) {
00491
00492
00493
00494 len = (s.msg.count * sizeof(Descriptor)) + s.msg.control_bytes;
00495 s.iov[new_n_iov].iov_base = 0;
00496 s.iov[new_n_iov].iov_len = len;
00497 s.block[new_n_iov] = s.msg.get_block_descriptor();
00498
00499
00500 s.block[new_n_iov]->_buf_end = s.block[new_n_iov]->end() + len;
00501
00502 to_do += s.iov[new_n_iov].iov_len;
00503 ++new_n_iov;
00504 }
00505 }
00506
00507
00508
00509
00510
00511
00512
00513
00514
00515 for (i = 0; i<(read_flag ? ((s.msg.state>= 2) ? s.msg.count : 0)
00516 : s.msg.count); i++) {
00517 if (s.msg.descriptor[i].type == CLUSTER_SEND_DATA) {
00518
00519
00520
00521 if (s.msg.descriptor[i].channel == CLUSTER_CONTROL_CHANNEL) {
00522 if (read_flag) {
00523
00524
00525
00526 if (!ic) {
00527 ic = IncomingControl::alloc();
00528 ic->recognized_time = now;
00529 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CTRL_MSGS_RECVD_STAT);
00530 ic->len = s.msg.descriptor[i].length;
00531 ic->alloc_data();
00532 if (!ic->fast_data()) {
00533 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_SLOW_CTRL_MSGS_RECVD_STAT);
00534 }
00535
00536 *((uint32_t *) ic->data) = UNDEFINED_CLUSTER_FUNCTION;
00537 incoming_control.enqueue(ic);
00538 }
00539 s.iov[new_n_iov].iov_base = 0;
00540 s.iov[new_n_iov].iov_len = ic->len;
00541 s.block[new_n_iov] = ic->get_block();
00542 to_do += s.iov[new_n_iov].iov_len;
00543 ++new_n_iov;
00544 ic = (IncomingControl *) ic->link.next;
00545 } else {
00546
00547
00548
00549 ink_assert(oc);
00550 s.iov[new_n_iov].iov_base = 0;
00551 s.iov[new_n_iov].iov_len = oc->len;
00552 s.block[new_n_iov] = oc->get_block();
00553 to_do += s.iov[new_n_iov].iov_len;
00554 ++new_n_iov;
00555 oc = (OutgoingControl *) oc->link.next;
00556 }
00557 } else {
00558
00559
00560
00561 ClusterVConnection *
00562 vc = channels[s.msg.descriptor[i].channel];
00563
00564 if (VALID_CHANNEL(vc) &&
00565 (s.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
00566 if (read_flag) {
00567 ink_release_assert(!vc->initial_data_bytes);
00568
00569
00570
00571 ink_release_assert(!(ProxyMutex *) vc->read_locked);
00572 #ifdef CLUSTER_TOMCAT
00573 if (!vc->read.vio.mutex ||
00574 !MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->read.vio.mutex, thread, vc->read.vio._cont, READ_LOCK_SPIN_COUNT))
00575 #else
00576 if (!MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->read.vio.mutex, thread, vc->read.vio._cont, READ_LOCK_SPIN_COUNT))
00577 #endif
00578 {
00579 vc->read_locked = 0;
00580 } else {
00581 vc->read_locked = vc->read.vio.mutex;
00582 }
00583
00584
00585
00586
00587 if (s.msg.descriptor[i].length) {
00588 vc->iov_map = new_n_iov;
00589 } else {
00590 vc->iov_map = CLUSTER_IOV_NONE;
00591 }
00592 if (vc->pending_remote_fill || vc_ok_read(vc)) {
00593
00594
00595
00596
00597 ink_release_assert(s.msg.descriptor[i].length <= DEFAULT_MAX_BUFFER_SIZE);
00598 vc->read_block = new_IOBufferBlock();
00599 int64_t index = buffer_size_to_index(s.msg.descriptor[i].length, MAX_BUFFER_SIZE_INDEX);
00600 vc->read_block->alloc(index);
00601
00602 s.iov[new_n_iov].iov_base = 0;
00603 s.block[new_n_iov] = vc->read_block->clone();
00604
00605 } else {
00606 Debug(CL_NOTE, "dumping cluster read data");
00607 s.iov[new_n_iov].iov_base = 0;
00608 s.block[new_n_iov] = new_IOBufferBlock();
00609 s.block[new_n_iov]->set(new_constant_IOBufferData(channel_dummy_input, DEFAULT_MAX_BUFFER_SIZE));
00610 }
00611
00612
00613 s.block[new_n_iov]->_buf_end = s.block[new_n_iov]->end() + s.msg.descriptor[i].length;
00614
00615 } else {
00616 bool remote_write_fill = (vc->pending_remote_fill && vc->remote_write_block);
00617
00618 if (!remote_write_fill) {
00619 ink_assert((ProxyMutex *) vc->write_locked);
00620 }
00621 if (vc_ok_write(vc) || remote_write_fill) {
00622 if (remote_write_fill) {
00623 s.iov[new_n_iov].iov_base = 0;
00624 ink_release_assert((int) s.msg.descriptor[i].length == bytes_IOBufferBlockList(vc->remote_write_block, 1));
00625 s.block[new_n_iov] = vc->remote_write_block;
00626
00627 } else {
00628 s.iov[new_n_iov].iov_base = 0;
00629 ink_release_assert((int) s.msg.descriptor[i].length <= vc->write_list_bytes);
00630 s.block[new_n_iov] = vc->write_list;
00631 vc->write_list = consume_IOBufferBlockList(vc->write_list, (int) s.msg.descriptor[i].length);
00632 vc->write_list_bytes -= (int) s.msg.descriptor[i].length;
00633 vc->write_bytes_in_transit += (int) s.msg.descriptor[i].length;
00634
00635 vc->write_list_tail = vc->write_list;
00636 while (vc->write_list_tail && vc->write_list_tail->next)
00637 vc->write_list_tail = vc->write_list_tail->next;
00638 }
00639 } else {
00640 Debug(CL_NOTE, "faking cluster write data");
00641 s.iov[new_n_iov].iov_base = 0;
00642 s.block[new_n_iov] = new_IOBufferBlock();
00643 s.block[new_n_iov]->set(new_constant_IOBufferData(channel_dummy_output, DEFAULT_MAX_BUFFER_SIZE));
00644
00645 s.block[new_n_iov]->fill(s.msg.descriptor[i].length);
00646 }
00647 }
00648 } else {
00649
00650 s.iov[new_n_iov].iov_base = 0;
00651 s.block[new_n_iov] = new_IOBufferBlock();
00652
00653 if (read_flag) {
00654 s.block[new_n_iov]->set(new_constant_IOBufferData(channel_dummy_input, DEFAULT_MAX_BUFFER_SIZE));
00655
00656
00657 s.block[new_n_iov]->_buf_end = s.block[new_n_iov]->end() + s.msg.descriptor[i].length;
00658
00659 } else {
00660 s.block[new_n_iov]->set(new_constant_IOBufferData(channel_dummy_output, DEFAULT_MAX_BUFFER_SIZE));
00661
00662
00663 s.block[new_n_iov]->fill(s.msg.descriptor[i].length);
00664 }
00665 }
00666 s.iov[new_n_iov].iov_len = s.msg.descriptor[i].length;
00667 to_do += s.iov[new_n_iov].iov_len;
00668 ++new_n_iov;
00669 }
00670 }
00671 }
00672
00673 for (n = new_n_iov; n < MAX_TCOUNT; ++n) {
00674 s.block[n] = 0;
00675 }
00676
00677
00678 s.to_do = to_do;
00679 s.did = 0;
00680 s.n_iov = new_n_iov;
00681 return true;
00682
00683
00684
00685 #if 0
00686
00687 for (n = 0; n < MAX_TCOUNT; ++n) {
00688 s.block[n] = 0;
00689 }
00690 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_OP_DELAYED_FOR_LOCK_STAT);
00691 Debug(CL_WARN, "%s delayed for locks", read_flag ? "read" : "write");
00692 free_locks(read_flag, i);
00693 return false;
00694 #endif
00695 }
00696
00697 bool ClusterHandler::get_read_locks()
00698 {
00699
00700
00701
00702
00703 ClusterState & s = read;
00704 int i, n;
00705 int bytes_processed;
00706 int vec_bytes_remainder;
00707 int iov_done[MAX_TCOUNT];
00708
00709 memset((char *) iov_done, 0, sizeof(int) * MAX_TCOUNT);
00710
00711
00712 bytes_processed = s.did - s.bytes_xfered;
00713
00714 i = -1;
00715 for (n = 0; n < s.n_iov; ++n) {
00716 bytes_processed -= s.iov[n].iov_len;
00717 if (bytes_processed >= 0) {
00718 iov_done[n] = s.iov[n].iov_len;
00719 } else {
00720 iov_done[n] = s.iov[n].iov_len + bytes_processed;
00721 if (i < 0) {
00722 i = n;
00723
00724
00725
00726
00727 vec_bytes_remainder = (s.iov[n].iov_len - iov_done[n]);
00728 bytes_processed = s.bytes_xfered;
00729
00730 bytes_processed -= vec_bytes_remainder;
00731 if (bytes_processed >= 0) {
00732 iov_done[n] = vec_bytes_remainder;
00733 } else {
00734 iov_done[n] = vec_bytes_remainder + bytes_processed;
00735 break;
00736 }
00737 } else {
00738 break;
00739 }
00740 }
00741 }
00742 ink_release_assert(i >= 0);
00743
00744
00745
00746
00747
00748
00749
00750 for (; i < s.n_iov; ++i) {
00751 if ((s.msg.descriptor[i].type == CLUSTER_SEND_DATA)
00752 && (s.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL)) {
00753
00754
00755
00756 ClusterVConnection *
00757 vc = channels[s.msg.descriptor[i].channel];
00758 if (!VALID_CHANNEL(vc) ||
00759 ((s.msg.descriptor[i].sequence_number) !=
00760 CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) || !vc_ok_read(vc)) {
00761
00762
00763 continue;
00764 }
00765
00766 ink_assert(!(ProxyMutex *) vc->read_locked);
00767 vc->read_locked = vc->read.vio.mutex;
00768 if (vc->byte_bank_q.head
00769 || !MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->read.vio.mutex, thread, vc->read.vio._cont, READ_LOCK_SPIN_COUNT)) {
00770
00771
00772 vc->read_locked = NULL;
00773 continue;
00774 }
00775
00776
00777 if (!vc_ok_read(vc)) {
00778 MUTEX_UNTAKE_LOCK(vc->read.vio.mutex, thread);
00779 vc->read_locked = NULL;
00780 continue;
00781 }
00782
00783
00784 int64_t read_avail = vc->read_block->read_avail();
00785
00786 if (!vc->pending_remote_fill && read_avail) {
00787 Debug("cluster_vc_xfer", "Deferred fill ch %d %p %" PRId64" bytes", vc->channel, vc, read_avail);
00788
00789 vc->read.vio.buffer.writer()->append_block(vc->read_block->clone());
00790 if (complete_channel_read(read_avail, vc)) {
00791 vc->read_block->consume(read_avail);
00792 }
00793 }
00794 }
00795 }
00796 return true;
00797 }
00798
00799 bool ClusterHandler::get_write_locks()
00800 {
00801
00802
00803
00804
00805
00806 ClusterState & s = write;
00807 int i;
00808
00809 for (i = 0; i < s.msg.count; ++i) {
00810 if ((s.msg.descriptor[i].type == CLUSTER_SEND_DATA)
00811 && (s.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL)) {
00812
00813
00814
00815 ClusterVConnection *
00816 vc = channels[s.msg.descriptor[i].channel];
00817 if (!VALID_CHANNEL(vc) ||
00818 (s.msg.descriptor[i].sequence_number) != CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
00819
00820
00821 continue;
00822 }
00823 ink_assert(!(ProxyMutex *) vc->write_locked);
00824 vc->write_locked = vc->write.vio.mutex;
00825 #ifdef CLUSTER_TOMCAT
00826 if (vc->write_locked &&
00827 !MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->write.vio.mutex, thread, vc->write.vio._cont, WRITE_LOCK_SPIN_COUNT)) {
00828 #else
00829 if (!MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->write.vio.mutex, thread, vc->write.vio._cont, WRITE_LOCK_SPIN_COUNT)) {
00830 #endif
00831
00832 vc->write_locked = 0;
00833 free_locks(CLUSTER_WRITE, i);
00834 return false;
00835 }
00836 }
00837 }
00838 return true;
00839 }
00840
00841 void
00842 ClusterHandler::swap_descriptor_bytes()
00843 {
00844 for (int i = 0; i < read.msg.count; i++) {
00845 read.msg.descriptor[i].SwapBytes();
00846 }
00847 }
00848
00849 void
00850 ClusterHandler::process_set_data_msgs()
00851 {
00852 uint32_t cluster_function_index;
00853
00854
00855
00856
00857
00858
00859
00860
00861
00862
00863 if (!read.msg.did_small_control_set_data) {
00864 char *p = (char *) &read.msg.descriptor[read.msg.count];
00865 char *endp = p + read.msg.control_bytes;
00866 while (p < endp) {
00867 if (needByteSwap) {
00868 ats_swap32((uint32_t *) p);
00869 ats_swap32((uint32_t *) (p + sizeof(int32_t)));
00870 }
00871 int len = *(int32_t *) p;
00872 cluster_function_index = *(uint32_t *) (p + sizeof(int32_t));
00873
00874 if ((cluster_function_index < (uint32_t) SIZE_clusterFunction)
00875 && (cluster_function_index == SET_CHANNEL_DATA_CLUSTER_FUNCTION)) {
00876 clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].pfn(this, p + (2 * sizeof(uint32_t)), len - sizeof(uint32_t));
00877
00878 *((uint32_t *) (p + sizeof(uint32_t))) = ~*((uint32_t *) (p + sizeof(uint32_t)));
00879 p += (2 * sizeof(uint32_t)) + (len - sizeof(uint32_t));
00880 p = (char *) DOUBLE_ALIGN(p);
00881 } else {
00882
00883
00884 if (needByteSwap) {
00885 ats_swap32((uint32_t *) p);
00886 ats_swap32((uint32_t *) (p + sizeof(int32_t)));
00887 }
00888 break;
00889 }
00890 }
00891 read.msg.control_data_offset = p - (char *) &read.msg.descriptor[read.msg.count];
00892 read.msg.did_small_control_set_data = 1;
00893 }
00894
00895
00896
00897 if (!read.msg.did_large_control_set_data) {
00898 IncomingControl *ic = incoming_control.head;
00899
00900 while (ic) {
00901 if (needByteSwap) {
00902 ats_swap32((uint32_t *) ic->data);
00903 }
00904 cluster_function_index = *((uint32_t *) ic->data);
00905
00906 if ((cluster_function_index < (uint32_t) SIZE_clusterFunction)
00907 && (cluster_function_index == SET_CHANNEL_DATA_CLUSTER_FUNCTION)) {
00908
00909 char *p = ic->data;
00910 clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].pfn(this,
00911 (void *) (p + sizeof(int32_t)), ic->len - sizeof(int32_t));
00912
00913
00914 if (needByteSwap) {
00915 ats_swap32((uint32_t *) p);
00916 ats_swap32((uint32_t *) (p + sizeof(int32_t)));
00917 }
00918
00919
00920 *((uint32_t *) p) = ~*((uint32_t *) p);
00921
00922 ic = (IncomingControl *) ic->link.next;
00923 } else {
00924
00925 if (needByteSwap) {
00926 ats_swap32((uint32_t *) ic->data);
00927 }
00928 break;
00929 }
00930 }
00931 read.msg.did_large_control_set_data = 1;
00932 }
00933 }
00934
00935 void
00936 ClusterHandler::process_small_control_msgs()
00937 {
00938 if (read.msg.did_small_control_msgs) {
00939 return;
00940 } else {
00941 read.msg.did_small_control_msgs = 1;
00942 }
00943
00944 ink_hrtime now = ink_get_hrtime();
00945 char *p = (char *) &read.msg.descriptor[read.msg.count] + read.msg.control_data_offset;
00946 char *endp = (char *) &read.msg.descriptor[read.msg.count] + read.msg.control_bytes;
00947
00948 while (p < endp) {
00949
00950
00951
00952
00953 if (needByteSwap) {
00954 ats_swap32((uint32_t *) p);
00955 ats_swap32((uint32_t *) (p + sizeof(int32_t)));
00956 }
00957 int len = *(int32_t *) p;
00958 p += sizeof(int32_t);
00959 uint32_t cluster_function_index = *(uint32_t *) p;
00960 ink_release_assert(cluster_function_index != SET_CHANNEL_DATA_CLUSTER_FUNCTION);
00961
00962 if (cluster_function_index >= (uint32_t) SIZE_clusterFunction) {
00963 Warning("1Bad cluster function index (small control)");
00964 p += len;
00965
00966 } else if (clusterFunction[cluster_function_index].ClusterFunc) {
00967
00968
00969
00970 p += sizeof(uint32_t);
00971 clusterFunction[cluster_function_index].pfn(this, p, len - sizeof(int32_t));
00972 p += (len - sizeof(int32_t));
00973
00974 } else {
00975
00976
00977
00978 IncomingControl *ic = IncomingControl::alloc();
00979 ic->recognized_time = now;
00980 ic->len = len;
00981 ic->alloc_data();
00982 memcpy(ic->data, p, ic->len);
00983 SetHighBit(&ic->len);
00984 ink_atomiclist_push(&external_incoming_control, (void *) ic);
00985 p += len;
00986 }
00987 p = (char *) DOUBLE_ALIGN(p);
00988 }
00989 }
00990
00991 void
00992 ClusterHandler::process_large_control_msgs()
00993 {
00994 if (read.msg.did_large_control_msgs) {
00995 return;
00996 } else {
00997 read.msg.did_large_control_msgs = 1;
00998 }
00999
01000
01001
01002
01003
01004 IncomingControl *ic = NULL;
01005 uint32_t cluster_function_index;
01006
01007 while ((ic = incoming_control.dequeue())) {
01008 if (needByteSwap) {
01009 ats_swap32((uint32_t *) ic->data);
01010 }
01011 cluster_function_index = *((uint32_t *) ic->data);
01012 ink_release_assert(cluster_function_index != SET_CHANNEL_DATA_CLUSTER_FUNCTION);
01013
01014 if (cluster_function_index == (uint32_t) ~ SET_CHANNEL_DATA_CLUSTER_FUNCTION) {
01015
01016
01017
01018 if (!clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].fMalloced)
01019 ic->freeall();
01020 continue;
01021 }
01022
01023 if (cluster_function_index >= (uint32_t) SIZE_clusterFunction) {
01024 Warning("Bad cluster function index (large control)");
01025 ic->freeall();
01026
01027 } else if (clusterFunction[cluster_function_index].ClusterFunc) {
01028
01029 clusterFunction[cluster_function_index].pfn(this, (void *) (ic->data + sizeof(int32_t)),
01030 ic->len - sizeof(int32_t));
01031
01032
01033 if (!clusterFunction[cluster_function_index].fMalloced)
01034 ic->freeall();
01035
01036 } else {
01037
01038 ink_atomiclist_push(&external_incoming_control, (void *) ic);
01039 }
01040 }
01041 }
01042
01043 void
01044 ClusterHandler::process_freespace_msgs()
01045 {
01046 if (read.msg.did_freespace_msgs) {
01047 return;
01048 } else {
01049 read.msg.did_freespace_msgs = 1;
01050 }
01051
01052 int i;
01053
01054
01055
01056
01057 for (i = 0; i < read.msg.count; i++) {
01058 if (read.msg.descriptor[i].type == CLUSTER_SEND_FREE && read.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL) {
01059 int c = read.msg.descriptor[i].channel;
01060 if (c < n_channels && VALID_CHANNEL(channels[c]) &&
01061 (CLUSTER_SEQUENCE_NUMBER(channels[c]->token.sequence_number) == read.msg.descriptor[i].sequence_number)) {
01062
01063
01064
01065
01066
01067 channels[c]->remote_free = read.msg.descriptor[i].length;
01068 vcs_push(channels[c], VC_CLUSTER_WRITE);
01069 }
01070 }
01071 }
01072 }
01073
01074 void
01075 ClusterHandler::add_to_byte_bank(ClusterVConnection * vc)
01076 {
01077 ByteBankDescriptor *bb_desc = ByteBankDescriptor::ByteBankDescriptor_alloc(vc->read_block);
01078 bool pending_byte_bank_completion = vc->byte_bank_q.head ? true : false;
01079
01080
01081 vc->byte_bank_q.enqueue(bb_desc);
01082
01083
01084 if (!pending_byte_bank_completion) {
01085 ClusterVC_remove_read(vc);
01086 delayed_reads.push(vc);
01087 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_LEVEL1_BANK_STAT);
01088 } else {
01089 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_MULTILEVEL_BANK_STAT);
01090 }
01091 vc->read_block = 0;
01092 }
01093
01094 void
01095 ClusterHandler::update_channels_read()
01096 {
01097
01098
01099
01100 int i;
01101 int len;
01102
01103
01104
01105 process_set_data_msgs();
01106
01107
01108
01109
01110 for (i = 0; i < read.msg.count; i++) {
01111 if (read.msg.descriptor[i].type == CLUSTER_SEND_DATA && read.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL) {
01112 ClusterVConnection *vc = channels[read.msg.descriptor[i].channel];
01113 if (VALID_CHANNEL(vc) &&
01114 (read.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
01115 vc->last_activity_time = current_time;
01116
01117 len = read.msg.descriptor[i].length;
01118 if (!len) {
01119 continue;
01120 }
01121
01122 if (!vc->pending_remote_fill && vc_ok_read(vc)
01123 && (!((ProxyMutex *) vc->read_locked) || vc->byte_bank_q.head)) {
01124
01125
01126
01127
01128
01129 vc->read_block->fill(len);
01130 add_to_byte_bank(vc);
01131
01132 } else {
01133 if (vc->pending_remote_fill || ((ProxyMutex *) vc->read_locked && vc_ok_read(vc))) {
01134 vc->read_block->fill(len);
01135 if (!vc->pending_remote_fill) {
01136 vc->read.vio.buffer.writer()->append_block(vc->read_block->clone());
01137 vc->read_block->consume(len);
01138 }
01139 complete_channel_read(len, vc);
01140 }
01141 }
01142 }
01143 }
01144 }
01145
01146
01147 process_small_control_msgs();
01148 process_large_control_msgs();
01149 process_freespace_msgs();
01150 }
01151
01152
01153
01154
01155
01156
01157
01158 int
01159 ClusterHandler::process_incoming_callouts(ProxyMutex * m)
01160 {
01161 ProxyMutex *mutex = m;
01162 ink_hrtime now;
01163
01164
01165
01166
01167
01168 Queue<IncomingControl> local_incoming_control;
01169 IncomingControl *ic_ext_next;
01170 IncomingControl *ic_ext;
01171
01172 while (true) {
01173 ic_ext = (IncomingControl *)
01174 ink_atomiclist_popall(&external_incoming_control);
01175 if (!ic_ext)
01176 break;
01177
01178 while (ic_ext) {
01179 ic_ext_next = (IncomingControl *) ic_ext->link.next;
01180 ic_ext->link.next = NULL;
01181 local_incoming_control.push(ic_ext);
01182 ic_ext = ic_ext_next;
01183 }
01184
01185
01186 int small_control_msg;
01187 IncomingControl *ic = NULL;
01188
01189 while ((ic = local_incoming_control.pop())) {
01190 LOG_EVENT_TIME(ic->recognized_time, inmsg_time_dist, inmsg_events);
01191
01192
01193 small_control_msg = IsHighBitSet(&ic->len);
01194 ClearHighBit(&ic->len);
01195
01196 if (small_control_msg) {
01197 int len = ic->len;
01198 char *p = ic->data;
01199 uint32_t cluster_function_index = *(uint32_t *) p;
01200 p += sizeof(uint32_t);
01201
01202 if (cluster_function_index < (uint32_t) SIZE_clusterFunction) {
01203
01204
01205
01206 ink_assert(!clusterFunction[cluster_function_index].ClusterFunc);
01207 clusterFunction[cluster_function_index].pfn(this, p, len - sizeof(int32_t));
01208 now = ink_get_hrtime();
01209 CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_RECV_TIME_STAT, now - ic->recognized_time);
01210 } else {
01211 Warning("2Bad cluster function index (small control)");
01212 }
01213
01214 if (!clusterFunction[cluster_function_index].fMalloced)
01215 ic->freeall();
01216
01217 } else {
01218 ink_assert(ic->len > 4);
01219 uint32_t cluster_function_index = *(uint32_t *) ic->data;
01220 bool valid_index;
01221
01222 if (cluster_function_index < (uint32_t) SIZE_clusterFunction) {
01223 valid_index = true;
01224
01225
01226
01227 ink_assert(!clusterFunction[cluster_function_index].ClusterFunc);
01228 clusterFunction[cluster_function_index].pfn(this, (void *) (ic->data + sizeof(int32_t)),
01229 ic->len - sizeof(int32_t));
01230 now = ink_get_hrtime();
01231 CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_RECV_TIME_STAT, now - ic->recognized_time);
01232 } else {
01233 valid_index = false;
01234 Warning("2Bad cluster function index (large control)");
01235 }
01236 if (valid_index && !clusterFunction[cluster_function_index].fMalloced)
01237 ic->freeall();
01238 }
01239 }
01240 }
01241 return EVENT_CONT;
01242 }
01243
01244 void
01245 ClusterHandler::update_channels_partial_read()
01246 {
01247
01248
01249
01250
01251 int i;
01252 int64_t res = read.bytes_xfered;
01253
01254 if (!res) {
01255 return;
01256 }
01257 ink_assert(res <= read.did);
01258
01259
01260
01261 int64_t iov_done[MAX_TCOUNT];
01262 int64_t total = 0;
01263 int64_t already_read = read.did - read.bytes_xfered;
01264
01265 for (i = 0; i < read.n_iov; i++) {
01266 ink_release_assert(already_read >= 0);
01267 iov_done[i] = read.iov[i].iov_len;
01268
01269
01270 if (already_read) {
01271 already_read -= iov_done[i];
01272 if (already_read < 0) {
01273 iov_done[i] = -already_read;
01274 already_read = 0;
01275 } else {
01276 iov_done[i] = 0;
01277 continue;
01278 }
01279 }
01280
01281 res -= iov_done[i];
01282 if (res < 0) {
01283 iov_done[i] += res;
01284 res = 0;
01285 } else {
01286 total += iov_done[i];
01287 }
01288 }
01289 ink_assert(total <= read.did);
01290
01291 int read_all_large_control_msgs = 0;
01292
01293
01294
01295 for (i = 0; i < read.msg.count; i++) {
01296 if (read.msg.descriptor[i].type == CLUSTER_SEND_DATA && read.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL) {
01297 ClusterVConnection *vc = channels[read.msg.descriptor[i].channel];
01298 if (VALID_CHANNEL(vc) &&
01299 (read.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
01300 if (vc->pending_remote_fill || (vc_ok_read(vc) && (vc->iov_map != CLUSTER_IOV_NONE))) {
01301 vc->last_activity_time = current_time;
01302 ClusterVConnState *s = &vc->read;
01303 ink_assert(vc->iov_map < read.n_iov);
01304 int len = iov_done[vc->iov_map];
01305
01306 if (len) {
01307 if (!read_all_large_control_msgs) {
01308
01309
01310
01311
01312
01313
01314
01315 process_set_data_msgs();
01316 process_small_control_msgs();
01317 process_freespace_msgs();
01318 read_all_large_control_msgs = 1;
01319 }
01320 iov_done[vc->iov_map] = 0;
01321 vc->read_block->fill(len);
01322
01323 if (!vc->pending_remote_fill) {
01324 if ((ProxyMutex *) vc->read_locked) {
01325 Debug("cluster_vc_xfer", "Partial read, credit ch %d %p %d bytes", vc->channel, vc, len);
01326 s->vio.buffer.writer()->append_block(vc->read_block->clone());
01327 if (complete_channel_read(len, vc)) {
01328 vc->read_block->consume(len);
01329 }
01330
01331 } else {
01332
01333
01334
01335
01336 if (len == (int) read.msg.descriptor[i].length) {
01337 Debug("cluster_vc_xfer", "Partial read, byte bank move ch %d %p %d bytes", vc->channel, vc, len);
01338 add_to_byte_bank(vc);
01339 }
01340 }
01341 } else {
01342 Debug("cluster_vc_xfer", "Partial remote fill read, credit ch %d %p %d bytes", vc->channel, vc, len);
01343 complete_channel_read(len, vc);
01344 }
01345 read.msg.descriptor[i].length -= len;
01346 ink_assert(((int) read.msg.descriptor[i].length) >= 0);
01347 }
01348 Debug(CL_TRACE, "partial_channel_read chan=%d len=%d", vc->channel, len);
01349 }
01350 }
01351 }
01352 }
01353 }
01354
01355 bool ClusterHandler::complete_channel_read(int len, ClusterVConnection * vc)
01356 {
01357
01358
01359
01360
01361 ClusterVConnState *s = &vc->read;
01362
01363 if (vc->pending_remote_fill) {
01364 Debug(CL_TRACE, "complete_channel_read chan=%d len=%d", vc->channel, len);
01365 vc->initial_data_bytes += len;
01366 ++vc->pending_remote_fill;
01367 return (vc->closed ? false : true);
01368 }
01369
01370 if (vc->closed)
01371 return false;
01372
01373 ink_assert((ProxyMutex *) s->vio.mutex == (ProxyMutex *) s->vio._cont->mutex);
01374
01375 Debug("cluster_vc_xfer", "Complete read, credit ch %d %p %d bytes", vc->channel, vc, len);
01376 s->vio.ndone += len;
01377
01378 if (s->vio.ntodo() <= 0) {
01379 s->enabled = 0;
01380 if (cluster_signal_and_update_locked(VC_EVENT_READ_COMPLETE, vc, s)
01381 == EVENT_DONE)
01382 return false;
01383 } else {
01384 if (cluster_signal_and_update_locked(VC_EVENT_READ_READY, vc, s) == EVENT_DONE)
01385 return false;
01386 if (s->vio.ntodo() <= 0)
01387 s->enabled = 0;
01388 }
01389
01390 vcs_push(vc, VC_CLUSTER_READ);
01391 return true;
01392 }
01393
01394 void
01395 ClusterHandler::finish_delayed_reads()
01396 {
01397
01398
01399
01400
01401
01402 ClusterVConnection *vc = NULL;
01403 DLL<ClusterVConnectionBase> l;
01404 while ((vc = (ClusterVConnection *) delayed_reads.pop())) {
01405 MUTEX_TRY_LOCK_SPIN(lock, vc->read.vio.mutex, thread, READ_LOCK_SPIN_COUNT);
01406 if (lock) {
01407 if (vc_ok_read(vc)) {
01408 ink_assert(!vc->read.queue);
01409 ByteBankDescriptor *d;
01410
01411 while ((d = vc->byte_bank_q.dequeue())) {
01412 if (vc->read.queue) {
01413
01414
01415 ClusterVC_remove_read(vc);
01416 }
01417 Debug("cluster_vc_xfer",
01418 "Delayed read, credit ch %d %p %" PRId64" bytes", vc->channel, vc, d->get_block()->read_avail());
01419 vc->read.vio.buffer.writer()->append_block(d->get_block());
01420
01421 if (complete_channel_read(d->get_block()->read_avail(), vc)) {
01422 ByteBankDescriptor::ByteBankDescriptor_free(d);
01423 } else {
01424 ByteBankDescriptor::ByteBankDescriptor_free(d);
01425 break;
01426 }
01427 }
01428 }
01429 } else
01430 l.push(vc);
01431 }
01432 delayed_reads = l;
01433 }
01434
01435 void
01436 ClusterHandler::update_channels_written()
01437 {
01438
01439
01440
01441
01442
01443
01444
01445 ink_hrtime now;
01446 for (int i = 0; i < write.msg.count; i++) {
01447 if (write.msg.descriptor[i].type == CLUSTER_SEND_DATA) {
01448 if (write.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL) {
01449 ClusterVConnection *vc = channels[write.msg.descriptor[i].channel];
01450 if (VALID_CHANNEL(vc) &&
01451 (write.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
01452
01453 if (vc->pending_remote_fill) {
01454 Debug(CL_TRACE,
01455 "update_channels_written chan=%d seqno=%d len=%d",
01456 write.msg.descriptor[i].channel,
01457 write.msg.descriptor[i].sequence_number, write.msg.descriptor[i].length);
01458 vc->pending_remote_fill = 0;
01459 vc->remote_write_block = 0;
01460 continue;
01461 }
01462
01463 ClusterVConnState *s = &vc->write;
01464 int len = write.msg.descriptor[i].length;
01465 vc->write_bytes_in_transit -= len;
01466 ink_release_assert(vc->write_bytes_in_transit >= 0);
01467 Debug(CL_PROTO, "(%d) data sent %d %" PRId64, write.msg.descriptor[i].channel, len, s->vio.ndone);
01468
01469 if (vc_ok_write(vc)) {
01470 vc->last_activity_time = current_time;
01471 int64_t ndone = vc->was_closed()? 0 : s->vio.ndone;
01472
01473 if (ndone < vc->remote_free) {
01474 vcs_push(vc, VC_CLUSTER_WRITE);
01475 }
01476 }
01477 }
01478 } else {
01479
01480
01481
01482 OutgoingControl *oc = write.msg.outgoing_control.dequeue();
01483 oc->free_data();
01484 oc->mutex = NULL;
01485 now = ink_get_hrtime();
01486 CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_SEND_TIME_STAT, now - oc->submit_time);
01487 LOG_EVENT_TIME(oc->submit_time, cluster_send_time_dist, cluster_send_events);
01488 oc->freeall();
01489 }
01490 }
01491 }
01492
01493
01494
01495
01496
01497 invoke_remote_data_args *args;
01498 OutgoingControl *hdr_oc;
01499 while ((hdr_oc = write.msg.outgoing_callout.dequeue())) {
01500 args = (invoke_remote_data_args *) (hdr_oc->data + sizeof(int32_t));
01501 ink_assert(args->magicno == invoke_remote_data_args::MagicNo);
01502
01503
01504 args->data_oc->free_data();
01505 args->data_oc->mutex = NULL;
01506 args->data_oc->freeall();
01507
01508
01509 hdr_oc->free_data();
01510 hdr_oc->mutex = NULL;
01511 now = ink_get_hrtime();
01512 CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_SEND_TIME_STAT, now - hdr_oc->submit_time);
01513 LOG_EVENT_TIME(hdr_oc->submit_time, cluster_send_time_dist, cluster_send_events);
01514 hdr_oc->freeall();
01515 }
01516 }
01517
01518 int
01519 ClusterHandler::build_write_descriptors()
01520 {
01521
01522
01523
01524
01525
01526 int count_bucket = cur_vcs;
01527 int tcount = write.msg.count + 2;
01528 int write_descriptors_built = 0;
01529 int valid;
01530 int list_len = 0;
01531 ClusterVConnection *vc, *vc_next;
01532
01533
01534
01535
01536 vc = (ClusterVConnection *)ink_atomiclist_popall(&write_vcs_ready);
01537 while (vc) {
01538 enter_exit(&cls_build_writes_entered, &cls_writes_exited);
01539 vc_next = (ClusterVConnection *) vc->ready_alink.next;
01540 vc->ready_alink.next = NULL;
01541 list_len++;
01542 if (VC_CLUSTER_CLOSED == vc->type) {
01543 vc->in_vcs = false;
01544 vc->type = VC_NULL;
01545 clusterVCAllocator.free(vc);
01546 vc = vc_next;
01547 continue;
01548 }
01549
01550 if (tcount >= MAX_TCOUNT) {
01551 vcs_push(vc, VC_CLUSTER_WRITE);
01552 } else {
01553 vc->in_vcs = false;
01554 cluster_reschedule_offset(this, vc, &vc->write, 0);
01555 tcount++;
01556 }
01557 vc = vc_next;
01558 }
01559 if (list_len) {
01560 CLUSTER_SUM_DYN_STAT(CLUSTER_VC_WRITE_LIST_LEN_STAT, list_len);
01561 }
01562
01563 tcount = write.msg.count + 2;
01564 vc_next = (ClusterVConnection *) write_vcs[count_bucket].head;
01565 while (vc_next) {
01566 vc = vc_next;
01567 vc_next = (ClusterVConnection *) vc->write.link.next;
01568
01569 if (VC_CLUSTER_CLOSED == vc->type) {
01570 vc->type = VC_NULL;
01571 clusterVCAllocator.free(vc);
01572 continue;
01573 }
01574
01575 if (tcount >= MAX_TCOUNT)
01576 break;
01577
01578 valid = valid_for_data_write(vc);
01579 if (-1 == valid) {
01580 vcs_push(vc, VC_CLUSTER_WRITE);
01581 } else if (valid) {
01582 ink_assert(vc->write_locked);
01583 if ((vc->remote_free > (vc->write.vio.ndone - vc->write_list_bytes))
01584 && channels[vc->channel] == vc) {
01585
01586 ink_assert(vc->write_list && vc->write_list_bytes);
01587
01588 int d = write.msg.count;
01589 write.msg.descriptor[d].type = CLUSTER_SEND_DATA;
01590 write.msg.descriptor[d].channel = vc->channel;
01591 write.msg.descriptor[d].sequence_number = vc->token.sequence_number;
01592 int s = vc->write_list_bytes;
01593 ink_release_assert(s <= MAX_CLUSTER_SEND_LENGTH);
01594
01595
01596 if ((vc->write.vio.ndone - s) > vc->write.vio.nbytes)
01597 s = vc->write.vio.nbytes - (vc->write.vio.ndone - s);
01598
01599 if ((vc->write.vio.ndone - s) > vc->remote_free)
01600 s = vc->remote_free - (vc->write.vio.ndone - s);
01601 write.msg.descriptor[d].length = s;
01602 write.msg.count++;
01603 tcount++;
01604 write_descriptors_built++;
01605
01606 #ifdef CLUSTER_STATS
01607 _vc_writes++;
01608 _vc_write_bytes += s;
01609 #endif
01610
01611 } else {
01612 MUTEX_UNTAKE_LOCK(vc->write_locked, thread);
01613 vc->write_locked = NULL;
01614
01615 if (channels[vc->channel] == vc)
01616 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_NO_REMOTE_SPACE_STAT);
01617 }
01618 }
01619 }
01620 return (write_descriptors_built);
01621 }
01622
01623 int
01624 ClusterHandler::build_freespace_descriptors()
01625 {
01626
01627
01628
01629
01630
01631
01632 int count_bucket = cur_vcs;
01633 int tcount = write.msg.count + 2;
01634 int freespace_descriptors_built = 0;
01635 int s = 0;
01636 int list_len = 0;
01637 ClusterVConnection *vc, *vc_next;
01638
01639
01640
01641
01642 vc = (ClusterVConnection *)ink_atomiclist_popall(&read_vcs_ready);
01643 while (vc) {
01644 enter_exit(&cls_build_reads_entered, &cls_reads_exited);
01645 vc_next = (ClusterVConnection *) vc->ready_alink.next;
01646 vc->ready_alink.next = NULL;
01647 list_len++;
01648 if (VC_CLUSTER_CLOSED == vc->type) {
01649 vc->in_vcs = false;
01650 vc->type = VC_NULL;
01651 clusterVCAllocator.free(vc);
01652 vc = vc_next;
01653 continue;
01654 }
01655
01656 if (tcount >= MAX_TCOUNT) {
01657 vcs_push(vc, VC_CLUSTER_READ);
01658 } else {
01659 vc->in_vcs = false;
01660 cluster_reschedule_offset(this, vc, &vc->read, 0);
01661 tcount++;
01662 }
01663 vc = vc_next;
01664 }
01665 if (list_len) {
01666 CLUSTER_SUM_DYN_STAT(CLUSTER_VC_READ_LIST_LEN_STAT, list_len);
01667 }
01668
01669 tcount = write.msg.count + 2;
01670 vc_next = (ClusterVConnection *) read_vcs[count_bucket].head;
01671 while (vc_next) {
01672 vc = vc_next;
01673 vc_next = (ClusterVConnection *) vc->read.link.next;
01674
01675 if (VC_CLUSTER_CLOSED == vc->type) {
01676 vc->type = VC_NULL;
01677 clusterVCAllocator.free(vc);
01678 continue;
01679 }
01680
01681 if (tcount >= MAX_TCOUNT)
01682 break;
01683
01684 s = valid_for_freespace_write(vc);
01685 if (-1 == s) {
01686 vcs_push(vc, VC_CLUSTER_READ);
01687 } else if (s) {
01688 if (vc_ok_read(vc) && channels[vc->channel] == vc) {
01689
01690 int d = write.msg.count;
01691 write.msg.descriptor[d].type = CLUSTER_SEND_FREE;
01692 write.msg.descriptor[d].channel = vc->channel;
01693 write.msg.descriptor[d].sequence_number = vc->token.sequence_number;
01694
01695 ink_assert(s > 0);
01696 write.msg.descriptor[d].length = s;
01697 vc->last_local_free = s;
01698 Debug(CL_PROTO, "(%d) free space priority %d", vc->channel, vc->read.priority);
01699 write.msg.count++;
01700 tcount++;
01701 freespace_descriptors_built++;
01702 }
01703 }
01704 }
01705 return (freespace_descriptors_built);
01706 }
01707
01708 int
01709 ClusterHandler::build_controlmsg_descriptors()
01710 {
01711
01712
01713
01714
01715
01716
01717 int tcount = write.msg.count + 2;
01718 int control_msgs_built = 0;
01719 bool compound_msg;
01720
01721
01722
01723 OutgoingControl *c = NULL;
01724 int control_bytes = 0;
01725 int q = 0;
01726
01727 while (tcount < (MAX_TCOUNT - 1)) {
01728 c = outgoing_control[q].pop();
01729 if (!c) {
01730
01731 OutgoingControl *c_next;
01732 c = (OutgoingControl *) ink_atomiclist_popall(&outgoing_control_al[q]);
01733 if (c == 0) {
01734 if (++q >= CLUSTER_CMSG_QUEUES) {
01735 break;
01736 } else {
01737 continue;
01738 }
01739 }
01740 while (c) {
01741 c_next = (OutgoingControl *) c->link.next;
01742 c->link.next = NULL;
01743 outgoing_control[q].push(c);
01744 c = c_next;
01745 }
01746 continue;
01747
01748 } else {
01749 compound_msg = (*((int32_t *) c->data) == -1);
01750 }
01751 if (!compound_msg && c->len <= SMALL_CONTROL_MESSAGE &&
01752
01753 !clusterFunction[*(int32_t *) c->data].fMalloced && control_bytes + c->len + sizeof(int32_t) * 2 + 7 < CONTROL_DATA) {
01754 write.msg.outgoing_small_control.enqueue(c);
01755 control_bytes += c->len + sizeof(int32_t) * 2 + 7;
01756 control_msgs_built++;
01757
01758 if (clusterFunction[*(int32_t *) c->data].post_pfn) {
01759 clusterFunction[*(int32_t *) c->data].post_pfn(this, c->data + sizeof(int32_t), c->len);
01760 }
01761 continue;
01762 }
01763
01764
01765
01766 if (compound_msg) {
01767
01768 invoke_remote_data_args *cmhdr = (invoke_remote_data_args *) (c->data + sizeof(int32_t));
01769 OutgoingControl *oc_header = c;
01770 OutgoingControl *oc_msg = cmhdr->msg_oc;
01771 OutgoingControl *oc_data = cmhdr->data_oc;
01772
01773 ink_assert(cmhdr->magicno == invoke_remote_data_args::MagicNo);
01774
01775
01776
01777
01778
01779
01780 int d;
01781 d = write.msg.count;
01782 write.msg.descriptor[d].type = CLUSTER_SEND_DATA;
01783 write.msg.descriptor[d].channel = cmhdr->dest_channel;
01784 write.msg.descriptor[d].length = oc_data->len;
01785 write.msg.descriptor[d].sequence_number = cmhdr->token.sequence_number;
01786
01787 #ifdef CLUSTER_STATS
01788 _vc_write_bytes += oc_data->len;
01789 #endif
01790
01791
01792 ClusterVConnection *vc = channels[cmhdr->dest_channel];
01793
01794 if (VALID_CHANNEL(vc) && vc->pending_remote_fill) {
01795 ink_release_assert(!vc->remote_write_block);
01796 vc->remote_write_block = oc_data->get_block();
01797
01798
01799 write.msg.count++;
01800 tcount++;
01801 control_msgs_built++;
01802 d = write.msg.count;
01803 write.msg.outgoing_control.enqueue(oc_msg);
01804 write.msg.descriptor[d].type = CLUSTER_SEND_DATA;
01805 write.msg.descriptor[d].channel = CLUSTER_CONTROL_CHANNEL;
01806 write.msg.descriptor[d].length = oc_msg->len;
01807
01808 #ifdef CLUSTER_STATS
01809 _control_write_bytes += oc_msg->len;
01810 #endif
01811
01812 write.msg.count++;
01813 tcount++;
01814 control_msgs_built++;
01815
01816
01817 write.msg.outgoing_callout.enqueue(oc_header);
01818
01819 } else {
01820
01821 Warning("Pending remote read fill aborted chan=%d len=%d", cmhdr->dest_channel, oc_data->len);
01822
01823
01824 oc_header->free_data();
01825 oc_header->mutex = NULL;
01826 oc_header->freeall();
01827
01828
01829 oc_msg->free_data();
01830 oc_msg->mutex = 0;
01831 oc_msg->freeall();
01832
01833
01834 oc_data->free_data();
01835 oc_data->mutex = 0;
01836 oc_data->freeall();
01837 }
01838
01839 } else {
01840 write.msg.outgoing_control.enqueue(c);
01841
01842 int d = write.msg.count;
01843 write.msg.descriptor[d].type = CLUSTER_SEND_DATA;
01844 write.msg.descriptor[d].channel = CLUSTER_CONTROL_CHANNEL;
01845 write.msg.descriptor[d].length = c->len;
01846
01847 #ifdef CLUSTER_STATS
01848 _control_write_bytes += c->len;
01849 #endif
01850
01851 write.msg.count++;
01852 tcount++;
01853 control_msgs_built++;
01854
01855 if (clusterFunction[*(int32_t *) c->data].post_pfn) {
01856 clusterFunction[*(int32_t *) c->data].post_pfn(this, c->data + sizeof(int32_t), c->len);
01857 }
01858 }
01859 }
01860 return control_msgs_built;
01861 }
01862
01863 int
01864 ClusterHandler::add_small_controlmsg_descriptors()
01865 {
01866
01867
01868
01869 char *p = (char *) &write.msg.descriptor[write.msg.count];
01870 OutgoingControl *c = NULL;
01871
01872 while ((c = write.msg.outgoing_small_control.dequeue())) {
01873 *(int32_t *) p = c->len;
01874 p += sizeof(int32_t);
01875 memcpy(p, c->data, c->len);
01876 c->free_data();
01877 c->mutex = NULL;
01878 p += c->len;
01879 ink_hrtime now = ink_get_hrtime();
01880 CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_SEND_TIME_STAT, now - c->submit_time);
01881 LOG_EVENT_TIME(c->submit_time, cluster_send_time_dist, cluster_send_events);
01882 c->freeall();
01883 p = (char *) DOUBLE_ALIGN(p);
01884 }
01885 write.msg.control_bytes = p - (char *) &write.msg.descriptor[write.msg.count];
01886
01887 #ifdef CLUSTER_STATS
01888 _control_write_bytes += write.msg.control_bytes;
01889 #endif
01890
01891 return 1;
01892 }
01893
01894 struct DestructorLock
01895 {
01896 DestructorLock(EThread * thread)
01897 {
01898 have_lock = false;
01899 t = thread;
01900 }
01901 ~DestructorLock()
01902 {
01903 if (have_lock && m) {
01904 Mutex_unlock(m, t);
01905 }
01906 m = 0;
01907 }
01908 EThread *t;
01909 Ptr<ProxyMutex> m;
01910 bool have_lock;
01911 };
01912
01913 int
01914 ClusterHandler::valid_for_data_write(ClusterVConnection * vc)
01915 {
01916
01917
01918
01919 ClusterVConnState *s = &vc->write;
01920
01921 ink_assert(!on_stolen_thread);
01922 ink_assert((ProxyMutex *) ! vc->write_locked);
01923
01924
01925
01926
01927 DestructorLock lock(thread);
01928
01929 retry:
01930 if ((lock.m = s->vio.mutex)) {
01931 lock.have_lock = MUTEX_TAKE_TRY_LOCK_FOR_SPIN(lock.m, thread, s->vio._cont, WRITE_LOCK_SPIN_COUNT);
01932 if (!lock.have_lock) {
01933 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_WRITE_LOCKED_STAT);
01934
01935 #ifdef CLUSTER_STATS
01936 _dw_missed_lock++;
01937 #endif
01938 return -1;
01939 }
01940 }
01941
01942 if (vc->was_closed()) {
01943 if (vc->schedule_write()) {
01944 #ifdef CLUSTER_TOMCAT
01945 ink_assert(lock.m);
01946 #endif
01947 vc->write_locked = lock.m;
01948 lock.m = 0;
01949 lock.have_lock = false;
01950 return 1;
01951 } else {
01952 if (!vc->write_bytes_in_transit) {
01953 close_ClusterVConnection(vc);
01954 }
01955 return 0;
01956 }
01957 }
01958
01959 if (!s->enabled && !vc->was_remote_closed()) {
01960 #ifdef CLUSTER_STATS
01961 _dw_not_enabled++;
01962 #endif
01963 return 0;
01964 }
01965
01966 if (vc->pending_remote_fill) {
01967 if (vc->was_remote_closed())
01968 close_ClusterVConnection(vc);
01969
01970 #ifdef CLUSTER_STATS
01971 _dw_wait_remote_fill++;
01972 #endif
01973 return 0;
01974 }
01975
01976 if (!lock.have_lock || !s->vio.mutex || !s->vio._cont) {
01977 if (!lock.have_lock && s->vio.mutex && s->vio._cont) {
01978 goto retry;
01979 } else {
01980
01981 #ifdef CLUSTER_STATS
01982 _dw_no_active_vio++;
01983 #endif
01984 return 0;
01985 }
01986 }
01987
01988
01989
01990 if (vc->was_remote_closed()) {
01991 if (!vc->write_bytes_in_transit && !vc->schedule_write()) {
01992 remote_close(vc, s);
01993 }
01994 return 0;
01995 }
01996
01997
01998
01999 if (!s->enabled || s->vio.op != VIO::WRITE) {
02000 s->enabled = 0;
02001 #ifdef CLUSTER_STATS
02002 _dw_not_enabled_or_no_write++;
02003 #endif
02004 return 0;
02005 }
02006
02007
02008
02009 int set_data_msgs_pending = vc->n_set_data_msgs;
02010 if (set_data_msgs_pending || (vc->remote_free <= (s->vio.ndone - vc->write_list_bytes))) {
02011 if (set_data_msgs_pending) {
02012 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_WRITE_STALL_STAT);
02013
02014 #ifdef CLUSTER_STATS
02015 _dw_set_data_pending++;
02016 #endif
02017
02018 } else {
02019 #ifdef CLUSTER_STATS
02020 _dw_no_free_space++;
02021 #endif
02022 }
02023 return 0;
02024 }
02025
02026
02027
02028 MIOBufferAccessor & buf = s->vio.buffer;
02029
02030 int64_t towrite = buf.reader()->read_avail();
02031 int64_t ntodo = s->vio.ntodo();
02032 bool write_vc_signal = false;
02033
02034 if (towrite > ntodo)
02035 towrite = ntodo;
02036
02037 ink_assert(ntodo >= 0);
02038 if (ntodo <= 0) {
02039 cluster_signal_and_update(VC_EVENT_WRITE_COMPLETE, vc, s);
02040 return 0;
02041 }
02042 if (buf.writer()->write_avail() && towrite != ntodo) {
02043 write_vc_signal = true;
02044 if (cluster_signal_and_update(VC_EVENT_WRITE_READY, vc, s) == EVENT_DONE)
02045 return 0;
02046 ink_assert(s->vio.ntodo() >= 0);
02047 if (s->vio.ntodo() <= 0) {
02048 cluster_signal_and_update(VC_EVENT_WRITE_COMPLETE, vc, s);
02049 return 0;
02050 }
02051 }
02052
02053
02054
02055 Ptr<IOBufferBlock> b_list;
02056 IOBufferBlock *b_tail;
02057 int bytes_to_fill;
02058 int consume_bytes;
02059
02060 bytes_to_fill = DEFAULT_MAX_BUFFER_SIZE - vc->write_list_bytes;
02061
02062 if (towrite && bytes_to_fill) {
02063 consume_bytes = (towrite > bytes_to_fill) ? bytes_to_fill : towrite;
02064 b_list = clone_IOBufferBlockList(s->vio.buffer.reader()->block,
02065 s->vio.buffer.reader()->start_offset, consume_bytes, &b_tail);
02066 ink_assert(b_tail);
02067
02068
02069
02070 if (vc->write_list_tail) {
02071 vc->write_list_tail->next = b_list;
02072 } else {
02073 vc->write_list = b_list;
02074 }
02075 vc->write_list_tail = b_tail;
02076 vc->write_list_bytes += consume_bytes;
02077 ink_assert(bytes_IOBufferBlockList(vc->write_list, 1) == vc->write_list_bytes);
02078
02079
02080
02081 (s->vio.buffer.reader())->consume(consume_bytes);
02082 s->vio.ndone += consume_bytes;
02083 if (s->vio.ntodo() <= 0) {
02084 cluster_signal_and_update_locked(VC_EVENT_WRITE_COMPLETE, vc, s);
02085 }
02086 }
02087
02088 if (vc->schedule_write()) {
02089 #ifdef CLUSTER_TOMCAT
02090 ink_assert(s->vio.mutex);
02091 #endif
02092 vc->write_locked = lock.m;
02093 lock.m = 0;
02094 lock.have_lock = false;
02095 return 1;
02096 } else {
02097 if (!write_vc_signal && buf.writer()->write_avail() && towrite != ntodo)
02098 cluster_signal_and_update(VC_EVENT_WRITE_READY, vc, s);
02099 return 0;
02100 }
02101 }
02102
02103 int
02104 ClusterHandler::valid_for_freespace_write(ClusterVConnection * vc)
02105 {
02106
02107
02108
02109 ClusterVConnState *s = &vc->read;
02110
02111 ink_assert(!on_stolen_thread);
02112
02113
02114
02115
02116 DestructorLock lock(thread);
02117
02118 retry:
02119 if ((lock.m = s->vio.mutex)) {
02120 lock.have_lock = MUTEX_TAKE_TRY_LOCK_FOR_SPIN(lock.m, thread, s->vio._cont, READ_LOCK_SPIN_COUNT);
02121
02122 if (!lock.have_lock) {
02123 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_READ_LOCKED_STAT);
02124
02125 #ifdef CLUSTER_STATS
02126 _fw_missed_lock++;
02127 #endif
02128 return -1;
02129 }
02130 }
02131 if (vc->was_closed()) {
02132 if (!vc->write_bytes_in_transit && !vc->schedule_write()) {
02133 close_ClusterVConnection(vc);
02134 }
02135 return 0;
02136 }
02137
02138 if (!s->enabled && !vc->was_remote_closed()) {
02139 #ifdef CLUSTER_STATS
02140 _fw_not_enabled++;
02141 #endif
02142 return 0;
02143 }
02144
02145 if (vc->pending_remote_fill) {
02146 if (vc->was_remote_closed())
02147 close_ClusterVConnection(vc);
02148
02149 #ifdef CLUSTER_STATS
02150 _fw_wait_remote_fill++;
02151 #endif
02152 return 0;
02153 }
02154
02155 if (!lock.have_lock || !s->vio.mutex || !s->vio._cont) {
02156 if (!lock.have_lock && s->vio.mutex && s->vio._cont) {
02157 goto retry;
02158 } else {
02159
02160 #ifdef CLUSTER_STATS
02161 _fw_no_active_vio++;
02162 #endif
02163 return 0;
02164 }
02165 }
02166
02167
02168
02169 if (vc->was_remote_closed()) {
02170 if (vc->write_bytes_in_transit || vc->schedule_write()) {
02171
02172 return 0;
02173 }
02174 remote_close(vc, s);
02175 return 0;
02176 }
02177
02178
02179
02180 if (!s->enabled || s->vio.op != VIO::READ) {
02181 #ifdef CLUSTER_STATS
02182 _fw_not_enabled_or_no_read++;
02183 #endif
02184 return 0;
02185 }
02186
02187 int64_t ntodo = s->vio.ntodo();
02188 ink_assert(ntodo >= 0);
02189
02190 if (ntodo <= 0) {
02191 cluster_signal_and_update(VC_EVENT_READ_COMPLETE, vc, s);
02192 return 0;
02193 }
02194
02195 int64_t bytes_to_move = vc->initial_data_bytes;
02196 if (vc->read_block && bytes_to_move) {
02197
02198
02199
02200 if (ntodo >= bytes_to_move) {
02201 Debug("cluster_vc_xfer", "finish initial data push ch %d bytes %" PRId64, vc->channel, vc->read_block->read_avail());
02202
02203 s->vio.buffer.writer()->append_block(vc->read_block->clone());
02204 vc->read_block = 0;
02205
02206 } else {
02207 bytes_to_move = ntodo;
02208
02209 Debug("cluster_vc_xfer", "initial data push ch %d bytes %" PRId64, vc->channel, bytes_to_move);
02210
02211
02212
02213 IOBufferBlock *b, *btail;
02214 b = clone_IOBufferBlockList(vc->read_block, 0, bytes_to_move, &btail);
02215 s->vio.buffer.writer()->append_block(b);
02216 vc->read_block->consume(bytes_to_move);
02217 }
02218 s->vio.ndone += bytes_to_move;
02219 vc->initial_data_bytes -= bytes_to_move;
02220
02221 if (s->vio.ntodo() <= 0) {
02222 s->enabled = 0;
02223 cluster_signal_and_update_locked(VC_EVENT_READ_COMPLETE, vc, s);
02224 return 0;
02225
02226 } else {
02227 if (vc->have_all_data) {
02228 if (!vc->read_block) {
02229 s->enabled = 0;
02230 cluster_signal_and_update(VC_EVENT_EOS, vc, s);
02231 return 0;
02232 }
02233 }
02234 if (cluster_signal_and_update_locked(VC_EVENT_READ_READY, vc, s)
02235 == EVENT_DONE)
02236 return false;
02237
02238 if (s->vio.ntodo() <= 0)
02239 s->enabled = 0;
02240
02241 if (vc->initial_data_bytes)
02242 return 0;
02243 }
02244 }
02245
02246
02247
02248
02249 int64_t nextfree = vc->read.vio.ndone;
02250
02251 nextfree = (nextfree + DEFAULT_MAX_BUFFER_SIZE - 1) / DEFAULT_MAX_BUFFER_SIZE;
02252 nextfree *= DEFAULT_MAX_BUFFER_SIZE;
02253
02254 if (nextfree >= (vc->last_local_free / 2)) {
02255 nextfree = vc->last_local_free + (8 * DEFAULT_MAX_BUFFER_SIZE);
02256 }
02257
02258 if ((vc->last_local_free == 0) || (nextfree >= vc->last_local_free)) {
02259 Debug(CL_PROTO, "(%d) update freespace %" PRId64, vc->channel, nextfree);
02260
02261
02262
02263 return nextfree;
02264
02265 } else {
02266
02267 return 0;
02268 }
02269 }
02270
02271 void
02272 ClusterHandler::vcs_push(ClusterVConnection * vc, int type)
02273 {
02274 if (vc->type <= VC_CLUSTER)
02275 vc->type = type;
02276
02277 while ((vc->type > VC_CLUSTER) && !vc->in_vcs && ink_atomic_cas(pvint32(&vc->in_vcs), 0, 1)) {
02278 if (vc->type == VC_CLUSTER_READ)
02279 ink_atomiclist_push(&vc->ch->read_vcs_ready, (void *)vc);
02280 else
02281 ink_atomiclist_push(&vc->ch->write_vcs_ready, (void *)vc);
02282 return;
02283 }
02284 }
02285
02286 int
02287 ClusterHandler::remote_close(ClusterVConnection * vc, ClusterVConnState * ns)
02288 {
02289 if (ns->vio.op != VIO::NONE && !vc->closed) {
02290 ns->enabled = 0;
02291 if (vc->remote_closed > 0) {
02292 if (ns->vio.op == VIO::READ) {
02293 if (ns->vio.nbytes == ns->vio.ndone) {
02294 return cluster_signal_and_update(VC_EVENT_READ_COMPLETE, vc, ns);
02295 } else {
02296 return cluster_signal_and_update(VC_EVENT_EOS, vc, ns);
02297 }
02298 } else {
02299 return cluster_signal_and_update(VC_EVENT_EOS, vc, ns);
02300 }
02301 } else {
02302 return cluster_signal_error_and_update(vc, ns, vc->remote_lerrno);
02303 }
02304 }
02305 return EVENT_CONT;
02306 }
02307
02308 void
02309 ClusterHandler::steal_thread(EThread * t)
02310 {
02311
02312
02313
02314
02315 if (t != thread &&
02316 write.to_do <= 0 &&
02317
02318 !write.msg.count) {
02319 mainClusterEvent(CLUSTER_EVENT_STEAL_THREAD, (Event *) t);
02320 }
02321 }
02322
02323 void
02324 ClusterHandler::free_locks(bool read_flag, int i)
02325 {
02326
02327
02328
02329 if (i == CLUSTER_FREE_ALL_LOCKS) {
02330 if (read_flag) {
02331 i = (read.msg.state >= 2 ? read.msg.count : 0);
02332 } else {
02333 i = write.msg.count;
02334 }
02335 }
02336 ClusterState & s = (read_flag ? read : write);
02337 for (int j = 0; j < i; j++) {
02338 if (s.msg.descriptor[j].type == CLUSTER_SEND_DATA && s.msg.descriptor[j].channel != CLUSTER_CONTROL_CHANNEL) {
02339 ClusterVConnection *vc = channels[s.msg.descriptor[j].channel];
02340 if (VALID_CHANNEL(vc)) {
02341 if (read_flag) {
02342 if ((ProxyMutex *) vc->read_locked) {
02343 MUTEX_UNTAKE_LOCK(vc->read.vio.mutex, thread);
02344 vc->read_locked = NULL;
02345 }
02346 } else {
02347 if ((ProxyMutex *) vc->write_locked) {
02348 MUTEX_UNTAKE_LOCK(vc->write_locked, thread);
02349 vc->write_locked = NULL;
02350 }
02351 }
02352 }
02353 } else if (!read_flag &&
02354 s.msg.descriptor[j].type == CLUSTER_SEND_FREE &&
02355 s.msg.descriptor[j].channel != CLUSTER_CONTROL_CHANNEL) {
02356 ClusterVConnection *vc = channels[s.msg.descriptor[j].channel];
02357 if (VALID_CHANNEL(vc)) {
02358 if ((ProxyMutex *) vc->read_locked) {
02359 MUTEX_UNTAKE_LOCK(vc->read_locked, thread);
02360 vc->read_locked = NULL;
02361 }
02362 }
02363 }
02364 }
02365 }
02366
02367 #ifdef CLUSTER_IMMEDIATE_NETIO
02368 void
02369 ClusterHandler::build_poll(bool next)
02370 {
02371 Pollfd *pfd;
02372 if (next) {
02373 pfd = thread->nextPollDescriptor->alloc();
02374 pfd->fd = net_vc->get_socket();
02375 ifd = pfd - thread->nextPollDescriptor->pfd;
02376 } else {
02377 pfd = thread->pollDescriptor->alloc();
02378 pfd->fd = net_vc->get_socket();
02379 ifd = pfd - thread->pollDescriptor->pfd;
02380 }
02381 pfd->events = POLLHUP;
02382 if (next) {
02383 if (read.to_do)
02384 pfd->events |= POLLIN;
02385 if (write.to_do)
02386 pfd->events |= POLLOUT;
02387 } else {
02388
02389 pfd->events = POLLIN | POLLOUT;
02390
02391 pfd->revents = POLLIN | POLLOUT;
02392 }
02393 }
02394 #endif // CLUSTER_IMMEDIATE_NETIO
02395
02396 extern int CacheClusterMonitorEnabled;
02397 extern int CacheClusterMonitorIntervalSecs;
02398
02399
02400
02401
02402
02403 int
02404 ClusterHandler::mainClusterEvent(int event, Event * e)
02405 {
02406
02407 current_time = ink_get_hrtime();
02408
02409 if (CacheClusterMonitorEnabled) {
02410 if ((current_time - last_trace_dump) > HRTIME_SECONDS(CacheClusterMonitorIntervalSecs)) {
02411 last_trace_dump = current_time;
02412 dump_internal_data();
02413 }
02414 }
02415
02416
02417
02418
02419
02420
02421
02422
02423
02424 #ifndef DEBUG
02425 if (clm && ClusterLoadMonitor::cf_monitor_enabled > 0) {
02426 #else
02427 if (0) {
02428 #endif
02429 bool last_state = disable_remote_cluster_ops;
02430 if (clm->is_cluster_overloaded()) {
02431 disable_remote_cluster_ops = true;
02432 } else {
02433 disable_remote_cluster_ops = false;
02434 }
02435 if (last_state != disable_remote_cluster_ops) {
02436 if (disable_remote_cluster_ops) {
02437 Note("Network congestion to [%u.%u.%u.%u] encountered, reverting to proxy only mode", DOT_SEPARATED(ip));
02438 } else {
02439 Note("Network congestion to [%u.%u.%u.%u] cleared, reverting to cache mode", DOT_SEPARATED(ip));
02440 last_cluster_op_enable = current_time;
02441 }
02442 }
02443 }
02444
02445 on_stolen_thread = (event == CLUSTER_EVENT_STEAL_THREAD);
02446 bool io_callback = (event == EVENT_IMMEDIATE);
02447
02448 if (on_stolen_thread) {
02449 thread = (EThread *) e;
02450 } else {
02451 if (io_callback) {
02452 thread = this_ethread();
02453 } else {
02454 thread = e->ethread;
02455 }
02456 }
02457
02458 int io_activity = 1;
02459 bool only_write_control_msgs;
02460 int res;
02461
02462 while (io_activity) {
02463 io_activity = 0;
02464 only_write_control_msgs = 0;
02465
02466 if (downing) {
02467 machine_down();
02468 break;
02469 }
02470
02471
02472
02473
02474 if (!on_stolen_thread) {
02475 if (delayed_reads.head) {
02476 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_DELAYED_READS_STAT);
02477 finish_delayed_reads();
02478 }
02479 if ((res = process_read(current_time)) < 0) {
02480 break;
02481 }
02482 io_activity += res;
02483
02484 if (delayed_reads.head) {
02485 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_DELAYED_READS_STAT);
02486 finish_delayed_reads();
02487 }
02488 }
02489
02490
02491
02492 if ((res = process_write(current_time, only_write_control_msgs)) < 0) {
02493 break;
02494 }
02495 io_activity += res;
02496
02497
02498
02499
02500 if (!on_stolen_thread) {
02501 if (do_open_local_requests())
02502 thread->signal_hook(thread);
02503 }
02504 }
02505
02506 #ifdef CLUSTER_IMMEDIATE_NETIO
02507 if (!dead && ((event == EVENT_POLL) || (event == EVENT_INTERVAL))) {
02508 if (res >= 0) {
02509 build_poll(true);
02510 }
02511 }
02512 #endif
02513 return EVENT_CONT;
02514 }
02515
02516 int
02517 ClusterHandler::process_read(ink_hrtime )
02518 {
02519 #ifdef CLUSTER_STATS
02520 _process_read_calls++;
02521 #endif
02522 if (dead) {
02523
02524 return 0;
02525 }
02526
02527
02528
02529
02530 for (;;) {
02531
02532 switch (read.state) {
02533
02534 case ClusterState::READ_START:
02535
02536 {
02537 #ifdef CLUSTER_STATS
02538 _n_read_start++;
02539 #endif
02540 read.msg.clear();
02541 read.start_time = ink_get_hrtime();
02542 if (build_initial_vector(CLUSTER_READ)) {
02543 read.state = ClusterState::READ_HEADER;
02544 } else {
02545 return 0;
02546 }
02547 break;
02548 }
02549
02550 case ClusterState::READ_HEADER:
02551
02552 {
02553 #ifdef CLUSTER_STATS
02554 _n_read_header++;
02555 #endif
02556 read.state = ClusterState::READ_AWAIT_HEADER;
02557 if (!read.doIO()) {
02558
02559 read.state = ClusterState::READ_HEADER;
02560 return 0;
02561 }
02562 break;
02563 }
02564
02565 case ClusterState::READ_AWAIT_HEADER:
02566
02567 {
02568 #ifdef CLUSTER_STATS
02569 _n_read_await_header++;
02570 #endif
02571 if (!read.io_complete) {
02572 return 0;
02573 } else {
02574 if (read.io_complete < 0) {
02575
02576 machine_down();
02577 return -1;
02578 }
02579 }
02580 if (read.to_do) {
02581 if (read.bytes_xfered) {
02582 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_PARTIAL_READS_STAT);
02583 read.state = ClusterState::READ_HEADER;
02584 break;
02585 } else {
02586
02587 read.state = ClusterState::READ_HEADER;
02588 return 0;
02589 }
02590 } else {
02591 #ifdef MSG_TRACE
02592 fprintf(t_fd,
02593 "[R] seqno=%d count=%d control_bytes=%d count_check=%d dsum=%d csum=%d\n",
02594 read.sequence_number,
02595 read.msg.hdr()->count, read.msg.hdr()->control_bytes,
02596 read.msg.hdr()->count_check, read.msg.hdr()->descriptor_cksum, read.msg.hdr()->control_bytes_cksum);
02597 fflush(t_fd);
02598 #endif
02599 CLUSTER_SUM_DYN_STAT(CLUSTER_READ_BYTES_STAT, read.did);
02600 if (needByteSwap) {
02601 read.msg.hdr()->SwapBytes();
02602 }
02603 read.msg.count = read.msg.hdr()->count;
02604 read.msg.control_bytes = read.msg.hdr()->control_bytes;
02605 read.msg.descriptor_cksum = read.msg.hdr()->descriptor_cksum;
02606 read.msg.control_bytes_cksum = read.msg.hdr()->control_bytes_cksum;
02607 read.msg.unused = read.msg.hdr()->unused;
02608
02609 if (MAGIC_COUNT(read) != read.msg.hdr()->count_check) {
02610 ink_assert(!"Read bad ClusterMsgHeader data");
02611 Warning("Bad ClusterMsgHeader read on [%d.%d.%d.%d], restarting", DOT_SEPARATED(ip));
02612 Note("Cluster read from [%u.%u.%u.%u] failed, declaring down", DOT_SEPARATED(ip));
02613 machine_down();
02614 return -1;
02615 }
02616
02617 if (read.msg.count || read.msg.control_bytes) {
02618 read.msg.state++;
02619 read.state = ClusterState::READ_SETUP_DESCRIPTOR;
02620 } else {
02621 read.state = ClusterState::READ_COMPLETE;
02622 }
02623 break;
02624 }
02625 }
02626
02627 case ClusterState::READ_SETUP_DESCRIPTOR:
02628
02629 {
02630 #ifdef CLUSTER_STATS
02631 _n_read_setup_descriptor++;
02632 #endif
02633 if (build_initial_vector(CLUSTER_READ)) {
02634 read.state = ClusterState::READ_DESCRIPTOR;
02635 } else {
02636 return 0;
02637 }
02638 break;
02639 }
02640
02641 case ClusterState::READ_DESCRIPTOR:
02642
02643 {
02644 #ifdef CLUSTER_STATS
02645 _n_read_descriptor++;
02646 #endif
02647 read.state = ClusterState::READ_AWAIT_DESCRIPTOR;
02648 if (!read.doIO()) {
02649
02650 read.state = ClusterState::READ_DESCRIPTOR;
02651 return 0;
02652 }
02653 break;
02654 }
02655
02656 case ClusterState::READ_AWAIT_DESCRIPTOR:
02657
02658 {
02659 #ifdef CLUSTER_STATS
02660 _n_read_await_descriptor++;
02661 #endif
02662 if (!read.io_complete) {
02663 return 0;
02664 } else {
02665 if (read.io_complete < 0) {
02666
02667 machine_down();
02668 return -1;
02669 }
02670 }
02671 if (read.to_do) {
02672 if (read.bytes_xfered) {
02673 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_PARTIAL_READS_STAT);
02674 read.state = ClusterState::READ_DESCRIPTOR;
02675 break;
02676 } else {
02677
02678 read.state = ClusterState::READ_DESCRIPTOR;
02679 return 0;
02680 }
02681 } else {
02682 #ifdef CLUSTER_MESSAGE_CKSUM
02683 ink_release_assert(read.msg.calc_descriptor_cksum() == read.msg.descriptor_cksum);
02684 ink_release_assert(read.msg.calc_control_bytes_cksum() == read.msg.control_bytes_cksum);
02685 #endif
02686 CLUSTER_SUM_DYN_STAT(CLUSTER_READ_BYTES_STAT, read.did);
02687 if (needByteSwap) {
02688
02689 swap_descriptor_bytes();
02690 }
02691 if (read.msg.count == 0) {
02692 read.bytes_xfered = 0;
02693 read.state = ClusterState::READ_COMPLETE;
02694 } else {
02695 read.msg.state++;
02696 read.state = ClusterState::READ_SETUP_DATA;
02697 }
02698 break;
02699 }
02700 }
02701
02702 case ClusterState::READ_SETUP_DATA:
02703
02704 {
02705 #ifdef CLUSTER_STATS
02706 _n_read_setup_data++;
02707 #endif
02708 if (build_initial_vector(CLUSTER_READ)) {
02709 free_locks(CLUSTER_READ);
02710 if (read.to_do) {
02711 read.state = ClusterState::READ_DATA;
02712 } else {
02713
02714 read.state = ClusterState::READ_COMPLETE;
02715 }
02716 } else {
02717 return 0;
02718 }
02719 break;
02720 }
02721
02722 case ClusterState::READ_DATA:
02723
02724 {
02725 #ifdef CLUSTER_STATS
02726 _n_read_data++;
02727 #endif
02728 ink_release_assert(read.to_do);
02729 read.state = ClusterState::READ_AWAIT_DATA;
02730 if (!read.doIO()) {
02731
02732 read.state = ClusterState::READ_DATA;
02733 return 0;
02734 }
02735 break;
02736 }
02737
02738 case ClusterState::READ_AWAIT_DATA:
02739
02740 {
02741 #ifdef CLUSTER_STATS
02742 _n_read_await_data++;
02743 #endif
02744 if (!read.io_complete) {
02745 return 0;
02746 } else {
02747 if (read.io_complete > 0) {
02748 read.state = ClusterState::READ_POST_COMPLETE;
02749 } else {
02750
02751 machine_down();
02752 return -1;
02753 }
02754 }
02755 break;
02756 }
02757
02758 case ClusterState::READ_POST_COMPLETE:
02759
02760 {
02761 #ifdef CLUSTER_STATS
02762 _n_read_post_complete++;
02763 #endif
02764 if (!get_read_locks()) {
02765 return 0;
02766 }
02767 if (read.to_do) {
02768 if (read.bytes_xfered) {
02769 update_channels_partial_read();
02770 free_locks(CLUSTER_READ);
02771 CLUSTER_SUM_DYN_STAT(CLUSTER_READ_BYTES_STAT, read.bytes_xfered);
02772 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_PARTIAL_READS_STAT);
02773 read.state = ClusterState::READ_DATA;
02774 return 1;
02775 } else {
02776
02777 free_locks(CLUSTER_READ);
02778 read.state = ClusterState::READ_DATA;
02779 return 0;
02780 }
02781 } else {
02782 CLUSTER_SUM_DYN_STAT(CLUSTER_READ_BYTES_STAT, read.bytes_xfered);
02783 read.state = ClusterState::READ_COMPLETE;
02784 break;
02785 }
02786 }
02787
02788 case ClusterState::READ_COMPLETE:
02789
02790 {
02791 #ifdef CLUSTER_STATS
02792 _n_read_complete++;
02793 #endif
02794 ink_hrtime rdmsg_end_time = ink_get_hrtime();
02795 CLUSTER_SUM_DYN_STAT(CLUSTER_RDMSG_ASSEMBLE_TIME_STAT, rdmsg_end_time - read.start_time);
02796 read.start_time = HRTIME_MSECONDS(0);
02797 if (dump_msgs)
02798 dump_read_msg();
02799 read.sequence_number++;
02800 update_channels_read();
02801 free_locks(CLUSTER_READ);
02802
02803 read.state = ClusterState::READ_START;
02804 break;
02805 }
02806
02807 default:
02808
02809 {
02810 ink_release_assert(!"ClusterHandler::process_read invalid state");
02811 }
02812
02813 }
02814 }
02815 }
02816
02817 int
02818 ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs)
02819 {
02820 #ifdef CLUSTER_STATS
02821 _process_write_calls++;
02822 #endif
02823
02824
02825
02826 for (;;) {
02827
02828 switch (write.state) {
02829
02830 case ClusterState::WRITE_START:
02831
02832 {
02833 #ifdef CLUSTER_STATS
02834 _n_write_start++;
02835 #endif
02836 write.msg.clear();
02837 write.last_time = ink_get_hrtime();
02838 pw_write_descriptors_built = -1;
02839 pw_freespace_descriptors_built = -1;
02840 pw_controldata_descriptors_built = -1;
02841 pw_time_expired = 0;
02842 write.state = ClusterState::WRITE_SETUP;
02843 break;
02844 }
02845
02846 case ClusterState::WRITE_SETUP:
02847
02848 {
02849 #ifdef CLUSTER_STATS
02850 _n_write_setup++;
02851 #endif
02852 if (!on_stolen_thread && !only_write_control_msgs) {
02853
02854
02855
02856
02857
02858
02859 if (pw_controldata_descriptors_built) {
02860 pw_controldata_descriptors_built = build_controlmsg_descriptors();
02861 }
02862
02863 if (pw_write_descriptors_built) {
02864 pw_write_descriptors_built = build_write_descriptors();
02865 }
02866
02867 if (pw_freespace_descriptors_built) {
02868 pw_freespace_descriptors_built = build_freespace_descriptors();
02869 }
02870 add_small_controlmsg_descriptors();
02871 } else {
02872
02873
02874
02875 pw_write_descriptors_built = 0;
02876 pw_freespace_descriptors_built = 0;
02877 pw_controldata_descriptors_built = build_controlmsg_descriptors();
02878 add_small_controlmsg_descriptors();
02879 }
02880
02881
02882 if (!pw_controldata_descriptors_built && !pw_write_descriptors_built && !pw_freespace_descriptors_built) {
02883 write.state = ClusterState::WRITE_COMPLETE;
02884 break;
02885 } else {
02886 started_on_stolen_thread = on_stolen_thread;
02887 control_message_write = only_write_control_msgs;
02888 }
02889
02890
02891 #ifdef CLUSTER_MESSAGE_CKSUM
02892 write.msg.descriptor_cksum = write.msg.calc_descriptor_cksum();
02893 write.msg.hdr()->descriptor_cksum = write.msg.descriptor_cksum;
02894
02895 write.msg.control_bytes_cksum = write.msg.calc_control_bytes_cksum();
02896 write.msg.hdr()->control_bytes_cksum = write.msg.control_bytes_cksum;
02897 write.msg.unused = 0;
02898 #endif
02899 write.msg.hdr()->count = write.msg.count;
02900 write.msg.hdr()->control_bytes = write.msg.control_bytes;
02901 write.msg.hdr()->count_check = MAGIC_COUNT(write);
02902
02903 ink_release_assert(build_initial_vector(CLUSTER_WRITE));
02904 free_locks(CLUSTER_WRITE);
02905 write.state = ClusterState::WRITE_INITIATE;
02906 break;
02907 }
02908
02909 case ClusterState::WRITE_INITIATE:
02910
02911 {
02912 #ifdef CLUSTER_STATS
02913 _n_write_initiate++;
02914 #endif
02915 write.state = ClusterState::WRITE_AWAIT_COMPLETION;
02916 if (!write.doIO()) {
02917
02918 write.state = ClusterState::WRITE_INITIATE;
02919 return 0;
02920 }
02921 break;
02922 }
02923
02924 case ClusterState::WRITE_AWAIT_COMPLETION:
02925
02926 {
02927 #ifdef CLUSTER_STATS
02928 _n_write_await_completion++;
02929 #endif
02930 if (!write.io_complete) {
02931
02932 return 0;
02933 } else {
02934 if (write.io_complete < 0) {
02935
02936 machine_down();
02937 write.state = ClusterState::WRITE_INITIATE;
02938 break;
02939 }
02940 if (write.to_do) {
02941 if (write.bytes_xfered) {
02942 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_PARTIAL_WRITES_STAT);
02943 write.state = ClusterState::WRITE_INITIATE;
02944 break;
02945 } else {
02946
02947 write.state = ClusterState::WRITE_INITIATE;
02948 return 0;
02949 }
02950 }
02951 CLUSTER_SUM_DYN_STAT(CLUSTER_WRITE_BYTES_STAT, write.bytes_xfered);
02952 write.sequence_number++;
02953 write.state = ClusterState::WRITE_POST_COMPLETE;
02954 }
02955 break;
02956 }
02957
02958 case ClusterState::WRITE_POST_COMPLETE:
02959
02960 {
02961 #ifdef CLUSTER_STATS
02962 _n_write_post_complete++;
02963 #endif
02964 if (!get_write_locks()) {
02965 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_WRITE_LOCK_MISSES_STAT);
02966 return 0;
02967 }
02968
02969
02970
02971
02972 update_channels_written();
02973 free_locks(CLUSTER_WRITE);
02974 write.state = ClusterState::WRITE_COMPLETE;
02975 break;
02976 }
02977
02978 case ClusterState::WRITE_COMPLETE:
02979
02980 {
02981 #ifdef CLUSTER_STATS
02982 _n_write_complete++;
02983 #endif
02984 write.state = ClusterState::WRITE_START;
02985 ink_hrtime curtime = ink_get_hrtime();
02986
02987 if (!on_stolen_thread) {
02988
02989
02990
02991 pw_time_expired = (curtime - now) > CLUSTER_MAX_RUN_TIME;
02992
02993 if (!control_message_write && !pw_write_descriptors_built
02994 && !pw_freespace_descriptors_built && !pw_controldata_descriptors_built) {
02995
02996 cur_vcs = (cur_vcs + 1) % CLUSTER_BUCKETS;
02997 }
02998 } else {
02999
03000
03001
03002 pw_time_expired = (curtime - now) > CLUSTER_MAX_THREAD_STEAL_TIME;
03003 if (pw_time_expired) {
03004 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_THREAD_STEAL_EXPIRES_STAT);
03005 }
03006 }
03007
03008
03009
03010 if (!on_stolen_thread && !cur_vcs && !dead) {
03011
03012
03013
03014 MachineList *mc = the_cluster_machines_config();
03015 if (mc && !mc->find(ip, port)) {
03016 Note("Cluster [%u.%u.%u.%u:%d] not in config, declaring down", DOT_SEPARATED(ip), port);
03017 machine_down();
03018 }
03019 }
03020 if (pw_time_expired) {
03021 return -1;
03022 } else {
03023 if (pw_write_descriptors_built || pw_freespace_descriptors_built || pw_controldata_descriptors_built) {
03024 break;
03025 } else {
03026 return 0;
03027 }
03028 }
03029 }
03030
03031 default:
03032
03033 {
03034 ink_release_assert(!"ClusterHandler::process_write invalid state");
03035 }
03036
03037 }
03038 }
03039 }
03040
03041 int
03042 ClusterHandler::do_open_local_requests()
03043 {
03044
03045
03046
03047
03048
03049
03050 int pending_request = 0;
03051 ClusterVConnection *cvc;
03052 ClusterVConnection *cvc_ext;
03053 ClusterVConnection *cvc_ext_next;
03054 EThread *tt = this_ethread();
03055 Queue<ClusterVConnection> local_incoming_open_local;
03056
03057
03058
03059
03060
03061 while (true) {
03062 cvc_ext = (ClusterVConnection *)
03063 ink_atomiclist_popall(&external_incoming_open_local);
03064 if (cvc_ext == 0)
03065 break;
03066
03067 while (cvc_ext) {
03068 cvc_ext_next = (ClusterVConnection *) cvc_ext->link.next;
03069 cvc_ext->link.next = NULL;
03070 local_incoming_open_local.push(cvc_ext);
03071 cvc_ext = cvc_ext_next;
03072 }
03073
03074
03075
03076 while ((cvc = local_incoming_open_local.pop())) {
03077 MUTEX_TRY_LOCK(lock, cvc->action_.mutex, tt);
03078 if (lock) {
03079 if (cvc->start(tt) < 0) {
03080 cvc->token.clear();
03081 if (cvc->action_.continuation) {
03082 cvc->action_.continuation->handleEvent(CLUSTER_EVENT_OPEN_FAILED, 0);
03083 clusterVCAllocator.free(cvc);
03084 }
03085 }
03086 MUTEX_RELEASE(lock);
03087
03088 } else {
03089
03090 Debug(CL_TRACE, "do_open_local_requests() unable to acquire mutex (cvc=%p)", cvc);
03091 pending_request = 1;
03092 ink_atomiclist_push(&external_incoming_open_local, (void *) cvc);
03093 }
03094 }
03095 }
03096 return pending_request;
03097 }
03098
03099