00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 #include "P_Cluster.h"
00031 
00032 int cluster_port = DEFAULT_CLUSTER_PORT_NUMBER;
00033 
00034 ClusterAccept::ClusterAccept(int *port, int send_bufsize, int recv_bufsize)
00035   : Continuation(0),
00036     p_cluster_port(port),
00037     socket_send_bufsize(send_bufsize),
00038     socket_recv_bufsize(recv_bufsize),
00039     current_cluster_port(-1),
00040     accept_action(0),
00041     periodic_event(0)
00042 {
00043   mutex = new_ProxyMutex();
00044   SET_HANDLER(&ClusterAccept::ClusterAcceptEvent);
00045 }
00046 
00047 ClusterAccept::~ClusterAccept()
00048 {
00049   mutex = 0;
00050 }
00051 
00052 void
00053 ClusterAccept::Init()
00054 {
00055   
00056   
00057 
00058   current_cluster_port = ~*p_cluster_port;
00059   ClusterAcceptEvent(EVENT_INTERVAL, 0);
00060 
00061   
00062   periodic_event = eventProcessor.schedule_every(this, HRTIME_SECONDS(60));
00063 }
00064 
00065 void
00066 ClusterAccept::ShutdownDelete()
00067 {
00068   MUTEX_TRY_LOCK(lock, this->mutex, this_ethread());
00069   if (!lock) {
00070     eventProcessor.schedule_imm(this, ET_CALL);
00071     return;
00072   }
00073   
00074   if (accept_action) {
00075     accept_action->cancel();
00076     accept_action = 0;
00077   }
00078   if (periodic_event) {
00079     periodic_event->cancel();
00080     periodic_event = 0;
00081   }
00082   delete this;
00083 }
00084 
00085 int
00086 ClusterAccept::ClusterAcceptEvent(int event, void *data)
00087 {
00088   switch (event) {
00089   case EVENT_IMMEDIATE:
00090     {
00091       ShutdownDelete();
00092       return EVENT_DONE;
00093     }
00094   case EVENT_INTERVAL:
00095     {
00096       int cluster_port = *p_cluster_port;
00097 
00098       if (cluster_port != current_cluster_port) {
00099         
00100         if (accept_action) {
00101           accept_action->cancel();
00102           accept_action = 0;
00103         }
00104 
00105         NetProcessor::AcceptOptions opt;
00106         opt.recv_bufsize = socket_recv_bufsize;
00107         opt.send_bufsize = socket_send_bufsize;
00108         opt.etype = ET_CLUSTER;
00109         opt.local_port = cluster_port;
00110         opt.ip_family = AF_INET;
00111         opt.localhost_only = false;
00112 
00113         accept_action = netProcessor.main_accept(this, NO_FD, opt);
00114         if (!accept_action) {
00115           Warning("Unable to accept cluster connections on port: %d", cluster_port);
00116         } else {
00117           current_cluster_port = cluster_port;
00118         }
00119       }
00120       return EVENT_CONT;
00121     }
00122   case NET_EVENT_ACCEPT:
00123     {
00124       ClusterAcceptMachine((NetVConnection *) data);
00125       return EVENT_DONE;
00126     }
00127   default:
00128     {
00129       Warning("ClusterAcceptEvent: received unknown event %d", event);
00130       return EVENT_DONE;
00131     }
00132   }                             
00133 }
00134 
00135 int
00136 ClusterAccept::ClusterAcceptMachine(NetVConnection * NetVC)
00137 {
00138   
00139   unsigned int remote_ip = NetVC->get_remote_ip();
00140   MachineList *mc = the_cluster_machines_config();
00141 
00142   if (mc && !mc->find(remote_ip)) {
00143     Note("Illegal cluster connection from %u.%u.%u.%u", DOT_SEPARATED(remote_ip));
00144     NetVC->do_io(VIO::CLOSE);
00145     return 0;
00146   }
00147 
00148   Debug(CL_NOTE, "Accepting machine %u.%u.%u.%u", DOT_SEPARATED(remote_ip));
00149   ClusterHandler *ch = new ClusterHandler;
00150   ch->machine = new ClusterMachine(NULL, remote_ip);
00151   ch->ip = remote_ip;
00152   ch->net_vc = NetVC;
00153   eventProcessor.schedule_imm_signal(ch, ET_CLUSTER);
00154   return 1;
00155 }
00156 
00157 static void
00158 make_cluster_connections(MachineList * l)
00159 {
00160   
00161   
00162   
00163   uint32_t ip = this_cluster_machine()->ip;
00164   int num_connections = this_cluster_machine()->num_connections;
00165 
00166   if (l) {
00167     for (int i = 0; i < l->n; i++) {
00168 #ifdef LOCAL_CLUSTER_TEST_MODE
00169       if (ip < l->machine[i].ip || (ip == l->machine[i].ip && (cluster_port < l->machine[i].port))) {
00170 #else
00171       if (ip < l->machine[i].ip) {
00172 #endif
00173         for (int j = 0; j < num_connections; j++) {
00174           clusterProcessor.connect(l->machine[i].ip, l->machine[i].port, j);
00175         }
00176       }
00177     }
00178   }
00179 }
00180 
00181 int
00182 machine_config_change(const char * , RecDataT , RecData data,
00183                       void *cookie)
00184 {
00185   
00186   
00187   
00188   
00189   
00190   
00191   
00192   char *filename = (char *) data.rec_string;
00193   MachineList *l = read_MachineList(filename);
00194   MachineList *old = NULL;
00195 #ifdef USE_SEPARATE_MACHINE_CONFIG
00196   switch ((int) cookie) {
00197   case MACHINE_CONFIG:
00198     old = machines_config;
00199     machines_config = l;
00200     break;
00201   case CLUSTER_CONFIG:
00202     old = cluster_config;
00203     cluster_config = l;
00204     make_cluster_connections(l);
00205     break;
00206   }
00207 #else
00208   (void) cookie;
00209   old = cluster_config;
00210   machines_config = l;
00211   cluster_config = l;
00212   make_cluster_connections(l);
00213 #endif
00214   if (old)
00215     free_MachineList(old);
00216   return 0;
00217 }
00218 
00219 void
00220 do_machine_config_change(void *d, const char *s)
00221 {
00222   char cluster_config_filename[PATH_NAME_MAX] = "";
00223   REC_ReadConfigString(cluster_config_filename, s, sizeof(cluster_config_filename) - 1);
00224   RecData data;
00225   data.rec_string = cluster_config_filename;
00226   machine_config_change(s, RECD_STRING, data, d);
00227 }
00228 
00229 
00230 
00231 
00232 ClusterConfiguration::ClusterConfiguration():n_machines(0), changed(0)
00233 {
00234   memset(machines, 0, sizeof(machines));
00235   memset(hash_table, 0, sizeof(hash_table));
00236 }
00237 
00238 
00239 
00240 
00241 struct ConfigurationContinuation;
00242 typedef int (ConfigurationContinuation::*CfgContHandler) (int, void *);
00243 struct ConfigurationContinuation: public Continuation
00244 {
00245   ClusterConfiguration *c;
00246   ClusterConfiguration *prev;
00247 
00248   int
00249   zombieEvent(int , Event * e)
00250   {
00251     prev->link.next = NULL;     
00252     SET_HANDLER((CfgContHandler) & ConfigurationContinuation::dieEvent);
00253     e->schedule_in(CLUSTER_CONFIGURATION_ZOMBIE);
00254     return EVENT_CONT;
00255   }
00256 
00257   int
00258   dieEvent(int event, Event * e)
00259   {
00260     (void) event;
00261     (void) e;
00262     delete c;
00263     delete this;
00264     return EVENT_DONE;
00265   }
00266 
00267   ConfigurationContinuation(ClusterConfiguration * cc, ClusterConfiguration * aprev)
00268     : Continuation(NULL), c(cc), prev(aprev) {
00269     mutex = new_ProxyMutex();
00270     SET_HANDLER((CfgContHandler) & ConfigurationContinuation::zombieEvent);
00271   }
00272 };
00273 
00274 static void
00275 free_configuration(ClusterConfiguration * c, ClusterConfiguration * prev)
00276 {
00277   
00278   
00279   
00280   
00281   
00282   
00283   
00284   
00285   eventProcessor.schedule_in(new ConfigurationContinuation(c, prev), CLUSTER_CONFIGURATION_TIMEOUT, ET_CALL);
00286 }
00287 
00288 ClusterConfiguration *
00289 configuration_add_machine(ClusterConfiguration * c, ClusterMachine * m)
00290 {
00291   
00292   
00293   
00294   EThread *thread = this_ethread();
00295   ProxyMutex *mutex = thread->mutex;
00296   int i = 0;
00297   ClusterConfiguration *cc = new ClusterConfiguration(*c);
00298 
00299   
00300   
00301   for (i = 0; i < cc->n_machines; i++) {
00302     if (cc->machines[i]->ip > m->ip)
00303       break;
00304   }
00305 
00306   
00307   
00308   for (int j = cc->n_machines - 1; j >= i; j--)
00309     cc->machines[j + 1] = cc->machines[j];
00310 
00311   
00312   
00313   cc->machines[i] = m;
00314   cc->n_machines++;
00315 
00316   cc->link.next = c;
00317   cc->changed = ink_get_hrtime();
00318   ink_assert(cc->n_machines < CLUSTER_MAX_MACHINES);
00319 
00320   build_cluster_hash_table(cc);
00321   INK_MEMORY_BARRIER;           
00322   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT);
00323 
00324   free_configuration(c, cc);
00325   return cc;
00326 }
00327 
00328 ClusterConfiguration *
00329 configuration_remove_machine(ClusterConfiguration * c, ClusterMachine * m)
00330 {
00331   EThread *thread = this_ethread();
00332   ProxyMutex *mutex = thread->mutex;
00333 
00334   
00335   
00336   
00337   ClusterConfiguration *cc = new ClusterConfiguration(*c);
00338   
00339   
00340   
00341   for (int i = 0; i < cc->n_machines - 1; i++)
00342     if (m == cc->machines[i])
00343       m = cc->machines[i] = cc->machines[i + 1];
00344   cc->n_machines--;
00345 
00346   ink_assert(cc->n_machines > 0);
00347 
00348   cc->link.next = c;
00349   cc->changed = ink_get_hrtime();
00350 
00351   build_cluster_hash_table(cc);
00352   INK_MEMORY_BARRIER;           
00353   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT);
00354 
00355   free_configuration(c, cc);
00356   return cc;
00357 }
00358 
00359 
00360 
00361 
00362 
00363 
00364 
00365 
00366 
00367 ClusterMachine *
00368 cluster_machine_at_depth(unsigned int hash, int *pprobe_depth, ClusterMachine ** past_probes)
00369 {
00370 #ifdef CLUSTER_TOMCAT
00371   if (!cache_clustering_enabled)
00372     return NULL;
00373 #endif
00374   ClusterConfiguration *
00375     cc = this_cluster()->current_configuration();
00376   ClusterConfiguration *
00377     next_cc = cc;
00378   ink_hrtime now = ink_get_hrtime();
00379   int fake_probe_depth = 0;
00380   int &probe_depth = pprobe_depth ? (*pprobe_depth) : fake_probe_depth;
00381   int tprobe_depth = probe_depth;
00382 
00383 #ifdef CLUSTER_TEST
00384   if (cc->n_machines > 1) {
00385     for (int i = 0; i < cc->n_machines; ++i) {
00386       if (cc->machines[i] != this_cluster_machine()) {
00387         return cc->machines[i];
00388       }
00389     }
00390   }
00391 #endif // CLUSTER_TEST
00392 
00393   while (1) {
00394     
00395     
00396     if (probe_depth > CONFIGURATION_HISTORY_PROBE_DEPTH)
00397       break;
00398 
00399     
00400     
00401     if (!cc || !next_cc)
00402       break;
00403 
00404     cc = next_cc;
00405     next_cc = next_cc->link.next;
00406 
00407     
00408     
00409     if (tprobe_depth) {
00410       if (cc->changed > (now + CLUSTER_CONFIGURATION_TIMEOUT))
00411         break;
00412       tprobe_depth--;
00413       continue;
00414     }
00415 
00416     ClusterMachine *
00417       m = cc->machine_hash(hash);
00418 
00419     
00420     
00421     
00422     bool
00423       ok = !(m == this_cluster_machine() || (past_probes && machine_in_vector(m, past_probes, probe_depth)) || m->dead);
00424 
00425     
00426     
00427     
00428     if (past_probes && probe_depth < CONFIGURATION_HISTORY_PROBE_DEPTH)
00429       past_probes[probe_depth] = m;
00430     probe_depth++;
00431 
00432     if (!ok) {
00433       if (!pprobe_depth)
00434         break;                  
00435       continue;
00436     }
00437 
00438     return (m != this_cluster_machine()) ? m : NULL;
00439   }
00440   return NULL;
00441 }
00442 
00443 
00444 
00445 
00446 
00447 
00448 
00449 void
00450 initialize_thread_for_cluster(EThread * e)
00451 {
00452   (void) e;
00453 }
00454 
00455 
00456 
00457 
00458 Cluster::Cluster()
00459 {
00460 }
00461 
00462