00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include "P_Cluster.h"
00031
00032
00033
00034 int cluster_port_number = DEFAULT_CLUSTER_PORT_NUMBER;
00035 int cache_clustering_enabled = 0;
00036 int num_of_cluster_threads = DEFAULT_NUMBER_OF_CLUSTER_THREADS;
00037
00038 ClusterProcessor clusterProcessor;
00039 RecRawStatBlock *cluster_rsb = NULL;
00040 int ET_CLUSTER;
00041
00042 ClusterProcessor::ClusterProcessor():accept_handler(NULL), this_cluster(NULL)
00043 {
00044 }
00045
00046 ClusterProcessor::~ClusterProcessor()
00047 {
00048 if (accept_handler) {
00049 accept_handler->ShutdownDelete();
00050 accept_handler = 0;
00051 }
00052 }
00053
00054 int
00055 ClusterProcessor::internal_invoke_remote(ClusterHandler *ch, int cluster_fn,
00056 void *data, int len, int options, void *cmsg)
00057 {
00058 EThread *thread = this_ethread();
00059 ProxyMutex *mutex = thread->mutex;
00060
00061
00062
00063
00064 bool steal = (options & CLUSTER_OPT_STEAL ? true : false);
00065 bool delay = (options & CLUSTER_OPT_DELAY ? true : false);
00066 bool data_in_ocntl = (options & CLUSTER_OPT_DATA_IS_OCONTROL ? true : false);
00067 bool malloced = (cluster_fn == CLUSTER_FUNCTION_MALLOCED);
00068 OutgoingControl *c;
00069
00070 if (!ch || (!malloced && !((unsigned int) cluster_fn < (uint32_t) SIZE_clusterFunction))) {
00071
00072 if (cmsg) {
00073 invoke_remote_data_args *args = (invoke_remote_data_args *)
00074 (((OutgoingControl *) cmsg)->data + sizeof(int32_t));
00075 ink_assert(args->magicno == invoke_remote_data_args::MagicNo);
00076
00077 args->data_oc->freeall();
00078 ((OutgoingControl *) cmsg)->freeall();
00079 }
00080 if (data_in_ocntl) {
00081 c = *((OutgoingControl **) ((char *) data - sizeof(OutgoingControl *)));
00082 c->freeall();
00083 }
00084 if (malloced) {
00085 ats_free(data);
00086 }
00087 return -1;
00088 }
00089
00090 if (data_in_ocntl) {
00091 c = *((OutgoingControl **) ((char *) data - sizeof(OutgoingControl *)));
00092 } else {
00093 c = OutgoingControl::alloc();
00094 }
00095 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CTRL_MSGS_SENT_STAT);
00096 c->submit_time = ink_get_hrtime();
00097
00098 if (malloced) {
00099 c->set_data((char *) data, len);
00100 } else {
00101 if (!data_in_ocntl) {
00102 c->len = len + sizeof(int32_t);
00103 c->alloc_data();
00104 }
00105 if (!c->fast_data()) {
00106 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_SLOW_CTRL_MSGS_SENT_STAT);
00107 }
00108 *(int32_t *) c->data = cluster_fn;
00109 if (!data_in_ocntl) {
00110 memcpy(c->data + sizeof(int32_t), data, len);
00111 }
00112 }
00113
00114 SET_CONTINUATION_HANDLER(c, (OutgoingCtrlHandler) & OutgoingControl::startEvent);
00115
00116
00117
00118
00119 if (cmsg) {
00120 invoke_remote_data_args *args = (invoke_remote_data_args *)
00121 (((OutgoingControl *) cmsg)->data + sizeof(int32_t));
00122 ink_assert(args->magicno == invoke_remote_data_args::MagicNo);
00123 args->msg_oc = c;
00124 c = (OutgoingControl *) cmsg;
00125 }
00126 #ifndef CLUSTER_THREAD_STEALING
00127 delay = true;
00128 #endif
00129 if (!delay) {
00130 EThread *tt = this_ethread();
00131 {
00132 int q = ClusterFuncToQpri(cluster_fn);
00133 ink_atomiclist_push(&ch->outgoing_control_al[q], (void *) c);
00134
00135 MUTEX_TRY_LOCK(lock, ch->mutex, tt);
00136 if (!lock) {
00137 if(ch->thread && ch->thread->signal_hook)
00138 ch->thread->signal_hook(ch->thread);
00139 return 1;
00140 }
00141 if (steal)
00142 ch->steal_thread(tt);
00143 return 1;
00144 }
00145 } else {
00146 c->mutex = ch->mutex;
00147 eventProcessor.schedule_imm_signal(c);
00148 return 0;
00149 }
00150 }
00151
00152 int
00153 ClusterProcessor::invoke_remote(ClusterHandler *ch, int cluster_fn, void *data, int len, int options)
00154 {
00155 return internal_invoke_remote(ch, cluster_fn, data, len, options, (void *) NULL);
00156 }
00157
00158 int
00159 ClusterProcessor::invoke_remote_data(ClusterHandler *ch, int cluster_fn,
00160 void *data, int data_len,
00161 IOBufferBlock * buf,
00162 int dest_channel, ClusterVCToken * token,
00163 void (*bufdata_free_proc) (void *), void *bufdata_free_proc_arg, int options)
00164 {
00165 if (!buf) {
00166
00167 return internal_invoke_remote(ch, cluster_fn, data, data_len, options, (void *) NULL);
00168 }
00169 ink_assert(data);
00170 ink_assert(data_len);
00171 ink_assert(dest_channel);
00172 ink_assert(token);
00173 ink_assert(bufdata_free_proc);
00174 ink_assert(bufdata_free_proc_arg);
00175
00176
00177
00178
00179
00180
00181 OutgoingControl *bufdata_oc = OutgoingControl::alloc();
00182 bufdata_oc->set_data(buf, bufdata_free_proc, bufdata_free_proc_arg);
00183
00184
00185 invoke_remote_data_args mh;
00186 mh.msg_oc = 0;
00187 mh.data_oc = bufdata_oc;
00188 mh.dest_channel = dest_channel;
00189 mh.token = *token;
00190
00191 OutgoingControl *chdr = OutgoingControl::alloc();
00192 chdr->submit_time = ink_get_hrtime();
00193 chdr->len = sizeof(int32_t) + sizeof(mh);
00194 chdr->alloc_data();
00195 *(int32_t *) chdr->data = -1;
00196 memcpy(chdr->data + sizeof(int32_t), (char *) &mh, sizeof(mh));
00197
00198 return internal_invoke_remote(ch, cluster_fn, data, data_len, options, (void *) chdr);
00199 }
00200
00201
00202 void
00203 ClusterProcessor::free_remote_data(char *p, int )
00204 {
00205 char *d = p - sizeof(int32_t);
00206 int data_hdr = ClusterControl::DATA_HDR;
00207
00208 ink_release_assert(*((uint8_t *) (d - data_hdr + 1)) == (uint8_t) ALLOC_DATA_MAGIC);
00209 unsigned char size_index = *(d - data_hdr);
00210 if (!(size_index & 0x80)) {
00211 ink_release_assert(size_index <= (DEFAULT_BUFFER_SIZES - 1));
00212 } else {
00213 ink_release_assert(size_index == 0xff);
00214 }
00215
00216
00217
00218 ClusterControl *ccl;
00219 memcpy((char *) &ccl, (d - data_hdr + 2), sizeof(void *));
00220 ink_assert(ccl->valid_alloc_data());
00221
00222
00223
00224 ccl->freeall();
00225 }
00226
00227 ClusterVConnection *
00228 ClusterProcessor::open_local(Continuation * cont, ClusterMachine *, ClusterVCToken & token, int options)
00229 {
00230
00231
00232
00233
00234
00235
00236
00237 bool immediate = ((options & CLUSTER_OPT_IMMEDIATE) ? true : false);
00238 bool allow_immediate = ((options & CLUSTER_OPT_ALLOW_IMMEDIATE) ? true : false);
00239
00240 ClusterHandler *ch = ((CacheContinuation *)cont)->ch;
00241 if (!ch)
00242 return NULL;
00243 EThread *t = ch->thread;
00244 if (!t)
00245 return NULL;
00246
00247 EThread *thread = this_ethread();
00248 ProxyMutex *mutex = thread->mutex;
00249 ClusterVConnection *vc = clusterVCAllocator.alloc();
00250 vc->new_connect_read = (options & CLUSTER_OPT_CONN_READ ? 1 : 0);
00251 vc->start_time = ink_get_hrtime();
00252 vc->last_activity_time = vc->start_time;
00253 vc->ch = ch;
00254 vc->token.alloc();
00255 vc->token.ch_id = ch->id;
00256 token = vc->token;
00257 #ifdef CLUSTER_THREAD_STEALING
00258 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPENNED_STAT);
00259 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPEN_STAT);
00260 MUTEX_TRY_LOCK(lock, ch->mutex, thread);
00261 if (!lock) {
00262 #endif
00263 if (immediate) {
00264 clusterVCAllocator_free(vc);
00265 return NULL;
00266 }
00267 vc->action_ = cont;
00268 ink_atomiclist_push(&ch->external_incoming_open_local, (void *) vc);
00269 if(ch->thread && ch->thread->signal_hook)
00270 ch->thread->signal_hook(ch->thread);
00271 return CLUSTER_DELAYED_OPEN;
00272
00273 #ifdef CLUSTER_THREAD_STEALING
00274 } else {
00275 if (!(immediate || allow_immediate))
00276 vc->action_ = cont;
00277 if (vc->start(thread) < 0) {
00278 return NULL;
00279 }
00280 if (immediate || allow_immediate) {
00281 return vc;
00282 } else {
00283 return CLUSTER_DELAYED_OPEN;
00284 }
00285 }
00286 #endif
00287 }
00288
00289 ClusterVConnection *
00290 ClusterProcessor::connect_local(Continuation * cont, ClusterVCToken * token, int channel, int options)
00291 {
00292
00293
00294
00295
00296 bool immediate = ((options & CLUSTER_OPT_IMMEDIATE) ? true : false);
00297 bool allow_immediate = ((options & CLUSTER_OPT_ALLOW_IMMEDIATE) ? true : false);
00298
00299 #ifdef LOCAL_CLUSTER_TEST_MODE
00300 int ip = inet_addr("127.0.0.1");
00301 ClusterMachine *m;
00302 m = this_cluster->current_configuration()->find(ip, token->ip_created);
00303 #else
00304 ClusterMachine *m = this_cluster->current_configuration()->find(token->ip_created);
00305 #endif
00306 if (!m)
00307 return NULL;
00308 if (token->ch_id >= (uint32_t)m->num_connections)
00309 return NULL;
00310 ClusterHandler *ch = m->clusterHandlers[token->ch_id];
00311 if (!ch)
00312 return NULL;
00313 EThread *t = ch->thread;
00314 if (!t)
00315 return NULL;
00316
00317 EThread *thread = this_ethread();
00318 ProxyMutex *mutex = thread->mutex;
00319 ClusterVConnection *vc = clusterVCAllocator.alloc();
00320 vc->new_connect_read = (options & CLUSTER_OPT_CONN_READ ? 1 : 0);
00321 vc->start_time = ink_get_hrtime();
00322 vc->last_activity_time = vc->start_time;
00323 vc->ch = ch;
00324 vc->token = *token;
00325 vc->channel = channel;
00326 #ifdef CLUSTER_THREAD_STEALING
00327 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPENNED_STAT);
00328 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPEN_STAT);
00329 MUTEX_TRY_LOCK(lock, ch->mutex, thread);
00330 if (!lock) {
00331 #endif
00332 if (immediate) {
00333 clusterVCAllocator_free(vc);
00334 return NULL;
00335 }
00336 vc->mutex = ch->mutex;
00337 vc->action_ = cont;
00338 ch->thread->schedule_imm_signal(vc);
00339 return CLUSTER_DELAYED_OPEN;
00340 #ifdef CLUSTER_THREAD_STEALING
00341 } else {
00342 if (!(immediate || allow_immediate))
00343 vc->action_ = cont;
00344 if (vc->start(thread) < 0) {
00345 return NULL;
00346 }
00347 if (immediate || allow_immediate) {
00348 return vc;
00349 } else {
00350 return CLUSTER_DELAYED_OPEN;
00351 }
00352 }
00353 #endif
00354 }
00355
00356 bool ClusterProcessor::disable_remote_cluster_ops(ClusterMachine * m)
00357 {
00358 ClusterHandler *ch = m->pop_ClusterHandler(1);
00359 if (ch) {
00360 return ch->disable_remote_cluster_ops;
00361 } else {
00362 return true;
00363 }
00364 }
00365
00366
00367
00368
00369
00370
00371 GlobalClusterPeriodicEvent *
00372 PeriodicClusterEvent;
00373
00374 #ifdef CLUSTER_TOMCAT
00375 extern int cache_clustering_enabled;
00376
00377 int CacheClusterMonitorEnabled = 0;
00378 int CacheClusterMonitorIntervalSecs = 1;
00379
00380 int cluster_send_buffer_size = 0;
00381 int cluster_receive_buffer_size = 0;
00382 unsigned long cluster_sockopt_flags = 0;
00383 unsigned long cluster_packet_mark = 0;
00384 unsigned long cluster_packet_tos = 0;
00385
00386 int RPC_only_CacheCluster = 0;
00387 #endif
00388
00389 int
00390 ClusterProcessor::init()
00391 {
00392 cluster_rsb = RecAllocateRawStatBlock((int) cluster_stat_count);
00393
00394
00395
00396 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00397 "proxy.process.cluster.connections_open",
00398 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CONNECTIONS_OPEN_STAT, RecRawStatSyncSum);
00399 CLUSTER_CLEAR_DYN_STAT(CLUSTER_CONNECTIONS_OPEN_STAT);
00400 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00401 "proxy.process.cluster.connections_opened",
00402 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CONNECTIONS_OPENNED_STAT, RecRawStatSyncSum);
00403 CLUSTER_CLEAR_DYN_STAT(CLUSTER_CONNECTIONS_OPENNED_STAT);
00404 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00405 "proxy.process.cluster.connections_closed",
00406 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CON_TOTAL_TIME_STAT, RecRawStatSyncCount);
00407 CLUSTER_CLEAR_DYN_STAT(CLUSTER_CON_TOTAL_TIME_STAT);
00408 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00409 "proxy.process.cluster.slow_ctrl_msgs_sent",
00410 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_SLOW_CTRL_MSGS_SENT_STAT, RecRawStatSyncCount);
00411 CLUSTER_CLEAR_DYN_STAT(CLUSTER_SLOW_CTRL_MSGS_SENT_STAT);
00412 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00413 "proxy.process.cluster.connections_read_locked",
00414 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CONNECTIONS_READ_LOCKED_STAT, RecRawStatSyncSum);
00415 CLUSTER_CLEAR_DYN_STAT(CLUSTER_CONNECTIONS_READ_LOCKED_STAT);
00416 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00417 "proxy.process.cluster.connections_write_locked",
00418 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CONNECTIONS_WRITE_LOCKED_STAT, RecRawStatSyncSum);
00419 CLUSTER_CLEAR_DYN_STAT(CLUSTER_CONNECTIONS_WRITE_LOCKED_STAT);
00420 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00421 "proxy.process.cluster.reads",
00422 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_READ_BYTES_STAT, RecRawStatSyncCount);
00423 CLUSTER_CLEAR_DYN_STAT(CLUSTER_READ_BYTES_STAT);
00424 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00425 "proxy.process.cluster.read_bytes",
00426 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_READ_BYTES_STAT, RecRawStatSyncSum);
00427 CLUSTER_CLEAR_DYN_STAT(CLUSTER_READ_BYTES_STAT);
00428 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00429 "proxy.process.cluster.writes",
00430 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_WRITE_BYTES_STAT, RecRawStatSyncCount);
00431 CLUSTER_CLEAR_DYN_STAT(CLUSTER_WRITE_BYTES_STAT);
00432 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00433 "proxy.process.cluster.write_bytes",
00434 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_WRITE_BYTES_STAT, RecRawStatSyncSum);
00435 CLUSTER_CLEAR_DYN_STAT(CLUSTER_WRITE_BYTES_STAT);
00436 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00437 "proxy.process.cluster.control_messages_sent",
00438 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CTRL_MSGS_SEND_TIME_STAT, RecRawStatSyncCount);
00439 CLUSTER_CLEAR_DYN_STAT(CLUSTER_CTRL_MSGS_SEND_TIME_STAT);
00440 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00441 "proxy.process.cluster.control_messages_received",
00442 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CTRL_MSGS_RECV_TIME_STAT, RecRawStatSyncCount);
00443 CLUSTER_CLEAR_DYN_STAT(CLUSTER_CTRL_MSGS_RECV_TIME_STAT);
00444 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00445 "proxy.process.cluster.op_delayed_for_lock",
00446 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_OP_DELAYED_FOR_LOCK_STAT, RecRawStatSyncSum);
00447 CLUSTER_CLEAR_DYN_STAT(CLUSTER_OP_DELAYED_FOR_LOCK_STAT);
00448 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00449 "proxy.process.cluster.connections_bumped",
00450 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CONNECTIONS_BUMPED_STAT, RecRawStatSyncSum);
00451 CLUSTER_CLEAR_DYN_STAT(CLUSTER_CONNECTIONS_BUMPED_STAT);
00452 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00453 "proxy.process.cluster.net_backup",
00454 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_NET_BACKUP_STAT, RecRawStatSyncCount);
00455 CLUSTER_CLEAR_DYN_STAT(CLUSTER_NET_BACKUP_STAT);
00456 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00457 "proxy.process.cluster.nodes",
00458 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_NODES_STAT, RecRawStatSyncSum);
00459 CLUSTER_CLEAR_DYN_STAT(CLUSTER_NODES_STAT);
00460 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00461 "proxy.process.cluster.machines_allocated",
00462 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_MACHINES_ALLOCATED_STAT, RecRawStatSyncSum);
00463 CLUSTER_CLEAR_DYN_STAT(CLUSTER_MACHINES_ALLOCATED_STAT);
00464 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00465 "proxy.process.cluster.machines_freed",
00466 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_MACHINES_FREED_STAT, RecRawStatSyncSum);
00467 CLUSTER_CLEAR_DYN_STAT(CLUSTER_MACHINES_FREED_STAT);
00468 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00469 "proxy.process.cluster.configuration_changes",
00470 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CONFIGURATION_CHANGES_STAT, RecRawStatSyncCount);
00471 CLUSTER_CLEAR_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT);
00472 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00473 "proxy.process.cluster.delayed_reads",
00474 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_DELAYED_READS_STAT, RecRawStatSyncSum);
00475 CLUSTER_CLEAR_DYN_STAT(CLUSTER_DELAYED_READS_STAT);
00476 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00477 "proxy.process.cluster.byte_bank_used",
00478 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_BYTE_BANK_USED_STAT, RecRawStatSyncSum);
00479 CLUSTER_CLEAR_DYN_STAT(CLUSTER_BYTE_BANK_USED_STAT);
00480 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00481 "proxy.process.cluster.alloc_data_news",
00482 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_ALLOC_DATA_NEWS_STAT, RecRawStatSyncSum);
00483 CLUSTER_CLEAR_DYN_STAT(CLUSTER_ALLOC_DATA_NEWS_STAT);
00484 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00485 "proxy.process.cluster.write_bb_mallocs",
00486 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_WRITE_BB_MALLOCS_STAT, RecRawStatSyncSum);
00487 CLUSTER_CLEAR_DYN_STAT(CLUSTER_WRITE_BB_MALLOCS_STAT);
00488 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00489 "proxy.process.cluster.partial_reads",
00490 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_PARTIAL_READS_STAT, RecRawStatSyncSum);
00491 CLUSTER_CLEAR_DYN_STAT(CLUSTER_PARTIAL_READS_STAT);
00492 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00493 "proxy.process.cluster.partial_writes",
00494 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_PARTIAL_WRITES_STAT, RecRawStatSyncSum);
00495 CLUSTER_CLEAR_DYN_STAT(CLUSTER_PARTIAL_WRITES_STAT);
00496 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00497 "proxy.process.cluster.cache_outstanding",
00498 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CACHE_OUTSTANDING_STAT, RecRawStatSyncSum);
00499 CLUSTER_CLEAR_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
00500 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00501 "proxy.process.cluster.remote_op_timeouts",
00502 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_REMOTE_OP_TIMEOUTS_STAT, RecRawStatSyncSum);
00503 CLUSTER_CLEAR_DYN_STAT(CLUSTER_REMOTE_OP_TIMEOUTS_STAT);
00504 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00505 "proxy.process.cluster.remote_op_reply_timeouts",
00506 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_REMOTE_OP_REPLY_TIMEOUTS_STAT, RecRawStatSyncSum);
00507 CLUSTER_CLEAR_DYN_STAT(CLUSTER_REMOTE_OP_REPLY_TIMEOUTS_STAT);
00508 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00509 "proxy.process.cluster.chan_inuse",
00510 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CHAN_INUSE_STAT, RecRawStatSyncSum);
00511 CLUSTER_CLEAR_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
00512 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00513 "proxy.process.cluster.open_delays",
00514 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_OPEN_DELAY_TIME_STAT, RecRawStatSyncSum);
00515 CLUSTER_CLEAR_DYN_STAT(CLUSTER_OPEN_DELAY_TIME_STAT);
00516 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00517 "proxy.process.cluster.connections_avg_time",
00518 RECD_FLOAT, RECP_NON_PERSISTENT, (int) CLUSTER_CON_TOTAL_TIME_STAT, RecRawStatSyncHrTimeAvg);
00519 CLUSTER_CLEAR_DYN_STAT(CLUSTER_CON_TOTAL_TIME_STAT);
00520 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00521 "proxy.process.cluster.control_messages_avg_send_time",
00522 RECD_FLOAT, RECP_NON_PERSISTENT, (int) CLUSTER_CTRL_MSGS_SEND_TIME_STAT, RecRawStatSyncHrTimeAvg);
00523 CLUSTER_CLEAR_DYN_STAT(CLUSTER_CTRL_MSGS_SEND_TIME_STAT);
00524 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00525 "proxy.process.cluster.control_messages_avg_receive_time",
00526 RECD_FLOAT, RECP_NON_PERSISTENT, (int) CLUSTER_CTRL_MSGS_RECV_TIME_STAT, RecRawStatSyncHrTimeAvg);
00527 CLUSTER_CLEAR_DYN_STAT(CLUSTER_CTRL_MSGS_RECV_TIME_STAT);
00528 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00529 "proxy.process.cluster.open_delay_time",
00530 RECD_FLOAT, RECP_NON_PERSISTENT, (int) CLUSTER_OPEN_DELAY_TIME_STAT, RecRawStatSyncHrTimeAvg);
00531 CLUSTER_CLEAR_DYN_STAT(CLUSTER_OPEN_DELAY_TIME_STAT);
00532 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00533 "proxy.process.cluster.cache_callback_time",
00534 RECD_FLOAT, RECP_NON_PERSISTENT, (int) CLUSTER_CACHE_CALLBACK_TIME_STAT, RecRawStatSyncHrTimeAvg);
00535 CLUSTER_CLEAR_DYN_STAT(CLUSTER_CACHE_CALLBACK_TIME_STAT);
00536 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00537 "proxy.process.cluster.rmt_cache_callback_time",
00538 RECD_FLOAT, RECP_NON_PERSISTENT,
00539 (int) CLUSTER_CACHE_RMT_CALLBACK_TIME_STAT, RecRawStatSyncHrTimeAvg);
00540 CLUSTER_CLEAR_DYN_STAT(CLUSTER_CACHE_RMT_CALLBACK_TIME_STAT);
00541 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00542 "proxy.process.cluster.lkrmt_cache_callback_time",
00543 RECD_FLOAT, RECP_NON_PERSISTENT,
00544 (int) CLUSTER_CACHE_LKRMT_CALLBACK_TIME_STAT, RecRawStatSyncHrTimeAvg);
00545 CLUSTER_CLEAR_DYN_STAT(CLUSTER_CACHE_LKRMT_CALLBACK_TIME_STAT);
00546 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00547 "proxy.process.cluster.local_connection_time",
00548 RECD_FLOAT, RECP_NON_PERSISTENT,
00549 (int) CLUSTER_LOCAL_CONNECTION_TIME_STAT, RecRawStatSyncHrTimeAvg);
00550 CLUSTER_CLEAR_DYN_STAT(CLUSTER_LOCAL_CONNECTION_TIME_STAT);
00551 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00552 "proxy.process.cluster.remote_connection_time",
00553 RECD_FLOAT, RECP_NON_PERSISTENT,
00554 (int) CLUSTER_REMOTE_CONNECTION_TIME_STAT, RecRawStatSyncHrTimeAvg);
00555 CLUSTER_CLEAR_DYN_STAT(CLUSTER_REMOTE_CONNECTION_TIME_STAT);
00556 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00557 "proxy.process.cluster.rdmsg_assemble_time",
00558 RECD_FLOAT, RECP_NON_PERSISTENT, (int) CLUSTER_RDMSG_ASSEMBLE_TIME_STAT, RecRawStatSyncHrTimeAvg);
00559 CLUSTER_CLEAR_DYN_STAT(CLUSTER_RDMSG_ASSEMBLE_TIME_STAT);
00560 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00561 "proxy.process.cluster.cluster_ping_time",
00562 RECD_FLOAT, RECP_NON_PERSISTENT, (int) CLUSTER_PING_TIME_STAT, RecRawStatSyncHrTimeAvg);
00563 CLUSTER_CLEAR_DYN_STAT(CLUSTER_PING_TIME_STAT);
00564 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00565 "proxy.process.cluster.cache_callbacks",
00566 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CACHE_CALLBACK_TIME_STAT, RecRawStatSyncCount);
00567 CLUSTER_CLEAR_DYN_STAT(CLUSTER_CACHE_CALLBACK_TIME_STAT);
00568 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00569 "proxy.process.cluster.rmt_cache_callbacks",
00570 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CACHE_RMT_CALLBACK_TIME_STAT, RecRawStatSyncCount);
00571 CLUSTER_CLEAR_DYN_STAT(CLUSTER_CACHE_RMT_CALLBACK_TIME_STAT);
00572 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00573 "proxy.process.cluster.lkrmt_cache_callbacks",
00574 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_CACHE_LKRMT_CALLBACK_TIME_STAT, RecRawStatSyncCount);
00575 CLUSTER_CLEAR_DYN_STAT(CLUSTER_CACHE_LKRMT_CALLBACK_TIME_STAT);
00576 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00577 "proxy.process.cluster.local_connections_closed",
00578 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_LOCAL_CONNECTION_TIME_STAT, RecRawStatSyncCount);
00579 CLUSTER_CLEAR_DYN_STAT(CLUSTER_LOCAL_CONNECTION_TIME_STAT);
00580 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00581 "proxy.process.cluster.remote_connections_closed",
00582 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_REMOTE_CONNECTION_TIME_STAT, RecRawStatSyncCount);
00583 CLUSTER_CLEAR_DYN_STAT(CLUSTER_REMOTE_CONNECTION_TIME_STAT);
00584 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00585 "proxy.process.cluster.setdata_no_clustervc",
00586 RECD_INT, RECP_NON_PERSISTENT, (int) cluster_setdata_no_CLUSTERVC_STAT, RecRawStatSyncCount);
00587 CLUSTER_CLEAR_DYN_STAT(cluster_setdata_no_CLUSTERVC_STAT);
00588 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00589 "proxy.process.cluster.setdata_no_tunnel",
00590 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_SETDATA_NO_TUNNEL_STAT, RecRawStatSyncCount);
00591 CLUSTER_CLEAR_DYN_STAT(CLUSTER_SETDATA_NO_TUNNEL_STAT);
00592 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00593 "proxy.process.cluster.setdata_no_cachevc",
00594 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_SETDATA_NO_CACHEVC_STAT, RecRawStatSyncCount);
00595 CLUSTER_CLEAR_DYN_STAT(CLUSTER_SETDATA_NO_CACHEVC_STAT);
00596 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00597 "proxy.process.cluster.setdata_no_cluster",
00598 RECD_INT, RECP_NON_PERSISTENT, (int) cluster_setdata_no_CLUSTER_STAT, RecRawStatSyncCount);
00599 CLUSTER_CLEAR_DYN_STAT(cluster_setdata_no_CLUSTER_STAT);
00600 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00601 "proxy.process.cluster.vc_write_stall",
00602 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_WRITE_STALL_STAT, RecRawStatSyncCount);
00603 CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_WRITE_STALL_STAT);
00604 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00605 "proxy.process.cluster.no_remote_space",
00606 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_NO_REMOTE_SPACE_STAT, RecRawStatSyncCount);
00607 CLUSTER_CLEAR_DYN_STAT(CLUSTER_NO_REMOTE_SPACE_STAT);
00608 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00609 "proxy.process.cluster.level1_bank",
00610 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_LEVEL1_BANK_STAT, RecRawStatSyncCount);
00611 CLUSTER_CLEAR_DYN_STAT(CLUSTER_LEVEL1_BANK_STAT);
00612 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00613 "proxy.process.cluster.multilevel_bank",
00614 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_MULTILEVEL_BANK_STAT, RecRawStatSyncCount);
00615 CLUSTER_CLEAR_DYN_STAT(CLUSTER_MULTILEVEL_BANK_STAT);
00616 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00617 "proxy.process.cluster.vc_cache_insert_lock_misses",
00618 RECD_INT, RECP_NON_PERSISTENT,
00619 (int) CLUSTER_VC_CACHE_INSERT_LOCK_MISSES_STAT, RecRawStatSyncCount);
00620 CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_INSERT_LOCK_MISSES_STAT);
00621 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00622 "proxy.process.cluster.vc_cache_inserts",
00623 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_CACHE_INSERTS_STAT, RecRawStatSyncCount);
00624 CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_INSERTS_STAT);
00625 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00626 "proxy.process.cluster.vc_cache_lookup_lock_misses",
00627 RECD_INT, RECP_NON_PERSISTENT,
00628 (int) CLUSTER_VC_CACHE_LOOKUP_LOCK_MISSES_STAT, RecRawStatSyncCount);
00629 CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_LOCK_MISSES_STAT);
00630 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00631 "proxy.process.cluster.vc_cache_lookup_hits",
00632 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_CACHE_LOOKUP_HITS_STAT, RecRawStatSyncCount);
00633 CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_HITS_STAT);
00634 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00635 "proxy.process.cluster.vc_cache_lookup_misses",
00636 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_CACHE_LOOKUP_MISSES_STAT, RecRawStatSyncCount);
00637 CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_MISSES_STAT);
00638 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00639 "proxy.process.cluster.vc_cache_scans",
00640 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_CACHE_SCANS_STAT, RecRawStatSyncCount);
00641 CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_SCANS_STAT);
00642 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00643 "proxy.process.cluster.vc_cache_scan_lock_misses",
00644 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_CACHE_SCAN_LOCK_MISSES_STAT, RecRawStatSyncCount);
00645 CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_SCAN_LOCK_MISSES_STAT);
00646 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00647 "proxy.process.cluster.vc_cache_purges",
00648 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_CACHE_PURGES_STAT, RecRawStatSyncCount);
00649 CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_CACHE_PURGES_STAT);
00650 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00651 "proxy.process.cluster.write_lock_misses",
00652 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_WRITE_LOCK_MISSES_STAT, RecRawStatSyncCount);
00653 CLUSTER_CLEAR_DYN_STAT(CLUSTER_WRITE_LOCK_MISSES_STAT);
00654 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00655 "proxy.process.cluster.vc_read_list_len",
00656 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_READ_LIST_LEN_STAT, RecRawStatSyncAvg);
00657 CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_READ_LIST_LEN_STAT);
00658 RecRegisterRawStat(cluster_rsb, RECT_PROCESS,
00659 "proxy.process.cluster.vc_write_list_len",
00660 RECD_INT, RECP_NON_PERSISTENT, (int) CLUSTER_VC_WRITE_LIST_LEN_STAT, RecRawStatSyncAvg);
00661 CLUSTER_CLEAR_DYN_STAT(CLUSTER_VC_WRITE_LIST_LEN_STAT);
00662 CLUSTER_CLEAR_DYN_STAT(CLUSTER_NODES_STAT);
00663
00664
00665 CLUSTER_SUM_GLOBAL_DYN_STAT(CLUSTER_NODES_STAT, 1);
00666
00667 REC_ReadConfigInteger(ClusterLoadMonitor::cf_monitor_enabled, "proxy.config.cluster.load_monitor_enabled");
00668 REC_ReadConfigInteger(ClusterLoadMonitor::cf_ping_message_send_msec_interval, "proxy.config.cluster.ping_send_interval_msecs");
00669 REC_ReadConfigInteger(ClusterLoadMonitor::cf_num_ping_response_buckets, "proxy.config.cluster.ping_response_buckets");
00670 REC_ReadConfigInteger(ClusterLoadMonitor::cf_msecs_per_ping_response_bucket, "proxy.config.cluster.msecs_per_ping_response_bucket");
00671 REC_ReadConfigInteger(ClusterLoadMonitor::cf_ping_latency_threshold_msecs, "proxy.config.cluster.ping_latency_threshold_msecs");
00672 REC_ReadConfigInteger(ClusterLoadMonitor::cf_cluster_load_compute_msec_interval, "proxy.config.cluster.load_compute_interval_msecs");
00673 REC_ReadConfigInteger(ClusterLoadMonitor::cf_cluster_periodic_msec_interval, "proxy.config.cluster.periodic_timer_interval_msecs");
00674 REC_ReadConfigInteger(ClusterLoadMonitor::cf_ping_history_buf_length, "proxy.config.cluster.ping_history_buf_length");
00675 REC_ReadConfigInteger(ClusterLoadMonitor::cf_cluster_load_clear_duration, "proxy.config.cluster.cluster_load_clear_duration");
00676 REC_ReadConfigInteger(ClusterLoadMonitor::cf_cluster_load_exceed_duration, "proxy.config.cluster.cluster_load_exceed_duration");
00677
00678
00679
00680
00681 if (cluster_port_number != DEFAULT_CLUSTER_PORT_NUMBER)
00682 cluster_port = cluster_port_number;
00683 else {
00684 REC_ReadConfigInteger(cluster_port, "proxy.config.cluster.cluster_port");
00685 }
00686 if (num_of_cluster_threads == DEFAULT_NUMBER_OF_CLUSTER_THREADS)
00687 REC_ReadConfigInteger(num_of_cluster_threads, "proxy.config.cluster.threads");
00688
00689 REC_EstablishStaticConfigInt32(CacheClusterMonitorEnabled, "proxy.config.cluster.enable_monitor");
00690 REC_EstablishStaticConfigInt32(CacheClusterMonitorIntervalSecs, "proxy.config.cluster.monitor_interval_secs");
00691 REC_ReadConfigInteger(cluster_receive_buffer_size, "proxy.config.cluster.receive_buffer_size");
00692 REC_ReadConfigInteger(cluster_send_buffer_size, "proxy.config.cluster.send_buffer_size");
00693 REC_ReadConfigInteger(cluster_sockopt_flags, "proxy.config.cluster.sock_option_flag");
00694 REC_ReadConfigInteger(cluster_packet_mark, "proxy.config.cluster.sock_packet_mark");
00695 REC_ReadConfigInteger(cluster_packet_tos, "proxy.config.cluster.sock_packet_tos");
00696 REC_EstablishStaticConfigInt32(RPC_only_CacheCluster, "proxy.config.cluster.rpc_cache_cluster");
00697
00698 int cluster_type = 0;
00699 REC_ReadConfigInteger(cluster_type, "proxy.local.cluster.type");
00700
00701 create_this_cluster_machine();
00702
00703 clusterAPI_init();
00704
00705 PeriodicClusterEvent = new GlobalClusterPeriodicEvent;
00706 PeriodicClusterEvent->init();
00707
00708 this_cluster = new Cluster;
00709 ClusterConfiguration *cc = new ClusterConfiguration;
00710 this_cluster->configurations.push(cc);
00711 cc->n_machines = 1;
00712 cc->machines[0] = this_cluster_machine();
00713 memset(cc->hash_table, 0, CLUSTER_HASH_TABLE_SIZE);
00714
00715
00716 memset(channel_dummy_output, 0, sizeof(channel_dummy_output));
00717
00718 if (cluster_type == 1) {
00719 cache_clustering_enabled = 1;
00720 Note("cache clustering enabled");
00721 compute_cluster_mode();
00722 } else {
00723 cache_clustering_enabled = 0;
00724 Note("cache clustering disabled");
00725 }
00726 return 0;
00727 }
00728
00729
00730 int
00731 init_clusterprocessor(void)
00732 {
00733 return clusterProcessor.init();
00734 }
00735
00736 int
00737 ClusterProcessor::start()
00738 {
00739 #ifdef LOCAL_CLUSTER_TEST_MODE
00740 this_cluster_machine()->cluster_port = cluster_port;
00741 #endif
00742 if (cache_clustering_enabled && (cacheProcessor.IsCacheEnabled() == CACHE_INITIALIZED)) {
00743 size_t stacksize;
00744
00745 REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
00746 ET_CLUSTER = eventProcessor.spawn_event_threads(num_of_cluster_threads, "ET_CLUSTER", stacksize);
00747 for (int i = 0; i < eventProcessor.n_threads_for_type[ET_CLUSTER]; i++) {
00748 initialize_thread_for_net(eventProcessor.eventthread[ET_CLUSTER][i]);
00749 }
00750 REC_RegisterConfigUpdateFunc("proxy.config.cluster.cluster_configuration", machine_config_change, (void *) CLUSTER_CONFIG);
00751 do_machine_config_change((void *) CLUSTER_CONFIG, "proxy.config.cluster.cluster_configuration");
00752
00753 #ifdef USE_SEPARATE_MACHINE_CONFIG
00754 REC_RegisterConfigUpdateFunc("proxy.config.cluster.machine_configuration", machine_config_change, (void *) MACHINE_CONFIG);
00755 do_machine_config_change((void *) MACHINE_CONFIG, "proxy.config.cluster.machine_configuration");
00756 #endif
00757
00758 accept_handler = new ClusterAccept(&cluster_port, cluster_receive_buffer_size, cluster_send_buffer_size);
00759 accept_handler->Init();
00760 }
00761 return 0;
00762 }
00763
00764 void
00765 ClusterProcessor::connect(char *hostname, int16_t id)
00766 {
00767
00768
00769
00770 ClusterHandler *ch = new ClusterHandler;
00771 SET_CONTINUATION_HANDLER(ch, (ClusterContHandler) & ClusterHandler::connectClusterEvent);
00772 ch->hostname = ats_strdup(hostname);
00773 ch->connector = true;
00774 ch->id = id;
00775 eventProcessor.schedule_imm(ch, ET_CLUSTER);
00776 }
00777
00778 void
00779 ClusterProcessor::connect(unsigned int ip, int port, int16_t id, bool delay)
00780 {
00781
00782
00783
00784 ClusterHandler *ch = new ClusterHandler;
00785 SET_CONTINUATION_HANDLER(ch, (ClusterContHandler) & ClusterHandler::connectClusterEvent);
00786 ch->ip = ip;
00787 ch->port = port;
00788 ch->connector = true;
00789 ch->id = id;
00790 if (delay)
00791 eventProcessor.schedule_in(ch, CLUSTER_MEMBER_DELAY, ET_CLUSTER);
00792 else
00793 eventProcessor.schedule_imm(ch, ET_CLUSTER);
00794 }
00795
00796 void
00797 ClusterProcessor::send_machine_list(ClusterMachine * m)
00798 {
00799
00800
00801
00802
00803
00804 MachineListMessage mlistmsg;
00805 int vers = MachineListMessage::protoToVersion(m->msg_proto_major);
00806 ClusterConfiguration *cc = this_cluster->current_configuration();
00807 void *data;
00808 int len;
00809
00810 if (vers == MachineListMessage::MACHINE_LIST_MESSAGE_VERSION) {
00811 int n = 0;
00812 MachineListMessage *msg = &mlistmsg;
00813
00814 while (n < cc->n_machines) {
00815 msg->ip[n] = cc->machines[n]->ip;
00816 n++;
00817 }
00818 msg->n_ip = n;
00819 data = (void *) msg;
00820 len = msg->sizeof_fixedlen_msg() + (n * sizeof(uint32_t));
00821
00822 } else {
00823
00824
00825
00826 ink_release_assert(!"send_machine_list() bad msg version");
00827 }
00828 invoke_remote(m->pop_ClusterHandler(), MACHINE_LIST_CLUSTER_FUNCTION, data, len);
00829 }
00830
00831 void
00832 ClusterProcessor::compute_cluster_mode()
00833 {
00834 if (RPC_only_CacheCluster) {
00835 if (cache_clustering_enabled > 0) {
00836 cache_clustering_enabled = -1;
00837 Note("RPC only cache clustering");
00838 }
00839 } else {
00840 if (cache_clustering_enabled < 0) {
00841 cache_clustering_enabled = 1;
00842 Note("RPC only cache clustering disabled");
00843 }
00844 }
00845 }
00846
00847