• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

ClusterHandlerBase.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 /****************************************************************************
00025 
00026   ClusterHandlerBase.cc
00027 ****************************************************************************/
00028 
00029 #include "P_Cluster.h"
00030 
00031 extern int cluster_receive_buffer_size;
00032 extern int cluster_send_buffer_size;
00033 extern uint32_t cluster_sockopt_flags;
00034 extern uint32_t cluster_packet_mark;
00035 extern uint32_t cluster_packet_tos;
00036 extern int num_of_cluster_threads;
00037 
00038 
00039 ///////////////////////////////////////////////////////////////
00040 // Incoming message continuation for periodic callout threads
00041 ///////////////////////////////////////////////////////////////
00042 
00043 ClusterCalloutContinuation::ClusterCalloutContinuation(struct ClusterHandler *ch)
00044   :
00045 Continuation(0),
00046 _ch(ch)
00047 {
00048   mutex = new_ProxyMutex();
00049   SET_HANDLER((ClstCoutContHandler)
00050               & ClusterCalloutContinuation::CalloutHandler);
00051 }
00052 
00053 ClusterCalloutContinuation::~ClusterCalloutContinuation()
00054 {
00055   mutex = 0;
00056 }
00057 
00058 int
00059 ClusterCalloutContinuation::CalloutHandler(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
00060 {
00061   return _ch->process_incoming_callouts(this->mutex);
00062 }
00063 
00064 /*************************************************************************/
00065 // ClusterControl member functions (Internal Class)
00066 /*************************************************************************/
00067 ClusterControl::ClusterControl():
00068 Continuation(NULL), len(0), size_index(-1), real_data(0), data(0), free_proc(0), free_proc_arg(0), iob_block(0)
00069 {
00070 }
00071 
00072 void
00073 ClusterControl::real_alloc_data(int read_access, bool align_int32_on_non_int64_boundary)
00074 {
00075   EThread *thread = this_ethread();
00076   ProxyMutex *mutex = thread->mutex;
00077 
00078   ink_assert(!data);
00079   if ((len + DATA_HDR + sizeof(int32_t)) <= DEFAULT_MAX_BUFFER_SIZE) {
00080     size_index = buffer_size_to_index(len + DATA_HDR + sizeof(int32_t), MAX_BUFFER_SIZE_INDEX);
00081     iob_block = new_IOBufferBlock();
00082     iob_block->alloc(size_index);       // aligns on 8 byte boundary
00083     real_data = (int64_t *) iob_block->buf();
00084 
00085     if (align_int32_on_non_int64_boundary) {
00086       data = ((char *) real_data) + sizeof(int32_t) + DATA_HDR;
00087     } else {
00088       data = ((char *) real_data) + DATA_HDR;
00089     }
00090   } else {
00091     int size = sizeof(int64_t) * (((len + DATA_HDR + sizeof(int32_t) + sizeof(int64_t) - 1) / sizeof(int64_t)) + 1);
00092     size_index = -1;
00093     iob_block = new_IOBufferBlock();
00094     iob_block->alloc(BUFFER_SIZE_FOR_XMALLOC(size));
00095     real_data = (int64_t *) iob_block->buf();
00096 
00097     if (align_int32_on_non_int64_boundary) {
00098       data = (char *) DOUBLE_ALIGN(real_data) + sizeof(int32_t) + DATA_HDR;
00099     } else {
00100       data = (char *) DOUBLE_ALIGN(real_data) + DATA_HDR;
00101     }
00102     CLUSTER_INCREMENT_DYN_STAT(CLUSTER_ALLOC_DATA_NEWS_STAT);
00103   }
00104 
00105   // IOBufferBlock adjustments
00106   if (read_access) {
00107     // Make iob_block->read_avail() == len
00108     iob_block->fill((char *) data - (char *) real_data);        // skip header
00109     iob_block->consume((char *) data - (char *) real_data);     // skip header
00110     iob_block->fill(len);
00111   } else {
00112     // Make iob_block->write_avail() == len
00113     iob_block->fill((char *) data - (char *) real_data);        // skip header
00114     iob_block->consume((char *) data - (char *) real_data);     // skip header
00115     iob_block->_buf_end = iob_block->end() + len;
00116   }
00117 
00118   // Write size_index, magic number and 'this' in leading bytes
00119   char *size_index_ptr = (char *) data - DATA_HDR;
00120   *size_index_ptr = size_index;
00121   ++size_index_ptr;
00122 
00123   *size_index_ptr = (char) ALLOC_DATA_MAGIC;
00124   ++size_index_ptr;
00125 
00126   void *val = (void *) this;
00127   memcpy(size_index_ptr, (char *) &val, sizeof(void *));
00128 }
00129 
00130 void
00131 ClusterControl::free_data()
00132 {
00133   if (data && iob_block) {
00134     if (free_proc) {
00135       // Free memory via callback proc
00136       (*free_proc) (free_proc_arg);
00137       iob_block = 0;            // really free memory
00138       return;
00139     }
00140     if (real_data) {
00141       ink_release_assert(*(((uint8_t *) data) - DATA_HDR + 1) == (uint8_t) ALLOC_DATA_MAGIC);
00142       *(((uint8_t *) data) - DATA_HDR + 1) = (uint8_t) ~ ALLOC_DATA_MAGIC;
00143 
00144       ink_release_assert(*(((char *) data) - DATA_HDR) == size_index);         
00145     } else {
00146       // malloc'ed memory, not alloced via real_alloc_data().
00147       // Data will be ats_free()'ed when IOBufferBlock is freed
00148     }
00149     iob_block = 0;              // free memory
00150   }
00151 }
00152 
00153 /*************************************************************************/
00154 // IncomingControl member functions (Internal Class)
00155 /*************************************************************************/
00156 IncomingControl *
00157 IncomingControl::alloc()
00158 {
00159   return inControlAllocator.alloc();
00160 }
00161 
00162 IncomingControl::IncomingControl()
00163 :recognized_time(0)
00164 {
00165 }
00166 
00167 void
00168 IncomingControl::freeall()
00169 {
00170   free_data();
00171   inControlAllocator.free(this);
00172 }
00173 
00174 /*************************************************************************/
00175 // OutgoingControl member functions (Internal Class)
00176 /*************************************************************************/
00177 OutgoingControl *
00178 OutgoingControl::alloc()
00179 {
00180   return outControlAllocator.alloc();
00181 }
00182 
00183 OutgoingControl::OutgoingControl()
00184 :ch(NULL), submit_time(0)
00185 {
00186 }
00187 
00188 int
00189 OutgoingControl::startEvent(int event, Event * e)
00190 {
00191   //
00192   // This event handler is used by ClusterProcessor::invoke_remote()
00193   // to delay (CLUSTER_OPT_DELAY) the enqueuing of the control message.
00194   //
00195   (void) event;
00196   (void) e;
00197   // verify that the machine has not gone down
00198   if (!ch || !ch->thread)
00199     return EVENT_DONE;
00200 
00201   int32_t cluster_fn = *(int32_t *) this->data;
00202   int32_t pri = ClusterFuncToQpri(cluster_fn);
00203   ink_atomiclist_push(&ch->outgoing_control_al[pri], (void *) this);
00204 
00205   return EVENT_DONE;
00206 }
00207 
00208 void
00209 OutgoingControl::freeall()
00210 {
00211   free_data();
00212   outControlAllocator.free(this);
00213 }
00214 
00215 /*************************************************************************/
00216 // ClusterState member functions (Internal Class)
00217 /*************************************************************************/
00218 ClusterState::ClusterState(ClusterHandler * c, bool read_chan):
00219 Continuation(0),
00220 ch(c),
00221 read_channel(read_chan),
00222 do_iodone_event(false),
00223 n_descriptors(0),
00224 sequence_number(0),
00225 to_do(0),
00226 did(0),
00227 n_iov(0),
00228 io_complete(1),
00229 io_complete_event(0),
00230 v(0),
00231 bytes_xfered(0),
00232 last_ndone(0),
00233 total_bytes_xfered(0),
00234 iov(NULL),
00235 iob_iov(NULL),
00236 byte_bank(NULL),
00237 n_byte_bank(0), byte_bank_size(0), missed(0), missed_msg(false), read_state_t(READ_START), write_state_t(WRITE_START)
00238 {
00239   mutex = new_ProxyMutex();
00240   if (read_channel) {
00241     state = ClusterState::READ_START;
00242     SET_HANDLER(&ClusterState::doIO_read_event);
00243   } else {
00244     state = ClusterState::WRITE_START;
00245     SET_HANDLER(&ClusterState::doIO_write_event);
00246   }
00247   last_time = HRTIME_SECONDS(0);
00248   start_time = HRTIME_SECONDS(0);
00249   int size;
00250   //
00251   // Note: we allocate space for maximum iovec(s), descriptor(s)
00252   //       and small control message data.
00253   //
00254 
00255   //////////////////////////////////////////////////
00256   // Place an invalid page in front of iovec data.
00257   //////////////////////////////////////////////////
00258   size_t pagesize = ats_pagesize();
00259   size = ((MAX_TCOUNT + 1) * sizeof(IOVec)) + (2 * pagesize);
00260   iob_iov = new_IOBufferData(BUFFER_SIZE_FOR_XMALLOC(size));
00261   char *addr = (char *) align_pointer_forward(iob_iov->data(), pagesize);
00262 
00263   iov = (IOVec *) (addr + pagesize);
00264 
00265   ///////////////////////////////////////////////////
00266   // Place an invalid page in front of message data.
00267   ///////////////////////////////////////////////////
00268   size = sizeof(ClusterMsgHeader) + (MAX_TCOUNT + 1) * sizeof(Descriptor)
00269     + CONTROL_DATA + (2 * pagesize);
00270   msg.iob_descriptor_block = new_IOBufferBlock();
00271   msg.iob_descriptor_block->alloc(BUFFER_SIZE_FOR_XMALLOC(size));
00272 
00273   addr = (char *) align_pointer_forward(msg.iob_descriptor_block->data->data(), pagesize);
00274 
00275   addr = addr + pagesize;
00276   memset(addr, 0, size - (2 * pagesize));
00277   msg.descriptor = (Descriptor *) (addr + sizeof(ClusterMsgHeader));
00278 
00279   mbuf = new_empty_MIOBuffer();
00280 }
00281 
00282 ClusterState::~ClusterState()
00283 {
00284   mutex = 0;
00285   if (iov) {
00286     iob_iov = 0;                // Free memory
00287   }
00288 
00289   if (msg.descriptor) {
00290     msg.iob_descriptor_block = 0;       // Free memory
00291   }
00292   // Deallocate IO Core structures
00293   int n;
00294   for (n = 0; n < MAX_TCOUNT; ++n) {
00295     block[n] = 0;
00296   }
00297   free_empty_MIOBuffer(mbuf);
00298   mbuf = 0;
00299 }
00300 
00301 void
00302 ClusterState::build_do_io_vector()
00303 {
00304   //
00305   // Construct the do_io_xxx data structures allowing transfer
00306   // of the data described by the iovec structure.
00307   //
00308   int bytes_to_xfer = 0;
00309   int n;
00310   IOBufferBlock *last_block = 0;
00311 
00312   mbuf->clear();
00313 
00314   // Build the IOBufferBlock chain.
00315 
00316   for (n = 0; n < n_iov; ++n) {
00317     bytes_to_xfer += iov[n].iov_len;
00318 
00319     if (last_block) {
00320       last_block->next = block[n];
00321     }
00322     last_block = block[n];
00323     while (last_block->next) {
00324       last_block = last_block->next;
00325     }
00326   }
00327   mbuf->_writer = block[0];
00328   ink_release_assert(bytes_to_xfer == to_do);
00329   ink_assert(bytes_to_xfer == bytes_IOBufferBlockList(mbuf->_writer, !read_channel));
00330 }
00331 
00332 #ifdef CLUSTER_TOMCAT
00333 #define REENABLE_IO() \
00334   if (!ch->on_stolen_thread && !io_complete) { \
00335     v->reenable_re(); \
00336   }
00337 
00338 #else // !CLUSTER_TOMCAT
00339 
00340 #ifdef CLUSTER_IMMEDIATE_NETIO
00341 #define REENABLE_IO() \
00342   if (!io_complete) { \
00343     ((NetVConnection *) v->vc_server)->reenable_re_now(v); \
00344   }
00345 
00346 #else // !CLUSTER_IMMEDIATE_NETIO
00347 
00348 #define REENABLE_IO() \
00349   if (!io_complete) { \
00350     v->reenable_re(); \
00351   }
00352 #endif // !CLUSTER_IMMEDIATE_NETIO
00353 
00354 #endif // !CLUSTER_TOMCAT
00355 
00356 int
00357 ClusterState::doIO()
00358 {
00359   ink_release_assert(io_complete);
00360 #if !defined(CLUSTER_IMMEDIATE_NETIO)
00361   MUTEX_TRY_LOCK(lock, this->mutex, this_ethread());
00362   if (!lock) {
00363     return 0;                   // unable to initiate operation
00364   }
00365 #endif
00366 
00367   if (!ch->net_vc) {
00368     // Node has gone down, simulate successful transfer
00369     io_complete = 1;
00370     bytes_xfered += to_do;
00371     to_do = 0;
00372     return 1;
00373   }
00374   //
00375   // Setup and initiate or resume Cluster i/o request to the NetProcessor.
00376   //
00377   if ((to_do && (io_complete_event == VC_EVENT_READ_READY)) || (io_complete_event == VC_EVENT_WRITE_READY)) {
00378 
00379     if (read_channel) {
00380       // Partial read case
00381       ink_assert(v->buffer.writer()->current_write_avail() == to_do);
00382 
00383     } else {
00384       // Partial write case
00385       ink_assert(v->buffer.reader()->read_avail() == to_do);
00386     }
00387 
00388     // Resume operation
00389     v->nbytes = to_do + did;
00390     ink_release_assert(v->nbytes > v->ndone);
00391 
00392     io_complete = false;
00393     io_complete_event = 0;
00394     REENABLE_IO();
00395 
00396   } else {
00397     // Start new do_io_xxx operation.
00398     // Initialize globals
00399 
00400     io_complete = false;
00401     io_complete_event = 0;
00402     bytes_xfered = 0;
00403     last_ndone = 0;
00404 
00405     build_do_io_vector();
00406 
00407     if (read_channel) {
00408       ink_assert(mbuf->current_write_avail() == to_do);
00409 #ifdef CLUSTER_IMMEDIATE_NETIO
00410       v = ch->net_vc->do_io_read_now(this, to_do, mbuf);
00411 #else
00412       v = ch->net_vc->do_io_read(this, to_do, mbuf);
00413 #endif
00414       REENABLE_IO();
00415 
00416     } else {
00417       IOBufferReader *r = mbuf->alloc_reader();
00418       r->block = mbuf->_writer;
00419       ink_assert(r->read_avail() == to_do);
00420 #ifdef CLUSTER_IMMEDIATE_NETIO
00421       v = ch->net_vc->do_io_write_now(this, to_do, r);
00422 #else
00423       v = ch->net_vc->do_io_write(this, to_do, r);
00424 #endif
00425       REENABLE_IO();
00426     }
00427   }
00428   return 1;                     // operation initiated
00429 }
00430 
00431 int
00432 ClusterState::doIO_read_event(int event, void *d)
00433 {
00434   ink_release_assert(!io_complete);
00435   if (!v) {
00436     v = (VIO *) d;              // Immediate callback on first NetVC read
00437   }
00438   ink_assert((VIO *) d == v);
00439 
00440   switch (event) {
00441   case VC_EVENT_READ_READY:
00442     {
00443       // Disable read processing
00444       v->nbytes = v->ndone;
00445       // fall through
00446     }
00447   case VC_EVENT_READ_COMPLETE:
00448     {
00449       bytes_xfered = v->ndone - last_ndone;
00450       if (bytes_xfered) {
00451         total_bytes_xfered += bytes_xfered;
00452         did += bytes_xfered;
00453         to_do -= bytes_xfered;
00454       }
00455       last_ndone = v->ndone;
00456       io_complete_event = event;
00457       INK_WRITE_MEMORY_BARRIER;
00458 
00459       io_complete = 1;
00460       IOComplete();
00461 
00462       break;
00463     }
00464   case VC_EVENT_EOS:
00465   case VC_EVENT_ERROR:
00466   case VC_EVENT_INACTIVITY_TIMEOUT:
00467   case VC_EVENT_ACTIVE_TIMEOUT:
00468   default:
00469     {
00470       io_complete_event = event;
00471       INK_WRITE_MEMORY_BARRIER;
00472 
00473       io_complete = -1;
00474       IOComplete();
00475       break;
00476     }
00477   }                             // End of switch
00478 
00479   return EVENT_DONE;
00480 }
00481 
00482 int
00483 ClusterState::doIO_write_event(int event, void *d)
00484 {
00485   ink_release_assert(!io_complete);
00486   if (!v) {
00487     v = (VIO *) d;              // Immediate callback on first NetVC write
00488   }
00489   ink_assert((VIO *) d == v);
00490 
00491   switch (event) {
00492   case VC_EVENT_WRITE_READY:
00493 #ifdef CLUSTER_IMMEDIATE_NETIO
00494     {
00495       // Disable write processing
00496       v->nbytes = v->ndone;
00497       // fall through
00498     }
00499 #endif
00500   case VC_EVENT_WRITE_COMPLETE:
00501     {
00502       bytes_xfered = v->ndone - last_ndone;
00503       if (bytes_xfered) {
00504         total_bytes_xfered += bytes_xfered;
00505         did += bytes_xfered;
00506         to_do -= bytes_xfered;
00507       }
00508       last_ndone = v->ndone;
00509 #ifdef CLUSTER_IMMEDIATE_NETIO
00510       io_complete_event = event;
00511       INK_WRITE_MEMORY_BARRIER;
00512 
00513       io_complete = 1;
00514       IOComplete();
00515 #else
00516       if (event == VC_EVENT_WRITE_COMPLETE) {
00517         io_complete_event = event;
00518         INK_WRITE_MEMORY_BARRIER;
00519 
00520         io_complete = 1;
00521         IOComplete();
00522       } else {
00523         if (bytes_xfered) {
00524           v->reenable_re();     // Immediate action
00525         } else {
00526           v->reenable();
00527         }
00528         return EVENT_DONE;
00529       }
00530 #endif
00531       break;
00532     }
00533   case VC_EVENT_EOS:
00534   case VC_EVENT_ERROR:
00535   case VC_EVENT_INACTIVITY_TIMEOUT:
00536   case VC_EVENT_ACTIVE_TIMEOUT:
00537   default:
00538     {
00539       io_complete_event = event;
00540       INK_WRITE_MEMORY_BARRIER;
00541 
00542       io_complete = -1;
00543       IOComplete();
00544       break;
00545     }
00546   }                             // End of switch
00547 
00548   return EVENT_DONE;
00549 }
00550 
00551 void
00552 ClusterState::IOComplete()
00553 {
00554   // If no thread appears (approximate check) to be holding
00555   // the ClusterHandler mutex (no cluster processing in progress)
00556   // and immediate i/o completion events are allowed,
00557   // start i/o completion processing.
00558 
00559   if (do_iodone_event && !ch->mutex->thread_holding) {
00560     MUTEX_TRY_LOCK(lock, ch->mutex, this_ethread());
00561     if (lock) {
00562       ch->handleEvent(EVENT_IMMEDIATE, (void *) 0);
00563     } else {
00564       eventProcessor.schedule_imm_signal(ch, ET_CLUSTER);
00565     }
00566   }
00567 }
00568 
00569 int
00570 ClusterHandler::cluster_signal_and_update(int event, ClusterVConnection * vc, ClusterVConnState * s)
00571 {
00572   s->vio._cont->handleEvent(event, &s->vio);
00573 
00574   if (vc->closed) {
00575     if (!vc->write_list && !vc->write_bytes_in_transit) {
00576       close_ClusterVConnection(vc);
00577     }
00578     return EVENT_DONE;
00579   } else {
00580     ink_assert((event != VC_EVENT_ERROR) || ((event == VC_EVENT_ERROR) && vc->closed));
00581     return EVENT_CONT;
00582   }
00583 }
00584 
00585 int
00586 ClusterHandler::cluster_signal_and_update_locked(int event, ClusterVConnection * vc, ClusterVConnState * s)
00587 {
00588   // should assert we have s->vio.mutex
00589   s->vio._cont->handleEvent(event, &s->vio);
00590 
00591   if (vc->closed) {
00592     if (!vc->write_list && !vc->write_bytes_in_transit) {
00593       close_free_lock(vc, s);
00594     }
00595     return EVENT_DONE;
00596   } else
00597     return EVENT_CONT;
00598 }
00599 
00600 int
00601 ClusterHandler::cluster_signal_error_and_update(ClusterVConnection * vc, ClusterVConnState * s, int lerrno)
00602 {
00603   s->enabled = 0;
00604   vc->lerrno = lerrno;
00605   return cluster_signal_and_update(VC_EVENT_ERROR, vc, s);
00606 }
00607 
00608 bool ClusterHandler::check_channel(int c)
00609 {
00610   //
00611   // Check to see that there is enough room to store channel c
00612   //
00613   while (n_channels <= c) {
00614     int
00615       old_channels = n_channels;
00616     if (!n_channels) {
00617       n_channels = MIN_CHANNELS;
00618     } else {
00619       if ((n_channels * 2) <= MAX_CHANNELS) {
00620         n_channels = n_channels * 2;
00621       } else {
00622         return false;           // Limit exceeded
00623       }
00624     }
00625     // Allocate ClusterVConnection table entries
00626     channels = (ClusterVConnection **)ats_realloc(channels, n_channels * sizeof(ClusterVConnection *));
00627 
00628     // Allocate ChannelData table entries
00629     channel_data = (struct ChannelData **)ats_realloc(channel_data, n_channels * sizeof(struct ChannelData *));
00630 
00631     for (int i = old_channels; i < n_channels; i++) {
00632       if (local_channel(i)) {
00633         if (i > LAST_DEDICATED_CHANNEL) {
00634           channels[i] = (ClusterVConnection *) 1;       // mark as invalid
00635           channel_data[i] = (struct ChannelData *)ats_malloc(sizeof(struct ChannelData));
00636           memset(channel_data[i], 0, sizeof(struct ChannelData));
00637           channel_data[i]->channel_number = i;
00638           free_local_channels.enqueue(channel_data[i]);
00639         } else {
00640           channels[i] = NULL;
00641           channel_data[i] = NULL;
00642         }
00643       } else {
00644         channels[i] = NULL;
00645         channel_data[i] = NULL;
00646       }
00647     }
00648   }
00649   return true;                  // OK
00650 }
00651 
00652 int
00653 ClusterHandler::alloc_channel(ClusterVConnection * vc, int requested)
00654 {
00655   //
00656   // Allocate a channel
00657   //
00658   struct ChannelData *cdp = 0;
00659   int i = requested;
00660 
00661   if (!i) {
00662     int loops = 1;
00663     do {
00664       cdp = free_local_channels.dequeue();
00665       if (!cdp) {
00666         if (!check_channel(n_channels)) {
00667           return -2;            // Limit exceeded
00668         }
00669       } else {
00670         ink_assert(cdp == channel_data[cdp->channel_number]);
00671         i = cdp->channel_number;
00672         break;
00673       }
00674     } while (loops--);
00675 
00676     ink_release_assert(i != 0); // required
00677     ink_release_assert(channels[i] == (ClusterVConnection *) 1);        // required
00678     Debug(CL_TRACE, "alloc_channel local chan=%d VC=%p", i, vc);
00679 
00680   } else {
00681     if (!check_channel(i)) {
00682       return -2;                // Limit exceeded
00683     }
00684     if (channels[i]) {
00685       Debug(CL_TRACE, "alloc_channel remote inuse chan=%d VC=%p", i, vc);
00686       return -1;                // channel in use
00687     } else {
00688       Debug(CL_TRACE, "alloc_channel remote chan=%d VC=%p", i, vc);
00689     }
00690   }
00691   channels[i] = vc;
00692   vc->channel = i;
00693   return i;
00694 }
00695 
00696 void
00697 ClusterHandler::free_channel(ClusterVConnection * vc)
00698 {
00699   //
00700   // Free a channel
00701   //
00702   int i = vc->channel;
00703   if (i > LAST_DEDICATED_CHANNEL && channels[i] == vc) {
00704     if (local_channel(i)) {
00705       channels[i] = (ClusterVConnection *) 1;
00706       free_local_channels.enqueue(channel_data[i]);
00707       Debug(CL_TRACE, "free_channel local chan=%d VC=%p", i, vc);
00708     } else {
00709       channels[i] = 0;
00710       Debug(CL_TRACE, "free_channel remote chan=%d VC=%p", i, vc);
00711     }
00712   }
00713   vc->channel = 0;
00714 }
00715 
00716 int
00717 ClusterHandler::machine_down()
00718 {
00719   char textbuf[sizeof("255.255.255.255:65535")];
00720 
00721   if (dead) {
00722     return EVENT_DONE;
00723   }
00724   //
00725   // Looks like this machine dropped out of the cluster.
00726   // Deal with it.
00727   // Fatal read/write errors on the node to node connection along
00728   // with failure of the cluster membership check in the periodic event
00729   // result in machine_down().
00730   //
00731 #ifdef LOCAL_CLUSTER_TEST_MODE
00732   Note("machine down %u.%u.%u.%u:%d", DOT_SEPARATED(ip), port);
00733 #else
00734   Note("machine down %u.%u.%u.%u:%d", DOT_SEPARATED(ip), id);
00735 #endif
00736   machine_offline_APIcallout(ip);
00737   snprintf(textbuf, sizeof(textbuf), "%hhu.%hhu.%hhu.%hhu:%d", DOT_SEPARATED(ip), port);
00738   RecSignalManager(REC_SIGNAL_MACHINE_DOWN, textbuf);
00739   if (net_vc) {
00740     net_vc->do_io(VIO::CLOSE);
00741     net_vc = 0;
00742   }
00743   // Cancel pending cluster reads and writes
00744   read.io_complete = -1;
00745   write.io_complete = -1;
00746 
00747   MUTEX_TAKE_LOCK(the_cluster_config_mutex, this_ethread());
00748   ClusterConfiguration *c = this_cluster()->current_configuration();
00749   machine->clusterHandlers[id] = NULL;
00750   if ((--machine->now_connections == 0) && c->find(ip, port)) {
00751     ClusterConfiguration *cc = configuration_remove_machine(c, machine);
00752     CLUSTER_DECREMENT_DYN_STAT(CLUSTER_NODES_STAT);
00753     this_cluster()->configurations.push(cc);
00754     machine->dead = true;
00755   }
00756   MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread());
00757   MachineList *cc = the_cluster_config();
00758   if (cc && cc->find(ip, port) && connector) {
00759     Debug(CL_NOTE, "cluster connect retry for %hhu.%hhu.%hhu.%hhu", DOT_SEPARATED(ip));
00760     clusterProcessor.connect(ip, port, id);
00761   }
00762   return zombify();             // defer deletion of *this
00763 }
00764 
00765 int
00766 ClusterHandler::zombify(Event * /* e ATS_UNUSED */)
00767 {
00768   //
00769   // Node associated with *this is declared down, setup the event to cleanup
00770   // and defer deletion of *this
00771   //
00772   dead = true;
00773   if (cluster_periodic_event) {
00774     cluster_periodic_event->cancel(this);
00775     cluster_periodic_event = NULL;
00776   }
00777   clm->cancel_monitor();
00778 
00779   SET_HANDLER((ClusterContHandler) & ClusterHandler::protoZombieEvent);
00780   //
00781   // At this point, allow the caller (either process_read/write to complete)
00782   // prior to performing node down actions.
00783   //
00784   eventProcessor.schedule_in(this, HRTIME_SECONDS(1), ET_CLUSTER);
00785   return EVENT_DONE;
00786 }
00787 
00788 int
00789 ClusterHandler::connectClusterEvent(int event, Event * e)
00790 {
00791 
00792   if ((event == EVENT_IMMEDIATE) || (event == EVENT_INTERVAL)) {
00793     //
00794     // Attempt connect to target node and if successful, setup the event
00795     // to initiate the node to node connection protocol.
00796     // Initiated via ClusterProcessor::connect().
00797     //
00798     MachineList *cc = the_cluster_config();
00799     if (!machine)
00800       machine = new ClusterMachine(hostname, ip, port);
00801 #ifdef LOCAL_CLUSTER_TEST_MODE
00802     if (!(cc && cc->find(ip, port))) {
00803 #else
00804     if (this_cluster_machine()->ip == machine->ip || !(cc && cc->find(ip, port))) {
00805 #endif
00806       if (this_cluster_machine()->ip != machine->ip)
00807         Debug(CL_NOTE, "cluster connect aborted, machine %u.%u.%u.%u not in cluster", DOT_SEPARATED(machine->ip));
00808       delete machine;
00809       machine = NULL;
00810       delete this;
00811       return EVENT_DONE;
00812     }
00813     // Connect to cluster member
00814     Debug(CL_NOTE, "connect_re from %u.%u.%u.%u to %u.%u.%u.%u",
00815           DOT_SEPARATED(this_cluster_machine()->ip), DOT_SEPARATED(machine->ip));
00816     ip = machine->ip;
00817 
00818     NetVCOptions opt;
00819     opt.socket_send_bufsize = cluster_send_buffer_size;
00820     opt.socket_recv_bufsize = cluster_receive_buffer_size;
00821     opt.sockopt_flags = cluster_sockopt_flags;
00822     opt.packet_mark = cluster_packet_mark;
00823     opt.packet_tos = cluster_packet_tos;
00824     opt.etype = ET_CLUSTER;
00825     opt.addr_binding = NetVCOptions::INTF_ADDR;
00826     opt.local_ip = this_cluster_machine()->ip;
00827 
00828     struct sockaddr_in addr;
00829     ats_ip4_set(&addr, machine->ip,
00830         htons(machine->cluster_port ? machine->cluster_port : cluster_port));
00831 
00832     // TODO: Should we check the Action* returned here?
00833     netProcessor.connect_re(this, ats_ip_sa_cast(&addr), &opt);
00834     return EVENT_DONE;
00835   } else {
00836     if (event == NET_EVENT_OPEN) {
00837       net_vc = (NetVConnection *) e;
00838       SET_HANDLER((ClusterContHandler) & ClusterHandler::startClusterEvent);
00839       eventProcessor.schedule_imm(this, ET_CLUSTER);
00840       return EVENT_DONE;
00841 
00842     } else {
00843       eventProcessor.schedule_in(this, CLUSTER_MEMBER_DELAY);
00844       return EVENT_CONT;
00845     }
00846   }
00847 }
00848 
00849 int
00850 ClusterHandler::startClusterEvent(int event, Event * e)
00851 {
00852   char textbuf[sizeof("255.255.255.255:65535")];
00853 
00854   // Perform the node to node connection establish protocol.
00855 
00856   (void) event;
00857   ink_assert(!read_vcs);
00858   ink_assert(!write_vcs);
00859 
00860   if (event == EVENT_IMMEDIATE) {
00861     if (cluster_connect_state == ClusterHandler::CLCON_INITIAL) {
00862       cluster_connect_state = ClusterHandler::CLCON_SEND_MSG;
00863     } else {
00864       ink_release_assert(!"startClusterEvent, EVENT_IMMEDIATE not expected");
00865     }
00866   } else {
00867     ink_release_assert(event == EVENT_INTERVAL);
00868   }
00869 
00870   for (;;) {
00871 
00872     switch (cluster_connect_state) {
00873       ////////////////////////////////////////////////////////////////////////////
00874     case ClusterHandler::CLCON_INITIAL:
00875       ////////////////////////////////////////////////////////////////////////////
00876       {
00877         ink_release_assert(!"Invalid state [CLCON_INITIAL]");
00878       }
00879       ////////////////////////////////////////////////////////////////////////////
00880     case ClusterHandler::CLCON_SEND_MSG:
00881       ////////////////////////////////////////////////////////////////////////////
00882       {
00883         // Send initial message.
00884 #ifdef LOCAL_CLUSTER_TEST_MODE
00885         nodeClusteringVersion._port = cluster_port;
00886 #endif
00887         cluster_connect_state = ClusterHandler::CLCON_SEND_MSG_COMPLETE;
00888         if (connector)
00889           nodeClusteringVersion._id = id;
00890         build_data_vector((char *) &nodeClusteringVersion, sizeof(nodeClusteringVersion), false);
00891         if (!write.doIO()) {
00892           // i/o not initiated, delay and retry
00893           cluster_connect_state = ClusterHandler::CLCON_SEND_MSG;
00894           eventProcessor.schedule_in(this, CLUSTER_PERIOD, ET_CLUSTER);
00895           return EVENT_DONE;
00896         }
00897         break;
00898       }
00899       ////////////////////////////////////////////////////////////////////////////
00900     case ClusterHandler::CLCON_SEND_MSG_COMPLETE:
00901       ////////////////////////////////////////////////////////////////////////////
00902       {
00903         if (write.io_complete) {
00904           if ((write.io_complete < 0)
00905               || ((size_t) write.did < sizeof(nodeClusteringVersion))) {
00906             Debug(CL_NOTE, "unable to write to cluster node %u.%u.%u.%u: %d",
00907                   DOT_SEPARATED(ip), write.io_complete_event);
00908             cluster_connect_state = ClusterHandler::CLCON_ABORT_CONNECT;
00909             break;              // goto next state
00910           }
00911           // Write OK, await message from peer node.
00912           build_data_vector((char *) &clusteringVersion, sizeof(clusteringVersion), true);
00913           cluster_connect_state = ClusterHandler::CLCON_READ_MSG;
00914           break;
00915         } else {
00916           // Delay and check for i/o completion
00917           eventProcessor.schedule_in(this, CLUSTER_PERIOD, ET_CLUSTER);
00918           return EVENT_DONE;
00919         }
00920       }
00921       ////////////////////////////////////////////////////////////////////////////
00922     case ClusterHandler::CLCON_READ_MSG:
00923       ////////////////////////////////////////////////////////////////////////////
00924       {
00925         cluster_connect_state = ClusterHandler::CLCON_READ_MSG_COMPLETE;
00926         if (!read.doIO()) {
00927           // i/o not initiated, delay and retry
00928           cluster_connect_state = ClusterHandler::CLCON_READ_MSG;
00929           eventProcessor.schedule_in(this, CLUSTER_PERIOD, ET_CLUSTER);
00930           return EVENT_DONE;
00931         }
00932         break;
00933       }
00934       ////////////////////////////////////////////////////////////////////////////
00935     case ClusterHandler::CLCON_READ_MSG_COMPLETE:
00936       ////////////////////////////////////////////////////////////////////////////
00937       {
00938         if (read.io_complete) {
00939           if (read.io_complete < 0) {
00940             // Read error, abort connect
00941             cluster_connect_state = ClusterHandler::CLCON_ABORT_CONNECT;
00942             break;              // goto next state
00943           }
00944           if ((size_t) read.did < sizeof(clusteringVersion)) {
00945             // Partial read, resume read.
00946             cluster_connect_state = ClusterHandler::CLCON_READ_MSG;
00947             break;
00948           }
00949           cluster_connect_state = ClusterHandler::CLCON_VALIDATE_MSG;
00950           break;
00951         } else {
00952           // Delay and check for i/o completion
00953           eventProcessor.schedule_in(this, CLUSTER_PERIOD, ET_CLUSTER);
00954           return EVENT_DONE;
00955         }
00956       }
00957       ////////////////////////////////////////////////////////////////////////////
00958     case ClusterHandler::CLCON_VALIDATE_MSG:
00959       ////////////////////////////////////////////////////////////////////////////
00960       {
00961         int proto_major = -1;
00962         int proto_minor = -1;
00963 
00964         clusteringVersion.AdjustByteOrder();
00965         /////////////////////////////////////////////////////////////////////////
00966         // Determine the message protocol major version to use, by stepping down
00967         // from current to the minimium level until a match is found.
00968         // Derive the minor number as follows, if the current (major, minor)
00969         // is the current node (major, minor) use the given minor number.
00970         // Otherwise, minor number is zero.
00971         /////////////////////////////////////////////////////////////////////////
00972         for (int major = clusteringVersion._major; major >= clusteringVersion._min_major; --major) {
00973           if ((major >= nodeClusteringVersion._min_major) && (major <= nodeClusteringVersion._major)) {
00974             proto_major = major;
00975           }
00976         }
00977         if (proto_major > 0) {
00978           ///////////////////////////
00979           // Compute minor version
00980           ///////////////////////////
00981           if (proto_major == clusteringVersion._major) {
00982             proto_minor = clusteringVersion._minor;
00983 
00984             if (proto_minor != nodeClusteringVersion._minor)
00985               Warning("Different clustering minor versions (%d,%d) for node %u.%u.%u.%u, continuing",
00986                       proto_minor, nodeClusteringVersion._minor, DOT_SEPARATED(ip));
00987           } else {
00988             proto_minor = 0;
00989           }
00990 
00991         } else {
00992           Warning("Bad cluster major version range (%d-%d) for node %u.%u.%u.%u connect failed",
00993                   clusteringVersion._min_major, clusteringVersion._major, DOT_SEPARATED(ip));
00994           cluster_connect_state = ClusterHandler::CLCON_ABORT_CONNECT;
00995           break;                // goto next state
00996         }
00997 
00998 #ifdef LOCAL_CLUSTER_TEST_MODE
00999         port = clusteringVersion._port & 0xffff;
01000 #endif
01001         if (!connector)
01002           id = clusteringVersion._id & 0xffff;
01003 
01004         machine->msg_proto_major = proto_major;
01005         machine->msg_proto_minor = proto_minor;
01006 
01007         if (eventProcessor.n_threads_for_type[ET_CLUSTER] != num_of_cluster_threads) {
01008           cluster_connect_state = ClusterHandler::CLCON_ABORT_CONNECT;
01009           break;
01010         }
01011 
01012         thread = eventProcessor.eventthread[ET_CLUSTER][id % num_of_cluster_threads];
01013         if (net_vc->thread == thread) {
01014           cluster_connect_state = CLCON_CONN_BIND_OK;
01015           break;
01016         } else { 
01017           cluster_connect_state = ClusterHandler::CLCON_CONN_BIND_CLEAR;
01018         }
01019       }
01020 
01021     case ClusterHandler::CLCON_CONN_BIND_CLEAR:
01022       {
01023         UnixNetVConnection *vc = (UnixNetVConnection *)net_vc; 
01024         MUTEX_TRY_LOCK(lock, vc->nh->mutex, e->ethread);
01025         MUTEX_TRY_LOCK(lock1, vc->mutex, e->ethread);
01026         if (lock && lock1) {
01027           vc->ep.stop();
01028           vc->nh->open_list.remove(vc);
01029           vc->thread = NULL;
01030           if (vc->nh->read_ready_list.in(vc))
01031             vc->nh->read_ready_list.remove(vc);
01032           if (vc->nh->write_ready_list.in(vc))
01033             vc->nh->write_ready_list.remove(vc);
01034           if (vc->read.in_enabled_list)
01035             vc->nh->read_enable_list.remove(vc);
01036           if (vc->write.in_enabled_list)
01037             vc->nh->write_enable_list.remove(vc);
01038 
01039           // CLCON_CONN_BIND handle in bind vc->thread (bind thread nh)
01040           cluster_connect_state = ClusterHandler::CLCON_CONN_BIND;
01041           thread->schedule_in(this, CLUSTER_PERIOD);
01042           return EVENT_DONE;
01043         } else {
01044           // CLCON_CONN_BIND_CLEAR handle in origin vc->thread (origin thread nh)
01045           vc->thread->schedule_in(this, CLUSTER_PERIOD);
01046           return EVENT_DONE;
01047         }
01048       }
01049 
01050     case ClusterHandler::CLCON_CONN_BIND:
01051       {
01052         // 
01053         NetHandler *nh = get_NetHandler(e->ethread);
01054         UnixNetVConnection *vc = (UnixNetVConnection *)net_vc; 
01055         MUTEX_TRY_LOCK(lock, nh->mutex, e->ethread);
01056         MUTEX_TRY_LOCK(lock1, vc->mutex, e->ethread);
01057         if (lock && lock1) {
01058           if (vc->read.in_enabled_list)
01059             nh->read_enable_list.push(vc);
01060           if (vc->write.in_enabled_list)
01061             nh->write_enable_list.push(vc);
01062 
01063           vc->nh = nh;
01064           vc->thread = e->ethread;
01065           PollDescriptor *pd = get_PollDescriptor(e->ethread);
01066           if (vc->ep.start(pd, vc, EVENTIO_READ|EVENTIO_WRITE) < 0) {
01067             cluster_connect_state = ClusterHandler::CLCON_DELETE_CONNECT;
01068             break;                // goto next state
01069           }
01070 
01071           nh->open_list.enqueue(vc);
01072           cluster_connect_state = ClusterHandler::CLCON_CONN_BIND_OK;
01073         } else {
01074           thread->schedule_in(this, CLUSTER_PERIOD);
01075           return EVENT_DONE;
01076         }
01077       }
01078 
01079     case ClusterHandler::CLCON_CONN_BIND_OK:
01080       {
01081         int failed = 0;
01082 
01083         // include this node into the cluster configuration
01084         MUTEX_TAKE_LOCK(the_cluster_config_mutex, this_ethread());
01085         MachineList *cc = the_cluster_config();
01086         if (cc && cc->find(ip, port)) {
01087           ClusterConfiguration *c = this_cluster()->current_configuration();
01088           ClusterMachine *m = c->find(ip, port);
01089           
01090           if (!m) { // this first connection
01091             ClusterConfiguration *cconf = configuration_add_machine(c, machine);
01092             CLUSTER_INCREMENT_DYN_STAT(CLUSTER_NODES_STAT);
01093             this_cluster()->configurations.push(cconf);
01094           } else {
01095             // close new connection if old connections is exist
01096             if (id >= m->num_connections || m->clusterHandlers[id]) {
01097               failed = -2;
01098               MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread());
01099               goto failed;
01100             }
01101             machine = m;
01102           }
01103           machine->now_connections++;
01104           machine->clusterHandlers[id] = this;
01105           machine->dead = false;
01106           dead = false;
01107         } else {
01108           Debug(CL_NOTE, "cluster connect aborted, machine %u.%u.%u.%u:%d not in cluster", DOT_SEPARATED(ip), port);
01109           failed = -1;
01110         }
01111         MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread());
01112 failed:
01113         if (failed) {
01114           if (failed == -1) {
01115             if (++configLookupFails <= CONFIG_LOOKUP_RETRIES) {
01116               thread->schedule_in(this, CLUSTER_PERIOD);
01117               return EVENT_DONE;
01118             }
01119           }
01120           cluster_connect_state = ClusterHandler::CLCON_DELETE_CONNECT;
01121           break;                // goto next state
01122         }
01123 
01124         this->needByteSwap = !clusteringVersion.NativeByteOrder();
01125         machine_online_APIcallout(ip);
01126 
01127         // Signal the manager
01128         snprintf(textbuf, sizeof(textbuf), "%hhu.%hhu.%hhu.%hhu:%d", DOT_SEPARATED(ip), port);
01129         RecSignalManager(REC_SIGNAL_MACHINE_UP, textbuf);
01130 #ifdef LOCAL_CLUSTER_TEST_MODE
01131         Note("machine up %hhu.%hhu.%hhu.%hhu:%d, protocol version=%d.%d",
01132              DOT_SEPARATED(ip), port, clusteringVersion._major, clusteringVersion._minor);
01133 #else
01134         Note("machine up %hhu.%hhu.%hhu.%hhu:%d, protocol version=%d.%d",
01135              DOT_SEPARATED(ip), id, clusteringVersion._major, clusteringVersion._minor);
01136 #endif
01137 
01138         read_vcs = new Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_read_link>[CLUSTER_BUCKETS];
01139         write_vcs = new Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_write_link>[CLUSTER_BUCKETS];
01140         SET_HANDLER((ClusterContHandler) & ClusterHandler::beginClusterEvent);
01141 
01142         // enable schedule_imm() on i/o completion (optimization)
01143         read.do_iodone_event = true;
01144         write.do_iodone_event = true;
01145 
01146         cluster_periodic_event = thread->schedule_every(this, -CLUSTER_PERIOD);
01147 
01148         // Startup the periodic events to process entries in
01149         //  external_incoming_control.
01150 
01151         int procs_online = ink_number_of_processors();
01152         int total_callbacks = min(procs_online, MAX_COMPLETION_CALLBACK_EVENTS);
01153         for (int n = 0; n < total_callbacks; ++n) {
01154           callout_cont[n] = new ClusterCalloutContinuation(this);
01155           callout_events[n] = eventProcessor.schedule_every(callout_cont[n], COMPLETION_CALLBACK_PERIOD, ET_NET);
01156         }
01157 
01158         // Start cluster interconnect load monitoring
01159 
01160         if (!clm) {
01161           clm = new ClusterLoadMonitor(this);
01162           clm->init();
01163         }
01164         return EVENT_DONE;
01165       }
01166       ////////////////////////////////////////////////////////////////////////////
01167     case ClusterHandler::CLCON_ABORT_CONNECT:
01168       ////////////////////////////////////////////////////////////////////////////
01169       {
01170         if (connector) {
01171           Debug(CL_NOTE, "cluster connect retry for %u.%u.%u.%u", DOT_SEPARATED(ip));
01172           // check for duplicate cluster connect
01173           clusterProcessor.connect(ip, port, id, true);
01174         }
01175         cluster_connect_state = ClusterHandler::CLCON_DELETE_CONNECT;
01176         break;                  // goto next state
01177       }
01178       ////////////////////////////////////////////////////////////////////////////
01179     case ClusterHandler::CLCON_DELETE_CONNECT:
01180       ////////////////////////////////////////////////////////////////////////////
01181       {
01182         // No references possible, so just delete it.
01183         delete machine;
01184         machine = NULL;
01185         delete this;
01186         Debug(CL_NOTE, "Failed cluster connect, deleting");
01187         return EVENT_DONE;
01188       }
01189       ////////////////////////////////////////////////////////////////////////////
01190     default:
01191       ////////////////////////////////////////////////////////////////////////////
01192       {
01193         Warning("startClusterEvent invalid state %d", cluster_connect_state);
01194         ink_release_assert(!"ClusterHandler::startClusterEvent invalid state");
01195         return EVENT_DONE;
01196       }
01197 
01198     }                           // End of switch
01199   }                             // End of for
01200   return EVENT_DONE;
01201 }
01202 
01203 int
01204 ClusterHandler::beginClusterEvent(int /* event ATS_UNUSED */, Event * e)
01205 {
01206   // Establish the main periodic Cluster event
01207 #ifdef CLUSTER_IMMEDIATE_NETIO
01208   build_poll(false);
01209 #endif
01210   SET_HANDLER((ClusterContHandler) & ClusterHandler::mainClusterEvent);
01211   return handleEvent(EVENT_INTERVAL, e);
01212 }
01213 
01214 int
01215 ClusterHandler::zombieClusterEvent(int event, Event * e)
01216 {
01217   //
01218   // The ZOMBIE state is entered when the handler may still be referenced
01219   // by short running tasks (one scheduling quanta).  The object is delayed
01220   // after some unreasonably long (in comparison) time.
01221   //
01222   (void) event;
01223   (void) e;
01224   delete this;                  // I am out of here
01225   return EVENT_DONE;
01226 }
01227 
01228 int
01229 ClusterHandler::protoZombieEvent(int /* event ATS_UNUSED */, Event * e)
01230 {
01231   //
01232   // Node associated with *this is declared down.
01233   // After cleanup is complete, setup handler to delete *this
01234   // after NO_RACE_DELAY
01235   //
01236   bool failed = false;
01237   ink_hrtime delay = CLUSTER_MEMBER_DELAY * 5;
01238   EThread *t = e ? e->ethread : this_ethread();
01239   head_p item;
01240 
01241   /////////////////////////////////////////////////////////////////
01242   // Complete pending i/o operations
01243   /////////////////////////////////////////////////////////////////
01244   mainClusterEvent(EVENT_INTERVAL, e);
01245 
01246   item.data = external_incoming_open_local.head.data;
01247   if (TO_PTR(FREELIST_POINTER(item)) ||
01248       delayed_reads.head || pw_write_descriptors_built
01249       || pw_freespace_descriptors_built || pw_controldata_descriptors_built) {
01250     // Operations still pending, retry later
01251     if (e) {
01252       e->schedule_in(delay);
01253       return EVENT_CONT;
01254     } else {
01255       eventProcessor.schedule_in(this, delay, ET_CLUSTER);
01256       return EVENT_DONE;
01257     }
01258   }
01259   ///////////////////////////////////////////////////////////////
01260   // Deallocate current read control data
01261   ///////////////////////////////////////////////////////////////
01262   IncomingControl *ic;
01263   while ((ic = incoming_control.dequeue())) {
01264     failed = true;
01265     ic->mutex = NULL;
01266     ic->freeall();
01267   }
01268 
01269   /////////////////////////////////////////////////////////////////
01270   // Post error completion on all active read/write VC(s) and
01271   // deallocate closed VC(s).
01272   /////////////////////////////////////////////////////////////////
01273   for (int i = 0; i < n_channels; i++) {
01274     ClusterVConnection *vc = channels[i];
01275     if (VALID_CHANNEL(vc)) {
01276       if (!vc->closed && vc->read.vio.op == VIO::READ) {
01277         MUTEX_TRY_LOCK(lock, vc->read.vio.mutex, t);
01278         if (lock) {
01279           cluster_signal_error_and_update(vc, &vc->read, 0);
01280         } else {
01281           failed = true;
01282         }
01283       }
01284       vc = channels[i];
01285       if (VALID_CHANNEL(vc)
01286           && !vc->closed && vc->write.vio.op == VIO::WRITE) {
01287         MUTEX_TRY_LOCK(lock, vc->write.vio.mutex, t);
01288         if (lock) {
01289           cluster_signal_error_and_update(vc, &vc->write, 0);
01290         } else {
01291           failed = true;
01292         }
01293       }
01294       vc = channels[i];
01295       if (VALID_CHANNEL(vc)) {
01296         if (vc->closed) {
01297           vc->ch = 0;
01298           vc->write_list = 0;
01299           vc->write_list_tail = 0;
01300           vc->write_list_bytes = 0;
01301           vc->write_bytes_in_transit = 0;
01302           close_ClusterVConnection(vc);
01303         } else {
01304           failed = true;
01305         }
01306       }
01307     }
01308   }
01309 
01310   ///////////////////////////////////////////////////////////////
01311   // Empty the external_incoming_control queue before aborting
01312   //   the completion callbacks.
01313   ///////////////////////////////////////////////////////////////
01314   item.data = external_incoming_control.head.data;
01315   if (TO_PTR(FREELIST_POINTER(item)) == NULL) {
01316     for (int n = 0; n < MAX_COMPLETION_CALLBACK_EVENTS; ++n) {
01317       if (callout_cont[n]) {
01318         MUTEX_TRY_LOCK(lock, callout_cont[n]->mutex, t);
01319         if (lock) {
01320           callout_events[n]->cancel(callout_cont[n]);
01321           callout_events[n] = 0;
01322           delete callout_cont[n];
01323           callout_cont[n] = 0;
01324         } else {
01325           failed = true;
01326         }
01327       }
01328     }
01329   } else {
01330     failed = true;
01331   }
01332 
01333   if (!failed) {
01334     Debug("cluster_down", "ClusterHandler zombie [%u.%u.%u.%u]", DOT_SEPARATED(ip));
01335     SET_HANDLER((ClusterContHandler) & ClusterHandler::zombieClusterEvent);
01336     delay = NO_RACE_DELAY;
01337   }
01338   if (e) {
01339     e->schedule_in(delay);
01340     return EVENT_CONT;
01341   } else {
01342     eventProcessor.schedule_in(this, delay, ET_CLUSTER);
01343     return EVENT_DONE;
01344   }
01345 }
01346 
01347 int dump_verbose = 0;
01348 
01349 int
01350 ClusterHandler::compute_active_channels()
01351 {
01352   ClusterHandler *ch = this;
01353   int active_chans = 0;
01354 
01355   for (int i = LAST_DEDICATED_CHANNEL + 1; i < ch->n_channels; i++) {
01356     ClusterVConnection *vc = ch->channels[i];
01357     if (VALID_CHANNEL(vc) && (vc->iov_map != CLUSTER_IOV_NOT_OPEN)) {
01358       ++active_chans;
01359       if (dump_verbose) {
01360         printf("ch[%d] vc=0x%p remote_free=%d last_local_free=%d\n", i, vc,
01361                vc->remote_free, vc->last_local_free);
01362         printf("  r_bytes=%d r_done=%d w_bytes=%d w_done=%d\n",
01363                (int)vc->read.vio.nbytes, (int)vc->read.vio.ndone,
01364                (int)vc->write.vio.nbytes, (int)vc->write.vio.ndone);
01365       }
01366     }
01367   }
01368   return active_chans;
01369 }
01370 
01371 void
01372 ClusterHandler::dump_internal_data()
01373 {
01374   if (!message_blk) {
01375     message_blk = new_IOBufferBlock();
01376     message_blk->alloc(MAX_IOBUFFER_SIZE);
01377   }
01378   int r;
01379   int n = 0;
01380   char *b = message_blk->data->data();
01381   unsigned int b_size = message_blk->data->block_size();
01382 
01383   r = snprintf(&b[n], b_size - n, "Host: %hhu.%hhu.%hhu.%hhu\n", DOT_SEPARATED(ip));
01384   n += r;
01385 
01386   r = snprintf(&b[n], b_size - n,
01387                "chans: %d vc_writes: %" PRId64 " write_bytes: %" PRId64 "(d)+%" PRId64 "(c)=%" PRId64 "\n",
01388                compute_active_channels(),
01389                _vc_writes, _vc_write_bytes, _control_write_bytes, _vc_write_bytes + _control_write_bytes);
01390 
01391   n += r;
01392   r = snprintf(&b[n], b_size - n,
01393                "dw: missed_lock: %d not_enabled: %d wait_remote_fill: %d no_active_vio: %d\n",
01394                _dw_missed_lock, _dw_not_enabled, _dw_wait_remote_fill, _dw_no_active_vio);
01395 
01396   n += r;
01397   r = snprintf(&b[n], b_size - n,
01398                "dw: not_enabled_or_no_write: %d set_data_pending: %d no_free_space: %d\n",
01399                _dw_not_enabled_or_no_write, _dw_set_data_pending, _dw_no_free_space);
01400 
01401   n += r;
01402   r = snprintf(&b[n], b_size - n,
01403                "fw: missed_lock: %d not_enabled: %d wait_remote_fill: %d no_active_vio: %d\n",
01404                _fw_missed_lock, _fw_not_enabled, _fw_wait_remote_fill, _fw_no_active_vio);
01405 
01406   n += r;
01407   r = snprintf(&b[n], b_size - n, "fw: not_enabled_or_no_read: %d\n", _fw_not_enabled_or_no_read);
01408 
01409   n += r;
01410   r = snprintf(&b[n], b_size - n,
01411                "rd(%d): st:%d rh:%d ahd:%d sd:%d rd:%d ad:%d sda:%d rda:%d awd:%d p:%d c:%d\n",
01412                _process_read_calls, _n_read_start, _n_read_header, _n_read_await_header,
01413                _n_read_setup_descriptor, _n_read_descriptor, _n_read_await_descriptor,
01414                _n_read_setup_data, _n_read_data, _n_read_await_data, _n_read_post_complete, _n_read_complete);
01415 
01416   n += r;
01417   r = snprintf(&b[n], b_size - n,
01418                "wr(%d): st:%d set:%d ini:%d wait:%d post:%d comp:%d\n",
01419                _process_write_calls, _n_write_start, _n_write_setup, _n_write_initiate,
01420                _n_write_await_completion, _n_write_post_complete, _n_write_complete);
01421 
01422   n += r;
01423   ink_release_assert((n + 1) <= BUFFER_SIZE_FOR_INDEX(MAX_IOBUFFER_SIZE));
01424   Note("%s", b);
01425   clear_cluster_stats();
01426 }
01427 
01428 void
01429 ClusterHandler::dump_write_msg(int res)
01430 {
01431   // Debug support for inter cluster message trace
01432   Alias32 x;
01433   x.u32 = (uint32_t) ((struct sockaddr_in *)(net_vc->get_remote_addr()))->sin_addr.s_addr;
01434 
01435   fprintf(stderr,
01436           "[W] %hhu.%hhu.%hhu.%hhu SeqNo=%u, Cnt=%d, CntlCnt=%d Todo=%d, Res=%d\n",
01437           x.byte[0], x.byte[1], x.byte[2], x.byte[3], write.sequence_number, write.msg.count, write.msg.control_bytes, write.to_do, res);
01438   for (int i = 0; i < write.msg.count; ++i) {
01439     fprintf(stderr, "   d[%i] Type=%d, Chan=%d, SeqNo=%d, Len=%u\n",
01440             i, (write.msg.descriptor[i].type ? 1 : 0),
01441             (int) write.msg.descriptor[i].channel,
01442             (int) write.msg.descriptor[i].sequence_number, write.msg.descriptor[i].length);
01443   }
01444 }
01445 
01446 void
01447 ClusterHandler::dump_read_msg()
01448 {
01449   // Debug support for inter cluster message trace
01450   Alias32 x;
01451   x.u32 = (uint32_t) ((struct sockaddr_in *)(net_vc->get_remote_addr()))->sin_addr.s_addr;
01452 
01453   fprintf(stderr, "[R] %hhu.%hhu.%hhu.%hhu  SeqNo=%u, Cnt=%d, CntlCnt=%d\n",
01454           x.byte[0], x.byte[1], x.byte[2], x.byte[3], read.sequence_number, read.msg.count, read.msg.control_bytes);
01455   for (int i = 0; i < read.msg.count; ++i) {
01456     fprintf(stderr, "   d[%i] Type=%d, Chan=%d, SeqNo=%d, Len=%u\n",
01457             i, (read.msg.descriptor[i].type ? 1 : 0),
01458             (int) read.msg.descriptor[i].channel,
01459             (int) read.msg.descriptor[i].sequence_number, read.msg.descriptor[i].length);
01460   }
01461 }
01462 
01463 // End of  ClusterHandlerBase.cc

Generated by  doxygen 1.7.1