• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

ClusterHandler.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 
00025 /****************************************************************************
00026 
00027   ClusterHandler.cc
00028 ****************************************************************************/
00029 
00030 #define DEFINE_CLUSTER_FUNCTIONS
00031 #include "P_Cluster.h"
00032 
00033 /*************************************************************************/
00034 // Global Data
00035 /*************************************************************************/
00036 // Initialize clusterFunction[] size
00037 unsigned SIZE_clusterFunction = countof(clusterFunction);
00038 
00039 // hook for testing
00040 ClusterFunction *ptest_ClusterFunction = NULL;
00041 
00042 // global bit buckets for closed channels
00043 static char channel_dummy_input[DEFAULT_MAX_BUFFER_SIZE];
00044 char channel_dummy_output[DEFAULT_MAX_BUFFER_SIZE];
00045 
00046 // outgoing control continuations
00047 ClassAllocator<OutgoingControl> outControlAllocator("outControlAllocator");
00048 
00049 // incoming control descriptors
00050 ClassAllocator<IncomingControl> inControlAllocator("inControlAllocator");
00051 
00052 static int dump_msgs = 0;
00053 
00054 /////////////////////////////////////////
00055 // VERIFY_PETERS_DATA support code
00056 /////////////////////////////////////////
00057 #ifdef VERIFY_PETERS_DATA
00058 #define DO_VERIFY_PETERS_DATA(_p,_l) verify_peters_data(_p,_l)
00059 #else
00060 #define DO_VERIFY_PETERS_DATA(_p,_l)
00061 #endif
00062 
00063 void
00064 verify_peters_data(char *ap, int l)
00065 {
00066   unsigned char *p = (unsigned char *) ap;
00067   for (int i = 0; i < l - 1; i++) {
00068     unsigned char x1 = p[i];
00069     unsigned char x2 = p[i + 1];
00070     x1 += 1;
00071     if (x1 != x2) {
00072       fprintf(stderr, "verify peter's data failed at %d\n", i);
00073       break;
00074     }
00075   }
00076 }
00077 
00078 /*************************************************************************/
00079 // ClusterHandler member functions (Internal Class)
00080 /*************************************************************************/
00081 //
00082 // Overview:
00083 //  In a steady state cluster environment, all cluster nodes have an
00084 //  established TCP socket connection to each node in the cluster.
00085 //  An instance of the class ClusterHandler exists for each known node
00086 //  in the cluster.  All specific node-node data/state is encapsulated
00087 //  by this class.
00088 //
00089 //  ClusterHandler::mainClusterEvent() is the key periodic event which
00090 //  drives the read/write action over the node-node socket connection.
00091 //  A high level overview of ClusterHandler::mainClusterEvent() action is
00092 //  as follows:
00093 //      1) Perform cluster interconnect load monitoring functions.
00094 //         If interconnect is overloaded, convert all remote cluster
00095 //         operations to proxy only.
00096 //      2) Process delayed reads.  Delayed read refers to data associated
00097 //         with a VC (Virtual Connection) which resides in an intermediate
00098 //         buffer and is unknown to the VC.  This is required in cases
00099 //         where we are unable to acquire the VC mutex at the time of the
00100 //         read from the node-node socket.  Delayed read processing
00101 //         consists of acquiring the VC mutex and moving the data into the
00102 //         VC and posting read completion.
00103 //      3) Process pending read data on the node-node TCP socket.  In the
00104 //         typical case, read processing is performed using three read
00105 //         operations.  The actions are as follows:
00106 //              a) read the fixed size message header
00107 //                 (struct ClusterMsgHeader) consisting of the
00108 //                 number of data descriptors and the size of the inline
00109 //                 control messages following the data descriptors.
00110 //              b) Setup buffer for data descriptors and inline control
00111 //                 messages and issue read.
00112 //              c) Setup read buffers and acquire applicable locks for
00113 //                 VC/Control data described by data descriptors and issue
00114 //                 read.
00115 //              d) Perform read completion actions on control and VC data.
00116 //              e) Free VC locks
00117 //      4) Process write bank data.  Write bank data is outstanding data
00118 //         which we were unable to push out in the last write over the
00119 //         node-node TCP socket.  Write bank data must be successfully pushed
00120 //         before performing any additional write processing.
00121 //      5) Build a write message consisting of the following data:
00122 //          1) Write data for a Virtual Connection in the current write data
00123 //             bucket (write_vcs)
00124 //          2) Virtual Connection free space for VCs in the current read
00125 //             data bucket (read_vcs)
00126 //          3) Control message data (outgoing_control)
00127 //      6) Push write data
00128 //
00129 //  Thread stealing refers to executing the control message processing
00130 //  portion of mainClusterEvent()  by a thread not associated with the
00131 //  periodic event.  This a mechanism to avoid the latency on control
00132 //  messages by allowing them to be pushed immediately.
00133 //
00134 /*************************************************************************/
00135 
00136 ClusterHandler::ClusterHandler()
00137   : net_vc(0),
00138     thread(0),
00139     ip(0),
00140     port(0),
00141     hostname(NULL),
00142     machine(NULL),
00143     ifd(-1),
00144     id(-1),
00145     dead(true),
00146     downing(false),
00147     active(false),
00148     on_stolen_thread(false),
00149     n_channels(0),
00150     channels(NULL),
00151     channel_data(NULL),
00152     connector(false),
00153     cluster_connect_state(ClusterHandler::CLCON_INITIAL),
00154     needByteSwap(false),
00155     configLookupFails(0),
00156     cluster_periodic_event(0),
00157     read(this, true),
00158     write(this, false),
00159     current_time(0),
00160     last(0),
00161     last_report(0),
00162     n_since_last_report(0),
00163     last_cluster_op_enable(0),
00164     last_trace_dump(0),
00165     clm(0),
00166     disable_remote_cluster_ops(0),
00167     pw_write_descriptors_built(0),
00168     pw_freespace_descriptors_built(0),
00169     pw_controldata_descriptors_built(0), pw_time_expired(0), started_on_stolen_thread(false), control_message_write(false)
00170 #ifdef CLUSTER_STATS
00171   ,
00172     _vc_writes(0),
00173     _vc_write_bytes(0),
00174     _control_write_bytes(0),
00175     _dw_missed_lock(0),
00176     _dw_not_enabled(0),
00177     _dw_wait_remote_fill(0),
00178     _dw_no_active_vio(0),
00179     _dw_not_enabled_or_no_write(0),
00180     _dw_set_data_pending(0),
00181     _dw_no_free_space(0),
00182     _fw_missed_lock(0),
00183     _fw_not_enabled(0),
00184     _fw_wait_remote_fill(0),
00185     _fw_no_active_vio(0),
00186     _fw_not_enabled_or_no_read(0),
00187     _process_read_calls(0),
00188     _n_read_start(0),
00189     _n_read_header(0),
00190     _n_read_await_header(0),
00191     _n_read_setup_descriptor(0),
00192     _n_read_descriptor(0),
00193     _n_read_await_descriptor(0),
00194     _n_read_setup_data(0),
00195     _n_read_data(0),
00196     _n_read_await_data(0),
00197     _n_read_post_complete(0),
00198     _n_read_complete(0),
00199     _process_write_calls(0),
00200     _n_write_start(0),
00201     _n_write_setup(0), _n_write_initiate(0), _n_write_await_completion(0), _n_write_post_complete(0), _n_write_complete(0)
00202 #endif
00203 {
00204 #ifdef MSG_TRACE
00205   t_fd = fopen("msgtrace.log", "w");
00206 #endif
00207   // we need to lead by at least 1
00208 
00209   min_priority = 1;
00210   SET_HANDLER((ClusterContHandler) & ClusterHandler::startClusterEvent);
00211 
00212   mutex = new_ProxyMutex();
00213   OutgoingControl oc;
00214   int n;
00215   for (n = 0; n < CLUSTER_CMSG_QUEUES; ++n) {
00216     ink_atomiclist_init(&outgoing_control_al[n], "OutGoingControlQueue", (char *) &oc.link.next - (char *) &oc);
00217   }
00218 
00219   IncomingControl ic;
00220   ink_atomiclist_init(&external_incoming_control,
00221                       "ExternalIncomingControlQueue", (char *) &ic.link.next - (char *) &ic);
00222 
00223   ClusterVConnection ivc;
00224   ink_atomiclist_init(&external_incoming_open_local,
00225                       "ExternalIncomingOpenLocalQueue", (char *) &ivc.link.next - (char *) &ivc);
00226   ink_atomiclist_init(&read_vcs_ready, "ReadVcReadyQueue", offsetof(ClusterVConnection, ready_alink.next));
00227   ink_atomiclist_init(&write_vcs_ready, "WriteVcReadyQueue", offsetof(ClusterVConnection, ready_alink.next));
00228   memset((char *) &callout_cont[0], 0, sizeof(callout_cont));
00229   memset((char *) &callout_events[0], 0, sizeof(callout_events));
00230 }
00231 
00232 ClusterHandler::~ClusterHandler()
00233 {
00234   bool free_m = false;
00235   if (net_vc) {
00236     net_vc->do_io(VIO::CLOSE);
00237     net_vc = 0;
00238   }
00239   if (machine) {
00240     MUTEX_TAKE_LOCK(the_cluster_config_mutex, this_ethread());
00241     if (++machine->free_connections >= machine->num_connections)
00242       free_m = true;
00243     MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread());
00244     if (free_m)
00245       free_ClusterMachine(machine);
00246   }
00247   machine = NULL;
00248   ats_free(hostname);
00249   hostname = NULL;
00250   ats_free(channels);
00251   channels = NULL;
00252   if (channel_data) {
00253     for (int i = 0; i < n_channels; ++i) {
00254       if (channel_data[i]) {
00255         ats_free(channel_data[i]);
00256         channel_data[i] = 0;
00257       }
00258     }
00259     ats_free(channel_data);
00260     channel_data = NULL;
00261   }
00262   if (read_vcs)
00263     delete[]read_vcs;
00264   read_vcs = NULL;
00265 
00266   if (write_vcs)
00267     delete[]write_vcs;
00268   write_vcs = NULL;
00269 
00270   if (clm) {
00271     delete clm;
00272     clm = NULL;
00273   }
00274 #ifdef CLUSTER_STATS
00275   message_blk = 0;
00276 #endif
00277 }
00278 
00279 void
00280 ClusterHandler::close_ClusterVConnection(ClusterVConnection * vc)
00281 {
00282   //
00283   // Close down a ClusterVConnection
00284   //
00285   if (vc->inactivity_timeout)
00286     vc->inactivity_timeout->cancel(vc);
00287   if (vc->active_timeout)
00288     vc->active_timeout->cancel(vc);
00289   if (vc->read.queue)
00290     ClusterVC_remove_read(vc);
00291   if (vc->write.queue)
00292     ClusterVC_remove_write(vc);
00293   vc->read.vio.mutex = NULL;
00294   vc->write.vio.mutex = NULL;
00295 
00296   ink_assert(!vc->read_locked);
00297   ink_assert(!vc->write_locked);
00298   int channel = vc->channel;
00299   free_channel(vc);
00300 
00301   if (vc->byte_bank_q.head) {
00302     delayed_reads.remove(vc);
00303 
00304     // Deallocate byte bank descriptors
00305     ByteBankDescriptor *d;
00306     while ((d = vc->byte_bank_q.dequeue())) {
00307       ByteBankDescriptor::ByteBankDescriptor_free(d);
00308     }
00309   }
00310   vc->read_block = 0;
00311 
00312   ink_assert(!vc->write_list);
00313   ink_assert(!vc->write_list_tail);
00314   ink_assert(!vc->write_list_bytes);
00315   ink_assert(!vc->write_bytes_in_transit);
00316 
00317   if (((!vc->remote_closed && !vc->have_all_data) || (vc->remote_closed == FORCE_CLOSE_ON_OPEN_CHANNEL)) && vc->ch) {
00318 
00319     CloseMessage msg;
00320     int vers = CloseMessage::protoToVersion(vc->ch->machine->msg_proto_major);
00321     void *data;
00322     int len;
00323 
00324     if (vers == CloseMessage::CLOSE_CHAN_MESSAGE_VERSION) {
00325       msg.channel = channel;
00326       msg.status = (vc->remote_closed == FORCE_CLOSE_ON_OPEN_CHANNEL) ? FORCE_CLOSE_ON_OPEN_CHANNEL : vc->closed;
00327       msg.lerrno = vc->lerrno;
00328       msg.sequence_number = vc->token.sequence_number;
00329       data = (void *) &msg;
00330       len = sizeof(CloseMessage);
00331 
00332     } else {
00333       //////////////////////////////////////////////////////////////
00334       // Create the specified down rev version of this message
00335       //////////////////////////////////////////////////////////////
00336       ink_release_assert(!"close_ClusterVConnection() bad msg version");
00337     }
00338     clusterProcessor.invoke_remote(vc->ch, CLOSE_CHANNEL_CLUSTER_FUNCTION, data, len);
00339   }
00340   ink_hrtime now = ink_get_hrtime();
00341   CLUSTER_DECREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPEN_STAT);
00342   CLUSTER_SUM_DYN_STAT(CLUSTER_CON_TOTAL_TIME_STAT, now - vc->start_time);
00343   if (!local_channel(channel)) {
00344     CLUSTER_SUM_DYN_STAT(CLUSTER_REMOTE_CONNECTION_TIME_STAT, now - vc->start_time);
00345   } else {
00346     CLUSTER_SUM_DYN_STAT(CLUSTER_LOCAL_CONNECTION_TIME_STAT, now - vc->start_time);
00347   }
00348   clusterVCAllocator_free(vc);
00349 }
00350 
00351 inline bool
00352 ClusterHandler::vc_ok_write(ClusterVConnection * vc)
00353 {
00354   return (((vc->closed > 0)
00355            && (vc->write_list || vc->write_bytes_in_transit)) ||
00356           (!vc->closed && vc->write.enabled && vc->write.vio.op == VIO::WRITE && vc->write.vio.buffer.writer()));
00357 }
00358 
00359 inline bool
00360 ClusterHandler::vc_ok_read(ClusterVConnection * vc)
00361 {
00362   return (!vc->closed && vc->read.vio.op == VIO::READ && vc->read.vio.buffer.writer());
00363 }
00364 
00365 void
00366 ClusterHandler::close_free_lock(ClusterVConnection * vc, ClusterVConnState * s)
00367 {
00368   Ptr<ProxyMutex> m(s->vio.mutex);
00369   if (s == &vc->read) {
00370     if ((ProxyMutex *) vc->read_locked)
00371       MUTEX_UNTAKE_LOCK(vc->read_locked, thread);
00372     vc->read_locked = NULL;
00373   } else {
00374     if ((ProxyMutex *) vc->write_locked)
00375       MUTEX_UNTAKE_LOCK(vc->write_locked, thread);
00376     vc->write_locked = NULL;
00377   }
00378   close_ClusterVConnection(vc);
00379 }
00380 
00381 bool ClusterHandler::build_data_vector(char *d, int len, bool read_flag)
00382 {
00383   // Internal interface to general network i/o facility allowing
00384   // single vector read/write to static data buffer.
00385 
00386   ClusterState & s = (read_flag ? read : write);
00387   ink_assert(d);
00388   ink_assert(len);
00389   ink_assert(s.iov);
00390 
00391   s.msg.count = 1;
00392   s.iov[0].iov_base = 0;
00393   s.iov[0].iov_len = len;
00394   s.block[0] = new_IOBufferBlock();
00395   s.block[0]->set(new_constant_IOBufferData(d, len));
00396 
00397   if (read_flag) {
00398     // Make block write_avail == len
00399     s.block[0]->_buf_end = s.block[0]->end() + len;
00400   } else {
00401     // Make block read_avail == len
00402     s.block[0]->fill(len);
00403   }
00404 
00405   s.to_do = len;
00406   s.did = 0;
00407   s.n_iov = 1;
00408 
00409   return true;
00410 }
00411 
00412 bool ClusterHandler::build_initial_vector(bool read_flag)
00413 {
00414   //
00415   // Build initial read/write struct iovec and corresponding IOBufferData
00416   // structures from the given struct descriptor(s).
00417   // Required vector adjustments for partial i/o conditions is handled
00418   // by adjust_vector().
00419   //
00420   ///////////////////////////////////////////////////////////////////
00421   // Descriptor to struct iovec layout
00422   ///////////////////////////////////////////////////////////////////
00423   // Write iovec[] layout
00424   //    iov[0] ----> struct ClusterMsgHeader
00425   //    iov[1] ----> struct descriptor [count]
00426   //                 char short_control_messages[control_bytes]
00427   //
00428   //    iov[2] ----> struct descriptor data (element #1)
00429   //    ......
00430   //    iov[2+count] ----> struct descriptor data (element #count)
00431   //
00432   ///////////////////////////////////////////////////////////////////
00433   // Read iovec[] layout phase #1 read
00434   //    iov[0] ----> struct ClusterMsgHeader
00435   ///////////////////////////////////////////////////////////////////
00436   // Read iovec[] layout phase #2 read
00437   //    iov[0] ----> struct descriptor[count]
00438   //                 char short_control_messages[control_bytes]
00439   ///////////////////////////////////////////////////////////////////
00440   // Read iovec[] layout phase #3 read
00441   //    iov[0] ----> struct descriptor data (element #1)
00442   //    ......
00443   //    iov[count-1] ----> struct descriptor data (element #count)
00444   ///////////////////////////////////////////////////////////////////
00445   int i, n;
00446   // This isn't used.
00447   // MIOBuffer      *w;
00448 
00449   ink_hrtime now = ink_get_hrtime();
00450   ClusterState & s = (read_flag ? read : write);
00451   OutgoingControl *oc = s.msg.outgoing_control.head;
00452   IncomingControl *ic = incoming_control.head;
00453   int new_n_iov = 0;
00454   int to_do = 0;
00455   int len;
00456 
00457   ink_assert(s.iov);
00458 
00459   if (!read_flag) {
00460     //////////////////////////////////////////////////////////////////////
00461     // Setup vector for write of header, descriptors and control data
00462     //////////////////////////////////////////////////////////////////////
00463     len = sizeof(ClusterMsgHeader) + (s.msg.count * sizeof(Descriptor)) + s.msg.control_bytes;
00464     s.iov[new_n_iov].iov_base = 0;
00465     s.iov[new_n_iov].iov_len = len;
00466     s.block[new_n_iov] = s.msg.get_block_header();
00467 
00468     // Make read_avail == len
00469     s.block[new_n_iov]->fill(len);
00470 
00471     to_do += len;
00472     ++new_n_iov;
00473 
00474   } else {
00475     if (s.msg.state == 0) {
00476       ////////////////////////////////////
00477       // Setup vector for read of header
00478       ////////////////////////////////////
00479       len = sizeof(ClusterMsgHeader);
00480       s.iov[new_n_iov].iov_base = 0;
00481       s.iov[new_n_iov].iov_len = len;
00482       s.block[new_n_iov] = s.msg.get_block_header();
00483 
00484       // Make write_avail == len
00485       s.block[new_n_iov]->_buf_end = s.block[new_n_iov]->end() + len;
00486 
00487       to_do += len;
00488       ++new_n_iov;
00489 
00490     } else if (s.msg.state == 1) {
00491       /////////////////////////////////////////////////////////
00492       // Setup vector for read of Descriptors+control data
00493       /////////////////////////////////////////////////////////
00494       len = (s.msg.count * sizeof(Descriptor)) + s.msg.control_bytes;
00495       s.iov[new_n_iov].iov_base = 0;
00496       s.iov[new_n_iov].iov_len = len;
00497       s.block[new_n_iov] = s.msg.get_block_descriptor();
00498 
00499       // Make write_avail == len
00500       s.block[new_n_iov]->_buf_end = s.block[new_n_iov]->end() + len;
00501 
00502       to_do += s.iov[new_n_iov].iov_len;
00503       ++new_n_iov;
00504     }
00505   }
00506 
00507   ////////////////////////////////////////////////////////////
00508   // Build vector for data section of the cluster message.
00509   // For read, we only do this if we are in data phase
00510   // of the read (msg.state == 2)
00511   //////////////////////////////////////////////////////////////
00512   //  Note: We are assuming that free space descriptors follow
00513   //        the data descriptors.
00514   //////////////////////////////////////////////////////////////
00515   for (i = 0; i<(read_flag ? ((s.msg.state>= 2) ? s.msg.count : 0)
00516                    : s.msg.count); i++) {
00517     if (s.msg.descriptor[i].type == CLUSTER_SEND_DATA) {
00518       ///////////////////////////////////
00519       // Control channel data
00520       ///////////////////////////////////
00521       if (s.msg.descriptor[i].channel == CLUSTER_CONTROL_CHANNEL) {
00522         if (read_flag) {
00523           ///////////////////////
00524           // Incoming Control
00525           ///////////////////////
00526           if (!ic) {
00527             ic = IncomingControl::alloc();
00528             ic->recognized_time = now;
00529             CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CTRL_MSGS_RECVD_STAT);
00530             ic->len = s.msg.descriptor[i].length;
00531             ic->alloc_data();
00532             if (!ic->fast_data()) {
00533               CLUSTER_INCREMENT_DYN_STAT(CLUSTER_SLOW_CTRL_MSGS_RECVD_STAT);
00534             }
00535             // Mark message data as invalid
00536             *((uint32_t *) ic->data) = UNDEFINED_CLUSTER_FUNCTION;
00537             incoming_control.enqueue(ic);
00538           }
00539           s.iov[new_n_iov].iov_base = 0;
00540           s.iov[new_n_iov].iov_len = ic->len;
00541           s.block[new_n_iov] = ic->get_block();
00542           to_do += s.iov[new_n_iov].iov_len;
00543           ++new_n_iov;
00544           ic = (IncomingControl *) ic->link.next;
00545         } else {
00546           ///////////////////////
00547           // Outgoing Control
00548           ///////////////////////
00549           ink_assert(oc);
00550           s.iov[new_n_iov].iov_base = 0;
00551           s.iov[new_n_iov].iov_len = oc->len;
00552           s.block[new_n_iov] = oc->get_block();
00553           to_do += s.iov[new_n_iov].iov_len;
00554           ++new_n_iov;
00555           oc = (OutgoingControl *) oc->link.next;
00556         }
00557       } else {
00558         ///////////////////////////////
00559         // User channel data
00560         ///////////////////////////////
00561         ClusterVConnection *
00562           vc = channels[s.msg.descriptor[i].channel];
00563 
00564         if (VALID_CHANNEL(vc) &&
00565             (s.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
00566           if (read_flag) {
00567             ink_release_assert(!vc->initial_data_bytes);
00568             /////////////////////////////////////
00569             // Try to get the read VIO mutex
00570             /////////////////////////////////////
00571             ink_release_assert(!(ProxyMutex *) vc->read_locked);
00572 #ifdef CLUSTER_TOMCAT
00573             if (!vc->read.vio.mutex ||
00574                 !MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->read.vio.mutex, thread, vc->read.vio._cont, READ_LOCK_SPIN_COUNT))
00575 #else
00576             if (!MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->read.vio.mutex, thread, vc->read.vio._cont, READ_LOCK_SPIN_COUNT))
00577 #endif
00578             {
00579               vc->read_locked = 0;
00580             } else {
00581               vc->read_locked = vc->read.vio.mutex;
00582             }
00583 
00584             ///////////////////////////////////////
00585             // Allocate read data block
00586             ///////////////////////////////////////
00587             if (s.msg.descriptor[i].length) {
00588               vc->iov_map = new_n_iov;
00589             } else {
00590               vc->iov_map = CLUSTER_IOV_NONE;
00591             }
00592             if (vc->pending_remote_fill || vc_ok_read(vc)) {
00593               //////////////////////////////////////////////////////////
00594               // Initial and subsequent data on open read channel.
00595               // Allocate IOBufferBlock.
00596               //////////////////////////////////////////////////////////
00597               ink_release_assert(s.msg.descriptor[i].length <= DEFAULT_MAX_BUFFER_SIZE);
00598               vc->read_block = new_IOBufferBlock();
00599               int64_t index = buffer_size_to_index(s.msg.descriptor[i].length, MAX_BUFFER_SIZE_INDEX);
00600               vc->read_block->alloc(index);
00601 
00602               s.iov[new_n_iov].iov_base = 0;
00603               s.block[new_n_iov] = vc->read_block->clone();
00604 
00605             } else {
00606               Debug(CL_NOTE, "dumping cluster read data");
00607               s.iov[new_n_iov].iov_base = 0;
00608               s.block[new_n_iov] = new_IOBufferBlock();
00609               s.block[new_n_iov]->set(new_constant_IOBufferData(channel_dummy_input, DEFAULT_MAX_BUFFER_SIZE));
00610             }
00611 
00612             // Make block write_avail == descriptor[].length
00613             s.block[new_n_iov]->_buf_end = s.block[new_n_iov]->end() + s.msg.descriptor[i].length;
00614 
00615           } else {
00616             bool remote_write_fill = (vc->pending_remote_fill && vc->remote_write_block);
00617             // Sanity check, assert we have the lock
00618             if (!remote_write_fill) {
00619               ink_assert((ProxyMutex *) vc->write_locked);
00620             }
00621             if (vc_ok_write(vc) || remote_write_fill) {
00622               if (remote_write_fill) {
00623                 s.iov[new_n_iov].iov_base = 0;
00624                 ink_release_assert((int) s.msg.descriptor[i].length == bytes_IOBufferBlockList(vc->remote_write_block, 1));
00625                 s.block[new_n_iov] = vc->remote_write_block;
00626 
00627               } else {
00628                 s.iov[new_n_iov].iov_base = 0;
00629                 ink_release_assert((int) s.msg.descriptor[i].length <= vc->write_list_bytes);
00630                 s.block[new_n_iov] = vc->write_list;
00631                 vc->write_list = consume_IOBufferBlockList(vc->write_list, (int) s.msg.descriptor[i].length);
00632                 vc->write_list_bytes -= (int) s.msg.descriptor[i].length;
00633                 vc->write_bytes_in_transit += (int) s.msg.descriptor[i].length;
00634 
00635                 vc->write_list_tail = vc->write_list;
00636                 while (vc->write_list_tail && vc->write_list_tail->next)
00637                   vc->write_list_tail = vc->write_list_tail->next;
00638               }
00639             } else {
00640               Debug(CL_NOTE, "faking cluster write data");
00641               s.iov[new_n_iov].iov_base = 0;
00642               s.block[new_n_iov] = new_IOBufferBlock();
00643               s.block[new_n_iov]->set(new_constant_IOBufferData(channel_dummy_output, DEFAULT_MAX_BUFFER_SIZE));
00644               // Make block read_avail == descriptor[].length
00645               s.block[new_n_iov]->fill(s.msg.descriptor[i].length);
00646             }
00647           }
00648         } else {
00649           // VC has been deleted, need to dump the bits...
00650           s.iov[new_n_iov].iov_base = 0;
00651           s.block[new_n_iov] = new_IOBufferBlock();
00652 
00653           if (read_flag) {
00654             s.block[new_n_iov]->set(new_constant_IOBufferData(channel_dummy_input, DEFAULT_MAX_BUFFER_SIZE));
00655 
00656             // Make block write_avail == descriptor[].length
00657             s.block[new_n_iov]->_buf_end = s.block[new_n_iov]->end() + s.msg.descriptor[i].length;
00658 
00659           } else {
00660             s.block[new_n_iov]->set(new_constant_IOBufferData(channel_dummy_output, DEFAULT_MAX_BUFFER_SIZE));
00661 
00662             // Make block read_avail == descriptor[].length
00663             s.block[new_n_iov]->fill(s.msg.descriptor[i].length);
00664           }
00665         }
00666         s.iov[new_n_iov].iov_len = s.msg.descriptor[i].length;
00667         to_do += s.iov[new_n_iov].iov_len;
00668         ++new_n_iov;
00669       }
00670     }
00671   }
00672   // Release IOBufferBlock references used in previous i/o operation
00673   for (n = new_n_iov; n < MAX_TCOUNT; ++n) {
00674     s.block[n] = 0;
00675   }
00676 
00677   // Initialize i/o state variables
00678   s.to_do = to_do;
00679   s.did = 0;
00680   s.n_iov = new_n_iov;
00681   return true;
00682 
00683   // TODO: This is apparently dead code, I added the #if 0 to avoid compiler
00684   // warnings, but is this really intentional??
00685 #if 0
00686   // Release all IOBufferBlock references.
00687   for (n = 0; n < MAX_TCOUNT; ++n) {
00688     s.block[n] = 0;
00689   }
00690   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_OP_DELAYED_FOR_LOCK_STAT);
00691   Debug(CL_WARN, "%s delayed for locks", read_flag ? "read" : "write");
00692   free_locks(read_flag, i);
00693   return false;
00694 #endif
00695 }
00696 
00697 bool ClusterHandler::get_read_locks()
00698 {
00699   ///////////////////////////////////////////////////////////////////////
00700   // Reacquire locks for the request setup by build_initial_vector().
00701   // We are called after each read completion prior to posting completion
00702   ///////////////////////////////////////////////////////////////////////
00703   ClusterState & s = read;
00704   int i, n;
00705   int bytes_processed;
00706   int vec_bytes_remainder;
00707   int iov_done[MAX_TCOUNT];
00708 
00709   memset((char *) iov_done, 0, sizeof(int) * MAX_TCOUNT);
00710 
00711   // Compute bytes transferred on a per vector basis
00712   bytes_processed = s.did - s.bytes_xfered;     // not including bytes in this xfer
00713 
00714   i = -1;
00715   for (n = 0; n < s.n_iov; ++n) {
00716     bytes_processed -= s.iov[n].iov_len;
00717     if (bytes_processed >= 0) {
00718       iov_done[n] = s.iov[n].iov_len;
00719     } else {
00720       iov_done[n] = s.iov[n].iov_len + bytes_processed;
00721       if (i < 0) {
00722         i = n;                  // note i/o start vector
00723 
00724         // Now at vector where last transfer started,
00725         // make considerations for the last transfer on this vector.
00726 
00727         vec_bytes_remainder = (s.iov[n].iov_len - iov_done[n]);
00728         bytes_processed = s.bytes_xfered;
00729 
00730         bytes_processed -= vec_bytes_remainder;
00731         if (bytes_processed >= 0) {
00732           iov_done[n] = vec_bytes_remainder;
00733         } else {
00734           iov_done[n] = vec_bytes_remainder + bytes_processed;
00735           break;
00736         }
00737       } else {
00738         break;
00739       }
00740     }
00741   }
00742   ink_release_assert(i >= 0);
00743 
00744   // Start lock acquisition at the first vector where we started
00745   //  the last read.
00746   //
00747   //  Note: We are assuming that free space descriptors follow
00748   //        the data descriptors.
00749 
00750   for (; i < s.n_iov; ++i) {
00751     if ((s.msg.descriptor[i].type == CLUSTER_SEND_DATA)
00752         && (s.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL)) {
00753 
00754       // Only user channels require locks
00755 
00756       ClusterVConnection *
00757         vc = channels[s.msg.descriptor[i].channel];
00758       if (!VALID_CHANNEL(vc) ||
00759           ((s.msg.descriptor[i].sequence_number) !=
00760            CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) || !vc_ok_read(vc)) {
00761         // Channel no longer valid, lock not needed since we
00762         //  already have a reference to the buffer
00763         continue;
00764       }
00765 
00766       ink_assert(!(ProxyMutex *) vc->read_locked);
00767       vc->read_locked = vc->read.vio.mutex;
00768       if (vc->byte_bank_q.head
00769           || !MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->read.vio.mutex, thread, vc->read.vio._cont, READ_LOCK_SPIN_COUNT)) {
00770         // Pending byte bank completions or lock acquire failure.
00771 
00772         vc->read_locked = NULL;
00773         continue;
00774       }
00775       // Since we now have the mutex, really see if reads are allowed.
00776 
00777       if (!vc_ok_read(vc)) {
00778         MUTEX_UNTAKE_LOCK(vc->read.vio.mutex, thread);
00779         vc->read_locked = NULL;
00780         continue;
00781       }
00782       // Lock acquire success, move read bytes into VC
00783 
00784       int64_t read_avail = vc->read_block->read_avail();
00785 
00786       if (!vc->pending_remote_fill && read_avail) {
00787         Debug("cluster_vc_xfer", "Deferred fill ch %d %p %" PRId64" bytes", vc->channel, vc, read_avail);
00788 
00789         vc->read.vio.buffer.writer()->append_block(vc->read_block->clone());
00790         if (complete_channel_read(read_avail, vc)) {
00791           vc->read_block->consume(read_avail);
00792         }
00793       }
00794     }
00795   }
00796   return true;                  // success
00797 }
00798 
00799 bool ClusterHandler::get_write_locks()
00800 {
00801   ///////////////////////////////////////////////////////////////////////
00802   // Reacquire locks for the request setup by build_initial_vector().
00803   // We are called after the entire write completes prior to
00804   // posting completion.
00805   ///////////////////////////////////////////////////////////////////////
00806   ClusterState & s = write;
00807   int i;
00808 
00809   for (i = 0; i < s.msg.count; ++i) {
00810     if ((s.msg.descriptor[i].type == CLUSTER_SEND_DATA)
00811         && (s.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL)) {
00812 
00813       // Only user channels require locks
00814 
00815       ClusterVConnection *
00816         vc = channels[s.msg.descriptor[i].channel];
00817       if (!VALID_CHANNEL(vc) ||
00818           (s.msg.descriptor[i].sequence_number) != CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
00819         // Channel no longer valid, lock not needed since we
00820         //  already have a reference to the buffer
00821         continue;
00822       }
00823       ink_assert(!(ProxyMutex *) vc->write_locked);
00824       vc->write_locked = vc->write.vio.mutex;
00825 #ifdef CLUSTER_TOMCAT
00826       if (vc->write_locked &&
00827           !MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->write.vio.mutex, thread, vc->write.vio._cont, WRITE_LOCK_SPIN_COUNT)) {
00828 #else
00829       if (!MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->write.vio.mutex, thread, vc->write.vio._cont, WRITE_LOCK_SPIN_COUNT)) {
00830 #endif
00831         // write lock acquire failed, free all acquired locks and retry later
00832         vc->write_locked = 0;
00833         free_locks(CLUSTER_WRITE, i);
00834         return false;
00835       }
00836     }
00837   }
00838   return true;
00839 }
00840 
00841 void
00842 ClusterHandler::swap_descriptor_bytes()
00843 {
00844   for (int i = 0; i < read.msg.count; i++) {
00845     read.msg.descriptor[i].SwapBytes();
00846   }
00847 }
00848 
00849 void
00850 ClusterHandler::process_set_data_msgs()
00851 {
00852   uint32_t cluster_function_index;
00853   //
00854   // Cluster set_data messages must always be processed ahead of all
00855   // messages and data.  By convention, set_data messages (highest priority
00856   // messages) always reside in the beginning of the descriptor
00857   // and small control message structures.
00858   //
00859 
00860   /////////////////////////////////////////////
00861   // Process small control set_data messages.
00862   /////////////////////////////////////////////
00863   if (!read.msg.did_small_control_set_data) {
00864     char *p = (char *) &read.msg.descriptor[read.msg.count];
00865     char *endp = p + read.msg.control_bytes;
00866     while (p < endp) {
00867       if (needByteSwap) {
00868         ats_swap32((uint32_t *) p);   // length
00869         ats_swap32((uint32_t *) (p + sizeof(int32_t))); // function code
00870       }
00871       int len = *(int32_t *) p;
00872       cluster_function_index = *(uint32_t *) (p + sizeof(int32_t));
00873 
00874       if ((cluster_function_index < (uint32_t) SIZE_clusterFunction)
00875           && (cluster_function_index == SET_CHANNEL_DATA_CLUSTER_FUNCTION)) {
00876         clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].pfn(this, p + (2 * sizeof(uint32_t)), len - sizeof(uint32_t));
00877         // Mark message as processed.
00878         *((uint32_t *) (p + sizeof(uint32_t))) = ~*((uint32_t *) (p + sizeof(uint32_t)));
00879         p += (2 * sizeof(uint32_t)) + (len - sizeof(uint32_t));
00880         p = (char *) DOUBLE_ALIGN(p);
00881       } else {
00882         // Reverse swap since this message will be reprocessed.
00883 
00884         if (needByteSwap) {
00885           ats_swap32((uint32_t *) p); // length
00886           ats_swap32((uint32_t *) (p + sizeof(int32_t)));       // function code
00887         }
00888         break;                  // End of set_data messages
00889       }
00890     }
00891     read.msg.control_data_offset = p - (char *) &read.msg.descriptor[read.msg.count];
00892     read.msg.did_small_control_set_data = 1;
00893   }
00894   /////////////////////////////////////////////
00895   // Process large control set_data messages.
00896   /////////////////////////////////////////////
00897   if (!read.msg.did_large_control_set_data) {
00898     IncomingControl *ic = incoming_control.head;
00899 
00900     while (ic) {
00901       if (needByteSwap) {
00902         ats_swap32((uint32_t *) ic->data);    // function code
00903       }
00904       cluster_function_index = *((uint32_t *) ic->data);
00905 
00906       if ((cluster_function_index < (uint32_t) SIZE_clusterFunction)
00907           && (cluster_function_index == SET_CHANNEL_DATA_CLUSTER_FUNCTION)) {
00908 
00909         char *p = ic->data;
00910         clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].pfn(this,
00911                                                                (void *) (p + sizeof(int32_t)), ic->len - sizeof(int32_t));
00912 
00913         // Reverse swap since this will be processed again for deallocation.
00914         if (needByteSwap) {
00915           ats_swap32((uint32_t *) p); // length
00916           ats_swap32((uint32_t *) (p + sizeof(int32_t)));       // function code
00917         }
00918         // Mark message as processed.
00919         // Defer dellocation until entire read is complete.
00920         *((uint32_t *) p) = ~*((uint32_t *) p);
00921 
00922         ic = (IncomingControl *) ic->link.next;
00923       } else {
00924         // Reverse swap action this message will be reprocessed.
00925         if (needByteSwap) {
00926           ats_swap32((uint32_t *) ic->data);  // function code
00927         }
00928         break;
00929       }
00930     }
00931     read.msg.did_large_control_set_data = 1;
00932   }
00933 }
00934 
00935 void
00936 ClusterHandler::process_small_control_msgs()
00937 {
00938   if (read.msg.did_small_control_msgs) {
00939     return;
00940   } else {
00941     read.msg.did_small_control_msgs = 1;
00942   }
00943 
00944   ink_hrtime now = ink_get_hrtime();
00945   char *p = (char *) &read.msg.descriptor[read.msg.count] + read.msg.control_data_offset;
00946   char *endp = (char *) &read.msg.descriptor[read.msg.count] + read.msg.control_bytes;
00947 
00948   while (p < endp) {
00949     /////////////////////////////////////////////////////////////////
00950     // Place non cluster small incoming messages on external
00951     // incoming queue for processing by callout threads.
00952     /////////////////////////////////////////////////////////////////
00953     if (needByteSwap) {
00954       ats_swap32((uint32_t *) p);     // length
00955       ats_swap32((uint32_t *) (p + sizeof(int32_t)));   // function code
00956     }
00957     int len = *(int32_t *) p;
00958     p += sizeof(int32_t);
00959     uint32_t cluster_function_index = *(uint32_t *) p;
00960     ink_release_assert(cluster_function_index != SET_CHANNEL_DATA_CLUSTER_FUNCTION);
00961 
00962     if (cluster_function_index >= (uint32_t) SIZE_clusterFunction) {
00963       Warning("1Bad cluster function index (small control)");
00964       p += len;
00965 
00966     } else if (clusterFunction[cluster_function_index].ClusterFunc) {
00967       //////////////////////////////////////////////////////////////////////
00968       // Cluster function, can only be processed in ET_CLUSTER thread
00969       //////////////////////////////////////////////////////////////////////
00970       p += sizeof(uint32_t);
00971       clusterFunction[cluster_function_index].pfn(this, p, len - sizeof(int32_t));
00972       p += (len - sizeof(int32_t));
00973 
00974     } else {
00975       ///////////////////////////////////////////////////////
00976       // Non Cluster function, defer to callout threads
00977       ///////////////////////////////////////////////////////
00978       IncomingControl *ic = IncomingControl::alloc();
00979       ic->recognized_time = now;
00980       ic->len = len;
00981       ic->alloc_data();
00982       memcpy(ic->data, p, ic->len);
00983       SetHighBit(&ic->len);     // mark as small cntl
00984       ink_atomiclist_push(&external_incoming_control, (void *) ic);
00985       p += len;
00986     }
00987     p = (char *) DOUBLE_ALIGN(p);
00988   }
00989 }
00990 
00991 void
00992 ClusterHandler::process_large_control_msgs()
00993 {
00994   if (read.msg.did_large_control_msgs) {
00995     return;
00996   } else {
00997     read.msg.did_large_control_msgs = 1;
00998   }
00999 
01000   ////////////////////////////////////////////////////////////////
01001   // Place non cluster large incoming messages on external
01002   // incoming queue for processing by callout threads.
01003   ////////////////////////////////////////////////////////////////
01004   IncomingControl *ic = NULL;
01005   uint32_t cluster_function_index;
01006 
01007   while ((ic = incoming_control.dequeue())) {
01008     if (needByteSwap) {
01009       ats_swap32((uint32_t *) ic->data);      // function code
01010     }
01011     cluster_function_index = *((uint32_t *) ic->data);
01012     ink_release_assert(cluster_function_index != SET_CHANNEL_DATA_CLUSTER_FUNCTION);
01013 
01014     if (cluster_function_index == (uint32_t) ~ SET_CHANNEL_DATA_CLUSTER_FUNCTION) {
01015       // SET_CHANNEL_DATA_CLUSTER_FUNCTION already processed.
01016       // Just do memory deallocation.
01017 
01018       if (!clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].fMalloced)
01019         ic->freeall();
01020       continue;
01021     }
01022 
01023     if (cluster_function_index >= (uint32_t) SIZE_clusterFunction) {
01024       Warning("Bad cluster function index (large control)");
01025       ic->freeall();
01026 
01027     } else if (clusterFunction[cluster_function_index].ClusterFunc) {
01028       // Cluster message, process in ET_CLUSTER thread
01029       clusterFunction[cluster_function_index].pfn(this, (void *) (ic->data + sizeof(int32_t)),
01030                                                   ic->len - sizeof(int32_t));
01031 
01032       // Deallocate memory
01033       if (!clusterFunction[cluster_function_index].fMalloced)
01034         ic->freeall();
01035 
01036     } else {
01037       // Non Cluster message, process in non ET_CLUSTER thread
01038       ink_atomiclist_push(&external_incoming_control, (void *) ic);
01039     }
01040   }
01041 }
01042 
01043 void
01044 ClusterHandler::process_freespace_msgs()
01045 {
01046   if (read.msg.did_freespace_msgs) {
01047     return;
01048   } else {
01049     read.msg.did_freespace_msgs = 1;
01050   }
01051 
01052   int i;
01053   //
01054   // unpack CLUSTER_SEND_FREE (VC free space) messages and update
01055   // the free space in the target VC(s).
01056   //
01057   for (i = 0; i < read.msg.count; i++) {
01058     if (read.msg.descriptor[i].type == CLUSTER_SEND_FREE && read.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL) {
01059       int c = read.msg.descriptor[i].channel;
01060       if (c < n_channels && VALID_CHANNEL(channels[c]) &&
01061           (CLUSTER_SEQUENCE_NUMBER(channels[c]->token.sequence_number) == read.msg.descriptor[i].sequence_number)) {
01062         //
01063         // VC received freespace message, move it to the
01064         // current bucket, since it may have data to
01065         // write (WRITE_VC_PRIORITY).
01066         //
01067         channels[c]->remote_free = read.msg.descriptor[i].length;
01068         vcs_push(channels[c], VC_CLUSTER_WRITE);
01069       }
01070     }
01071   }
01072 }
01073 
01074 void
01075 ClusterHandler::add_to_byte_bank(ClusterVConnection * vc)
01076 {
01077   ByteBankDescriptor *bb_desc = ByteBankDescriptor::ByteBankDescriptor_alloc(vc->read_block);
01078   bool pending_byte_bank_completion = vc->byte_bank_q.head ? true : false;
01079 
01080   // Put current byte bank descriptor on completion list
01081   vc->byte_bank_q.enqueue(bb_desc);
01082 
01083   // Start byte bank completion action if not active
01084   if (!pending_byte_bank_completion) {
01085     ClusterVC_remove_read(vc);
01086     delayed_reads.push(vc);
01087     CLUSTER_INCREMENT_DYN_STAT(CLUSTER_LEVEL1_BANK_STAT);
01088   } else {
01089     CLUSTER_INCREMENT_DYN_STAT(CLUSTER_MULTILEVEL_BANK_STAT);
01090   }
01091   vc->read_block = 0;
01092 }
01093 
01094 void
01095 ClusterHandler::update_channels_read()
01096 {
01097   //
01098   // Update channels from which data has been read.
01099   //
01100   int i;
01101   int len;
01102   // This isn't used.
01103   // int nread = read.bytes_xfered;
01104 
01105   process_set_data_msgs();
01106 
01107   //
01108   // update the ClusterVConnections
01109   //
01110   for (i = 0; i < read.msg.count; i++) {
01111     if (read.msg.descriptor[i].type == CLUSTER_SEND_DATA && read.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL) {
01112       ClusterVConnection *vc = channels[read.msg.descriptor[i].channel];
01113       if (VALID_CHANNEL(vc) &&
01114           (read.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
01115         vc->last_activity_time = current_time;  // note activity time
01116 
01117         len = read.msg.descriptor[i].length;
01118         if (!len) {
01119           continue;
01120         }
01121 
01122         if (!vc->pending_remote_fill && vc_ok_read(vc)
01123             && (!((ProxyMutex *) vc->read_locked) || vc->byte_bank_q.head)) {
01124           //
01125           // Byte bank active or unable to acquire lock on VC.
01126           // Move data into the byte bank and attempt delivery
01127           // at the next periodic event.
01128           //
01129           vc->read_block->fill(len);    // note bytes received
01130           add_to_byte_bank(vc);
01131 
01132         } else {
01133           if (vc->pending_remote_fill || ((ProxyMutex *) vc->read_locked && vc_ok_read(vc))) {
01134             vc->read_block->fill(len);  // note bytes received
01135             if (!vc->pending_remote_fill) {
01136               vc->read.vio.buffer.writer()->append_block(vc->read_block->clone());
01137               vc->read_block->consume(len);     // note bytes moved to user
01138             }
01139             complete_channel_read(len, vc);
01140           }
01141         }
01142       }
01143     }
01144   }
01145 
01146   // Processs control and freespace messages
01147   process_small_control_msgs();
01148   process_large_control_msgs();
01149   process_freespace_msgs();
01150 }
01151 
01152 //
01153 // This member function is run in a non ET_CLUSTER thread, which
01154 // performs the input message processing on behalf of ET_CLUSTER.
01155 // Primary motivation is to allow blocking and unbounded runtime
01156 // for message processing which cannot be done with a ET_CLUSTER thread.
01157 //
01158 int
01159 ClusterHandler::process_incoming_callouts(ProxyMutex * m)
01160 {
01161   ProxyMutex *mutex = m;
01162   ink_hrtime now;
01163   //
01164   // Atomically dequeue all active requests from the external queue and
01165   // move them to the local working queue.  Insertion queue order is
01166   // maintained.
01167   //
01168   Queue<IncomingControl> local_incoming_control;
01169   IncomingControl *ic_ext_next;
01170   IncomingControl *ic_ext;
01171 
01172   while (true) {
01173     ic_ext = (IncomingControl *)
01174       ink_atomiclist_popall(&external_incoming_control);
01175     if (!ic_ext)
01176       break;
01177 
01178     while (ic_ext) {
01179       ic_ext_next = (IncomingControl *) ic_ext->link.next;
01180       ic_ext->link.next = NULL;
01181       local_incoming_control.push(ic_ext);
01182       ic_ext = ic_ext_next;
01183     }
01184 
01185     // Perform callout actions for each message.
01186     int small_control_msg;
01187     IncomingControl *ic = NULL;
01188 
01189     while ((ic = local_incoming_control.pop())) {
01190       LOG_EVENT_TIME(ic->recognized_time, inmsg_time_dist, inmsg_events);
01191 
01192       // Determine if this a small control message
01193       small_control_msg = IsHighBitSet(&ic->len);
01194       ClearHighBit(&ic->len);   // Clear small msg flag bit
01195 
01196       if (small_control_msg) {
01197         int len = ic->len;
01198         char *p = ic->data;
01199         uint32_t cluster_function_index = *(uint32_t *) p;
01200         p += sizeof(uint32_t);
01201 
01202         if (cluster_function_index < (uint32_t) SIZE_clusterFunction) {
01203           ////////////////////////////////
01204           // Invoke processing function
01205           ////////////////////////////////
01206           ink_assert(!clusterFunction[cluster_function_index].ClusterFunc);
01207           clusterFunction[cluster_function_index].pfn(this, p, len - sizeof(int32_t));
01208           now = ink_get_hrtime();
01209           CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_RECV_TIME_STAT, now - ic->recognized_time);
01210         } else {
01211           Warning("2Bad cluster function index (small control)");
01212         }
01213         // Deallocate memory
01214         if (!clusterFunction[cluster_function_index].fMalloced)
01215           ic->freeall();
01216 
01217       } else {
01218         ink_assert(ic->len > 4);
01219         uint32_t cluster_function_index = *(uint32_t *) ic->data;
01220         bool valid_index;
01221 
01222         if (cluster_function_index < (uint32_t) SIZE_clusterFunction) {
01223           valid_index = true;
01224           ////////////////////////////////
01225           // Invoke processing function
01226           ////////////////////////////////
01227           ink_assert(!clusterFunction[cluster_function_index].ClusterFunc);
01228           clusterFunction[cluster_function_index].pfn(this, (void *) (ic->data + sizeof(int32_t)),
01229                                                       ic->len - sizeof(int32_t));
01230           now = ink_get_hrtime();
01231           CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_RECV_TIME_STAT, now - ic->recognized_time);
01232         } else {
01233           valid_index = false;
01234           Warning("2Bad cluster function index (large control)");
01235         }
01236         if (valid_index && !clusterFunction[cluster_function_index].fMalloced)
01237           ic->freeall();
01238       }
01239     }
01240   }
01241   return EVENT_CONT;
01242 }
01243 
01244 void
01245 ClusterHandler::update_channels_partial_read()
01246 {
01247   //
01248   // We were unable to read the computed amount.  Reflect the partial
01249   // amount read in the associated VC read buffer data structures.
01250   //
01251   int i;
01252   int64_t res = read.bytes_xfered;
01253 
01254   if (!res) {
01255     return;
01256   }
01257   ink_assert(res <= read.did);
01258 
01259   // how much of the iov was done
01260 
01261   int64_t iov_done[MAX_TCOUNT];
01262   int64_t total = 0;
01263   int64_t already_read = read.did - read.bytes_xfered;
01264 
01265   for (i = 0; i < read.n_iov; i++) {
01266     ink_release_assert(already_read >= 0);
01267     iov_done[i] = read.iov[i].iov_len;
01268 
01269     // Skip over bytes already processed
01270     if (already_read) {
01271       already_read -= iov_done[i];
01272       if (already_read < 0) {
01273         iov_done[i] = -already_read;    // bytes remaining
01274         already_read = 0;
01275       } else {
01276         iov_done[i] = 0;
01277         continue;
01278       }
01279     }
01280     // Adjustments for partial read for the current transfer
01281     res -= iov_done[i];
01282     if (res < 0) {
01283       iov_done[i] += res;
01284       res = 0;
01285     } else {
01286       total += iov_done[i];
01287     }
01288   }
01289   ink_assert(total <= read.did);
01290 
01291   int read_all_large_control_msgs = 0;
01292   //
01293   // update the ClusterVConnections buffer pointers
01294   //
01295   for (i = 0; i < read.msg.count; i++) {
01296     if (read.msg.descriptor[i].type == CLUSTER_SEND_DATA && read.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL) {
01297       ClusterVConnection *vc = channels[read.msg.descriptor[i].channel];
01298       if (VALID_CHANNEL(vc) &&
01299           (read.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
01300         if (vc->pending_remote_fill || (vc_ok_read(vc) && (vc->iov_map != CLUSTER_IOV_NONE))) {
01301           vc->last_activity_time = current_time;        // note activity time
01302           ClusterVConnState *s = &vc->read;
01303           ink_assert(vc->iov_map < read.n_iov);
01304           int len = iov_done[vc->iov_map];
01305 
01306           if (len) {
01307             if (!read_all_large_control_msgs) {
01308               //
01309               // Since all large set_data control messages reside at the
01310               // beginning, all have been read if the first non-control
01311               // descriptor contains > 0 bytes.
01312               // Process them ahead of any VC data completion actions
01313               // followed by small control and freespace message processing.
01314               //
01315               process_set_data_msgs();
01316               process_small_control_msgs();
01317               process_freespace_msgs();
01318               read_all_large_control_msgs = 1;
01319             }
01320             iov_done[vc->iov_map] = 0;
01321             vc->read_block->fill(len);  // note bytes received
01322 
01323             if (!vc->pending_remote_fill) {
01324               if ((ProxyMutex *) vc->read_locked) {
01325                 Debug("cluster_vc_xfer", "Partial read, credit ch %d %p %d bytes", vc->channel, vc, len);
01326                 s->vio.buffer.writer()->append_block(vc->read_block->clone());
01327                 if (complete_channel_read(len, vc)) {
01328                   vc->read_block->consume(len); // note bytes moved to user
01329                 }
01330 
01331               } else {
01332                 // If we have all the data for the VC, move it
01333                 // into the byte bank.  Otherwise, do nothing since
01334                 // we will resume the read at this VC.
01335 
01336                 if (len == (int) read.msg.descriptor[i].length) {
01337                   Debug("cluster_vc_xfer", "Partial read, byte bank move ch %d %p %d bytes", vc->channel, vc, len);
01338                   add_to_byte_bank(vc);
01339                 }
01340               }
01341             } else {
01342               Debug("cluster_vc_xfer", "Partial remote fill read, credit ch %d %p %d bytes", vc->channel, vc, len);
01343               complete_channel_read(len, vc);
01344             }
01345             read.msg.descriptor[i].length -= len;
01346             ink_assert(((int) read.msg.descriptor[i].length) >= 0);
01347           }
01348           Debug(CL_TRACE, "partial_channel_read chan=%d len=%d", vc->channel, len);
01349         }
01350       }
01351     }
01352   }
01353 }
01354 
01355 bool ClusterHandler::complete_channel_read(int len, ClusterVConnection * vc)
01356 {
01357   //
01358   // We have processed a complete VC read request message for a channel,
01359   // perform completion actions.
01360   //
01361   ClusterVConnState *s = &vc->read;
01362 
01363   if (vc->pending_remote_fill) {
01364     Debug(CL_TRACE, "complete_channel_read chan=%d len=%d", vc->channel, len);
01365     vc->initial_data_bytes += len;
01366     ++vc->pending_remote_fill;  // Note completion
01367     return (vc->closed ? false : true);
01368   }
01369 
01370   if (vc->closed)
01371     return false;               // No action if already closed
01372 
01373   ink_assert((ProxyMutex *) s->vio.mutex == (ProxyMutex *) s->vio._cont->mutex);
01374 
01375   Debug("cluster_vc_xfer", "Complete read, credit ch %d %p %d bytes", vc->channel, vc, len);
01376   s->vio.ndone += len;
01377 
01378   if (s->vio.ntodo() <= 0) {
01379     s->enabled = 0;
01380     if (cluster_signal_and_update_locked(VC_EVENT_READ_COMPLETE, vc, s)
01381         == EVENT_DONE)
01382       return false;
01383   } else {
01384     if (cluster_signal_and_update_locked(VC_EVENT_READ_READY, vc, s) == EVENT_DONE)
01385       return false;
01386     if (s->vio.ntodo() <= 0)
01387       s->enabled = 0;
01388   }
01389 
01390   vcs_push(vc, VC_CLUSTER_READ);
01391   return true;
01392 }
01393 
01394 void
01395 ClusterHandler::finish_delayed_reads()
01396 {
01397   //
01398   // Process pending VC delayed reads generated in the last read from
01399   // the node to node connection. For explanation of "delayed read" see
01400   // comments at the beginning of the member functions for ClusterHandler.
01401   //
01402   ClusterVConnection *vc = NULL;
01403   DLL<ClusterVConnectionBase> l;
01404   while ((vc = (ClusterVConnection *) delayed_reads.pop())) {
01405     MUTEX_TRY_LOCK_SPIN(lock, vc->read.vio.mutex, thread, READ_LOCK_SPIN_COUNT);
01406     if (lock) {
01407       if (vc_ok_read(vc)) {
01408         ink_assert(!vc->read.queue);
01409         ByteBankDescriptor *d;
01410 
01411         while ((d = vc->byte_bank_q.dequeue())) {
01412           if (vc->read.queue) {
01413             // Previous complete_channel_read() put us back on the list,
01414             //  remove our self to process another byte bank completion
01415             ClusterVC_remove_read(vc);
01416           }
01417           Debug("cluster_vc_xfer",
01418                 "Delayed read, credit ch %d %p %" PRId64" bytes", vc->channel, vc, d->get_block()->read_avail());
01419           vc->read.vio.buffer.writer()->append_block(d->get_block());
01420 
01421           if (complete_channel_read(d->get_block()->read_avail(), vc)) {
01422             ByteBankDescriptor::ByteBankDescriptor_free(d);
01423           } else {
01424             ByteBankDescriptor::ByteBankDescriptor_free(d);
01425             break;
01426           }
01427         }
01428       }
01429     } else
01430       l.push(vc);
01431   }
01432   delayed_reads = l;
01433 }
01434 
01435 void
01436 ClusterHandler::update_channels_written()
01437 {
01438   //
01439   // We have sucessfully pushed the write data for the VC(s) described
01440   // by the descriptors.
01441   // Move the channels in this bucket to a new bucket.
01442   // Lower the priority of those with too little data and raise that of
01443   // those with too much data.
01444   //
01445   ink_hrtime now;
01446   for (int i = 0; i < write.msg.count; i++) {
01447     if (write.msg.descriptor[i].type == CLUSTER_SEND_DATA) {
01448       if (write.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL) {
01449         ClusterVConnection *vc = channels[write.msg.descriptor[i].channel];
01450         if (VALID_CHANNEL(vc) &&
01451             (write.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
01452 
01453           if (vc->pending_remote_fill) {
01454             Debug(CL_TRACE,
01455                   "update_channels_written chan=%d seqno=%d len=%d",
01456                   write.msg.descriptor[i].channel,
01457                   write.msg.descriptor[i].sequence_number, write.msg.descriptor[i].length);
01458             vc->pending_remote_fill = 0;
01459             vc->remote_write_block = 0; // free data block
01460             continue;           // ignore remote write fill VC(s)
01461           }
01462 
01463           ClusterVConnState *s = &vc->write;
01464           int len = write.msg.descriptor[i].length;
01465           vc->write_bytes_in_transit -= len;
01466           ink_release_assert(vc->write_bytes_in_transit >= 0);
01467           Debug(CL_PROTO, "(%d) data sent %d %" PRId64, write.msg.descriptor[i].channel, len, s->vio.ndone);
01468 
01469           if (vc_ok_write(vc)) {
01470             vc->last_activity_time = current_time;      // note activity time
01471             int64_t ndone = vc->was_closed()? 0 : s->vio.ndone;
01472 
01473             if (ndone < vc->remote_free) {
01474               vcs_push(vc, VC_CLUSTER_WRITE);
01475             }
01476           }
01477         }
01478       } else {
01479         //
01480         // Free up outgoing control message space
01481         //
01482         OutgoingControl *oc = write.msg.outgoing_control.dequeue();
01483         oc->free_data();
01484         oc->mutex = NULL;
01485         now = ink_get_hrtime();
01486         CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_SEND_TIME_STAT, now - oc->submit_time);
01487         LOG_EVENT_TIME(oc->submit_time, cluster_send_time_dist, cluster_send_events);
01488         oc->freeall();
01489       }
01490     }
01491   }
01492   //
01493   // For compound messages, deallocate the data and header descriptors.
01494   // The deallocation of the data descriptor will indirectly invoke
01495   // the free memory proc described in set_data.
01496   //
01497   invoke_remote_data_args *args;
01498   OutgoingControl *hdr_oc;
01499   while ((hdr_oc = write.msg.outgoing_callout.dequeue())) {
01500     args = (invoke_remote_data_args *) (hdr_oc->data + sizeof(int32_t));
01501     ink_assert(args->magicno == invoke_remote_data_args::MagicNo);
01502 
01503     // Free data descriptor
01504     args->data_oc->free_data(); // invoke memory free callback
01505     args->data_oc->mutex = NULL;
01506     args->data_oc->freeall();
01507 
01508     // Free descriptor
01509     hdr_oc->free_data();
01510     hdr_oc->mutex = NULL;
01511     now = ink_get_hrtime();
01512     CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_SEND_TIME_STAT, now - hdr_oc->submit_time);
01513     LOG_EVENT_TIME(hdr_oc->submit_time, cluster_send_time_dist, cluster_send_events);
01514     hdr_oc->freeall();
01515   }
01516 }
01517 
01518 int
01519 ClusterHandler::build_write_descriptors()
01520 {
01521   //
01522   // Construct the write descriptors for VC write data in the current
01523   // write_vcs bucket with considerations for maximum elements per
01524   // write (struct iovec system maximum).
01525   //
01526   int count_bucket = cur_vcs;
01527   int tcount = write.msg.count + 2;     // count + descriptor
01528   int write_descriptors_built = 0;
01529   int valid;
01530   int list_len = 0;
01531   ClusterVConnection *vc, *vc_next;
01532 
01533   //
01534   // Build descriptors for connections with stuff to send.
01535   //
01536   vc = (ClusterVConnection *)ink_atomiclist_popall(&write_vcs_ready);
01537   while (vc) {
01538     enter_exit(&cls_build_writes_entered, &cls_writes_exited);
01539     vc_next = (ClusterVConnection *) vc->ready_alink.next;
01540     vc->ready_alink.next = NULL;
01541     list_len++;
01542     if (VC_CLUSTER_CLOSED == vc->type) {
01543       vc->in_vcs = false;
01544       vc->type = VC_NULL;
01545       clusterVCAllocator.free(vc);
01546       vc = vc_next;
01547       continue;
01548     }
01549 
01550     if (tcount >= MAX_TCOUNT) {
01551       vcs_push(vc, VC_CLUSTER_WRITE);
01552     } else {
01553       vc->in_vcs = false;
01554       cluster_reschedule_offset(this, vc, &vc->write, 0);
01555       tcount++;
01556     }
01557     vc = vc_next;
01558   }
01559   if (list_len) {
01560     CLUSTER_SUM_DYN_STAT(CLUSTER_VC_WRITE_LIST_LEN_STAT, list_len);
01561   }
01562 
01563   tcount = write.msg.count + 2;
01564   vc_next = (ClusterVConnection *) write_vcs[count_bucket].head;
01565   while (vc_next) {
01566     vc = vc_next;
01567     vc_next = (ClusterVConnection *) vc->write.link.next;
01568 
01569     if (VC_CLUSTER_CLOSED == vc->type) {
01570       vc->type = VC_NULL;
01571       clusterVCAllocator.free(vc);
01572       continue;
01573     }
01574 
01575     if (tcount >= MAX_TCOUNT)
01576       break;
01577 
01578     valid = valid_for_data_write(vc);
01579     if (-1 == valid) {
01580       vcs_push(vc, VC_CLUSTER_WRITE);
01581     } else if (valid) {
01582       ink_assert(vc->write_locked);     // Acquired in valid_for_data_write()
01583       if ((vc->remote_free > (vc->write.vio.ndone - vc->write_list_bytes))
01584           && channels[vc->channel] == vc) {
01585 
01586         ink_assert(vc->write_list && vc->write_list_bytes);
01587 
01588         int d = write.msg.count;
01589         write.msg.descriptor[d].type = CLUSTER_SEND_DATA;
01590         write.msg.descriptor[d].channel = vc->channel;
01591         write.msg.descriptor[d].sequence_number = vc->token.sequence_number;
01592         int s = vc->write_list_bytes;
01593         ink_release_assert(s <= MAX_CLUSTER_SEND_LENGTH);
01594 
01595         // Transfer no more than nbytes
01596         if ((vc->write.vio.ndone - s) > vc->write.vio.nbytes)
01597           s = vc->write.vio.nbytes - (vc->write.vio.ndone - s);
01598 
01599         if ((vc->write.vio.ndone - s) > vc->remote_free)
01600           s = vc->remote_free - (vc->write.vio.ndone - s);
01601         write.msg.descriptor[d].length = s;
01602         write.msg.count++;
01603         tcount++;
01604         write_descriptors_built++;
01605 
01606 #ifdef CLUSTER_STATS
01607         _vc_writes++;
01608         _vc_write_bytes += s;
01609 #endif
01610 
01611       } else {
01612         MUTEX_UNTAKE_LOCK(vc->write_locked, thread);
01613         vc->write_locked = NULL;
01614 
01615         if (channels[vc->channel] == vc)
01616           CLUSTER_INCREMENT_DYN_STAT(CLUSTER_NO_REMOTE_SPACE_STAT);
01617       }
01618     }
01619   }
01620   return (write_descriptors_built);
01621 }
01622 
01623 int
01624 ClusterHandler::build_freespace_descriptors()
01625 {
01626   //
01627   // Construct the write descriptors for VC freespace data in the current
01628   // read_vcs bucket with considerations for maximum elements per
01629   // write (struct iovec system maximum) and for pending elements already
01630   // in the list.
01631   //
01632   int count_bucket = cur_vcs;
01633   int tcount = write.msg.count + 2;     // count + descriptor require 2 iovec(s)
01634   int freespace_descriptors_built = 0;
01635   int s = 0;
01636   int list_len = 0;
01637   ClusterVConnection *vc, *vc_next;
01638 
01639   //
01640   // Build descriptors for available space
01641   //
01642   vc = (ClusterVConnection *)ink_atomiclist_popall(&read_vcs_ready);
01643   while (vc) {
01644     enter_exit(&cls_build_reads_entered, &cls_reads_exited);
01645     vc_next = (ClusterVConnection *) vc->ready_alink.next;
01646     vc->ready_alink.next = NULL;
01647     list_len++;
01648     if (VC_CLUSTER_CLOSED == vc->type) {
01649       vc->in_vcs = false;
01650       vc->type = VC_NULL;
01651       clusterVCAllocator.free(vc);
01652       vc = vc_next;
01653       continue;
01654     }
01655 
01656     if (tcount >= MAX_TCOUNT) {
01657       vcs_push(vc, VC_CLUSTER_READ);
01658     } else {
01659       vc->in_vcs = false;
01660       cluster_reschedule_offset(this, vc, &vc->read, 0);
01661       tcount++;
01662     }
01663     vc = vc_next;
01664   }
01665   if (list_len) {
01666     CLUSTER_SUM_DYN_STAT(CLUSTER_VC_READ_LIST_LEN_STAT, list_len);
01667   }
01668 
01669   tcount = write.msg.count + 2;
01670   vc_next = (ClusterVConnection *) read_vcs[count_bucket].head;
01671   while (vc_next) {
01672     vc = vc_next;
01673     vc_next = (ClusterVConnection *) vc->read.link.next;
01674 
01675     if (VC_CLUSTER_CLOSED == vc->type) {
01676       vc->type = VC_NULL;
01677       clusterVCAllocator.free(vc);
01678       continue;
01679     }
01680 
01681     if (tcount >= MAX_TCOUNT)
01682       break;
01683 
01684     s = valid_for_freespace_write(vc);
01685     if (-1 == s) {
01686       vcs_push(vc, VC_CLUSTER_READ);
01687     } else if (s) {
01688       if (vc_ok_read(vc) && channels[vc->channel] == vc) {
01689         // Send free space only if changed
01690         int d = write.msg.count;
01691         write.msg.descriptor[d].type = CLUSTER_SEND_FREE;
01692         write.msg.descriptor[d].channel = vc->channel;
01693         write.msg.descriptor[d].sequence_number = vc->token.sequence_number;
01694 
01695         ink_assert(s > 0);
01696         write.msg.descriptor[d].length = s;
01697         vc->last_local_free = s;
01698         Debug(CL_PROTO, "(%d) free space priority %d", vc->channel, vc->read.priority);
01699         write.msg.count++;
01700         tcount++;
01701         freespace_descriptors_built++;
01702       }
01703     }
01704   }
01705   return (freespace_descriptors_built);
01706 }
01707 
01708 int
01709 ClusterHandler::build_controlmsg_descriptors()
01710 {
01711   //
01712   // Construct the write descriptors for control message data in the
01713   // outgoing_control queue with considerations for maximum elements per
01714   // write (struct iovec system maximum) and for elements already
01715   // in the list.
01716   //
01717   int tcount = write.msg.count + 2;     // count + descriptor require 2 iovec(s)
01718   int control_msgs_built = 0;
01719   bool compound_msg;            // msg + chan data
01720   //
01721   // Build descriptors for control messages
01722   //
01723   OutgoingControl *c = NULL;
01724   int control_bytes = 0;
01725   int q = 0;
01726 
01727   while (tcount < (MAX_TCOUNT - 1)) {   // -1 to allow for compound messages
01728     c = outgoing_control[q].pop();
01729     if (!c) {
01730       // Move elements from global outgoing_control to local queue
01731       OutgoingControl *c_next;
01732       c = (OutgoingControl *) ink_atomiclist_popall(&outgoing_control_al[q]);
01733       if (c == 0) {
01734         if (++q >= CLUSTER_CMSG_QUEUES) {
01735           break;
01736         } else {
01737           continue;
01738         }
01739       }
01740       while (c) {
01741         c_next = (OutgoingControl *) c->link.next;
01742         c->link.next = NULL;
01743         outgoing_control[q].push(c);
01744         c = c_next;
01745       }
01746       continue;
01747 
01748     } else {
01749       compound_msg = (*((int32_t *) c->data) == -1);      // (msg+chan data)?
01750     }
01751     if (!compound_msg && c->len <= SMALL_CONTROL_MESSAGE &&
01752         // check if the receiving cluster function will want to malloc'ed data
01753         !clusterFunction[*(int32_t *) c->data].fMalloced && control_bytes + c->len + sizeof(int32_t) * 2 + 7 < CONTROL_DATA) {
01754       write.msg.outgoing_small_control.enqueue(c);
01755       control_bytes += c->len + sizeof(int32_t) * 2 + 7;  // safe approximation
01756       control_msgs_built++;
01757 
01758       if (clusterFunction[*(int32_t *) c->data].post_pfn) {
01759         clusterFunction[*(int32_t *) c->data].post_pfn(this, c->data + sizeof(int32_t), c->len);
01760       }
01761       continue;
01762     }
01763     //
01764     // Build large control message descriptor
01765     //
01766     if (compound_msg) {
01767       // Extract out components of compound message.
01768       invoke_remote_data_args *cmhdr = (invoke_remote_data_args *) (c->data + sizeof(int32_t));
01769       OutgoingControl *oc_header = c;
01770       OutgoingControl *oc_msg = cmhdr->msg_oc;
01771       OutgoingControl *oc_data = cmhdr->data_oc;
01772 
01773       ink_assert(cmhdr->magicno == invoke_remote_data_args::MagicNo);
01774       //
01775       // Build descriptors and order the data before the reply message.
01776       // Reply message processing assumes data completion action performed
01777       // prior to processing completion message.
01778       // Not an issue today since channel data is always processed first.
01779       //
01780       int d;
01781       d = write.msg.count;
01782       write.msg.descriptor[d].type = CLUSTER_SEND_DATA;
01783       write.msg.descriptor[d].channel = cmhdr->dest_channel;
01784       write.msg.descriptor[d].length = oc_data->len;
01785       write.msg.descriptor[d].sequence_number = cmhdr->token.sequence_number;
01786 
01787 #ifdef CLUSTER_STATS
01788       _vc_write_bytes += oc_data->len;
01789 #endif
01790 
01791       // Setup remote write fill iovec.  Remote write fills have no VIO.
01792       ClusterVConnection *vc = channels[cmhdr->dest_channel];
01793 
01794       if (VALID_CHANNEL(vc) && vc->pending_remote_fill) {
01795         ink_release_assert(!vc->remote_write_block);
01796         vc->remote_write_block = oc_data->get_block();
01797 
01798         // Note: No array overrun since we are bounded by (MAX_TCOUNT-1).
01799         write.msg.count++;
01800         tcount++;
01801         control_msgs_built++;
01802         d = write.msg.count;
01803         write.msg.outgoing_control.enqueue(oc_msg);
01804         write.msg.descriptor[d].type = CLUSTER_SEND_DATA;
01805         write.msg.descriptor[d].channel = CLUSTER_CONTROL_CHANNEL;
01806         write.msg.descriptor[d].length = oc_msg->len;
01807 
01808 #ifdef CLUSTER_STATS
01809         _control_write_bytes += oc_msg->len;
01810 #endif
01811 
01812         write.msg.count++;
01813         tcount++;
01814         control_msgs_built++;
01815 
01816         // Queue header to process buffer free memory callbacks after send.
01817         write.msg.outgoing_callout.enqueue(oc_header);
01818 
01819       } else {
01820         // Operation cancelled free memory.
01821         Warning("Pending remote read fill aborted chan=%d len=%d", cmhdr->dest_channel, oc_data->len);
01822 
01823         // Free compound message
01824         oc_header->free_data();
01825         oc_header->mutex = NULL;
01826         oc_header->freeall();
01827 
01828         // Free response message
01829         oc_msg->free_data();
01830         oc_msg->mutex = 0;
01831         oc_msg->freeall();
01832 
01833         // Free data descriptor
01834         oc_data->free_data();   // invoke memory free callback
01835         oc_data->mutex = 0;
01836         oc_data->freeall();
01837       }
01838 
01839     } else {
01840       write.msg.outgoing_control.enqueue(c);
01841 
01842       int d = write.msg.count;
01843       write.msg.descriptor[d].type = CLUSTER_SEND_DATA;
01844       write.msg.descriptor[d].channel = CLUSTER_CONTROL_CHANNEL;
01845       write.msg.descriptor[d].length = c->len;
01846 
01847 #ifdef CLUSTER_STATS
01848       _control_write_bytes += c->len;
01849 #endif
01850 
01851       write.msg.count++;
01852       tcount++;
01853       control_msgs_built++;
01854 
01855       if (clusterFunction[*(int32_t *) c->data].post_pfn) {
01856         clusterFunction[*(int32_t *) c->data].post_pfn(this, c->data + sizeof(int32_t), c->len);
01857       }
01858     }
01859   }
01860   return control_msgs_built;
01861 }
01862 
01863 int
01864 ClusterHandler::add_small_controlmsg_descriptors()
01865 {
01866   //
01867   // Move small control message data to free space after descriptors
01868   //
01869   char *p = (char *) &write.msg.descriptor[write.msg.count];
01870   OutgoingControl *c = NULL;
01871 
01872   while ((c = write.msg.outgoing_small_control.dequeue())) {
01873     *(int32_t *) p = c->len;
01874     p += sizeof(int32_t);
01875     memcpy(p, c->data, c->len);
01876     c->free_data();
01877     c->mutex = NULL;
01878     p += c->len;
01879     ink_hrtime now = ink_get_hrtime();
01880     CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_SEND_TIME_STAT, now - c->submit_time);
01881     LOG_EVENT_TIME(c->submit_time, cluster_send_time_dist, cluster_send_events);
01882     c->freeall();
01883     p = (char *) DOUBLE_ALIGN(p);
01884   }
01885   write.msg.control_bytes = p - (char *) &write.msg.descriptor[write.msg.count];
01886 
01887 #ifdef CLUSTER_STATS
01888   _control_write_bytes += write.msg.control_bytes;
01889 #endif
01890 
01891   return 1;
01892 }
01893 
01894 struct DestructorLock
01895 {
01896   DestructorLock(EThread * thread)
01897   {
01898     have_lock = false;
01899     t = thread;
01900   }
01901    ~DestructorLock()
01902   {
01903     if (have_lock && m) {
01904       Mutex_unlock(m, t);
01905     }
01906     m = 0;
01907   }
01908   EThread *t;
01909   Ptr<ProxyMutex> m;
01910   bool have_lock;
01911 };
01912 
01913 int
01914 ClusterHandler::valid_for_data_write(ClusterVConnection * vc)
01915 {
01916   //
01917   // Determine if writes are allowed on this VC
01918   //
01919   ClusterVConnState *s = &vc->write;
01920 
01921   ink_assert(!on_stolen_thread);
01922   ink_assert((ProxyMutex *) ! vc->write_locked);
01923 
01924   //
01925   // Attempt to get the lock, if we miss, push vc into the future
01926   //
01927   DestructorLock lock(thread);
01928 
01929 retry:
01930   if ((lock.m = s->vio.mutex)) {
01931     lock.have_lock = MUTEX_TAKE_TRY_LOCK_FOR_SPIN(lock.m, thread, s->vio._cont, WRITE_LOCK_SPIN_COUNT);
01932     if (!lock.have_lock) {
01933       CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_WRITE_LOCKED_STAT);
01934 
01935 #ifdef CLUSTER_STATS
01936       _dw_missed_lock++;
01937 #endif
01938       return -1;
01939     }
01940   }
01941 
01942   if (vc->was_closed()) {
01943     if (vc->schedule_write()) {
01944 #ifdef CLUSTER_TOMCAT
01945       ink_assert(lock.m);
01946 #endif
01947       vc->write_locked = lock.m;
01948       lock.m = 0;
01949       lock.have_lock = false;
01950       return 1;
01951     } else {
01952       if (!vc->write_bytes_in_transit) {
01953         close_ClusterVConnection(vc);
01954       }
01955       return 0;
01956     }
01957   }
01958 
01959   if (!s->enabled && !vc->was_remote_closed()) {
01960 #ifdef CLUSTER_STATS
01961     _dw_not_enabled++;
01962 #endif
01963     return 0;
01964   }
01965 
01966   if (vc->pending_remote_fill) {
01967     if (vc->was_remote_closed())
01968       close_ClusterVConnection(vc);
01969 
01970 #ifdef CLUSTER_STATS
01971     _dw_wait_remote_fill++;
01972 #endif
01973     return 0;
01974   }
01975 
01976   if (!lock.have_lock || !s->vio.mutex || !s->vio._cont) {
01977     if (!lock.have_lock && s->vio.mutex && s->vio._cont) {
01978       goto retry;
01979     } else {
01980       // No active VIO
01981 #ifdef CLUSTER_STATS
01982       _dw_no_active_vio++;
01983 #endif
01984       return 0;
01985     }
01986   }
01987   //
01988   // If this connection has been closed remotely, send EOS
01989   //
01990   if (vc->was_remote_closed()) {
01991     if (!vc->write_bytes_in_transit && !vc->schedule_write()) {
01992       remote_close(vc, s);
01993     }
01994     return 0;
01995   }
01996   //
01997   // If not enabled or not WRITE
01998   //
01999   if (!s->enabled || s->vio.op != VIO::WRITE) {
02000     s->enabled = 0;
02001 #ifdef CLUSTER_STATS
02002     _dw_not_enabled_or_no_write++;
02003 #endif
02004     return 0;
02005   }
02006   //
02007   // If no room on the remote side or set_data() messages pending
02008   //
02009   int set_data_msgs_pending = vc->n_set_data_msgs;
02010   if (set_data_msgs_pending || (vc->remote_free <= (s->vio.ndone - vc->write_list_bytes))) {
02011     if (set_data_msgs_pending) {
02012       CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_WRITE_STALL_STAT);
02013 
02014 #ifdef CLUSTER_STATS
02015       _dw_set_data_pending++;
02016 #endif
02017 
02018     } else {
02019 #ifdef CLUSTER_STATS
02020       _dw_no_free_space++;
02021 #endif
02022     }
02023     return 0;
02024   }
02025   //
02026   // Calculate amount writable
02027   //
02028   MIOBufferAccessor & buf = s->vio.buffer;
02029 
02030   int64_t towrite = buf.reader()->read_avail();
02031   int64_t ntodo = s->vio.ntodo();
02032   bool write_vc_signal = false;
02033 
02034   if (towrite > ntodo)
02035     towrite = ntodo;
02036 
02037   ink_assert(ntodo >= 0);
02038   if (ntodo <= 0) {
02039     cluster_signal_and_update(VC_EVENT_WRITE_COMPLETE, vc, s);
02040     return 0;
02041   }
02042   if (buf.writer()->write_avail() && towrite != ntodo) {
02043     write_vc_signal = true;
02044     if (cluster_signal_and_update(VC_EVENT_WRITE_READY, vc, s) == EVENT_DONE)
02045       return 0;
02046     ink_assert(s->vio.ntodo() >= 0);
02047     if (s->vio.ntodo() <= 0) {
02048       cluster_signal_and_update(VC_EVENT_WRITE_COMPLETE, vc, s);
02049       return 0;
02050     }
02051   }
02052   // Clone nbytes of vio.buffer.reader IOBufferBlock list allowing
02053   // write_list to contain no more than DEFAULT_MAX_BUFFER_SIZE bytes.
02054 
02055   Ptr<IOBufferBlock> b_list;
02056   IOBufferBlock *b_tail;
02057   int bytes_to_fill;
02058   int consume_bytes;
02059 
02060   bytes_to_fill = DEFAULT_MAX_BUFFER_SIZE - vc->write_list_bytes;
02061 
02062   if (towrite && bytes_to_fill) {
02063     consume_bytes = (towrite > bytes_to_fill) ? bytes_to_fill : towrite;
02064     b_list = clone_IOBufferBlockList(s->vio.buffer.reader()->block,
02065                                      s->vio.buffer.reader()->start_offset, consume_bytes, &b_tail);
02066     ink_assert(b_tail);
02067 
02068     // Append cloned IOBufferBlock list to VC write_list.
02069 
02070     if (vc->write_list_tail) {
02071       vc->write_list_tail->next = b_list;
02072     } else {
02073       vc->write_list = b_list;
02074     }
02075     vc->write_list_tail = b_tail;
02076     vc->write_list_bytes += consume_bytes;
02077     ink_assert(bytes_IOBufferBlockList(vc->write_list, 1) == vc->write_list_bytes);
02078 
02079     // We may defer the write, but tell the user we have consumed the data.
02080 
02081     (s->vio.buffer.reader())->consume(consume_bytes);
02082     s->vio.ndone += consume_bytes;
02083     if (s->vio.ntodo() <= 0) {
02084       cluster_signal_and_update_locked(VC_EVENT_WRITE_COMPLETE, vc, s);
02085     }
02086   }
02087 
02088   if (vc->schedule_write()) {
02089 #ifdef CLUSTER_TOMCAT
02090     ink_assert(s->vio.mutex);
02091 #endif
02092     vc->write_locked = lock.m;
02093     lock.m = 0;
02094     lock.have_lock = false;
02095     return 1;
02096   } else {
02097     if (!write_vc_signal && buf.writer()->write_avail() && towrite != ntodo)
02098        cluster_signal_and_update(VC_EVENT_WRITE_READY, vc, s);
02099     return 0;
02100   }
02101 }
02102 
02103 int
02104 ClusterHandler::valid_for_freespace_write(ClusterVConnection * vc)
02105 {
02106   //
02107   // Determine if freespace messages are allowed on this VC
02108   //
02109   ClusterVConnState *s = &vc->read;
02110 
02111   ink_assert(!on_stolen_thread);
02112 
02113   //
02114   // Attempt to get the lock, if we miss, push vc into the future
02115   //
02116   DestructorLock lock(thread);
02117 
02118 retry:
02119   if ((lock.m = s->vio.mutex)) {
02120     lock.have_lock = MUTEX_TAKE_TRY_LOCK_FOR_SPIN(lock.m, thread, s->vio._cont, READ_LOCK_SPIN_COUNT);
02121 
02122     if (!lock.have_lock) {
02123       CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_READ_LOCKED_STAT);
02124 
02125 #ifdef CLUSTER_STATS
02126       _fw_missed_lock++;
02127 #endif
02128       return -1;
02129     }
02130   }
02131   if (vc->was_closed()) {
02132     if (!vc->write_bytes_in_transit && !vc->schedule_write()) {
02133       close_ClusterVConnection(vc);
02134     } 
02135     return 0;
02136   }
02137 
02138   if (!s->enabled && !vc->was_remote_closed()) {
02139 #ifdef CLUSTER_STATS
02140     _fw_not_enabled++;
02141 #endif
02142     return 0;
02143   }
02144 
02145   if (vc->pending_remote_fill) {
02146     if (vc->was_remote_closed())
02147       close_ClusterVConnection(vc);
02148 
02149 #ifdef CLUSTER_STATS
02150     _fw_wait_remote_fill++;
02151 #endif
02152     return 0;
02153   }
02154 
02155   if (!lock.have_lock || !s->vio.mutex || !s->vio._cont) {
02156     if (!lock.have_lock && s->vio.mutex && s->vio._cont) {
02157       goto retry;
02158     } else {
02159       // No active VIO
02160 #ifdef CLUSTER_STATS
02161       _fw_no_active_vio++;
02162 #endif
02163       return 0;
02164     }
02165   }
02166   //
02167   // If this connection has been closed remotely, send EOS
02168   //
02169   if (vc->was_remote_closed()) {
02170     if (vc->write_bytes_in_transit || vc->schedule_write()) {
02171       // Defer close until write data is pushed
02172       return 0;
02173     }
02174     remote_close(vc, s);
02175     return 0;
02176   }
02177   //
02178   // If not enabled or not WRITE
02179   //
02180   if (!s->enabled || s->vio.op != VIO::READ) {
02181 #ifdef CLUSTER_STATS
02182     _fw_not_enabled_or_no_read++;
02183 #endif
02184     return 0;
02185   }
02186 
02187   int64_t ntodo = s->vio.ntodo();
02188   ink_assert(ntodo >= 0);
02189 
02190   if (ntodo <= 0) {
02191     cluster_signal_and_update(VC_EVENT_READ_COMPLETE, vc, s);
02192     return 0;
02193   }
02194 
02195   int64_t bytes_to_move = vc->initial_data_bytes;
02196   if (vc->read_block && bytes_to_move) {
02197 
02198     // Push initial read data into VC
02199 
02200     if (ntodo >= bytes_to_move) {
02201       Debug("cluster_vc_xfer", "finish initial data push ch %d bytes %" PRId64, vc->channel, vc->read_block->read_avail());
02202 
02203       s->vio.buffer.writer()->append_block(vc->read_block->clone());
02204       vc->read_block = 0;
02205 
02206     } else {
02207       bytes_to_move = ntodo;
02208 
02209       Debug("cluster_vc_xfer", "initial data push ch %d bytes %" PRId64, vc->channel, bytes_to_move);
02210 
02211       // Clone a portion of the data
02212 
02213       IOBufferBlock *b, *btail;
02214       b = clone_IOBufferBlockList(vc->read_block, 0, bytes_to_move, &btail);
02215       s->vio.buffer.writer()->append_block(b);
02216       vc->read_block->consume(bytes_to_move);
02217     }
02218     s->vio.ndone += bytes_to_move;
02219     vc->initial_data_bytes -= bytes_to_move;
02220 
02221     if (s->vio.ntodo() <= 0) {
02222       s->enabled = 0;
02223       cluster_signal_and_update_locked(VC_EVENT_READ_COMPLETE, vc, s);
02224       return 0;
02225 
02226     } else {
02227       if (vc->have_all_data) {
02228         if (!vc->read_block) {
02229           s->enabled = 0;
02230           cluster_signal_and_update(VC_EVENT_EOS, vc, s);
02231           return 0;
02232         }
02233       }
02234       if (cluster_signal_and_update_locked(VC_EVENT_READ_READY, vc, s)
02235           == EVENT_DONE)
02236         return false;
02237 
02238       if (s->vio.ntodo() <= 0)
02239         s->enabled = 0;
02240 
02241       if (vc->initial_data_bytes)
02242         return 0;
02243     }
02244   }
02245   // At this point, all initial read data passed in the open_read reply
02246   // has been moved into the user VC.
02247   // Now allow send of freespace to receive additional data.
02248 
02249   int64_t nextfree = vc->read.vio.ndone;
02250 
02251   nextfree = (nextfree + DEFAULT_MAX_BUFFER_SIZE - 1) / DEFAULT_MAX_BUFFER_SIZE;
02252   nextfree *= DEFAULT_MAX_BUFFER_SIZE;
02253 
02254   if (nextfree >= (vc->last_local_free / 2)) {
02255     nextfree = vc->last_local_free + (8 * DEFAULT_MAX_BUFFER_SIZE);
02256   }
02257 
02258   if ((vc->last_local_free == 0) || (nextfree >= vc->last_local_free)) {
02259     Debug(CL_PROTO, "(%d) update freespace %" PRId64, vc->channel, nextfree);
02260     //
02261     // Have good VC candidate locked for freespace write
02262     //
02263     return nextfree;
02264 
02265   } else {
02266     // No free space update required
02267     return 0;
02268   }
02269 }
02270 
02271 void
02272 ClusterHandler::vcs_push(ClusterVConnection * vc, int type)
02273 {
02274   if (vc->type <= VC_CLUSTER)
02275     vc->type = type;
02276 
02277   while ((vc->type > VC_CLUSTER) && !vc->in_vcs && ink_atomic_cas(pvint32(&vc->in_vcs), 0, 1)) {
02278     if (vc->type == VC_CLUSTER_READ)
02279       ink_atomiclist_push(&vc->ch->read_vcs_ready, (void *)vc);
02280     else
02281       ink_atomiclist_push(&vc->ch->write_vcs_ready, (void *)vc);
02282     return;
02283   }
02284 }
02285 
02286 int
02287 ClusterHandler::remote_close(ClusterVConnection * vc, ClusterVConnState * ns)
02288 {
02289   if (ns->vio.op != VIO::NONE && !vc->closed) {
02290     ns->enabled = 0;
02291     if (vc->remote_closed > 0) {
02292       if (ns->vio.op == VIO::READ) {
02293         if (ns->vio.nbytes == ns->vio.ndone) {
02294           return cluster_signal_and_update(VC_EVENT_READ_COMPLETE, vc, ns);
02295         } else {
02296           return cluster_signal_and_update(VC_EVENT_EOS, vc, ns);
02297         }
02298       } else {
02299         return cluster_signal_and_update(VC_EVENT_EOS, vc, ns);
02300       }
02301     } else {
02302       return cluster_signal_error_and_update(vc, ns, vc->remote_lerrno);
02303     }
02304   }
02305   return EVENT_CONT;
02306 }
02307 
02308 void
02309 ClusterHandler::steal_thread(EThread * t)
02310 {
02311   //
02312   // Attempt to push the control message now instead of waiting
02313   // for the periodic event to process it.
02314   //
02315   if (t != thread &&            // different thread to steal
02316       write.to_do <= 0 &&       // currently not trying to send data
02317       // nothing big outstanding
02318       !write.msg.count) {
02319     mainClusterEvent(CLUSTER_EVENT_STEAL_THREAD, (Event *) t);
02320   }
02321 }
02322 
02323 void
02324 ClusterHandler::free_locks(bool read_flag, int i)
02325 {
02326   //
02327   // Free VC locks.  Handle partial acquires up to i
02328   //
02329   if (i == CLUSTER_FREE_ALL_LOCKS) {
02330     if (read_flag) {
02331       i = (read.msg.state >= 2 ? read.msg.count : 0);
02332     } else {
02333       i = write.msg.count;
02334     }
02335   }
02336   ClusterState & s = (read_flag ? read : write);
02337   for (int j = 0; j < i; j++) {
02338     if (s.msg.descriptor[j].type == CLUSTER_SEND_DATA && s.msg.descriptor[j].channel != CLUSTER_CONTROL_CHANNEL) {
02339       ClusterVConnection *vc = channels[s.msg.descriptor[j].channel];
02340       if (VALID_CHANNEL(vc)) {
02341         if (read_flag) {
02342           if ((ProxyMutex *) vc->read_locked) {
02343             MUTEX_UNTAKE_LOCK(vc->read.vio.mutex, thread);
02344             vc->read_locked = NULL;
02345           }
02346         } else {
02347           if ((ProxyMutex *) vc->write_locked) {
02348             MUTEX_UNTAKE_LOCK(vc->write_locked, thread);
02349             vc->write_locked = NULL;
02350           }
02351         }
02352       }
02353     } else if (!read_flag &&
02354                s.msg.descriptor[j].type == CLUSTER_SEND_FREE &&
02355                s.msg.descriptor[j].channel != CLUSTER_CONTROL_CHANNEL) {
02356       ClusterVConnection *vc = channels[s.msg.descriptor[j].channel];
02357       if (VALID_CHANNEL(vc)) {
02358         if ((ProxyMutex *) vc->read_locked) {
02359           MUTEX_UNTAKE_LOCK(vc->read_locked, thread);
02360           vc->read_locked = NULL;
02361         }
02362       }
02363     }
02364   }
02365 }
02366 
02367 #ifdef CLUSTER_IMMEDIATE_NETIO
02368 void
02369 ClusterHandler::build_poll(bool next)
02370 {
02371   Pollfd *pfd;
02372   if (next) {
02373     pfd = thread->nextPollDescriptor->alloc();
02374     pfd->fd = net_vc->get_socket();
02375     ifd = pfd - thread->nextPollDescriptor->pfd;
02376   } else {
02377     pfd = thread->pollDescriptor->alloc();
02378     pfd->fd = net_vc->get_socket();
02379     ifd = pfd - thread->pollDescriptor->pfd;
02380   }
02381   pfd->events = POLLHUP;
02382   if (next) {
02383     if (read.to_do)
02384       pfd->events |= POLLIN;
02385     if (write.to_do)
02386       pfd->events |= POLLOUT;
02387   } else {
02388     // we have to lie since we are in the same cycle
02389     pfd->events = POLLIN | POLLOUT;
02390     // reads/writes are non-blocking anyway
02391     pfd->revents = POLLIN | POLLOUT;
02392   }
02393 }
02394 #endif // CLUSTER_IMMEDIATE_NETIO
02395 
02396 extern int CacheClusterMonitorEnabled;
02397 extern int CacheClusterMonitorIntervalSecs;
02398 
02399 
02400 //
02401 // The main event for machine-machine link
02402 //
02403 int
02404 ClusterHandler::mainClusterEvent(int event, Event * e)
02405 {
02406   // Set global time
02407   current_time = ink_get_hrtime();
02408 
02409   if (CacheClusterMonitorEnabled) {
02410     if ((current_time - last_trace_dump) > HRTIME_SECONDS(CacheClusterMonitorIntervalSecs)) {
02411       last_trace_dump = current_time;
02412       dump_internal_data();
02413     }
02414   }
02415   //
02416   // Note: The caller always acquires the ClusterHandler mutex prior
02417   //       to the call.  This guarantees single threaded access in
02418   //       mainClusterEvent()
02419   //
02420 
02421   /////////////////////////////////////////////////////////////////////////
02422   // If cluster interconnect is overloaded, disable remote cluster ops.
02423   /////////////////////////////////////////////////////////////////////////
02424 #ifndef DEBUG
02425   if (clm && ClusterLoadMonitor::cf_monitor_enabled > 0) {
02426 #else
02427   if (0) {
02428 #endif
02429     bool last_state = disable_remote_cluster_ops;
02430     if (clm->is_cluster_overloaded()) {
02431       disable_remote_cluster_ops = true;
02432     } else {
02433       disable_remote_cluster_ops = false;
02434     }
02435     if (last_state != disable_remote_cluster_ops) {
02436       if (disable_remote_cluster_ops) {
02437         Note("Network congestion to [%u.%u.%u.%u] encountered, reverting to proxy only mode", DOT_SEPARATED(ip));
02438       } else {
02439         Note("Network congestion to [%u.%u.%u.%u] cleared, reverting to cache mode", DOT_SEPARATED(ip));
02440         last_cluster_op_enable = current_time;
02441       }
02442     }
02443   }
02444 
02445   on_stolen_thread = (event == CLUSTER_EVENT_STEAL_THREAD);
02446   bool io_callback = (event == EVENT_IMMEDIATE);
02447 
02448   if (on_stolen_thread) {
02449     thread = (EThread *) e;
02450   } else {
02451     if (io_callback) {
02452       thread = this_ethread();
02453     } else {
02454       thread = e->ethread;
02455     }
02456   }
02457 
02458   int io_activity = 1;
02459   bool only_write_control_msgs;
02460   int res;
02461 
02462   while (io_activity) {
02463     io_activity = 0;
02464     only_write_control_msgs = 0;
02465 
02466     if (downing) {
02467       machine_down();
02468       break;
02469     }
02470 
02471     //////////////////////////
02472     // Read Processing
02473     //////////////////////////
02474     if (!on_stolen_thread) {
02475       if (delayed_reads.head) {
02476         CLUSTER_INCREMENT_DYN_STAT(CLUSTER_DELAYED_READS_STAT);
02477         finish_delayed_reads();
02478       }
02479       if ((res = process_read(current_time)) < 0) {
02480         break;
02481       }
02482       io_activity += res;
02483 
02484       if (delayed_reads.head) {
02485         CLUSTER_INCREMENT_DYN_STAT(CLUSTER_DELAYED_READS_STAT);
02486         finish_delayed_reads();
02487       }
02488     }
02489     /////////////////////////
02490     // Write Processing
02491     /////////////////////////
02492     if ((res = process_write(current_time, only_write_control_msgs)) < 0) {
02493       break;
02494     }
02495     io_activity += res;
02496 
02497     /////////////////////////////////////////
02498     // Process deferred open_local requests
02499     /////////////////////////////////////////
02500     if (!on_stolen_thread) {
02501       if (do_open_local_requests())
02502                 thread->signal_hook(thread);
02503     }
02504   }
02505 
02506 #ifdef CLUSTER_IMMEDIATE_NETIO
02507   if (!dead && ((event == EVENT_POLL) || (event == EVENT_INTERVAL))) {
02508     if (res >= 0) {
02509       build_poll(true);
02510     }
02511   }
02512 #endif
02513   return EVENT_CONT;
02514 }
02515 
02516 int
02517 ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */)
02518 {
02519 #ifdef CLUSTER_STATS
02520   _process_read_calls++;
02521 #endif
02522   if (dead) {
02523     // Node is down
02524     return 0;
02525   }
02526   ///////////////////////////////
02527   // Cluster read state machine
02528   ///////////////////////////////
02529 
02530   for (;;) {
02531 
02532     switch (read.state) {
02533       ///////////////////////////////////////////////
02534     case ClusterState::READ_START:
02535       ///////////////////////////////////////////////
02536       {
02537 #ifdef CLUSTER_STATS
02538         _n_read_start++;
02539 #endif
02540         read.msg.clear();
02541         read.start_time = ink_get_hrtime();
02542         if (build_initial_vector(CLUSTER_READ)) {
02543           read.state = ClusterState::READ_HEADER;
02544         } else {
02545           return 0;
02546         }
02547         break;
02548       }
02549       ///////////////////////////////////////////////
02550     case ClusterState::READ_HEADER:
02551       ///////////////////////////////////////////////
02552       {
02553 #ifdef CLUSTER_STATS
02554         _n_read_header++;
02555 #endif
02556         read.state = ClusterState::READ_AWAIT_HEADER;
02557         if (!read.doIO()) {
02558           // i/o not initiated, retry later
02559           read.state = ClusterState::READ_HEADER;
02560           return 0;
02561         }
02562         break;
02563       }
02564       ///////////////////////////////////////////////
02565     case ClusterState::READ_AWAIT_HEADER:
02566       ///////////////////////////////////////////////
02567       {
02568 #ifdef CLUSTER_STATS
02569         _n_read_await_header++;
02570 #endif
02571         if (!read.io_complete) {
02572           return 0;
02573         } else {
02574           if (read.io_complete < 0) {
02575             // read error, declare node down
02576             machine_down();
02577             return -1;
02578           }
02579         }
02580         if (read.to_do) {
02581           if (read.bytes_xfered) {
02582             CLUSTER_INCREMENT_DYN_STAT(CLUSTER_PARTIAL_READS_STAT);
02583             read.state = ClusterState::READ_HEADER;
02584             break;
02585           } else {
02586             // Zero byte read
02587             read.state = ClusterState::READ_HEADER;
02588             return 0;
02589           }
02590         } else {
02591 #ifdef MSG_TRACE
02592           fprintf(t_fd,
02593                   "[R] seqno=%d count=%d control_bytes=%d count_check=%d dsum=%d csum=%d\n",
02594                   read.sequence_number,
02595                   read.msg.hdr()->count, read.msg.hdr()->control_bytes,
02596                   read.msg.hdr()->count_check, read.msg.hdr()->descriptor_cksum, read.msg.hdr()->control_bytes_cksum);
02597           fflush(t_fd);
02598 #endif
02599           CLUSTER_SUM_DYN_STAT(CLUSTER_READ_BYTES_STAT, read.did);
02600           if (needByteSwap) {
02601             read.msg.hdr()->SwapBytes();
02602           }
02603           read.msg.count = read.msg.hdr()->count;
02604           read.msg.control_bytes = read.msg.hdr()->control_bytes;
02605           read.msg.descriptor_cksum = read.msg.hdr()->descriptor_cksum;
02606           read.msg.control_bytes_cksum = read.msg.hdr()->control_bytes_cksum;
02607           read.msg.unused = read.msg.hdr()->unused;
02608 
02609           if (MAGIC_COUNT(read) != read.msg.hdr()->count_check) {
02610             ink_assert(!"Read bad ClusterMsgHeader data");
02611             Warning("Bad ClusterMsgHeader read on [%d.%d.%d.%d], restarting", DOT_SEPARATED(ip));
02612             Note("Cluster read from [%u.%u.%u.%u] failed, declaring down", DOT_SEPARATED(ip));
02613             machine_down();
02614             return -1;
02615           }
02616 
02617           if (read.msg.count || read.msg.control_bytes) {
02618             read.msg.state++;
02619             read.state = ClusterState::READ_SETUP_DESCRIPTOR;
02620           } else {
02621             read.state = ClusterState::READ_COMPLETE;
02622           }
02623           break;
02624         }
02625       }
02626       ///////////////////////////////////////////////
02627     case ClusterState::READ_SETUP_DESCRIPTOR:
02628       ///////////////////////////////////////////////
02629       {
02630 #ifdef CLUSTER_STATS
02631         _n_read_setup_descriptor++;
02632 #endif
02633         if (build_initial_vector(CLUSTER_READ)) {
02634           read.state = ClusterState::READ_DESCRIPTOR;
02635         } else {
02636           return 0;
02637         }
02638         break;
02639       }
02640       ///////////////////////////////////////////////
02641     case ClusterState::READ_DESCRIPTOR:
02642       ///////////////////////////////////////////////
02643       {
02644 #ifdef CLUSTER_STATS
02645         _n_read_descriptor++;
02646 #endif
02647         read.state = ClusterState::READ_AWAIT_DESCRIPTOR;
02648         if (!read.doIO()) {
02649           // i/o not initiated, retry later
02650           read.state = ClusterState::READ_DESCRIPTOR;
02651           return 0;
02652         }
02653         break;
02654       }
02655       ///////////////////////////////////////////////
02656     case ClusterState::READ_AWAIT_DESCRIPTOR:
02657       ///////////////////////////////////////////////
02658       {
02659 #ifdef CLUSTER_STATS
02660         _n_read_await_descriptor++;
02661 #endif
02662         if (!read.io_complete) {
02663           return 0;
02664         } else {
02665           if (read.io_complete < 0) {
02666             // read error, declare node down
02667             machine_down();
02668             return -1;
02669           }
02670         }
02671         if (read.to_do) {
02672           if (read.bytes_xfered) {
02673             CLUSTER_INCREMENT_DYN_STAT(CLUSTER_PARTIAL_READS_STAT);
02674             read.state = ClusterState::READ_DESCRIPTOR;
02675             break;
02676           } else {
02677             // Zero byte read
02678             read.state = ClusterState::READ_DESCRIPTOR;
02679             return 0;
02680           }
02681         } else {
02682 #ifdef CLUSTER_MESSAGE_CKSUM
02683           ink_release_assert(read.msg.calc_descriptor_cksum() == read.msg.descriptor_cksum);
02684           ink_release_assert(read.msg.calc_control_bytes_cksum() == read.msg.control_bytes_cksum);
02685 #endif
02686           CLUSTER_SUM_DYN_STAT(CLUSTER_READ_BYTES_STAT, read.did);
02687           if (needByteSwap) {
02688             // Descriptors need byte swap
02689             swap_descriptor_bytes();
02690           }
02691           if (read.msg.count == 0) {
02692             read.bytes_xfered = 0;
02693             read.state = ClusterState::READ_COMPLETE;
02694           } else {
02695             read.msg.state++;
02696             read.state = ClusterState::READ_SETUP_DATA;
02697           }
02698           break;
02699         }
02700       }
02701       ///////////////////////////////////////////////
02702     case ClusterState::READ_SETUP_DATA:
02703       ///////////////////////////////////////////////
02704       {
02705 #ifdef CLUSTER_STATS
02706         _n_read_setup_data++;
02707 #endif
02708         if (build_initial_vector(CLUSTER_READ)) {
02709           free_locks(CLUSTER_READ);
02710           if (read.to_do) {
02711             read.state = ClusterState::READ_DATA;
02712           } else {
02713             // Descriptor contains no VC data
02714             read.state = ClusterState::READ_COMPLETE;
02715           }
02716         } else {
02717           return 0;
02718         }
02719         break;
02720       }
02721       ///////////////////////////////////////////////
02722     case ClusterState::READ_DATA:
02723       ///////////////////////////////////////////////
02724       {
02725 #ifdef CLUSTER_STATS
02726         _n_read_data++;
02727 #endif
02728         ink_release_assert(read.to_do);
02729         read.state = ClusterState::READ_AWAIT_DATA;
02730         if (!read.doIO()) {
02731           // i/o not initiated, retry later
02732           read.state = ClusterState::READ_DATA;
02733           return 0;
02734         }
02735         break;
02736       }
02737       ///////////////////////////////////////////////
02738     case ClusterState::READ_AWAIT_DATA:
02739       ///////////////////////////////////////////////
02740       {
02741 #ifdef CLUSTER_STATS
02742         _n_read_await_data++;
02743 #endif
02744         if (!read.io_complete) {
02745           return 0;             // awaiting i/o complete
02746         } else {
02747           if (read.io_complete > 0) {
02748             read.state = ClusterState::READ_POST_COMPLETE;
02749           } else {
02750             // read error, declare node down
02751             machine_down();
02752             return -1;
02753           }
02754         }
02755         break;
02756       }
02757       ///////////////////////////////////////////////
02758     case ClusterState::READ_POST_COMPLETE:
02759       ///////////////////////////////////////////////
02760       {
02761 #ifdef CLUSTER_STATS
02762         _n_read_post_complete++;
02763 #endif
02764         if (!get_read_locks()) {
02765           return 0;
02766         }
02767         if (read.to_do) {
02768           if (read.bytes_xfered) {
02769             update_channels_partial_read();
02770             free_locks(CLUSTER_READ);
02771             CLUSTER_SUM_DYN_STAT(CLUSTER_READ_BYTES_STAT, read.bytes_xfered);
02772             CLUSTER_INCREMENT_DYN_STAT(CLUSTER_PARTIAL_READS_STAT);
02773             read.state = ClusterState::READ_DATA;
02774             return 1;
02775           } else {
02776             // Zero byte read
02777             free_locks(CLUSTER_READ);
02778             read.state = ClusterState::READ_DATA;
02779             return 0;
02780           }
02781         } else {
02782           CLUSTER_SUM_DYN_STAT(CLUSTER_READ_BYTES_STAT, read.bytes_xfered);
02783           read.state = ClusterState::READ_COMPLETE;
02784           break;
02785         }
02786       }
02787       ///////////////////////////////////////////////
02788     case ClusterState::READ_COMPLETE:
02789       ///////////////////////////////////////////////
02790       {
02791 #ifdef CLUSTER_STATS
02792         _n_read_complete++;
02793 #endif
02794         ink_hrtime rdmsg_end_time = ink_get_hrtime();
02795         CLUSTER_SUM_DYN_STAT(CLUSTER_RDMSG_ASSEMBLE_TIME_STAT, rdmsg_end_time - read.start_time);
02796         read.start_time = HRTIME_MSECONDS(0);
02797         if (dump_msgs)
02798           dump_read_msg();
02799         read.sequence_number++;
02800         update_channels_read();
02801         free_locks(CLUSTER_READ);
02802 
02803         read.state = ClusterState::READ_START;
02804         break;                  // setup next read
02805       }
02806       //////////////////
02807     default:
02808       //////////////////
02809       {
02810         ink_release_assert(!"ClusterHandler::process_read invalid state");
02811       }
02812 
02813     }                           // end of switch
02814   }                             // end of for
02815 }
02816 
02817 int
02818 ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs)
02819 {
02820 #ifdef CLUSTER_STATS
02821   _process_write_calls++;
02822 #endif
02823   /////////////////////////////////
02824   // Cluster write state machine
02825   /////////////////////////////////
02826   for (;;) {
02827 
02828     switch (write.state) {
02829       ///////////////////////////////////////////////
02830     case ClusterState::WRITE_START:
02831       ///////////////////////////////////////////////
02832       {
02833 #ifdef CLUSTER_STATS
02834         _n_write_start++;
02835 #endif
02836         write.msg.clear();
02837         write.last_time = ink_get_hrtime();
02838         pw_write_descriptors_built = -1;
02839         pw_freespace_descriptors_built = -1;
02840         pw_controldata_descriptors_built = -1;
02841         pw_time_expired = 0;
02842         write.state = ClusterState::WRITE_SETUP;
02843         break;
02844       }
02845       ///////////////////////////////////////////////
02846     case ClusterState::WRITE_SETUP:
02847       ///////////////////////////////////////////////
02848       {
02849 #ifdef CLUSTER_STATS
02850         _n_write_setup++;
02851 #endif
02852         if (!on_stolen_thread && !only_write_control_msgs) {
02853           /////////////////////////////////////////////////////////////
02854           // Build a complete write descriptor containing control,
02855           // data and freespace message data.
02856           /////////////////////////////////////////////////////////////
02857 
02858           // Control message descriptors
02859           if (pw_controldata_descriptors_built) {
02860             pw_controldata_descriptors_built = build_controlmsg_descriptors();
02861           }
02862           // Write data descriptors
02863           if (pw_write_descriptors_built) {
02864             pw_write_descriptors_built = build_write_descriptors();
02865           }
02866           // Free space descriptors
02867           if (pw_freespace_descriptors_built) {
02868             pw_freespace_descriptors_built = build_freespace_descriptors();
02869           }
02870           add_small_controlmsg_descriptors();   // always last
02871         } else {
02872           /////////////////////////////////////////////////////////////
02873           // Build a write descriptor only containing control data.
02874           /////////////////////////////////////////////////////////////
02875           pw_write_descriptors_built = 0;
02876           pw_freespace_descriptors_built = 0;
02877           pw_controldata_descriptors_built = build_controlmsg_descriptors();
02878           add_small_controlmsg_descriptors();   // always last
02879         }
02880 
02881         // If nothing to write, post write completion
02882         if (!pw_controldata_descriptors_built && !pw_write_descriptors_built && !pw_freespace_descriptors_built) {
02883           write.state = ClusterState::WRITE_COMPLETE;
02884           break;
02885         } else {
02886           started_on_stolen_thread = on_stolen_thread;
02887           control_message_write = only_write_control_msgs;
02888         }
02889 
02890         // Move required data into the message header
02891 #ifdef CLUSTER_MESSAGE_CKSUM
02892         write.msg.descriptor_cksum = write.msg.calc_descriptor_cksum();
02893         write.msg.hdr()->descriptor_cksum = write.msg.descriptor_cksum;
02894 
02895         write.msg.control_bytes_cksum = write.msg.calc_control_bytes_cksum();
02896         write.msg.hdr()->control_bytes_cksum = write.msg.control_bytes_cksum;
02897         write.msg.unused = 0;
02898 #endif
02899         write.msg.hdr()->count = write.msg.count;
02900         write.msg.hdr()->control_bytes = write.msg.control_bytes;
02901         write.msg.hdr()->count_check = MAGIC_COUNT(write);
02902 
02903         ink_release_assert(build_initial_vector(CLUSTER_WRITE));
02904         free_locks(CLUSTER_WRITE);
02905         write.state = ClusterState::WRITE_INITIATE;
02906         break;
02907       }
02908       ///////////////////////////////////////////////
02909     case ClusterState::WRITE_INITIATE:
02910       ///////////////////////////////////////////////
02911       {
02912 #ifdef CLUSTER_STATS
02913         _n_write_initiate++;
02914 #endif
02915         write.state = ClusterState::WRITE_AWAIT_COMPLETION;
02916         if (!write.doIO()) {
02917           // i/o not initiated, retry later
02918           write.state = ClusterState::WRITE_INITIATE;
02919           return 0;
02920         }
02921         break;
02922       }
02923       ///////////////////////////////////////////////
02924     case ClusterState::WRITE_AWAIT_COMPLETION:
02925       ///////////////////////////////////////////////
02926       {
02927 #ifdef CLUSTER_STATS
02928         _n_write_await_completion++;
02929 #endif
02930         if (!write.io_complete) {
02931           // Still waiting for write i/o completion
02932           return 0;
02933         } else {
02934           if (write.io_complete < 0) {
02935             // write error, declare node down
02936             machine_down();
02937             write.state = ClusterState::WRITE_INITIATE;
02938             break;
02939           }
02940           if (write.to_do) {
02941             if (write.bytes_xfered) {
02942               CLUSTER_INCREMENT_DYN_STAT(CLUSTER_PARTIAL_WRITES_STAT);
02943               write.state = ClusterState::WRITE_INITIATE;
02944               break;
02945             } else {
02946               // Zero byte write
02947               write.state = ClusterState::WRITE_INITIATE;
02948               return 0;
02949             }
02950           }
02951           CLUSTER_SUM_DYN_STAT(CLUSTER_WRITE_BYTES_STAT, write.bytes_xfered);
02952           write.sequence_number++;
02953           write.state = ClusterState::WRITE_POST_COMPLETE;
02954         }
02955         break;
02956       }
02957       ///////////////////////////////////////////////
02958     case ClusterState::WRITE_POST_COMPLETE:
02959       ///////////////////////////////////////////////
02960       {
02961 #ifdef CLUSTER_STATS
02962         _n_write_post_complete++;
02963 #endif
02964         if (!get_write_locks()) {
02965           CLUSTER_INCREMENT_DYN_STAT(CLUSTER_WRITE_LOCK_MISSES_STAT);
02966           return 0;
02967         }
02968         //
02969         // Move the channels into their new buckets based on how much
02970         // was written
02971         //
02972         update_channels_written();
02973         free_locks(CLUSTER_WRITE);
02974         write.state = ClusterState::WRITE_COMPLETE;
02975         break;
02976       }
02977       ///////////////////////////////////////////////
02978     case ClusterState::WRITE_COMPLETE:
02979       ///////////////////////////////////////////////
02980       {
02981 #ifdef CLUSTER_STATS
02982         _n_write_complete++;
02983 #endif
02984         write.state = ClusterState::WRITE_START;
02985         ink_hrtime curtime = ink_get_hrtime();
02986 
02987         if (!on_stolen_thread) {
02988           //
02989           // Complete all work in the current bucket before moving to next
02990           //
02991           pw_time_expired = (curtime - now) > CLUSTER_MAX_RUN_TIME;
02992 
02993           if (!control_message_write && !pw_write_descriptors_built
02994               && !pw_freespace_descriptors_built && !pw_controldata_descriptors_built) {
02995             // skip to the next bucket
02996             cur_vcs = (cur_vcs + 1) % CLUSTER_BUCKETS;
02997           }
02998         } else {
02999           //
03000           // Place an upper bound on thread stealing
03001           //
03002           pw_time_expired = (curtime - now) > CLUSTER_MAX_THREAD_STEAL_TIME;
03003           if (pw_time_expired) {
03004             CLUSTER_INCREMENT_DYN_STAT(CLUSTER_THREAD_STEAL_EXPIRES_STAT);
03005           }
03006         }
03007         //
03008         // periodic activities
03009         //
03010         if (!on_stolen_thread && !cur_vcs && !dead) {
03011           //
03012           // check if this machine is supposed to be in the cluster
03013           //
03014           MachineList *mc = the_cluster_machines_config();
03015           if (mc && !mc->find(ip, port)) {
03016             Note("Cluster [%u.%u.%u.%u:%d] not in config, declaring down", DOT_SEPARATED(ip), port);
03017             machine_down();
03018           }
03019         }
03020         if (pw_time_expired) {
03021           return -1;            // thread run time expired
03022         } else {
03023           if (pw_write_descriptors_built || pw_freespace_descriptors_built || pw_controldata_descriptors_built) {
03024             break;              // start another write
03025           } else {
03026             return 0;           // no more data to write
03027           }
03028         }
03029       }
03030       //////////////////
03031     default:
03032       //////////////////
03033       {
03034         ink_release_assert(!"ClusterHandler::process_write invalid state");
03035       }
03036 
03037     }                           // End of switch
03038   }                             // End of for
03039 }
03040 
03041 int
03042 ClusterHandler::do_open_local_requests()
03043 {
03044   //
03045   // open_local requests which are unable to obtain the ClusterHandler
03046   // mutex are deferred and placed onto external_incoming_open_local queue.
03047   // It is here where we process the open_local requests within the
03048   // ET_CLUSTER thread.
03049   //
03050   int pending_request = 0;
03051   ClusterVConnection *cvc;
03052   ClusterVConnection *cvc_ext;
03053   ClusterVConnection *cvc_ext_next;
03054   EThread *tt = this_ethread();
03055   Queue<ClusterVConnection> local_incoming_open_local;
03056 
03057   //
03058   // Atomically dequeue all requests from the external queue and
03059   // move them to the local working queue while maintaining insertion order.
03060   //
03061   while (true) {
03062     cvc_ext = (ClusterVConnection *)
03063       ink_atomiclist_popall(&external_incoming_open_local);
03064     if (cvc_ext == 0)
03065       break;
03066 
03067     while (cvc_ext) {
03068       cvc_ext_next = (ClusterVConnection *) cvc_ext->link.next;
03069       cvc_ext->link.next = NULL;
03070       local_incoming_open_local.push(cvc_ext);
03071       cvc_ext = cvc_ext_next;
03072     }
03073 
03074     // Process deferred open_local requests.
03075 
03076     while ((cvc = local_incoming_open_local.pop())) {
03077       MUTEX_TRY_LOCK(lock, cvc->action_.mutex, tt);
03078       if (lock) {
03079         if (cvc->start(tt) < 0) {
03080           cvc->token.clear();
03081           if (cvc->action_.continuation) {
03082             cvc->action_.continuation->handleEvent(CLUSTER_EVENT_OPEN_FAILED, 0);
03083             clusterVCAllocator.free(cvc);
03084           }
03085         }
03086         MUTEX_RELEASE(lock);
03087 
03088       } else {
03089         // unable to get mutex, insert request back onto global queue.
03090         Debug(CL_TRACE, "do_open_local_requests() unable to acquire mutex (cvc=%p)", cvc);
03091         pending_request = 1;
03092         ink_atomiclist_push(&external_incoming_open_local, (void *) cvc);
03093       }
03094     }
03095   }
03096   return pending_request;
03097 }
03098 
03099 // End of ClusterHandler.cc

Generated by  doxygen 1.7.1