• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

ClusterConfig.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 
00025 /****************************************************************************
00026 
00027   ClusterConfig.cc
00028 ****************************************************************************/
00029 
00030 #include "P_Cluster.h"
00031 // updated from the cluster port configuration variable
00032 int cluster_port = DEFAULT_CLUSTER_PORT_NUMBER;
00033 
00034 ClusterAccept::ClusterAccept(int *port, int send_bufsize, int recv_bufsize)
00035   : Continuation(0),
00036     p_cluster_port(port),
00037     socket_send_bufsize(send_bufsize),
00038     socket_recv_bufsize(recv_bufsize),
00039     current_cluster_port(-1),
00040     accept_action(0),
00041     periodic_event(0)
00042 {
00043   mutex = new_ProxyMutex();
00044   SET_HANDLER(&ClusterAccept::ClusterAcceptEvent);
00045 }
00046 
00047 ClusterAccept::~ClusterAccept()
00048 {
00049   mutex = 0;
00050 }
00051 
00052 void
00053 ClusterAccept::Init()
00054 {
00055   // Setup initial accept by simulating EVENT_INTERVAL
00056   // where cluster accept port has changed.
00057 
00058   current_cluster_port = ~*p_cluster_port;
00059   ClusterAcceptEvent(EVENT_INTERVAL, 0);
00060 
00061   // Setup periodic event to handle changing cluster accept port.
00062   periodic_event = eventProcessor.schedule_every(this, HRTIME_SECONDS(60));
00063 }
00064 
00065 void
00066 ClusterAccept::ShutdownDelete()
00067 {
00068   MUTEX_TRY_LOCK(lock, this->mutex, this_ethread());
00069   if (!lock) {
00070     eventProcessor.schedule_imm(this, ET_CALL);
00071     return;
00072   }
00073   // Kill all events and delete.
00074   if (accept_action) {
00075     accept_action->cancel();
00076     accept_action = 0;
00077   }
00078   if (periodic_event) {
00079     periodic_event->cancel();
00080     periodic_event = 0;
00081   }
00082   delete this;
00083 }
00084 
00085 int
00086 ClusterAccept::ClusterAcceptEvent(int event, void *data)
00087 {
00088   switch (event) {
00089   case EVENT_IMMEDIATE:
00090     {
00091       ShutdownDelete();
00092       return EVENT_DONE;
00093     }
00094   case EVENT_INTERVAL:
00095     {
00096       int cluster_port = *p_cluster_port;
00097 
00098       if (cluster_port != current_cluster_port) {
00099         // Configuration changed cluster port, redo accept on new port.
00100         if (accept_action) {
00101           accept_action->cancel();
00102           accept_action = 0;
00103         }
00104 
00105         NetProcessor::AcceptOptions opt;
00106         opt.recv_bufsize = socket_recv_bufsize;
00107         opt.send_bufsize = socket_send_bufsize;
00108         opt.etype = ET_CLUSTER;
00109         opt.local_port = cluster_port;
00110         opt.ip_family = AF_INET;
00111         opt.localhost_only = false;
00112 
00113         accept_action = netProcessor.main_accept(this, NO_FD, opt);
00114         if (!accept_action) {
00115           Warning("Unable to accept cluster connections on port: %d", cluster_port);
00116         } else {
00117           current_cluster_port = cluster_port;
00118         }
00119       }
00120       return EVENT_CONT;
00121     }
00122   case NET_EVENT_ACCEPT:
00123     {
00124       ClusterAcceptMachine((NetVConnection *) data);
00125       return EVENT_DONE;
00126     }
00127   default:
00128     {
00129       Warning("ClusterAcceptEvent: received unknown event %d", event);
00130       return EVENT_DONE;
00131     }
00132   }                             // End of switch
00133 }
00134 
00135 int
00136 ClusterAccept::ClusterAcceptMachine(NetVConnection * NetVC)
00137 {
00138   // Validate remote IP address.
00139   unsigned int remote_ip = NetVC->get_remote_ip();
00140   MachineList *mc = the_cluster_machines_config();
00141 
00142   if (mc && !mc->find(remote_ip)) {
00143     Note("Illegal cluster connection from %u.%u.%u.%u", DOT_SEPARATED(remote_ip));
00144     NetVC->do_io(VIO::CLOSE);
00145     return 0;
00146   }
00147 
00148   Debug(CL_NOTE, "Accepting machine %u.%u.%u.%u", DOT_SEPARATED(remote_ip));
00149   ClusterHandler *ch = new ClusterHandler;
00150   ch->machine = new ClusterMachine(NULL, remote_ip);
00151   ch->ip = remote_ip;
00152   ch->net_vc = NetVC;
00153   eventProcessor.schedule_imm_signal(ch, ET_CLUSTER);
00154   return 1;
00155 }
00156 
00157 static void
00158 make_cluster_connections(MachineList * l)
00159 {
00160   //
00161   // Connect to all new machines.
00162   //
00163   uint32_t ip = this_cluster_machine()->ip;
00164   int num_connections = this_cluster_machine()->num_connections;
00165 
00166   if (l) {
00167     for (int i = 0; i < l->n; i++) {
00168 #ifdef LOCAL_CLUSTER_TEST_MODE
00169       if (ip < l->machine[i].ip || (ip == l->machine[i].ip && (cluster_port < l->machine[i].port))) {
00170 #else
00171       if (ip < l->machine[i].ip) {
00172 #endif
00173         for (int j = 0; j < num_connections; j++) {
00174           clusterProcessor.connect(l->machine[i].ip, l->machine[i].port, j);
00175         }
00176       }
00177     }
00178   }
00179 }
00180 
00181 int
00182 machine_config_change(const char * /* name ATS_UNUSED */, RecDataT /* data_type ATS_UNUSED */, RecData data,
00183                       void *cookie)
00184 {
00185   // Handle changes to the cluster.config or machines.config
00186   // file.  cluster.config is the list of machines in the
00187   // cluster proper ( in the cluster hash table ).  machines.config
00188   // is the list of machines which communicate with the cluster.
00189   // This may include front-end load redirectors, machines going
00190   // up or coming down etc.
00191   //
00192   char *filename = (char *) data.rec_string;
00193   MachineList *l = read_MachineList(filename);
00194   MachineList *old = NULL;
00195 #ifdef USE_SEPARATE_MACHINE_CONFIG
00196   switch ((int) cookie) {
00197   case MACHINE_CONFIG:
00198     old = machines_config;
00199     machines_config = l;
00200     break;
00201   case CLUSTER_CONFIG:
00202     old = cluster_config;
00203     cluster_config = l;
00204     make_cluster_connections(l);
00205     break;
00206   }
00207 #else
00208   (void) cookie;
00209   old = cluster_config;
00210   machines_config = l;
00211   cluster_config = l;
00212   make_cluster_connections(l);
00213 #endif
00214   if (old)
00215     free_MachineList(old);
00216   return 0;
00217 }
00218 
00219 void
00220 do_machine_config_change(void *d, const char *s)
00221 {
00222   char cluster_config_filename[PATH_NAME_MAX] = "";
00223   REC_ReadConfigString(cluster_config_filename, s, sizeof(cluster_config_filename) - 1);
00224   RecData data;
00225   data.rec_string = cluster_config_filename;
00226   machine_config_change(s, RECD_STRING, data, d);
00227 }
00228 
00229 /*************************************************************************/
00230 // ClusterConfiguration member functions (Public Class)
00231 /*************************************************************************/
00232 ClusterConfiguration::ClusterConfiguration():n_machines(0), changed(0)
00233 {
00234   memset(machines, 0, sizeof(machines));
00235   memset(hash_table, 0, sizeof(hash_table));
00236 }
00237 
00238 /*************************************************************************/
00239 // ConfigurationContinuation member functions (Internal Class)
00240 /*************************************************************************/
00241 struct ConfigurationContinuation;
00242 typedef int (ConfigurationContinuation::*CfgContHandler) (int, void *);
00243 struct ConfigurationContinuation: public Continuation
00244 {
00245   ClusterConfiguration *c;
00246   ClusterConfiguration *prev;
00247 
00248   int
00249   zombieEvent(int /* event ATS_UNUSED */, Event * e)
00250   {
00251     prev->link.next = NULL;     // remove that next pointer
00252     SET_HANDLER((CfgContHandler) & ConfigurationContinuation::dieEvent);
00253     e->schedule_in(CLUSTER_CONFIGURATION_ZOMBIE);
00254     return EVENT_CONT;
00255   }
00256 
00257   int
00258   dieEvent(int event, Event * e)
00259   {
00260     (void) event;
00261     (void) e;
00262     delete c;
00263     delete this;
00264     return EVENT_DONE;
00265   }
00266 
00267   ConfigurationContinuation(ClusterConfiguration * cc, ClusterConfiguration * aprev)
00268     : Continuation(NULL), c(cc), prev(aprev) {
00269     mutex = new_ProxyMutex();
00270     SET_HANDLER((CfgContHandler) & ConfigurationContinuation::zombieEvent);
00271   }
00272 };
00273 
00274 static void
00275 free_configuration(ClusterConfiguration * c, ClusterConfiguration * prev)
00276 {
00277   //
00278   // Delete the configuration after a time.
00279   // The problem is that configurations change infrequently, and
00280   // are used in different threads, so reference counts are
00281   // relatively difficult and expensive.  The solution I have
00282   // chosen is to simply delete the object after some (very long)
00283   // time after it has ceased to be accessible.
00284   //
00285   eventProcessor.schedule_in(new ConfigurationContinuation(c, prev), CLUSTER_CONFIGURATION_TIMEOUT, ET_CALL);
00286 }
00287 
00288 ClusterConfiguration *
00289 configuration_add_machine(ClusterConfiguration * c, ClusterMachine * m)
00290 {
00291   // Build a new cluster configuration with the new machine.
00292   // Machines are stored in ip sorted order.
00293   //
00294   EThread *thread = this_ethread();
00295   ProxyMutex *mutex = thread->mutex;
00296   int i = 0;
00297   ClusterConfiguration *cc = new ClusterConfiguration(*c);
00298 
00299   // Find the place to insert this new machine
00300   //
00301   for (i = 0; i < cc->n_machines; i++) {
00302     if (cc->machines[i]->ip > m->ip)
00303       break;
00304   }
00305 
00306   // Move the other machines out of the way
00307   //
00308   for (int j = cc->n_machines - 1; j >= i; j--)
00309     cc->machines[j + 1] = cc->machines[j];
00310 
00311   // Insert it
00312   //
00313   cc->machines[i] = m;
00314   cc->n_machines++;
00315 
00316   cc->link.next = c;
00317   cc->changed = ink_get_hrtime();
00318   ink_assert(cc->n_machines < CLUSTER_MAX_MACHINES);
00319 
00320   build_cluster_hash_table(cc);
00321   INK_MEMORY_BARRIER;           // commit writes before freeing old hash table
00322   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT);
00323 
00324   free_configuration(c, cc);
00325   return cc;
00326 }
00327 
00328 ClusterConfiguration *
00329 configuration_remove_machine(ClusterConfiguration * c, ClusterMachine * m)
00330 {
00331   EThread *thread = this_ethread();
00332   ProxyMutex *mutex = thread->mutex;
00333 
00334   //
00335   // Build a new cluster configuration without a machine
00336   //
00337   ClusterConfiguration *cc = new ClusterConfiguration(*c);
00338   //
00339   // remove m and move others down
00340   //
00341   for (int i = 0; i < cc->n_machines - 1; i++)
00342     if (m == cc->machines[i])
00343       m = cc->machines[i] = cc->machines[i + 1];
00344   cc->n_machines--;
00345 
00346   ink_assert(cc->n_machines > 0);
00347 
00348   cc->link.next = c;
00349   cc->changed = ink_get_hrtime();
00350 
00351   build_cluster_hash_table(cc);
00352   INK_MEMORY_BARRIER;           // commit writes before freeing old hash table
00353   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT);
00354 
00355   free_configuration(c, cc);
00356   return cc;
00357 }
00358 
00359 //
00360 // cluster_machine_at_depth()
00361 //   Find a machine at a particular depth into the past.
00362 //   We don't want to probe the current machine or machines
00363 //   we have probed before, so we store a list of "past_probes".
00364 //   If probe_depth and past_probes are NULL we only want the
00365 //   owner (machine now as opposed to in the past).
00366 //
00367 ClusterMachine *
00368 cluster_machine_at_depth(unsigned int hash, int *pprobe_depth, ClusterMachine ** past_probes)
00369 {
00370 #ifdef CLUSTER_TOMCAT
00371   if (!cache_clustering_enabled)
00372     return NULL;
00373 #endif
00374   ClusterConfiguration *
00375     cc = this_cluster()->current_configuration();
00376   ClusterConfiguration *
00377     next_cc = cc;
00378   ink_hrtime now = ink_get_hrtime();
00379   int fake_probe_depth = 0;
00380   int &probe_depth = pprobe_depth ? (*pprobe_depth) : fake_probe_depth;
00381   int tprobe_depth = probe_depth;
00382 
00383 #ifdef CLUSTER_TEST
00384   if (cc->n_machines > 1) {
00385     for (int i = 0; i < cc->n_machines; ++i) {
00386       if (cc->machines[i] != this_cluster_machine()) {
00387         return cc->machines[i];
00388       }
00389     }
00390   }
00391 #endif // CLUSTER_TEST
00392 
00393   while (1) {
00394     // If we are out of our depth, fail
00395     //
00396     if (probe_depth > CONFIGURATION_HISTORY_PROBE_DEPTH)
00397       break;
00398 
00399     // If there is no configuration, fail
00400     //
00401     if (!cc || !next_cc)
00402       break;
00403 
00404     cc = next_cc;
00405     next_cc = next_cc->link.next;
00406 
00407     // Find the correct configuration
00408     //
00409     if (tprobe_depth) {
00410       if (cc->changed > (now + CLUSTER_CONFIGURATION_TIMEOUT))
00411         break;
00412       tprobe_depth--;
00413       continue;
00414     }
00415 
00416     ClusterMachine *
00417       m = cc->machine_hash(hash);
00418 
00419     // If it is not this machine, or a machine we have done before
00420     // and one that is still up, try again
00421     //
00422     bool
00423       ok = !(m == this_cluster_machine() || (past_probes && machine_in_vector(m, past_probes, probe_depth)) || m->dead);
00424 
00425     // Store the all but the last probe, so that we never return
00426     // the same machine
00427     //
00428     if (past_probes && probe_depth < CONFIGURATION_HISTORY_PROBE_DEPTH)
00429       past_probes[probe_depth] = m;
00430     probe_depth++;
00431 
00432     if (!ok) {
00433       if (!pprobe_depth)
00434         break;                  // don't go down if we don't have a depth
00435       continue;
00436     }
00437 
00438     return (m != this_cluster_machine()) ? m : NULL;
00439   }
00440   return NULL;
00441 }
00442 
00443 //
00444 // initialize_thread_for_cluster()
00445 //   This is not required since we have a separate handler
00446 //   for each machine-machine pair, the pointers to which are
00447 //   stored in the ClusterMachine structures
00448 //
00449 void
00450 initialize_thread_for_cluster(EThread * e)
00451 {
00452   (void) e;
00453 }
00454 
00455 /*************************************************************************/
00456 // Cluster member functions (Public Class)
00457 /*************************************************************************/
00458 Cluster::Cluster()
00459 {
00460 }
00461 
00462 // End of ClusterConfig.cc

Generated by  doxygen 1.7.1