• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

ClusterProcessor.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 
00025 /****************************************************************************
00026 
00027   ClusterProcessor.cc
00028 ****************************************************************************/
00029 
00030 #include "P_Cluster.h"
00031 /*************************************************************************/
00032 // ClusterProcessor member functions (Public class)
00033 /*************************************************************************/
00034 int cluster_port_number = DEFAULT_CLUSTER_PORT_NUMBER;
00035 int cache_clustering_enabled = 0;
00036 int num_of_cluster_threads = DEFAULT_NUMBER_OF_CLUSTER_THREADS;
00037 
00038 ClusterProcessor clusterProcessor;
00039 RecRawStatBlock *cluster_rsb = NULL;
00040 int ET_CLUSTER;
00041 
00042 ClusterProcessor::ClusterProcessor():accept_handler(NULL), this_cluster(NULL)
00043 {
00044 }
00045 
00046 ClusterProcessor::~ClusterProcessor()
00047 {
00048   if (accept_handler) {
00049     accept_handler->ShutdownDelete();
00050     accept_handler = 0;
00051   }
00052 }
00053 
00054 int
00055 ClusterProcessor::internal_invoke_remote(ClusterHandler *ch, int cluster_fn,
00056                                          void *data, int len, int options, void *cmsg)
00057 {
00058   EThread *thread = this_ethread();
00059   ProxyMutex *mutex = thread->mutex;
00060   //
00061   // RPC facility for intercluster communication available to other
00062   //  subsystems.
00063   //
00064   bool steal = (options & CLUSTER_OPT_STEAL ? true : false);
00065   bool delay = (options & CLUSTER_OPT_DELAY ? true : false);
00066   bool data_in_ocntl = (options & CLUSTER_OPT_DATA_IS_OCONTROL ? true : false);
00067   bool malloced = (cluster_fn == CLUSTER_FUNCTION_MALLOCED);
00068   OutgoingControl *c;
00069 
00070   if (!ch || (!malloced && !((unsigned int) cluster_fn < (uint32_t) SIZE_clusterFunction))) {
00071     // Invalid message or node is down, free message data
00072     if (cmsg) {
00073       invoke_remote_data_args *args = (invoke_remote_data_args *)
00074         (((OutgoingControl *) cmsg)->data + sizeof(int32_t));
00075       ink_assert(args->magicno == invoke_remote_data_args::MagicNo);
00076 
00077       args->data_oc->freeall();
00078       ((OutgoingControl *) cmsg)->freeall();
00079     }
00080     if (data_in_ocntl) {
00081       c = *((OutgoingControl **) ((char *) data - sizeof(OutgoingControl *)));
00082       c->freeall();
00083     }
00084     if (malloced) {
00085       ats_free(data);
00086     }
00087     return -1;
00088   }
00089 
00090   if (data_in_ocntl) {
00091     c = *((OutgoingControl **) ((char *) data - sizeof(OutgoingControl *)));
00092   } else {
00093     c = OutgoingControl::alloc();
00094   }
00095   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CTRL_MSGS_SENT_STAT);
00096   c->submit_time = ink_get_hrtime();
00097 
00098   if (malloced) {
00099     c->set_data((char *) data, len);
00100   } else {
00101     if (!data_in_ocntl) {
00102       c->len = len + sizeof(int32_t);
00103       c->alloc_data();
00104     }
00105     if (!c->fast_data()) {
00106       CLUSTER_INCREMENT_DYN_STAT(CLUSTER_SLOW_CTRL_MSGS_SENT_STAT);
00107     }
00108     *(int32_t *) c->data = cluster_fn;
00109     if (!data_in_ocntl) {
00110       memcpy(c->data + sizeof(int32_t), data, len);
00111     }
00112   }
00113 
00114   SET_CONTINUATION_HANDLER(c, (OutgoingCtrlHandler) & OutgoingControl::startEvent);
00115 
00116   /////////////////////////////////////
00117   // Compound message adjustments
00118   /////////////////////////////////////
00119   if (cmsg) {
00120     invoke_remote_data_args *args = (invoke_remote_data_args *)
00121       (((OutgoingControl *) cmsg)->data + sizeof(int32_t));
00122     ink_assert(args->magicno == invoke_remote_data_args::MagicNo);
00123     args->msg_oc = c;
00124     c = (OutgoingControl *) cmsg;
00125   }
00126 #ifndef CLUSTER_THREAD_STEALING
00127   delay = true;
00128 #endif
00129   if (!delay) {
00130     EThread *tt = this_ethread();
00131     {
00132       int q = ClusterFuncToQpri(cluster_fn);
00133       ink_atomiclist_push(&ch->outgoing_control_al[q], (void *) c);
00134 
00135       MUTEX_TRY_LOCK(lock, ch->mutex, tt);
00136       if (!lock) {
00137                 if(ch->thread && ch->thread->signal_hook)
00138                   ch->thread->signal_hook(ch->thread);
00139                 return 1;
00140       }
00141       if (steal)
00142         ch->steal_thread(tt);
00143       return 1;
00144     }
00145   } else {
00146     c->mutex = ch->mutex;
00147     eventProcessor.schedule_imm_signal(c);
00148     return 0;
00149   }
00150 }
00151 
00152 int
00153 ClusterProcessor::invoke_remote(ClusterHandler *ch, int cluster_fn, void *data, int len, int options)
00154 {
00155   return internal_invoke_remote(ch, cluster_fn, data, len, options, (void *) NULL);
00156 }
00157 
00158 int
00159 ClusterProcessor::invoke_remote_data(ClusterHandler *ch, int cluster_fn,
00160                                      void *data, int data_len,
00161                                      IOBufferBlock * buf,
00162                                      int dest_channel, ClusterVCToken * token,
00163                                      void (*bufdata_free_proc) (void *), void *bufdata_free_proc_arg, int options)
00164 {
00165   if (!buf) {
00166     // No buffer data, translate this into a invoke_remote() request
00167     return internal_invoke_remote(ch, cluster_fn, data, data_len, options, (void *) NULL);
00168   }
00169   ink_assert(data);
00170   ink_assert(data_len);
00171   ink_assert(dest_channel);
00172   ink_assert(token);
00173   ink_assert(bufdata_free_proc);
00174   ink_assert(bufdata_free_proc_arg);
00175 
00176   /////////////////////////////////////////////////////////////////////////
00177   // Build the compound message as described by invoke_remote_data_args.
00178   /////////////////////////////////////////////////////////////////////////
00179 
00180   // Build OutgoingControl for buffer data
00181   OutgoingControl *bufdata_oc = OutgoingControl::alloc();
00182   bufdata_oc->set_data(buf, bufdata_free_proc, bufdata_free_proc_arg);
00183 
00184   // Build OutgoingControl for compound message header
00185   invoke_remote_data_args mh;
00186   mh.msg_oc = 0;
00187   mh.data_oc = bufdata_oc;
00188   mh.dest_channel = dest_channel;
00189   mh.token = *token;
00190 
00191   OutgoingControl *chdr = OutgoingControl::alloc();
00192   chdr->submit_time = ink_get_hrtime();
00193   chdr->len = sizeof(int32_t) + sizeof(mh);
00194   chdr->alloc_data();
00195   *(int32_t *) chdr->data = -1;   // always -1 for compound message
00196   memcpy(chdr->data + sizeof(int32_t), (char *) &mh, sizeof(mh));
00197 
00198   return internal_invoke_remote(ch, cluster_fn, data, data_len, options, (void *) chdr);
00199 }
00200 
00201 // TODO: Why pass in the length here if not used ?
00202 void
00203 ClusterProcessor::free_remote_data(char *p, int /* l ATS_UNUSED */)
00204 {
00205   char *d = p - sizeof(int32_t);  // reset to ptr to function code
00206   int data_hdr = ClusterControl::DATA_HDR;
00207 
00208   ink_release_assert(*((uint8_t *) (d - data_hdr + 1)) == (uint8_t) ALLOC_DATA_MAGIC);
00209   unsigned char size_index = *(d - data_hdr);
00210   if (!(size_index & 0x80)) {
00211     ink_release_assert(size_index <= (DEFAULT_BUFFER_SIZES - 1));
00212   } else {
00213     ink_release_assert(size_index == 0xff);
00214   }
00215 
00216   // Extract 'this' pointer
00217 
00218   ClusterControl *ccl;
00219   memcpy((char *) &ccl, (d - data_hdr + 2), sizeof(void *));
00220   ink_assert(ccl->valid_alloc_data());
00221 
00222   // Deallocate control structure and data
00223 
00224   ccl->freeall();
00225 }
00226 
00227 ClusterVConnection *
00228 ClusterProcessor::open_local(Continuation * cont, ClusterMachine */* m ATS_UNUSED */, ClusterVCToken & token, int options)
00229 {
00230   //
00231   //  New connect protocol.
00232   //  As a VC initiator, establish the VC connection to the remote node
00233   //  by allocating the VC locally and requiring the caller to pass the
00234   //  token and channel id in the remote request.  The remote handler calls
00235   //  connect_local to establish the remote side of the connection.
00236   //
00237   bool immediate = ((options & CLUSTER_OPT_IMMEDIATE) ? true : false);
00238   bool allow_immediate = ((options & CLUSTER_OPT_ALLOW_IMMEDIATE) ? true : false);
00239 
00240   ClusterHandler *ch = ((CacheContinuation *)cont)->ch;
00241   if (!ch)
00242     return NULL;
00243   EThread *t = ch->thread;
00244   if (!t)
00245     return NULL;
00246 
00247   EThread *thread = this_ethread();
00248   ProxyMutex *mutex = thread->mutex;
00249   ClusterVConnection *vc = clusterVCAllocator.alloc();
00250   vc->new_connect_read = (options & CLUSTER_OPT_CONN_READ ? 1 : 0);
00251   vc->start_time = ink_get_hrtime();
00252   vc->last_activity_time = vc->start_time;
00253   vc->ch = ch;
00254   vc->token.alloc();
00255   vc->token.ch_id = ch->id;
00256   token = vc->token;
00257 #ifdef CLUSTER_THREAD_STEALING
00258   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPENNED_STAT);
00259   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPEN_STAT);
00260   MUTEX_TRY_LOCK(lock, ch->mutex, thread);
00261   if (!lock) {
00262 #endif
00263     if (immediate) {
00264       clusterVCAllocator_free(vc);
00265       return NULL;
00266     }
00267     vc->action_ = cont;
00268     ink_atomiclist_push(&ch->external_incoming_open_local, (void *) vc);
00269         if(ch->thread && ch->thread->signal_hook)
00270           ch->thread->signal_hook(ch->thread);
00271     return CLUSTER_DELAYED_OPEN;
00272 
00273 #ifdef CLUSTER_THREAD_STEALING
00274   } else {
00275     if (!(immediate || allow_immediate))
00276       vc->action_ = cont;
00277     if (vc->start(thread) < 0) {
00278       return NULL;
00279     }
00280     if (immediate || allow_immediate) {
00281       return vc;
00282     } else {
00283       return CLUSTER_DELAYED_OPEN;
00284     }
00285   }
00286 #endif
00287 }
00288 
00289 ClusterVConnection *
00290 ClusterProcessor::connect_local(Continuation * cont, ClusterVCToken * token, int channel, int options)
00291 {
00292   //
00293   // Establish VC connection initiated by remote node on the local node
00294   // using the given token and channel id.
00295   //
00296   bool immediate = ((options & CLUSTER_OPT_IMMEDIATE) ? true : false);
00297   bool allow_immediate = ((options & CLUSTER_OPT_ALLOW_IMMEDIATE) ? true : false);
00298 
00299 #ifdef LOCAL_CLUSTER_TEST_MODE
00300   int ip = inet_addr("127.0.0.1");
00301   ClusterMachine *m;
00302   m = this_cluster->current_configuration()->find(ip, token->ip_created);
00303 #else
00304   ClusterMachine *m = this_cluster->current_configuration()->find(token->ip_created);
00305 #endif
00306   if (!m)
00307     return NULL;
00308   if (token->ch_id >= (uint32_t)m->num_connections)
00309     return NULL;
00310   ClusterHandler *ch = m->clusterHandlers[token->ch_id];
00311   if (!ch)
00312     return NULL;
00313   EThread *t = ch->thread;
00314   if (!t)
00315     return NULL;
00316 
00317   EThread *thread = this_ethread();
00318   ProxyMutex *mutex = thread->mutex;
00319   ClusterVConnection *vc = clusterVCAllocator.alloc();
00320   vc->new_connect_read = (options & CLUSTER_OPT_CONN_READ ? 1 : 0);
00321   vc->start_time = ink_get_hrtime();
00322   vc->last_activity_time = vc->start_time;
00323   vc->ch = ch;
00324   vc->token = *token;
00325   vc->channel = channel;
00326 #ifdef CLUSTER_THREAD_STEALING
00327   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPENNED_STAT);
00328   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPEN_STAT);
00329   MUTEX_TRY_LOCK(lock, ch->mutex, thread);
00330   if (!lock) {
00331 #endif
00332     if (immediate) {
00333       clusterVCAllocator_free(vc);
00334       return NULL;
00335     }
00336     vc->mutex = ch->mutex;
00337     vc->action_ = cont;
00338     ch->thread->schedule_imm_signal(vc);
00339     return CLUSTER_DELAYED_OPEN;
00340 #ifdef CLUSTER_THREAD_STEALING
00341   } else {
00342     if (!(immediate || allow_immediate))
00343       vc->action_ = cont;
00344     if (vc->start(thread) < 0) {
00345       return NULL;
00346     }
00347     if (immediate || allow_immediate) {
00348       return vc;
00349     } else {
00350       return CLUSTER_DELAYED_OPEN;
00351     }
00352   }
00353 #endif
00354 }
00355 
00356 bool ClusterProcessor::disable_remote_cluster_ops(ClusterMachine * m)
00357 {
00358   ClusterHandler *ch = m->pop_ClusterHandler(1);
00359   if (ch) {
00360     return ch->disable_remote_cluster_ops;
00361   } else {
00362     return true;
00363   }
00364 }
00365 
00366 ////////////////////////////////////////////////////////////////////////////
00367 // Simplify debug access to stats
00368 ////////////////////////////////////////////////////////////////////////////
00369 ////////////////////////////////////////////////////////////////////////////
00370 
00371 GlobalClusterPeriodicEvent *
00372   PeriodicClusterEvent;
00373 
00374 #ifdef CLUSTER_TOMCAT
00375 extern int cache_clustering_enabled;
00376 
00377 int CacheClusterMonitorEnabled = 0;
00378 int CacheClusterMonitorIntervalSecs = 1;
00379 
00380 int cluster_send_buffer_size = 0;
00381 int cluster_receive_buffer_size = 0;
00382 unsigned long cluster_sockopt_flags = 0;
00383 unsigned long cluster_packet_mark = 0;
00384 unsigned long cluster_packet_tos = 0;
00385 
00386 int RPC_only_CacheCluster = 0;
00387 #endif
00388 
00389 int
00390 ClusterProcessor::init()
00391 {
00392   cluster_rsb = RecAllocateRawStatBlock((int) cluster_stat_count);
00393   //
00394   // Statistics callbacks
00395   //
00396   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00397                      "proxy.process.cluster.connections_open",
00398                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CONNECTIONS_OPEN_STAT, RecRawStatSyncSum);
00399   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CONNECTIONS_OPEN_STAT);
00400   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00401                      "proxy.process.cluster.connections_opened",
00402                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CONNECTIONS_OPENNED_STAT, RecRawStatSyncSum);
00403   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CONNECTIONS_OPENNED_STAT);
00404   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00405                      "proxy.process.cluster.connections_closed",
00406                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CON_TOTAL_TIME_STAT, RecRawStatSyncCount);
00407   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CON_TOTAL_TIME_STAT);
00408   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00409                      "proxy.process.cluster.slow_ctrl_msgs_sent",
00410                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_SLOW_CTRL_MSGS_SENT_STAT, RecRawStatSyncCount);
00411   CLUSTER_CLEAR_DYN_STAT(CLUSTER_SLOW_CTRL_MSGS_SENT_STAT);
00412   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00413                      "proxy.process.cluster.connections_read_locked",
00414                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CONNECTIONS_READ_LOCKED_STAT, RecRawStatSyncSum);
00415   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CONNECTIONS_READ_LOCKED_STAT);
00416   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00417                      "proxy.process.cluster.connections_write_locked",
00418                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CONNECTIONS_WRITE_LOCKED_STAT, RecRawStatSyncSum);
00419   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CONNECTIONS_WRITE_LOCKED_STAT);
00420   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00421                      "proxy.process.cluster.reads",
00422                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_READ_BYTES_STAT, RecRawStatSyncCount);
00423   CLUSTER_CLEAR_DYN_STAT(CLUSTER_READ_BYTES_STAT);
00424   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00425                      "proxy.process.cluster.read_bytes",
00426                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_READ_BYTES_STAT, RecRawStatSyncSum);
00427   CLUSTER_CLEAR_DYN_STAT(CLUSTER_READ_BYTES_STAT);
00428   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00429                      "proxy.process.cluster.writes",
00430                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_WRITE_BYTES_STAT, RecRawStatSyncCount);
00431   CLUSTER_CLEAR_DYN_STAT(CLUSTER_WRITE_BYTES_STAT);
00432   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00433                      "proxy.process.cluster.write_bytes",
00434                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_WRITE_BYTES_STAT, RecRawStatSyncSum);
00435   CLUSTER_CLEAR_DYN_STAT(CLUSTER_WRITE_BYTES_STAT);
00436   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00437                      "proxy.process.cluster.control_messages_sent",
00438                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CTRL_MSGS_SEND_TIME_STAT, RecRawStatSyncCount);
00439   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CTRL_MSGS_SEND_TIME_STAT);
00440   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00441                      "proxy.process.cluster.control_messages_received",
00442                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CTRL_MSGS_RECV_TIME_STAT, RecRawStatSyncCount);
00443   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CTRL_MSGS_RECV_TIME_STAT);
00444   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00445                      "proxy.process.cluster.op_delayed_for_lock",
00446                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_OP_DELAYED_FOR_LOCK_STAT, RecRawStatSyncSum);
00447   CLUSTER_CLEAR_DYN_STAT(CLUSTER_OP_DELAYED_FOR_LOCK_STAT);
00448   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00449                      "proxy.process.cluster.connections_bumped",
00450                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CONNECTIONS_BUMPED_STAT, RecRawStatSyncSum);
00451   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CONNECTIONS_BUMPED_STAT);
00452   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00453                      "proxy.process.cluster.net_backup",
00454                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_NET_BACKUP_STAT, RecRawStatSyncCount);
00455   CLUSTER_CLEAR_DYN_STAT(CLUSTER_NET_BACKUP_STAT);
00456   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00457                      "proxy.process.cluster.nodes",
00458                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_NODES_STAT, RecRawStatSyncSum);
00459   CLUSTER_CLEAR_DYN_STAT(CLUSTER_NODES_STAT);
00460   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00461                      "proxy.process.cluster.machines_allocated",
00462                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_MACHINES_ALLOCATED_STAT, RecRawStatSyncSum);
00463   CLUSTER_CLEAR_DYN_STAT(CLUSTER_MACHINES_ALLOCATED_STAT);
00464   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00465                      "proxy.process.cluster.machines_freed",
00466                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_MACHINES_FREED_STAT, RecRawStatSyncSum);
00467   CLUSTER_CLEAR_DYN_STAT(CLUSTER_MACHINES_FREED_STAT);
00468   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00469                      "proxy.process.cluster.configuration_changes",
00470                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CONFIGURATION_CHANGES_STAT, RecRawStatSyncCount);
00471   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT);
00472   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00473                      "proxy.process.cluster.delayed_reads",
00474                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_DELAYED_READS_STAT, RecRawStatSyncSum);
00475   CLUSTER_CLEAR_DYN_STAT(CLUSTER_DELAYED_READS_STAT);
00476   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00477                      "proxy.process.cluster.byte_bank_used",
00478                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_BYTE_BANK_USED_STAT, RecRawStatSyncSum);
00479   CLUSTER_CLEAR_DYN_STAT(CLUSTER_BYTE_BANK_USED_STAT);
00480   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00481                      "proxy.process.cluster.alloc_data_news",
00482                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_ALLOC_DATA_NEWS_STAT, RecRawStatSyncSum);
00483   CLUSTER_CLEAR_DYN_STAT(CLUSTER_ALLOC_DATA_NEWS_STAT);
00484   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00485                      "proxy.process.cluster.write_bb_mallocs",
00486                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_WRITE_BB_MALLOCS_STAT, RecRawStatSyncSum);
00487   CLUSTER_CLEAR_DYN_STAT(CLUSTER_WRITE_BB_MALLOCS_STAT);
00488   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00489                      "proxy.process.cluster.partial_reads",
00490                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_PARTIAL_READS_STAT, RecRawStatSyncSum);
00491   CLUSTER_CLEAR_DYN_STAT(CLUSTER_PARTIAL_READS_STAT);
00492   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00493                      "proxy.process.cluster.partial_writes",
00494                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_PARTIAL_WRITES_STAT, RecRawStatSyncSum);
00495   CLUSTER_CLEAR_DYN_STAT(CLUSTER_PARTIAL_WRITES_STAT);
00496   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00497                      "proxy.process.cluster.cache_outstanding",
00498                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CACHE_OUTSTANDING_STAT, RecRawStatSyncSum);
00499   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
00500   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00501                      "proxy.process.cluster.remote_op_timeouts",
00502                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_REMOTE_OP_TIMEOUTS_STAT, RecRawStatSyncSum);
00503   CLUSTER_CLEAR_DYN_STAT(CLUSTER_REMOTE_OP_TIMEOUTS_STAT);
00504   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00505                      "proxy.process.cluster.remote_op_reply_timeouts",
00506                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_REMOTE_OP_REPLY_TIMEOUTS_STAT, RecRawStatSyncSum);
00507   CLUSTER_CLEAR_DYN_STAT(CLUSTER_REMOTE_OP_REPLY_TIMEOUTS_STAT);
00508   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00509                      "proxy.process.cluster.chan_inuse",
00510                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CHAN_INUSE_STAT, RecRawStatSyncSum);
00511   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
00512   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00513                      "proxy.process.cluster.open_delays",
00514                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_OPEN_DELAY_TIME_STAT, RecRawStatSyncSum);
00515   CLUSTER_CLEAR_DYN_STAT(CLUSTER_OPEN_DELAY_TIME_STAT);
00516   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00517                      "proxy.process.cluster.connections_avg_time",
00518                      RECD_FLOAT, RECP_NON_PERSISTENT, (int) CLUSTER_CON_TOTAL_TIME_STAT, RecRawStatSyncHrTimeAvg);
00519   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CON_TOTAL_TIME_STAT);
00520   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00521                      "proxy.process.cluster.control_messages_avg_send_time",
00522                      RECD_FLOAT, RECP_NON_PERSISTENT, (int) CLUSTER_CTRL_MSGS_SEND_TIME_STAT, RecRawStatSyncHrTimeAvg);
00523   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CTRL_MSGS_SEND_TIME_STAT);
00524   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00525                      "proxy.process.cluster.control_messages_avg_receive_time",
00526                      RECD_FLOAT, RECP_NON_PERSISTENT, (int) CLUSTER_CTRL_MSGS_RECV_TIME_STAT, RecRawStatSyncHrTimeAvg);
00527   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CTRL_MSGS_RECV_TIME_STAT);
00528   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00529                      "proxy.process.cluster.open_delay_time",
00530                      RECD_FLOAT, RECP_NON_PERSISTENT, (int) CLUSTER_OPEN_DELAY_TIME_STAT, RecRawStatSyncHrTimeAvg);
00531   CLUSTER_CLEAR_DYN_STAT(CLUSTER_OPEN_DELAY_TIME_STAT);
00532   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00533                      "proxy.process.cluster.cache_callback_time",
00534                      RECD_FLOAT, RECP_NON_PERSISTENT, (int) CLUSTER_CACHE_CALLBACK_TIME_STAT, RecRawStatSyncHrTimeAvg);
00535   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CACHE_CALLBACK_TIME_STAT);
00536   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00537                      "proxy.process.cluster.rmt_cache_callback_time",
00538                      RECD_FLOAT, RECP_NON_PERSISTENT,
00539                      (int) CLUSTER_CACHE_RMT_CALLBACK_TIME_STAT, RecRawStatSyncHrTimeAvg);
00540   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CACHE_RMT_CALLBACK_TIME_STAT);
00541   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00542                      "proxy.process.cluster.lkrmt_cache_callback_time",
00543                      RECD_FLOAT, RECP_NON_PERSISTENT,
00544                      (int) CLUSTER_CACHE_LKRMT_CALLBACK_TIME_STAT, RecRawStatSyncHrTimeAvg);
00545   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CACHE_LKRMT_CALLBACK_TIME_STAT);
00546   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00547                      "proxy.process.cluster.local_connection_time",
00548                      RECD_FLOAT, RECP_NON_PERSISTENT,
00549                      (int) CLUSTER_LOCAL_CONNECTION_TIME_STAT, RecRawStatSyncHrTimeAvg);
00550   CLUSTER_CLEAR_DYN_STAT(CLUSTER_LOCAL_CONNECTION_TIME_STAT);
00551   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00552                      "proxy.process.cluster.remote_connection_time",
00553                      RECD_FLOAT, RECP_NON_PERSISTENT,
00554                      (int) CLUSTER_REMOTE_CONNECTION_TIME_STAT, RecRawStatSyncHrTimeAvg);
00555   CLUSTER_CLEAR_DYN_STAT(CLUSTER_REMOTE_CONNECTION_TIME_STAT);
00556   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00557                      "proxy.process.cluster.rdmsg_assemble_time",
00558                      RECD_FLOAT, RECP_NON_PERSISTENT, (int) CLUSTER_RDMSG_ASSEMBLE_TIME_STAT, RecRawStatSyncHrTimeAvg);
00559   CLUSTER_CLEAR_DYN_STAT(CLUSTER_RDMSG_ASSEMBLE_TIME_STAT);
00560   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00561                      "proxy.process.cluster.cluster_ping_time",
00562                      RECD_FLOAT, RECP_NON_PERSISTENT, (int) CLUSTER_PING_TIME_STAT, RecRawStatSyncHrTimeAvg);
00563   CLUSTER_CLEAR_DYN_STAT(CLUSTER_PING_TIME_STAT);
00564   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00565                      "proxy.process.cluster.cache_callbacks",
00566                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CACHE_CALLBACK_TIME_STAT, RecRawStatSyncCount);
00567   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CACHE_CALLBACK_TIME_STAT);
00568   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00569                      "proxy.process.cluster.rmt_cache_callbacks",
00570                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CACHE_RMT_CALLBACK_TIME_STAT, RecRawStatSyncCount);
00571   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CACHE_RMT_CALLBACK_TIME_STAT);
00572   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00573                      "proxy.process.cluster.lkrmt_cache_callbacks",
00574                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CACHE_LKRMT_CALLBACK_TIME_STAT, RecRawStatSyncCount);
00575   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CACHE_LKRMT_CALLBACK_TIME_STAT);
00576   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00577                      "proxy.process.cluster.local_connections_closed",
00578                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_LOCAL_CONNECTION_TIME_STAT, RecRawStatSyncCount);
00579   CLUSTER_CLEAR_DYN_STAT(CLUSTER_LOCAL_CONNECTION_TIME_STAT);
00580   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00581                      "proxy.process.cluster.remote_connections_closed",
00582                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_REMOTE_CONNECTION_TIME_STAT, RecRawStatSyncCount);
00583   CLUSTER_CLEAR_DYN_STAT(CLUSTER_REMOTE_CONNECTION_TIME_STAT);
00584   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00585                      "proxy.process.cluster.setdata_no_clustervc",
00586                      RECD_INT, RECP_NON_PERSISTENT, (int) cluster_setdata_no_CLUSTERVC_STAT, RecRawStatSyncCount);
00587   CLUSTER_CLEAR_DYN_STAT(cluster_setdata_no_CLUSTERVC_STAT);
00588   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00589                      "proxy.process.cluster.setdata_no_tunnel",
00590                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_SETDATA_NO_TUNNEL_STAT, RecRawStatSyncCount);
00591   CLUSTER_CLEAR_DYN_STAT(CLUSTER_SETDATA_NO_TUNNEL_STAT);
00592   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00593                      "proxy.process.cluster.setdata_no_cachevc",
00594                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_SETDATA_NO_CACHEVC_STAT, RecRawStatSyncCount);
00595   CLUSTER_CLEAR_DYN_STAT(CLUSTER_SETDATA_NO_CACHEVC_STAT);
00596   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00597                      "proxy.process.cluster.setdata_no_cluster",
00598                      RECD_INT, RECP_NON_PERSISTENT, (int) cluster_setdata_no_CLUSTER_STAT, RecRawStatSyncCount);
00599   CLUSTER_CLEAR_DYN_STAT(cluster_setdata_no_CLUSTER_STAT);
00600   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00601                      "proxy.process.cluster.vc_write_stall",
00602                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_WRITE_STALL_STAT, RecRawStatSyncCount);
00603   CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_WRITE_STALL_STAT);
00604   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00605                      "proxy.process.cluster.no_remote_space",
00606                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_NO_REMOTE_SPACE_STAT, RecRawStatSyncCount);
00607   CLUSTER_CLEAR_DYN_STAT(CLUSTER_NO_REMOTE_SPACE_STAT);
00608   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00609                      "proxy.process.cluster.level1_bank",
00610                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_LEVEL1_BANK_STAT, RecRawStatSyncCount);
00611   CLUSTER_CLEAR_DYN_STAT(CLUSTER_LEVEL1_BANK_STAT);
00612   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00613                      "proxy.process.cluster.multilevel_bank",
00614                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_MULTILEVEL_BANK_STAT, RecRawStatSyncCount);
00615   CLUSTER_CLEAR_DYN_STAT(CLUSTER_MULTILEVEL_BANK_STAT);
00616   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00617                      "proxy.process.cluster.vc_cache_insert_lock_misses",
00618                      RECD_INT, RECP_NON_PERSISTENT,
00619                      (int) CLUSTER_VC_CACHE_INSERT_LOCK_MISSES_STAT, RecRawStatSyncCount);
00620   CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_INSERT_LOCK_MISSES_STAT);
00621   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00622                      "proxy.process.cluster.vc_cache_inserts",
00623                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_CACHE_INSERTS_STAT, RecRawStatSyncCount);
00624   CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_INSERTS_STAT);
00625   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00626                      "proxy.process.cluster.vc_cache_lookup_lock_misses",
00627                      RECD_INT, RECP_NON_PERSISTENT,
00628                      (int) CLUSTER_VC_CACHE_LOOKUP_LOCK_MISSES_STAT, RecRawStatSyncCount);
00629   CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_LOCK_MISSES_STAT);
00630   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00631                      "proxy.process.cluster.vc_cache_lookup_hits",
00632                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_CACHE_LOOKUP_HITS_STAT, RecRawStatSyncCount);
00633   CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_HITS_STAT);
00634   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00635                      "proxy.process.cluster.vc_cache_lookup_misses",
00636                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_CACHE_LOOKUP_MISSES_STAT, RecRawStatSyncCount);
00637   CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_MISSES_STAT);
00638   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00639                      "proxy.process.cluster.vc_cache_scans",
00640                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_CACHE_SCANS_STAT, RecRawStatSyncCount);
00641   CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_SCANS_STAT);
00642   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00643                      "proxy.process.cluster.vc_cache_scan_lock_misses",
00644                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_CACHE_SCAN_LOCK_MISSES_STAT, RecRawStatSyncCount);
00645   CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_SCAN_LOCK_MISSES_STAT);
00646   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00647                      "proxy.process.cluster.vc_cache_purges",
00648                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_CACHE_PURGES_STAT, RecRawStatSyncCount);
00649   CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_PURGES_STAT);
00650   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00651                      "proxy.process.cluster.write_lock_misses",
00652                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_WRITE_LOCK_MISSES_STAT, RecRawStatSyncCount);
00653   CLUSTER_CLEAR_DYN_STAT(CLUSTER_WRITE_LOCK_MISSES_STAT);
00654   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00655                      "proxy.process.cluster.vc_read_list_len",
00656                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_READ_LIST_LEN_STAT, RecRawStatSyncAvg);
00657   CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_READ_LIST_LEN_STAT);
00658   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00659                      "proxy.process.cluster.vc_write_list_len",
00660                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_WRITE_LIST_LEN_STAT, RecRawStatSyncAvg);
00661   CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_WRITE_LIST_LEN_STAT);
00662   CLUSTER_CLEAR_DYN_STAT(CLUSTER_NODES_STAT);   // clear sum and count
00663   // INKqa08033: win2k: ui: cluster warning light on
00664   // Used to call CLUSTER_INCREMENT_DYN_STAT here; switch to SUM_GLOBAL_DYN_STAT
00665   CLUSTER_SUM_GLOBAL_DYN_STAT(CLUSTER_NODES_STAT, 1);   // one node in cluster, ME
00666 
00667   REC_ReadConfigInteger(ClusterLoadMonitor::cf_monitor_enabled, "proxy.config.cluster.load_monitor_enabled");
00668   REC_ReadConfigInteger(ClusterLoadMonitor::cf_ping_message_send_msec_interval, "proxy.config.cluster.ping_send_interval_msecs");
00669   REC_ReadConfigInteger(ClusterLoadMonitor::cf_num_ping_response_buckets, "proxy.config.cluster.ping_response_buckets");
00670   REC_ReadConfigInteger(ClusterLoadMonitor::cf_msecs_per_ping_response_bucket, "proxy.config.cluster.msecs_per_ping_response_bucket");
00671   REC_ReadConfigInteger(ClusterLoadMonitor::cf_ping_latency_threshold_msecs, "proxy.config.cluster.ping_latency_threshold_msecs");
00672   REC_ReadConfigInteger(ClusterLoadMonitor::cf_cluster_load_compute_msec_interval, "proxy.config.cluster.load_compute_interval_msecs");
00673   REC_ReadConfigInteger(ClusterLoadMonitor::cf_cluster_periodic_msec_interval, "proxy.config.cluster.periodic_timer_interval_msecs");
00674   REC_ReadConfigInteger(ClusterLoadMonitor::cf_ping_history_buf_length, "proxy.config.cluster.ping_history_buf_length");
00675   REC_ReadConfigInteger(ClusterLoadMonitor::cf_cluster_load_clear_duration, "proxy.config.cluster.cluster_load_clear_duration");
00676   REC_ReadConfigInteger(ClusterLoadMonitor::cf_cluster_load_exceed_duration, "proxy.config.cluster.cluster_load_exceed_duration");
00677 
00678   //
00679   // Configuration callbacks
00680   //
00681   if (cluster_port_number != DEFAULT_CLUSTER_PORT_NUMBER)
00682     cluster_port = cluster_port_number;
00683   else {
00684     REC_ReadConfigInteger(cluster_port, "proxy.config.cluster.cluster_port");
00685   }
00686   if (num_of_cluster_threads == DEFAULT_NUMBER_OF_CLUSTER_THREADS)
00687     REC_ReadConfigInteger(num_of_cluster_threads, "proxy.config.cluster.threads");
00688 
00689   REC_EstablishStaticConfigInt32(CacheClusterMonitorEnabled, "proxy.config.cluster.enable_monitor");
00690   REC_EstablishStaticConfigInt32(CacheClusterMonitorIntervalSecs, "proxy.config.cluster.monitor_interval_secs");
00691   REC_ReadConfigInteger(cluster_receive_buffer_size, "proxy.config.cluster.receive_buffer_size");
00692   REC_ReadConfigInteger(cluster_send_buffer_size, "proxy.config.cluster.send_buffer_size");
00693   REC_ReadConfigInteger(cluster_sockopt_flags, "proxy.config.cluster.sock_option_flag");
00694   REC_ReadConfigInteger(cluster_packet_mark, "proxy.config.cluster.sock_packet_mark");
00695   REC_ReadConfigInteger(cluster_packet_tos, "proxy.config.cluster.sock_packet_tos");
00696   REC_EstablishStaticConfigInt32(RPC_only_CacheCluster, "proxy.config.cluster.rpc_cache_cluster");
00697 
00698   int cluster_type = 0;
00699   REC_ReadConfigInteger(cluster_type, "proxy.local.cluster.type");
00700 
00701   create_this_cluster_machine();
00702   // Cluster API Initializations
00703   clusterAPI_init();
00704   // Start global Cluster periodic event
00705   PeriodicClusterEvent = new GlobalClusterPeriodicEvent;
00706   PeriodicClusterEvent->init();
00707 
00708   this_cluster = new Cluster;
00709   ClusterConfiguration *cc = new ClusterConfiguration;
00710   this_cluster->configurations.push(cc);
00711   cc->n_machines = 1;
00712   cc->machines[0] = this_cluster_machine();
00713   memset(cc->hash_table, 0, CLUSTER_HASH_TABLE_SIZE);
00714   // 0 dummy output data
00715 
00716   memset(channel_dummy_output, 0, sizeof(channel_dummy_output));
00717 
00718   if (cluster_type == 1) {
00719     cache_clustering_enabled = 1;
00720     Note("cache clustering enabled");
00721     compute_cluster_mode();
00722   } else {
00723     cache_clustering_enabled = 0;
00724     Note("cache clustering disabled");
00725   }
00726   return 0;
00727 }
00728 
00729 // function added to adhere to the name calling convention of init functions
00730 int
00731 init_clusterprocessor(void)
00732 {
00733   return clusterProcessor.init();
00734 }
00735 
00736 int
00737 ClusterProcessor::start()
00738 {
00739 #ifdef LOCAL_CLUSTER_TEST_MODE
00740   this_cluster_machine()->cluster_port = cluster_port;
00741 #endif
00742   if (cache_clustering_enabled && (cacheProcessor.IsCacheEnabled() == CACHE_INITIALIZED)) {
00743     size_t stacksize;
00744 
00745     REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
00746     ET_CLUSTER = eventProcessor.spawn_event_threads(num_of_cluster_threads, "ET_CLUSTER", stacksize);
00747     for (int i = 0; i < eventProcessor.n_threads_for_type[ET_CLUSTER]; i++) {
00748       initialize_thread_for_net(eventProcessor.eventthread[ET_CLUSTER][i]);
00749     }
00750     REC_RegisterConfigUpdateFunc("proxy.config.cluster.cluster_configuration", machine_config_change, (void *) CLUSTER_CONFIG);
00751     do_machine_config_change((void *) CLUSTER_CONFIG, "proxy.config.cluster.cluster_configuration");
00752     // TODO: Remove this?
00753 #ifdef USE_SEPARATE_MACHINE_CONFIG
00754     REC_RegisterConfigUpdateFunc("proxy.config.cluster.machine_configuration", machine_config_change, (void *) MACHINE_CONFIG);
00755     do_machine_config_change((void *) MACHINE_CONFIG, "proxy.config.cluster.machine_configuration");
00756 #endif
00757 
00758     accept_handler = new ClusterAccept(&cluster_port, cluster_receive_buffer_size, cluster_send_buffer_size);
00759     accept_handler->Init();
00760   }
00761   return 0;
00762 }
00763 
00764 void
00765 ClusterProcessor::connect(char *hostname, int16_t id)
00766 {
00767   //
00768   // Construct a cluster link to the given machine
00769   //
00770   ClusterHandler *ch = new ClusterHandler;
00771   SET_CONTINUATION_HANDLER(ch, (ClusterContHandler) & ClusterHandler::connectClusterEvent);
00772   ch->hostname = ats_strdup(hostname);
00773   ch->connector = true;
00774   ch->id = id;
00775   eventProcessor.schedule_imm(ch, ET_CLUSTER);
00776 }
00777 
00778 void
00779 ClusterProcessor::connect(unsigned int ip, int port, int16_t id, bool delay)
00780 {
00781   //
00782   // Construct a cluster link to the given machine
00783   //
00784   ClusterHandler *ch = new ClusterHandler;
00785   SET_CONTINUATION_HANDLER(ch, (ClusterContHandler) & ClusterHandler::connectClusterEvent);
00786   ch->ip = ip;
00787   ch->port = port;
00788   ch->connector = true;
00789   ch->id = id;
00790   if (delay)
00791     eventProcessor.schedule_in(ch, CLUSTER_MEMBER_DELAY, ET_CLUSTER);
00792   else
00793     eventProcessor.schedule_imm(ch, ET_CLUSTER);
00794 }
00795 
00796 void
00797 ClusterProcessor::send_machine_list(ClusterMachine * m)
00798 {
00799   //
00800   // In testing mode, cluster nodes automagically connect to all
00801   // known hosts.  This function is called on connect to exchange those
00802   // lists.
00803   //
00804   MachineListMessage mlistmsg;
00805   int vers = MachineListMessage::protoToVersion(m->msg_proto_major);
00806   ClusterConfiguration *cc = this_cluster->current_configuration();
00807   void *data;
00808   int len;
00809 
00810   if (vers == MachineListMessage::MACHINE_LIST_MESSAGE_VERSION) {
00811     int n = 0;
00812     MachineListMessage *msg = &mlistmsg;
00813 
00814     while (n < cc->n_machines) {
00815       msg->ip[n] = cc->machines[n]->ip;
00816       n++;
00817     }
00818     msg->n_ip = n;
00819     data = (void *) msg;
00820     len = msg->sizeof_fixedlen_msg() + (n * sizeof(uint32_t));
00821 
00822   } else {
00823     //////////////////////////////////////////////////////////////
00824     // Create the specified down rev version of this message
00825     //////////////////////////////////////////////////////////////
00826     ink_release_assert(!"send_machine_list() bad msg version");
00827   }
00828   invoke_remote(m->pop_ClusterHandler(), MACHINE_LIST_CLUSTER_FUNCTION, data, len);
00829 }
00830 
00831 void
00832 ClusterProcessor::compute_cluster_mode()
00833 {
00834   if (RPC_only_CacheCluster) {
00835     if (cache_clustering_enabled > 0) {
00836       cache_clustering_enabled = -1;
00837       Note("RPC only cache clustering");
00838     }
00839   } else {
00840     if (cache_clustering_enabled < 0) {
00841       cache_clustering_enabled = 1;
00842       Note("RPC only cache clustering disabled");
00843     }
00844   }
00845 }
00846 
00847 // End of ClusterProcessor.cc

Generated by  doxygen 1.7.1