00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 #include "ink_config.h"
00030 #include <unistd.h>
00031 #include "P_Cluster.h"
00032 #include "I_Layout.h"
00033 extern int num_of_cluster_threads;
00034 
00035 MachineList *machines_config = NULL;
00036 MachineList *cluster_config = NULL;
00037 
00038 ProxyMutex *the_cluster_config_mutex;
00039 
00040 static ClusterMachine *cluster_machine;
00041 
00042 MachineList *
00043 the_cluster_machines_config()
00044 {
00045   return machines_config;
00046 }
00047 
00048 MachineList *
00049 the_cluster_config()
00050 {
00051   return cluster_config;
00052 }
00053 
00054 ClusterMachine *
00055 this_cluster_machine()
00056 {
00057   return cluster_machine;
00058 }
00059 
00060 void
00061 create_this_cluster_machine()
00062 {
00063   the_cluster_config_mutex = new_ProxyMutex();
00064   cluster_machine = new ClusterMachine;
00065 }
00066 
00067 ClusterMachine::ClusterMachine(char *ahostname, unsigned int aip, int aport)
00068   : dead(false),
00069     hostname(ahostname),
00070     ip(aip),
00071     cluster_port(aport),
00072     num_connections(0),
00073     now_connections(0),
00074     free_connections(0),
00075     rr_count(0),
00076     msg_proto_major(0),
00077     msg_proto_minor(0),
00078     clusterHandlers(0)
00079 {
00080   EThread *thread = this_ethread();
00081   ProxyMutex *mutex = thread->mutex;
00082   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_MACHINES_ALLOCATED_STAT);
00083   if (!aip) {
00084     char localhost[1024];
00085     if (!ahostname) {
00086       ink_release_assert(!gethostname(localhost, 1023));
00087       ahostname = localhost;
00088     }
00089     hostname = ats_strdup(ahostname);
00090 
00091     
00092     
00093     
00094 #ifdef LOCAL_CLUSTER_TEST_MODE
00095     ip = inet_addr("127.0.0.1");
00096 #else
00097 #ifdef CLUSTER_TEST
00098     int clustering_enabled = true;
00099 #else
00100     int clustering_enabled = !!getenv("PROXY_CLUSTER_ADDR");
00101 #endif
00102     if (clustering_enabled) {
00103       char *clusterIP = getenv("PROXY_CLUSTER_ADDR");
00104       Debug("cluster_note", "[Machine::Machine] Cluster IP addr: %s\n", clusterIP);
00105       ip = inet_addr(clusterIP);
00106     } else {
00107 
00108       ink_gethostbyname_r_data data;
00109       struct hostent *r = ink_gethostbyname_r(ahostname, &data);
00110       if (!r) {
00111         Warning("unable to DNS %s: %d", ahostname, data.herrno);
00112         ip = 0;
00113       } else {
00114 
00115         
00116 
00117         ip = (unsigned int) -1; 
00118         for (int i = 0; r->h_addr_list[i]; i++)
00119           if (ip > *(unsigned int *) r->h_addr_list[i])
00120             ip = *(unsigned int *) r->h_addr_list[i];
00121         if (ip == (unsigned int) -1)
00122           ip = 0;
00123       }
00124       
00125     }
00126 #endif // LOCAL_CLUSTER_TEST_MODE
00127   } else {
00128 
00129     ip = aip;
00130 
00131     ink_gethostbyaddr_r_data data;
00132     struct hostent *r = ink_gethostbyaddr_r((char *) &ip, sizeof(int), AF_INET, &data);
00133 
00134     if (r == NULL) {
00135       Alias32 x;
00136       memcpy(&x.u32, &ip, sizeof(x.u32));
00137       Debug("machine_debug", "unable to reverse DNS %u.%u.%u.%u: %d", x.byte[0], x.byte[1], x.byte[2], x.byte[3], data.herrno);
00138     } else
00139       hostname = ats_strdup(r->h_name);
00140   }
00141   if (hostname)
00142     hostname_len = strlen(hostname);
00143   else
00144     hostname_len = 0;
00145 
00146   num_connections = num_of_cluster_threads;
00147   clusterHandlers = (ClusterHandler **)ats_calloc(num_connections, sizeof(ClusterHandler *));
00148 }
00149 
00150 ClusterHandler *ClusterMachine::pop_ClusterHandler(int no_rr)
00151 {
00152   int find = 0;
00153   int64_t now = rr_count;
00154   if (no_rr == 0) {
00155     ink_atomic_increment(&rr_count, 1);
00156   }
00157 
00158   
00159   while (!clusterHandlers[now % this->num_connections] && (find < this->num_connections)) {
00160     now++;
00161     find++;
00162   }
00163   return this->clusterHandlers[now % this->num_connections];
00164 }
00165 
00166 ClusterMachine::~ClusterMachine()
00167 {
00168   ats_free(hostname);
00169   ats_free(clusterHandlers);
00170 }
00171 
00172 struct MachineTimeoutContinuation;
00173 typedef int (MachineTimeoutContinuation::*McTimeoutContHandler) (int, void *);
00174 struct MachineTimeoutContinuation: public Continuation
00175 {
00176   ClusterMachine *m;
00177   int dieEvent(int event, Event * e)
00178   {
00179     (void) event;
00180     (void) e;
00181     delete m;
00182     delete this;
00183     return EVENT_DONE;
00184   }
00185 
00186   MachineTimeoutContinuation(ClusterMachine * am)
00187     : Continuation(NULL), m(am)
00188   {
00189     SET_HANDLER((McTimeoutContHandler) & MachineTimeoutContinuation::dieEvent);
00190   }
00191 };
00192 
00193 void
00194 free_ClusterMachine(ClusterMachine * m)
00195 {
00196   EThread *thread = this_ethread();
00197   ProxyMutex *mutex = thread->mutex;
00198   
00199   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_MACHINES_FREED_STAT);
00200   m->dead = true;
00201   eventProcessor.schedule_in(new MachineTimeoutContinuation(m), MACHINE_TIMEOUT, ET_CALL);
00202 }
00203 
00204 void
00205 free_MachineList(MachineList * l)
00206 {
00207   new_Freer(l, MACHINE_TIMEOUT);
00208 }
00209 
00210 MachineList *
00211 read_MachineList(char *filename, int afd)
00212 {
00213   char line[256];
00214   int n = -1, i = 0, ln = 0;
00215   MachineList *l = NULL;
00216   ink_assert(filename || (afd != -1));
00217   ats_scoped_str path(Layout::get()->relative_to(Layout::get()->sysconfdir, filename));
00218 
00219   int fd = ((afd != -1) ? afd : open(path, O_RDONLY));
00220   if (fd >= 0) {
00221     while (ink_file_fd_readline(fd, sizeof(line) - 1, line) > 0) {
00222       ln++;
00223       if (*line == '#')
00224         continue;
00225       if (n == -1 && ParseRules::is_digit(*line)) {
00226         n = atoi(line);
00227         if (n > 0) {
00228           l = (MachineList *)ats_malloc(sizeof(MachineList) + (n - 1) * sizeof(MachineListElement));
00229           l->n = 0;
00230         } else {
00231           l = NULL;
00232         }
00233         continue;
00234       }
00235       if (l && ParseRules::is_digit(*line) && i < n) {
00236         char *port = strchr(line, ':');
00237         if (!port)
00238           goto Lfail;
00239         *port++ = 0;
00240         l->machine[i].ip = inet_addr(line);
00241         if (-1 == (int) l->machine[i].ip) {
00242           if (afd == -1) {
00243             Warning("read machine list failure, bad ip, line %d", ln);
00244             return NULL;
00245           } else {
00246             char s[256];
00247             snprintf(s, sizeof s, "bad ip, line %d", ln);
00248             return (MachineList *) ats_strdup(s);
00249           }
00250         }
00251         l->machine[i].port = atoi(port);
00252         if (!l->machine[i].port)
00253           goto Lfail;
00254         i++;
00255         l->n++;
00256         continue;
00257       Lfail:
00258         if (afd == -1) {
00259           Warning("read machine list failure, bad port, line %d", ln);
00260           return NULL;
00261         } else {
00262           char s[256];
00263           snprintf(s, sizeof s, "bad port, line %d", ln);
00264           return (MachineList *) ats_strdup(s);
00265         }
00266       }
00267     }
00268     close(fd);
00269   } else {
00270     Warning("read machine list failure, open failed");
00271     return NULL;
00272   }
00273   if (n >= 0) {
00274     if (i != n) {
00275       if (afd == -1) {
00276         Warning("read machine list failure, length mismatch");
00277         return NULL;
00278       } else
00279         ats_free(l);
00280       return (MachineList *) ats_strdup("number of machines does not match length of list\n");
00281     }
00282   }
00283   return (afd != -1) ? (MachineList *) NULL : l;
00284 }