00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 #include "P_Cluster.h"
00031 
00032 
00033 
00034 int cluster_port_number = DEFAULT_CLUSTER_PORT_NUMBER;
00035 int cache_clustering_enabled = 0;
00036 int num_of_cluster_threads = DEFAULT_NUMBER_OF_CLUSTER_THREADS;
00037 
00038 ClusterProcessor clusterProcessor;
00039 RecRawStatBlock *cluster_rsb = NULL;
00040 int ET_CLUSTER;
00041 
00042 ClusterProcessor::ClusterProcessor():accept_handler(NULL), this_cluster(NULL)
00043 {
00044 }
00045 
00046 ClusterProcessor::~ClusterProcessor()
00047 {
00048   if (accept_handler) {
00049     accept_handler->ShutdownDelete();
00050     accept_handler = 0;
00051   }
00052 }
00053 
00054 int
00055 ClusterProcessor::internal_invoke_remote(ClusterHandler *ch, int cluster_fn,
00056                                          void *data, int len, int options, void *cmsg)
00057 {
00058   EThread *thread = this_ethread();
00059   ProxyMutex *mutex = thread->mutex;
00060   
00061   
00062   
00063   
00064   bool steal = (options & CLUSTER_OPT_STEAL ? true : false);
00065   bool delay = (options & CLUSTER_OPT_DELAY ? true : false);
00066   bool data_in_ocntl = (options & CLUSTER_OPT_DATA_IS_OCONTROL ? true : false);
00067   bool malloced = (cluster_fn == CLUSTER_FUNCTION_MALLOCED);
00068   OutgoingControl *c;
00069 
00070   if (!ch || (!malloced && !((unsigned int) cluster_fn < (uint32_t) SIZE_clusterFunction))) {
00071     
00072     if (cmsg) {
00073       invoke_remote_data_args *args = (invoke_remote_data_args *)
00074         (((OutgoingControl *) cmsg)->data + sizeof(int32_t));
00075       ink_assert(args->magicno == invoke_remote_data_args::MagicNo);
00076 
00077       args->data_oc->freeall();
00078       ((OutgoingControl *) cmsg)->freeall();
00079     }
00080     if (data_in_ocntl) {
00081       c = *((OutgoingControl **) ((char *) data - sizeof(OutgoingControl *)));
00082       c->freeall();
00083     }
00084     if (malloced) {
00085       ats_free(data);
00086     }
00087     return -1;
00088   }
00089 
00090   if (data_in_ocntl) {
00091     c = *((OutgoingControl **) ((char *) data - sizeof(OutgoingControl *)));
00092   } else {
00093     c = OutgoingControl::alloc();
00094   }
00095   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CTRL_MSGS_SENT_STAT);
00096   c->submit_time = ink_get_hrtime();
00097 
00098   if (malloced) {
00099     c->set_data((char *) data, len);
00100   } else {
00101     if (!data_in_ocntl) {
00102       c->len = len + sizeof(int32_t);
00103       c->alloc_data();
00104     }
00105     if (!c->fast_data()) {
00106       CLUSTER_INCREMENT_DYN_STAT(CLUSTER_SLOW_CTRL_MSGS_SENT_STAT);
00107     }
00108     *(int32_t *) c->data = cluster_fn;
00109     if (!data_in_ocntl) {
00110       memcpy(c->data + sizeof(int32_t), data, len);
00111     }
00112   }
00113 
00114   SET_CONTINUATION_HANDLER(c, (OutgoingCtrlHandler) & OutgoingControl::startEvent);
00115 
00116 
00117   
00118 
00119   if (cmsg) {
00120     invoke_remote_data_args *args = (invoke_remote_data_args *)
00121       (((OutgoingControl *) cmsg)->data + sizeof(int32_t));
00122     ink_assert(args->magicno == invoke_remote_data_args::MagicNo);
00123     args->msg_oc = c;
00124     c = (OutgoingControl *) cmsg;
00125   }
00126 #ifndef CLUSTER_THREAD_STEALING
00127   delay = true;
00128 #endif
00129   if (!delay) {
00130     EThread *tt = this_ethread();
00131     {
00132       int q = ClusterFuncToQpri(cluster_fn);
00133       ink_atomiclist_push(&ch->outgoing_control_al[q], (void *) c);
00134 
00135       MUTEX_TRY_LOCK(lock, ch->mutex, tt);
00136       if (!lock) {
00137                 if(ch->thread && ch->thread->signal_hook)
00138                   ch->thread->signal_hook(ch->thread);
00139                 return 1;
00140       }
00141       if (steal)
00142         ch->steal_thread(tt);
00143       return 1;
00144     }
00145   } else {
00146     c->mutex = ch->mutex;
00147     eventProcessor.schedule_imm_signal(c);
00148     return 0;
00149   }
00150 }
00151 
00152 int
00153 ClusterProcessor::invoke_remote(ClusterHandler *ch, int cluster_fn, void *data, int len, int options)
00154 {
00155   return internal_invoke_remote(ch, cluster_fn, data, len, options, (void *) NULL);
00156 }
00157 
00158 int
00159 ClusterProcessor::invoke_remote_data(ClusterHandler *ch, int cluster_fn,
00160                                      void *data, int data_len,
00161                                      IOBufferBlock * buf,
00162                                      int dest_channel, ClusterVCToken * token,
00163                                      void (*bufdata_free_proc) (void *), void *bufdata_free_proc_arg, int options)
00164 {
00165   if (!buf) {
00166     
00167     return internal_invoke_remote(ch, cluster_fn, data, data_len, options, (void *) NULL);
00168   }
00169   ink_assert(data);
00170   ink_assert(data_len);
00171   ink_assert(dest_channel);
00172   ink_assert(token);
00173   ink_assert(bufdata_free_proc);
00174   ink_assert(bufdata_free_proc_arg);
00175 
00176 
00177   
00178 
00179 
00180   
00181   OutgoingControl *bufdata_oc = OutgoingControl::alloc();
00182   bufdata_oc->set_data(buf, bufdata_free_proc, bufdata_free_proc_arg);
00183 
00184   
00185   invoke_remote_data_args mh;
00186   mh.msg_oc = 0;
00187   mh.data_oc = bufdata_oc;
00188   mh.dest_channel = dest_channel;
00189   mh.token = *token;
00190 
00191   OutgoingControl *chdr = OutgoingControl::alloc();
00192   chdr->submit_time = ink_get_hrtime();
00193   chdr->len = sizeof(int32_t) + sizeof(mh);
00194   chdr->alloc_data();
00195   *(int32_t *) chdr->data = -1;   
00196   memcpy(chdr->data + sizeof(int32_t), (char *) &mh, sizeof(mh));
00197 
00198   return internal_invoke_remote(ch, cluster_fn, data, data_len, options, (void *) chdr);
00199 }
00200 
00201 
00202 void
00203 ClusterProcessor::free_remote_data(char *p, int )
00204 {
00205   char *d = p - sizeof(int32_t);  
00206   int data_hdr = ClusterControl::DATA_HDR;
00207 
00208   ink_release_assert(*((uint8_t *) (d - data_hdr + 1)) == (uint8_t) ALLOC_DATA_MAGIC);
00209   unsigned char size_index = *(d - data_hdr);
00210   if (!(size_index & 0x80)) {
00211     ink_release_assert(size_index <= (DEFAULT_BUFFER_SIZES - 1));
00212   } else {
00213     ink_release_assert(size_index == 0xff);
00214   }
00215 
00216   
00217 
00218   ClusterControl *ccl;
00219   memcpy((char *) &ccl, (d - data_hdr + 2), sizeof(void *));
00220   ink_assert(ccl->valid_alloc_data());
00221 
00222   
00223 
00224   ccl->freeall();
00225 }
00226 
00227 ClusterVConnection *
00228 ClusterProcessor::open_local(Continuation * cont, ClusterMachine *, ClusterVCToken & token, int options)
00229 {
00230   
00231   
00232   
00233   
00234   
00235   
00236   
00237   bool immediate = ((options & CLUSTER_OPT_IMMEDIATE) ? true : false);
00238   bool allow_immediate = ((options & CLUSTER_OPT_ALLOW_IMMEDIATE) ? true : false);
00239 
00240   ClusterHandler *ch = ((CacheContinuation *)cont)->ch;
00241   if (!ch)
00242     return NULL;
00243   EThread *t = ch->thread;
00244   if (!t)
00245     return NULL;
00246 
00247   EThread *thread = this_ethread();
00248   ProxyMutex *mutex = thread->mutex;
00249   ClusterVConnection *vc = clusterVCAllocator.alloc();
00250   vc->new_connect_read = (options & CLUSTER_OPT_CONN_READ ? 1 : 0);
00251   vc->start_time = ink_get_hrtime();
00252   vc->last_activity_time = vc->start_time;
00253   vc->ch = ch;
00254   vc->token.alloc();
00255   vc->token.ch_id = ch->id;
00256   token = vc->token;
00257 #ifdef CLUSTER_THREAD_STEALING
00258   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPENNED_STAT);
00259   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPEN_STAT);
00260   MUTEX_TRY_LOCK(lock, ch->mutex, thread);
00261   if (!lock) {
00262 #endif
00263     if (immediate) {
00264       clusterVCAllocator_free(vc);
00265       return NULL;
00266     }
00267     vc->action_ = cont;
00268     ink_atomiclist_push(&ch->external_incoming_open_local, (void *) vc);
00269         if(ch->thread && ch->thread->signal_hook)
00270           ch->thread->signal_hook(ch->thread);
00271     return CLUSTER_DELAYED_OPEN;
00272 
00273 #ifdef CLUSTER_THREAD_STEALING
00274   } else {
00275     if (!(immediate || allow_immediate))
00276       vc->action_ = cont;
00277     if (vc->start(thread) < 0) {
00278       return NULL;
00279     }
00280     if (immediate || allow_immediate) {
00281       return vc;
00282     } else {
00283       return CLUSTER_DELAYED_OPEN;
00284     }
00285   }
00286 #endif
00287 }
00288 
00289 ClusterVConnection *
00290 ClusterProcessor::connect_local(Continuation * cont, ClusterVCToken * token, int channel, int options)
00291 {
00292   
00293   
00294   
00295   
00296   bool immediate = ((options & CLUSTER_OPT_IMMEDIATE) ? true : false);
00297   bool allow_immediate = ((options & CLUSTER_OPT_ALLOW_IMMEDIATE) ? true : false);
00298 
00299 #ifdef LOCAL_CLUSTER_TEST_MODE
00300   int ip = inet_addr("127.0.0.1");
00301   ClusterMachine *m;
00302   m = this_cluster->current_configuration()->find(ip, token->ip_created);
00303 #else
00304   ClusterMachine *m = this_cluster->current_configuration()->find(token->ip_created);
00305 #endif
00306   if (!m)
00307     return NULL;
00308   if (token->ch_id >= (uint32_t)m->num_connections)
00309     return NULL;
00310   ClusterHandler *ch = m->clusterHandlers[token->ch_id];
00311   if (!ch)
00312     return NULL;
00313   EThread *t = ch->thread;
00314   if (!t)
00315     return NULL;
00316 
00317   EThread *thread = this_ethread();
00318   ProxyMutex *mutex = thread->mutex;
00319   ClusterVConnection *vc = clusterVCAllocator.alloc();
00320   vc->new_connect_read = (options & CLUSTER_OPT_CONN_READ ? 1 : 0);
00321   vc->start_time = ink_get_hrtime();
00322   vc->last_activity_time = vc->start_time;
00323   vc->ch = ch;
00324   vc->token = *token;
00325   vc->channel = channel;
00326 #ifdef CLUSTER_THREAD_STEALING
00327   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPENNED_STAT);
00328   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPEN_STAT);
00329   MUTEX_TRY_LOCK(lock, ch->mutex, thread);
00330   if (!lock) {
00331 #endif
00332     if (immediate) {
00333       clusterVCAllocator_free(vc);
00334       return NULL;
00335     }
00336     vc->mutex = ch->mutex;
00337     vc->action_ = cont;
00338     ch->thread->schedule_imm_signal(vc);
00339     return CLUSTER_DELAYED_OPEN;
00340 #ifdef CLUSTER_THREAD_STEALING
00341   } else {
00342     if (!(immediate || allow_immediate))
00343       vc->action_ = cont;
00344     if (vc->start(thread) < 0) {
00345       return NULL;
00346     }
00347     if (immediate || allow_immediate) {
00348       return vc;
00349     } else {
00350       return CLUSTER_DELAYED_OPEN;
00351     }
00352   }
00353 #endif
00354 }
00355 
00356 bool ClusterProcessor::disable_remote_cluster_ops(ClusterMachine * m)
00357 {
00358   ClusterHandler *ch = m->pop_ClusterHandler(1);
00359   if (ch) {
00360     return ch->disable_remote_cluster_ops;
00361   } else {
00362     return true;
00363   }
00364 }
00365 
00366 
00367 
00368 
00369 
00370 
00371 GlobalClusterPeriodicEvent *
00372   PeriodicClusterEvent;
00373 
00374 #ifdef CLUSTER_TOMCAT
00375 extern int cache_clustering_enabled;
00376 
00377 int CacheClusterMonitorEnabled = 0;
00378 int CacheClusterMonitorIntervalSecs = 1;
00379 
00380 int cluster_send_buffer_size = 0;
00381 int cluster_receive_buffer_size = 0;
00382 unsigned long cluster_sockopt_flags = 0;
00383 unsigned long cluster_packet_mark = 0;
00384 unsigned long cluster_packet_tos = 0;
00385 
00386 int RPC_only_CacheCluster = 0;
00387 #endif
00388 
00389 int
00390 ClusterProcessor::init()
00391 {
00392   cluster_rsb = RecAllocateRawStatBlock((int) cluster_stat_count);
00393   
00394   
00395   
00396   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00397                      "proxy.process.cluster.connections_open",
00398                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CONNECTIONS_OPEN_STAT, RecRawStatSyncSum);
00399   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CONNECTIONS_OPEN_STAT);
00400   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00401                      "proxy.process.cluster.connections_opened",
00402                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CONNECTIONS_OPENNED_STAT, RecRawStatSyncSum);
00403   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CONNECTIONS_OPENNED_STAT);
00404   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00405                      "proxy.process.cluster.connections_closed",
00406                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CON_TOTAL_TIME_STAT, RecRawStatSyncCount);
00407   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CON_TOTAL_TIME_STAT);
00408   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00409                      "proxy.process.cluster.slow_ctrl_msgs_sent",
00410                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_SLOW_CTRL_MSGS_SENT_STAT, RecRawStatSyncCount);
00411   CLUSTER_CLEAR_DYN_STAT(CLUSTER_SLOW_CTRL_MSGS_SENT_STAT);
00412   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00413                      "proxy.process.cluster.connections_read_locked",
00414                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CONNECTIONS_READ_LOCKED_STAT, RecRawStatSyncSum);
00415   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CONNECTIONS_READ_LOCKED_STAT);
00416   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00417                      "proxy.process.cluster.connections_write_locked",
00418                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CONNECTIONS_WRITE_LOCKED_STAT, RecRawStatSyncSum);
00419   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CONNECTIONS_WRITE_LOCKED_STAT);
00420   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00421                      "proxy.process.cluster.reads",
00422                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_READ_BYTES_STAT, RecRawStatSyncCount);
00423   CLUSTER_CLEAR_DYN_STAT(CLUSTER_READ_BYTES_STAT);
00424   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00425                      "proxy.process.cluster.read_bytes",
00426                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_READ_BYTES_STAT, RecRawStatSyncSum);
00427   CLUSTER_CLEAR_DYN_STAT(CLUSTER_READ_BYTES_STAT);
00428   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00429                      "proxy.process.cluster.writes",
00430                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_WRITE_BYTES_STAT, RecRawStatSyncCount);
00431   CLUSTER_CLEAR_DYN_STAT(CLUSTER_WRITE_BYTES_STAT);
00432   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00433                      "proxy.process.cluster.write_bytes",
00434                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_WRITE_BYTES_STAT, RecRawStatSyncSum);
00435   CLUSTER_CLEAR_DYN_STAT(CLUSTER_WRITE_BYTES_STAT);
00436   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00437                      "proxy.process.cluster.control_messages_sent",
00438                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CTRL_MSGS_SEND_TIME_STAT, RecRawStatSyncCount);
00439   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CTRL_MSGS_SEND_TIME_STAT);
00440   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00441                      "proxy.process.cluster.control_messages_received",
00442                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CTRL_MSGS_RECV_TIME_STAT, RecRawStatSyncCount);
00443   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CTRL_MSGS_RECV_TIME_STAT);
00444   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00445                      "proxy.process.cluster.op_delayed_for_lock",
00446                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_OP_DELAYED_FOR_LOCK_STAT, RecRawStatSyncSum);
00447   CLUSTER_CLEAR_DYN_STAT(CLUSTER_OP_DELAYED_FOR_LOCK_STAT);
00448   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00449                      "proxy.process.cluster.connections_bumped",
00450                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CONNECTIONS_BUMPED_STAT, RecRawStatSyncSum);
00451   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CONNECTIONS_BUMPED_STAT);
00452   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00453                      "proxy.process.cluster.net_backup",
00454                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_NET_BACKUP_STAT, RecRawStatSyncCount);
00455   CLUSTER_CLEAR_DYN_STAT(CLUSTER_NET_BACKUP_STAT);
00456   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00457                      "proxy.process.cluster.nodes",
00458                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_NODES_STAT, RecRawStatSyncSum);
00459   CLUSTER_CLEAR_DYN_STAT(CLUSTER_NODES_STAT);
00460   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00461                      "proxy.process.cluster.machines_allocated",
00462                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_MACHINES_ALLOCATED_STAT, RecRawStatSyncSum);
00463   CLUSTER_CLEAR_DYN_STAT(CLUSTER_MACHINES_ALLOCATED_STAT);
00464   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00465                      "proxy.process.cluster.machines_freed",
00466                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_MACHINES_FREED_STAT, RecRawStatSyncSum);
00467   CLUSTER_CLEAR_DYN_STAT(CLUSTER_MACHINES_FREED_STAT);
00468   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00469                      "proxy.process.cluster.configuration_changes",
00470                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CONFIGURATION_CHANGES_STAT, RecRawStatSyncCount);
00471   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT);
00472   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00473                      "proxy.process.cluster.delayed_reads",
00474                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_DELAYED_READS_STAT, RecRawStatSyncSum);
00475   CLUSTER_CLEAR_DYN_STAT(CLUSTER_DELAYED_READS_STAT);
00476   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00477                      "proxy.process.cluster.byte_bank_used",
00478                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_BYTE_BANK_USED_STAT, RecRawStatSyncSum);
00479   CLUSTER_CLEAR_DYN_STAT(CLUSTER_BYTE_BANK_USED_STAT);
00480   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00481                      "proxy.process.cluster.alloc_data_news",
00482                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_ALLOC_DATA_NEWS_STAT, RecRawStatSyncSum);
00483   CLUSTER_CLEAR_DYN_STAT(CLUSTER_ALLOC_DATA_NEWS_STAT);
00484   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00485                      "proxy.process.cluster.write_bb_mallocs",
00486                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_WRITE_BB_MALLOCS_STAT, RecRawStatSyncSum);
00487   CLUSTER_CLEAR_DYN_STAT(CLUSTER_WRITE_BB_MALLOCS_STAT);
00488   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00489                      "proxy.process.cluster.partial_reads",
00490                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_PARTIAL_READS_STAT, RecRawStatSyncSum);
00491   CLUSTER_CLEAR_DYN_STAT(CLUSTER_PARTIAL_READS_STAT);
00492   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00493                      "proxy.process.cluster.partial_writes",
00494                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_PARTIAL_WRITES_STAT, RecRawStatSyncSum);
00495   CLUSTER_CLEAR_DYN_STAT(CLUSTER_PARTIAL_WRITES_STAT);
00496   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00497                      "proxy.process.cluster.cache_outstanding",
00498                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CACHE_OUTSTANDING_STAT, RecRawStatSyncSum);
00499   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
00500   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00501                      "proxy.process.cluster.remote_op_timeouts",
00502                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_REMOTE_OP_TIMEOUTS_STAT, RecRawStatSyncSum);
00503   CLUSTER_CLEAR_DYN_STAT(CLUSTER_REMOTE_OP_TIMEOUTS_STAT);
00504   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00505                      "proxy.process.cluster.remote_op_reply_timeouts",
00506                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_REMOTE_OP_REPLY_TIMEOUTS_STAT, RecRawStatSyncSum);
00507   CLUSTER_CLEAR_DYN_STAT(CLUSTER_REMOTE_OP_REPLY_TIMEOUTS_STAT);
00508   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00509                      "proxy.process.cluster.chan_inuse",
00510                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CHAN_INUSE_STAT, RecRawStatSyncSum);
00511   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
00512   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00513                      "proxy.process.cluster.open_delays",
00514                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_OPEN_DELAY_TIME_STAT, RecRawStatSyncSum);
00515   CLUSTER_CLEAR_DYN_STAT(CLUSTER_OPEN_DELAY_TIME_STAT);
00516   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00517                      "proxy.process.cluster.connections_avg_time",
00518                      RECD_FLOAT, RECP_NON_PERSISTENT, (int) CLUSTER_CON_TOTAL_TIME_STAT, RecRawStatSyncHrTimeAvg);
00519   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CON_TOTAL_TIME_STAT);
00520   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00521                      "proxy.process.cluster.control_messages_avg_send_time",
00522                      RECD_FLOAT, RECP_NON_PERSISTENT, (int) CLUSTER_CTRL_MSGS_SEND_TIME_STAT, RecRawStatSyncHrTimeAvg);
00523   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CTRL_MSGS_SEND_TIME_STAT);
00524   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00525                      "proxy.process.cluster.control_messages_avg_receive_time",
00526                      RECD_FLOAT, RECP_NON_PERSISTENT, (int) CLUSTER_CTRL_MSGS_RECV_TIME_STAT, RecRawStatSyncHrTimeAvg);
00527   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CTRL_MSGS_RECV_TIME_STAT);
00528   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00529                      "proxy.process.cluster.open_delay_time",
00530                      RECD_FLOAT, RECP_NON_PERSISTENT, (int) CLUSTER_OPEN_DELAY_TIME_STAT, RecRawStatSyncHrTimeAvg);
00531   CLUSTER_CLEAR_DYN_STAT(CLUSTER_OPEN_DELAY_TIME_STAT);
00532   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00533                      "proxy.process.cluster.cache_callback_time",
00534                      RECD_FLOAT, RECP_NON_PERSISTENT, (int) CLUSTER_CACHE_CALLBACK_TIME_STAT, RecRawStatSyncHrTimeAvg);
00535   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CACHE_CALLBACK_TIME_STAT);
00536   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00537                      "proxy.process.cluster.rmt_cache_callback_time",
00538                      RECD_FLOAT, RECP_NON_PERSISTENT,
00539                      (int) CLUSTER_CACHE_RMT_CALLBACK_TIME_STAT, RecRawStatSyncHrTimeAvg);
00540   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CACHE_RMT_CALLBACK_TIME_STAT);
00541   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00542                      "proxy.process.cluster.lkrmt_cache_callback_time",
00543                      RECD_FLOAT, RECP_NON_PERSISTENT,
00544                      (int) CLUSTER_CACHE_LKRMT_CALLBACK_TIME_STAT, RecRawStatSyncHrTimeAvg);
00545   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CACHE_LKRMT_CALLBACK_TIME_STAT);
00546   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00547                      "proxy.process.cluster.local_connection_time",
00548                      RECD_FLOAT, RECP_NON_PERSISTENT,
00549                      (int) CLUSTER_LOCAL_CONNECTION_TIME_STAT, RecRawStatSyncHrTimeAvg);
00550   CLUSTER_CLEAR_DYN_STAT(CLUSTER_LOCAL_CONNECTION_TIME_STAT);
00551   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00552                      "proxy.process.cluster.remote_connection_time",
00553                      RECD_FLOAT, RECP_NON_PERSISTENT,
00554                      (int) CLUSTER_REMOTE_CONNECTION_TIME_STAT, RecRawStatSyncHrTimeAvg);
00555   CLUSTER_CLEAR_DYN_STAT(CLUSTER_REMOTE_CONNECTION_TIME_STAT);
00556   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00557                      "proxy.process.cluster.rdmsg_assemble_time",
00558                      RECD_FLOAT, RECP_NON_PERSISTENT, (int) CLUSTER_RDMSG_ASSEMBLE_TIME_STAT, RecRawStatSyncHrTimeAvg);
00559   CLUSTER_CLEAR_DYN_STAT(CLUSTER_RDMSG_ASSEMBLE_TIME_STAT);
00560   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00561                      "proxy.process.cluster.cluster_ping_time",
00562                      RECD_FLOAT, RECP_NON_PERSISTENT, (int) CLUSTER_PING_TIME_STAT, RecRawStatSyncHrTimeAvg);
00563   CLUSTER_CLEAR_DYN_STAT(CLUSTER_PING_TIME_STAT);
00564   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00565                      "proxy.process.cluster.cache_callbacks",
00566                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CACHE_CALLBACK_TIME_STAT, RecRawStatSyncCount);
00567   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CACHE_CALLBACK_TIME_STAT);
00568   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00569                      "proxy.process.cluster.rmt_cache_callbacks",
00570                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CACHE_RMT_CALLBACK_TIME_STAT, RecRawStatSyncCount);
00571   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CACHE_RMT_CALLBACK_TIME_STAT);
00572   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00573                      "proxy.process.cluster.lkrmt_cache_callbacks",
00574                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CACHE_LKRMT_CALLBACK_TIME_STAT, RecRawStatSyncCount);
00575   CLUSTER_CLEAR_DYN_STAT(CLUSTER_CACHE_LKRMT_CALLBACK_TIME_STAT);
00576   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00577                      "proxy.process.cluster.local_connections_closed",
00578                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_LOCAL_CONNECTION_TIME_STAT, RecRawStatSyncCount);
00579   CLUSTER_CLEAR_DYN_STAT(CLUSTER_LOCAL_CONNECTION_TIME_STAT);
00580   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00581                      "proxy.process.cluster.remote_connections_closed",
00582                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_REMOTE_CONNECTION_TIME_STAT, RecRawStatSyncCount);
00583   CLUSTER_CLEAR_DYN_STAT(CLUSTER_REMOTE_CONNECTION_TIME_STAT);
00584   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00585                      "proxy.process.cluster.setdata_no_clustervc",
00586                      RECD_INT, RECP_NON_PERSISTENT, (int) cluster_setdata_no_CLUSTERVC_STAT, RecRawStatSyncCount);
00587   CLUSTER_CLEAR_DYN_STAT(cluster_setdata_no_CLUSTERVC_STAT);
00588   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00589                      "proxy.process.cluster.setdata_no_tunnel",
00590                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_SETDATA_NO_TUNNEL_STAT, RecRawStatSyncCount);
00591   CLUSTER_CLEAR_DYN_STAT(CLUSTER_SETDATA_NO_TUNNEL_STAT);
00592   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00593                      "proxy.process.cluster.setdata_no_cachevc",
00594                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_SETDATA_NO_CACHEVC_STAT, RecRawStatSyncCount);
00595   CLUSTER_CLEAR_DYN_STAT(CLUSTER_SETDATA_NO_CACHEVC_STAT);
00596   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00597                      "proxy.process.cluster.setdata_no_cluster",
00598                      RECD_INT, RECP_NON_PERSISTENT, (int) cluster_setdata_no_CLUSTER_STAT, RecRawStatSyncCount);
00599   CLUSTER_CLEAR_DYN_STAT(cluster_setdata_no_CLUSTER_STAT);
00600   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00601                      "proxy.process.cluster.vc_write_stall",
00602                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_WRITE_STALL_STAT, RecRawStatSyncCount);
00603   CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_WRITE_STALL_STAT);
00604   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00605                      "proxy.process.cluster.no_remote_space",
00606                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_NO_REMOTE_SPACE_STAT, RecRawStatSyncCount);
00607   CLUSTER_CLEAR_DYN_STAT(CLUSTER_NO_REMOTE_SPACE_STAT);
00608   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00609                      "proxy.process.cluster.level1_bank",
00610                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_LEVEL1_BANK_STAT, RecRawStatSyncCount);
00611   CLUSTER_CLEAR_DYN_STAT(CLUSTER_LEVEL1_BANK_STAT);
00612   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00613                      "proxy.process.cluster.multilevel_bank",
00614                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_MULTILEVEL_BANK_STAT, RecRawStatSyncCount);
00615   CLUSTER_CLEAR_DYN_STAT(CLUSTER_MULTILEVEL_BANK_STAT);
00616   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00617                      "proxy.process.cluster.vc_cache_insert_lock_misses",
00618                      RECD_INT, RECP_NON_PERSISTENT,
00619                      (int) CLUSTER_VC_CACHE_INSERT_LOCK_MISSES_STAT, RecRawStatSyncCount);
00620   CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_INSERT_LOCK_MISSES_STAT);
00621   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00622                      "proxy.process.cluster.vc_cache_inserts",
00623                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_CACHE_INSERTS_STAT, RecRawStatSyncCount);
00624   CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_INSERTS_STAT);
00625   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00626                      "proxy.process.cluster.vc_cache_lookup_lock_misses",
00627                      RECD_INT, RECP_NON_PERSISTENT,
00628                      (int) CLUSTER_VC_CACHE_LOOKUP_LOCK_MISSES_STAT, RecRawStatSyncCount);
00629   CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_LOCK_MISSES_STAT);
00630   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00631                      "proxy.process.cluster.vc_cache_lookup_hits",
00632                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_CACHE_LOOKUP_HITS_STAT, RecRawStatSyncCount);
00633   CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_HITS_STAT);
00634   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00635                      "proxy.process.cluster.vc_cache_lookup_misses",
00636                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_CACHE_LOOKUP_MISSES_STAT, RecRawStatSyncCount);
00637   CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_MISSES_STAT);
00638   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00639                      "proxy.process.cluster.vc_cache_scans",
00640                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_CACHE_SCANS_STAT, RecRawStatSyncCount);
00641   CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_SCANS_STAT);
00642   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00643                      "proxy.process.cluster.vc_cache_scan_lock_misses",
00644                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_CACHE_SCAN_LOCK_MISSES_STAT, RecRawStatSyncCount);
00645   CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_SCAN_LOCK_MISSES_STAT);
00646   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00647                      "proxy.process.cluster.vc_cache_purges",
00648                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_CACHE_PURGES_STAT, RecRawStatSyncCount);
00649   CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_PURGES_STAT);
00650   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00651                      "proxy.process.cluster.write_lock_misses",
00652                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_WRITE_LOCK_MISSES_STAT, RecRawStatSyncCount);
00653   CLUSTER_CLEAR_DYN_STAT(CLUSTER_WRITE_LOCK_MISSES_STAT);
00654   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00655                      "proxy.process.cluster.vc_read_list_len",
00656                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_READ_LIST_LEN_STAT, RecRawStatSyncAvg);
00657   CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_READ_LIST_LEN_STAT);
00658   RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00659                      "proxy.process.cluster.vc_write_list_len",
00660                      RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_WRITE_LIST_LEN_STAT, RecRawStatSyncAvg);
00661   CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_WRITE_LIST_LEN_STAT);
00662   CLUSTER_CLEAR_DYN_STAT(CLUSTER_NODES_STAT);   
00663   
00664   
00665   CLUSTER_SUM_GLOBAL_DYN_STAT(CLUSTER_NODES_STAT, 1);   
00666 
00667   REC_ReadConfigInteger(ClusterLoadMonitor::cf_monitor_enabled, "proxy.config.cluster.load_monitor_enabled");
00668   REC_ReadConfigInteger(ClusterLoadMonitor::cf_ping_message_send_msec_interval, "proxy.config.cluster.ping_send_interval_msecs");
00669   REC_ReadConfigInteger(ClusterLoadMonitor::cf_num_ping_response_buckets, "proxy.config.cluster.ping_response_buckets");
00670   REC_ReadConfigInteger(ClusterLoadMonitor::cf_msecs_per_ping_response_bucket, "proxy.config.cluster.msecs_per_ping_response_bucket");
00671   REC_ReadConfigInteger(ClusterLoadMonitor::cf_ping_latency_threshold_msecs, "proxy.config.cluster.ping_latency_threshold_msecs");
00672   REC_ReadConfigInteger(ClusterLoadMonitor::cf_cluster_load_compute_msec_interval, "proxy.config.cluster.load_compute_interval_msecs");
00673   REC_ReadConfigInteger(ClusterLoadMonitor::cf_cluster_periodic_msec_interval, "proxy.config.cluster.periodic_timer_interval_msecs");
00674   REC_ReadConfigInteger(ClusterLoadMonitor::cf_ping_history_buf_length, "proxy.config.cluster.ping_history_buf_length");
00675   REC_ReadConfigInteger(ClusterLoadMonitor::cf_cluster_load_clear_duration, "proxy.config.cluster.cluster_load_clear_duration");
00676   REC_ReadConfigInteger(ClusterLoadMonitor::cf_cluster_load_exceed_duration, "proxy.config.cluster.cluster_load_exceed_duration");
00677 
00678   
00679   
00680   
00681   if (cluster_port_number != DEFAULT_CLUSTER_PORT_NUMBER)
00682     cluster_port = cluster_port_number;
00683   else {
00684     REC_ReadConfigInteger(cluster_port, "proxy.config.cluster.cluster_port");
00685   }
00686   if (num_of_cluster_threads == DEFAULT_NUMBER_OF_CLUSTER_THREADS)
00687     REC_ReadConfigInteger(num_of_cluster_threads, "proxy.config.cluster.threads");
00688 
00689   REC_EstablishStaticConfigInt32(CacheClusterMonitorEnabled, "proxy.config.cluster.enable_monitor");
00690   REC_EstablishStaticConfigInt32(CacheClusterMonitorIntervalSecs, "proxy.config.cluster.monitor_interval_secs");
00691   REC_ReadConfigInteger(cluster_receive_buffer_size, "proxy.config.cluster.receive_buffer_size");
00692   REC_ReadConfigInteger(cluster_send_buffer_size, "proxy.config.cluster.send_buffer_size");
00693   REC_ReadConfigInteger(cluster_sockopt_flags, "proxy.config.cluster.sock_option_flag");
00694   REC_ReadConfigInteger(cluster_packet_mark, "proxy.config.cluster.sock_packet_mark");
00695   REC_ReadConfigInteger(cluster_packet_tos, "proxy.config.cluster.sock_packet_tos");
00696   REC_EstablishStaticConfigInt32(RPC_only_CacheCluster, "proxy.config.cluster.rpc_cache_cluster");
00697 
00698   int cluster_type = 0;
00699   REC_ReadConfigInteger(cluster_type, "proxy.local.cluster.type");
00700 
00701   create_this_cluster_machine();
00702   
00703   clusterAPI_init();
00704   
00705   PeriodicClusterEvent = new GlobalClusterPeriodicEvent;
00706   PeriodicClusterEvent->init();
00707 
00708   this_cluster = new Cluster;
00709   ClusterConfiguration *cc = new ClusterConfiguration;
00710   this_cluster->configurations.push(cc);
00711   cc->n_machines = 1;
00712   cc->machines[0] = this_cluster_machine();
00713   memset(cc->hash_table, 0, CLUSTER_HASH_TABLE_SIZE);
00714   
00715 
00716   memset(channel_dummy_output, 0, sizeof(channel_dummy_output));
00717 
00718   if (cluster_type == 1) {
00719     cache_clustering_enabled = 1;
00720     Note("cache clustering enabled");
00721     compute_cluster_mode();
00722   } else {
00723     cache_clustering_enabled = 0;
00724     Note("cache clustering disabled");
00725   }
00726   return 0;
00727 }
00728 
00729 
00730 int
00731 init_clusterprocessor(void)
00732 {
00733   return clusterProcessor.init();
00734 }
00735 
00736 int
00737 ClusterProcessor::start()
00738 {
00739 #ifdef LOCAL_CLUSTER_TEST_MODE
00740   this_cluster_machine()->cluster_port = cluster_port;
00741 #endif
00742   if (cache_clustering_enabled && (cacheProcessor.IsCacheEnabled() == CACHE_INITIALIZED)) {
00743     size_t stacksize;
00744 
00745     REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
00746     ET_CLUSTER = eventProcessor.spawn_event_threads(num_of_cluster_threads, "ET_CLUSTER", stacksize);
00747     for (int i = 0; i < eventProcessor.n_threads_for_type[ET_CLUSTER]; i++) {
00748       initialize_thread_for_net(eventProcessor.eventthread[ET_CLUSTER][i]);
00749     }
00750     REC_RegisterConfigUpdateFunc("proxy.config.cluster.cluster_configuration", machine_config_change, (void *) CLUSTER_CONFIG);
00751     do_machine_config_change((void *) CLUSTER_CONFIG, "proxy.config.cluster.cluster_configuration");
00752     
00753 #ifdef USE_SEPARATE_MACHINE_CONFIG
00754     REC_RegisterConfigUpdateFunc("proxy.config.cluster.machine_configuration", machine_config_change, (void *) MACHINE_CONFIG);
00755     do_machine_config_change((void *) MACHINE_CONFIG, "proxy.config.cluster.machine_configuration");
00756 #endif
00757 
00758     accept_handler = new ClusterAccept(&cluster_port, cluster_receive_buffer_size, cluster_send_buffer_size);
00759     accept_handler->Init();
00760   }
00761   return 0;
00762 }
00763 
00764 void
00765 ClusterProcessor::connect(char *hostname, int16_t id)
00766 {
00767   
00768   
00769   
00770   ClusterHandler *ch = new ClusterHandler;
00771   SET_CONTINUATION_HANDLER(ch, (ClusterContHandler) & ClusterHandler::connectClusterEvent);
00772   ch->hostname = ats_strdup(hostname);
00773   ch->connector = true;
00774   ch->id = id;
00775   eventProcessor.schedule_imm(ch, ET_CLUSTER);
00776 }
00777 
00778 void
00779 ClusterProcessor::connect(unsigned int ip, int port, int16_t id, bool delay)
00780 {
00781   
00782   
00783   
00784   ClusterHandler *ch = new ClusterHandler;
00785   SET_CONTINUATION_HANDLER(ch, (ClusterContHandler) & ClusterHandler::connectClusterEvent);
00786   ch->ip = ip;
00787   ch->port = port;
00788   ch->connector = true;
00789   ch->id = id;
00790   if (delay)
00791     eventProcessor.schedule_in(ch, CLUSTER_MEMBER_DELAY, ET_CLUSTER);
00792   else
00793     eventProcessor.schedule_imm(ch, ET_CLUSTER);
00794 }
00795 
00796 void
00797 ClusterProcessor::send_machine_list(ClusterMachine * m)
00798 {
00799   
00800   
00801   
00802   
00803   
00804   MachineListMessage mlistmsg;
00805   int vers = MachineListMessage::protoToVersion(m->msg_proto_major);
00806   ClusterConfiguration *cc = this_cluster->current_configuration();
00807   void *data;
00808   int len;
00809 
00810   if (vers == MachineListMessage::MACHINE_LIST_MESSAGE_VERSION) {
00811     int n = 0;
00812     MachineListMessage *msg = &mlistmsg;
00813 
00814     while (n < cc->n_machines) {
00815       msg->ip[n] = cc->machines[n]->ip;
00816       n++;
00817     }
00818     msg->n_ip = n;
00819     data = (void *) msg;
00820     len = msg->sizeof_fixedlen_msg() + (n * sizeof(uint32_t));
00821 
00822   } else {
00823 
00824     
00825 
00826     ink_release_assert(!"send_machine_list() bad msg version");
00827   }
00828   invoke_remote(m->pop_ClusterHandler(), MACHINE_LIST_CLUSTER_FUNCTION, data, len);
00829 }
00830 
00831 void
00832 ClusterProcessor::compute_cluster_mode()
00833 {
00834   if (RPC_only_CacheCluster) {
00835     if (cache_clustering_enabled > 0) {
00836       cache_clustering_enabled = -1;
00837       Note("RPC only cache clustering");
00838     }
00839   } else {
00840     if (cache_clustering_enabled < 0) {
00841       cache_clustering_enabled = 1;
00842       Note("RPC only cache clustering disabled");
00843     }
00844   }
00845 }
00846 
00847