00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include "P_Cluster.h"
00031
00032 int cluster_port = DEFAULT_CLUSTER_PORT_NUMBER;
00033
00034 ClusterAccept::ClusterAccept(int *port, int send_bufsize, int recv_bufsize)
00035 : Continuation(0),
00036 p_cluster_port(port),
00037 socket_send_bufsize(send_bufsize),
00038 socket_recv_bufsize(recv_bufsize),
00039 current_cluster_port(-1),
00040 accept_action(0),
00041 periodic_event(0)
00042 {
00043 mutex = new_ProxyMutex();
00044 SET_HANDLER(&ClusterAccept::ClusterAcceptEvent);
00045 }
00046
00047 ClusterAccept::~ClusterAccept()
00048 {
00049 mutex = 0;
00050 }
00051
00052 void
00053 ClusterAccept::Init()
00054 {
00055
00056
00057
00058 current_cluster_port = ~*p_cluster_port;
00059 ClusterAcceptEvent(EVENT_INTERVAL, 0);
00060
00061
00062 periodic_event = eventProcessor.schedule_every(this, HRTIME_SECONDS(60));
00063 }
00064
00065 void
00066 ClusterAccept::ShutdownDelete()
00067 {
00068 MUTEX_TRY_LOCK(lock, this->mutex, this_ethread());
00069 if (!lock) {
00070 eventProcessor.schedule_imm(this, ET_CALL);
00071 return;
00072 }
00073
00074 if (accept_action) {
00075 accept_action->cancel();
00076 accept_action = 0;
00077 }
00078 if (periodic_event) {
00079 periodic_event->cancel();
00080 periodic_event = 0;
00081 }
00082 delete this;
00083 }
00084
00085 int
00086 ClusterAccept::ClusterAcceptEvent(int event, void *data)
00087 {
00088 switch (event) {
00089 case EVENT_IMMEDIATE:
00090 {
00091 ShutdownDelete();
00092 return EVENT_DONE;
00093 }
00094 case EVENT_INTERVAL:
00095 {
00096 int cluster_port = *p_cluster_port;
00097
00098 if (cluster_port != current_cluster_port) {
00099
00100 if (accept_action) {
00101 accept_action->cancel();
00102 accept_action = 0;
00103 }
00104
00105 NetProcessor::AcceptOptions opt;
00106 opt.recv_bufsize = socket_recv_bufsize;
00107 opt.send_bufsize = socket_send_bufsize;
00108 opt.etype = ET_CLUSTER;
00109 opt.local_port = cluster_port;
00110 opt.ip_family = AF_INET;
00111 opt.localhost_only = false;
00112
00113 accept_action = netProcessor.main_accept(this, NO_FD, opt);
00114 if (!accept_action) {
00115 Warning("Unable to accept cluster connections on port: %d", cluster_port);
00116 } else {
00117 current_cluster_port = cluster_port;
00118 }
00119 }
00120 return EVENT_CONT;
00121 }
00122 case NET_EVENT_ACCEPT:
00123 {
00124 ClusterAcceptMachine((NetVConnection *) data);
00125 return EVENT_DONE;
00126 }
00127 default:
00128 {
00129 Warning("ClusterAcceptEvent: received unknown event %d", event);
00130 return EVENT_DONE;
00131 }
00132 }
00133 }
00134
00135 int
00136 ClusterAccept::ClusterAcceptMachine(NetVConnection * NetVC)
00137 {
00138
00139 unsigned int remote_ip = NetVC->get_remote_ip();
00140 MachineList *mc = the_cluster_machines_config();
00141
00142 if (mc && !mc->find(remote_ip)) {
00143 Note("Illegal cluster connection from %u.%u.%u.%u", DOT_SEPARATED(remote_ip));
00144 NetVC->do_io(VIO::CLOSE);
00145 return 0;
00146 }
00147
00148 Debug(CL_NOTE, "Accepting machine %u.%u.%u.%u", DOT_SEPARATED(remote_ip));
00149 ClusterHandler *ch = new ClusterHandler;
00150 ch->machine = new ClusterMachine(NULL, remote_ip);
00151 ch->ip = remote_ip;
00152 ch->net_vc = NetVC;
00153 eventProcessor.schedule_imm_signal(ch, ET_CLUSTER);
00154 return 1;
00155 }
00156
00157 static void
00158 make_cluster_connections(MachineList * l)
00159 {
00160
00161
00162
00163 uint32_t ip = this_cluster_machine()->ip;
00164 int num_connections = this_cluster_machine()->num_connections;
00165
00166 if (l) {
00167 for (int i = 0; i < l->n; i++) {
00168 #ifdef LOCAL_CLUSTER_TEST_MODE
00169 if (ip < l->machine[i].ip || (ip == l->machine[i].ip && (cluster_port < l->machine[i].port))) {
00170 #else
00171 if (ip < l->machine[i].ip) {
00172 #endif
00173 for (int j = 0; j < num_connections; j++) {
00174 clusterProcessor.connect(l->machine[i].ip, l->machine[i].port, j);
00175 }
00176 }
00177 }
00178 }
00179 }
00180
00181 int
00182 machine_config_change(const char * , RecDataT , RecData data,
00183 void *cookie)
00184 {
00185
00186
00187
00188
00189
00190
00191
00192 char *filename = (char *) data.rec_string;
00193 MachineList *l = read_MachineList(filename);
00194 MachineList *old = NULL;
00195 #ifdef USE_SEPARATE_MACHINE_CONFIG
00196 switch ((int) cookie) {
00197 case MACHINE_CONFIG:
00198 old = machines_config;
00199 machines_config = l;
00200 break;
00201 case CLUSTER_CONFIG:
00202 old = cluster_config;
00203 cluster_config = l;
00204 make_cluster_connections(l);
00205 break;
00206 }
00207 #else
00208 (void) cookie;
00209 old = cluster_config;
00210 machines_config = l;
00211 cluster_config = l;
00212 make_cluster_connections(l);
00213 #endif
00214 if (old)
00215 free_MachineList(old);
00216 return 0;
00217 }
00218
00219 void
00220 do_machine_config_change(void *d, const char *s)
00221 {
00222 char cluster_config_filename[PATH_NAME_MAX] = "";
00223 REC_ReadConfigString(cluster_config_filename, s, sizeof(cluster_config_filename) - 1);
00224 RecData data;
00225 data.rec_string = cluster_config_filename;
00226 machine_config_change(s, RECD_STRING, data, d);
00227 }
00228
00229
00230
00231
00232 ClusterConfiguration::ClusterConfiguration():n_machines(0), changed(0)
00233 {
00234 memset(machines, 0, sizeof(machines));
00235 memset(hash_table, 0, sizeof(hash_table));
00236 }
00237
00238
00239
00240
00241 struct ConfigurationContinuation;
00242 typedef int (ConfigurationContinuation::*CfgContHandler) (int, void *);
00243 struct ConfigurationContinuation: public Continuation
00244 {
00245 ClusterConfiguration *c;
00246 ClusterConfiguration *prev;
00247
00248 int
00249 zombieEvent(int , Event * e)
00250 {
00251 prev->link.next = NULL;
00252 SET_HANDLER((CfgContHandler) & ConfigurationContinuation::dieEvent);
00253 e->schedule_in(CLUSTER_CONFIGURATION_ZOMBIE);
00254 return EVENT_CONT;
00255 }
00256
00257 int
00258 dieEvent(int event, Event * e)
00259 {
00260 (void) event;
00261 (void) e;
00262 delete c;
00263 delete this;
00264 return EVENT_DONE;
00265 }
00266
00267 ConfigurationContinuation(ClusterConfiguration * cc, ClusterConfiguration * aprev)
00268 : Continuation(NULL), c(cc), prev(aprev) {
00269 mutex = new_ProxyMutex();
00270 SET_HANDLER((CfgContHandler) & ConfigurationContinuation::zombieEvent);
00271 }
00272 };
00273
00274 static void
00275 free_configuration(ClusterConfiguration * c, ClusterConfiguration * prev)
00276 {
00277
00278
00279
00280
00281
00282
00283
00284
00285 eventProcessor.schedule_in(new ConfigurationContinuation(c, prev), CLUSTER_CONFIGURATION_TIMEOUT, ET_CALL);
00286 }
00287
00288 ClusterConfiguration *
00289 configuration_add_machine(ClusterConfiguration * c, ClusterMachine * m)
00290 {
00291
00292
00293
00294 EThread *thread = this_ethread();
00295 ProxyMutex *mutex = thread->mutex;
00296 int i = 0;
00297 ClusterConfiguration *cc = new ClusterConfiguration(*c);
00298
00299
00300
00301 for (i = 0; i < cc->n_machines; i++) {
00302 if (cc->machines[i]->ip > m->ip)
00303 break;
00304 }
00305
00306
00307
00308 for (int j = cc->n_machines - 1; j >= i; j--)
00309 cc->machines[j + 1] = cc->machines[j];
00310
00311
00312
00313 cc->machines[i] = m;
00314 cc->n_machines++;
00315
00316 cc->link.next = c;
00317 cc->changed = ink_get_hrtime();
00318 ink_assert(cc->n_machines < CLUSTER_MAX_MACHINES);
00319
00320 build_cluster_hash_table(cc);
00321 INK_MEMORY_BARRIER;
00322 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT);
00323
00324 free_configuration(c, cc);
00325 return cc;
00326 }
00327
00328 ClusterConfiguration *
00329 configuration_remove_machine(ClusterConfiguration * c, ClusterMachine * m)
00330 {
00331 EThread *thread = this_ethread();
00332 ProxyMutex *mutex = thread->mutex;
00333
00334
00335
00336
00337 ClusterConfiguration *cc = new ClusterConfiguration(*c);
00338
00339
00340
00341 for (int i = 0; i < cc->n_machines - 1; i++)
00342 if (m == cc->machines[i])
00343 m = cc->machines[i] = cc->machines[i + 1];
00344 cc->n_machines--;
00345
00346 ink_assert(cc->n_machines > 0);
00347
00348 cc->link.next = c;
00349 cc->changed = ink_get_hrtime();
00350
00351 build_cluster_hash_table(cc);
00352 INK_MEMORY_BARRIER;
00353 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT);
00354
00355 free_configuration(c, cc);
00356 return cc;
00357 }
00358
00359
00360
00361
00362
00363
00364
00365
00366
00367 ClusterMachine *
00368 cluster_machine_at_depth(unsigned int hash, int *pprobe_depth, ClusterMachine ** past_probes)
00369 {
00370 #ifdef CLUSTER_TOMCAT
00371 if (!cache_clustering_enabled)
00372 return NULL;
00373 #endif
00374 ClusterConfiguration *
00375 cc = this_cluster()->current_configuration();
00376 ClusterConfiguration *
00377 next_cc = cc;
00378 ink_hrtime now = ink_get_hrtime();
00379 int fake_probe_depth = 0;
00380 int &probe_depth = pprobe_depth ? (*pprobe_depth) : fake_probe_depth;
00381 int tprobe_depth = probe_depth;
00382
00383 #ifdef CLUSTER_TEST
00384 if (cc->n_machines > 1) {
00385 for (int i = 0; i < cc->n_machines; ++i) {
00386 if (cc->machines[i] != this_cluster_machine()) {
00387 return cc->machines[i];
00388 }
00389 }
00390 }
00391 #endif // CLUSTER_TEST
00392
00393 while (1) {
00394
00395
00396 if (probe_depth > CONFIGURATION_HISTORY_PROBE_DEPTH)
00397 break;
00398
00399
00400
00401 if (!cc || !next_cc)
00402 break;
00403
00404 cc = next_cc;
00405 next_cc = next_cc->link.next;
00406
00407
00408
00409 if (tprobe_depth) {
00410 if (cc->changed > (now + CLUSTER_CONFIGURATION_TIMEOUT))
00411 break;
00412 tprobe_depth--;
00413 continue;
00414 }
00415
00416 ClusterMachine *
00417 m = cc->machine_hash(hash);
00418
00419
00420
00421
00422 bool
00423 ok = !(m == this_cluster_machine() || (past_probes && machine_in_vector(m, past_probes, probe_depth)) || m->dead);
00424
00425
00426
00427
00428 if (past_probes && probe_depth < CONFIGURATION_HISTORY_PROBE_DEPTH)
00429 past_probes[probe_depth] = m;
00430 probe_depth++;
00431
00432 if (!ok) {
00433 if (!pprobe_depth)
00434 break;
00435 continue;
00436 }
00437
00438 return (m != this_cluster_machine()) ? m : NULL;
00439 }
00440 return NULL;
00441 }
00442
00443
00444
00445
00446
00447
00448
00449 void
00450 initialize_thread_for_cluster(EThread * e)
00451 {
00452 (void) e;
00453 }
00454
00455
00456
00457
00458 Cluster::Cluster()
00459 {
00460 }
00461
00462