00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include "ink_config.h"
00030 #include <unistd.h>
00031 #include "P_Cluster.h"
00032 #include "I_Layout.h"
00033 extern int num_of_cluster_threads;
00034
00035 MachineList *machines_config = NULL;
00036 MachineList *cluster_config = NULL;
00037
00038 ProxyMutex *the_cluster_config_mutex;
00039
00040 static ClusterMachine *cluster_machine;
00041
00042 MachineList *
00043 the_cluster_machines_config()
00044 {
00045 return machines_config;
00046 }
00047
00048 MachineList *
00049 the_cluster_config()
00050 {
00051 return cluster_config;
00052 }
00053
00054 ClusterMachine *
00055 this_cluster_machine()
00056 {
00057 return cluster_machine;
00058 }
00059
00060 void
00061 create_this_cluster_machine()
00062 {
00063 the_cluster_config_mutex = new_ProxyMutex();
00064 cluster_machine = new ClusterMachine;
00065 }
00066
00067 ClusterMachine::ClusterMachine(char *ahostname, unsigned int aip, int aport)
00068 : dead(false),
00069 hostname(ahostname),
00070 ip(aip),
00071 cluster_port(aport),
00072 num_connections(0),
00073 now_connections(0),
00074 free_connections(0),
00075 rr_count(0),
00076 msg_proto_major(0),
00077 msg_proto_minor(0),
00078 clusterHandlers(0)
00079 {
00080 EThread *thread = this_ethread();
00081 ProxyMutex *mutex = thread->mutex;
00082 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_MACHINES_ALLOCATED_STAT);
00083 if (!aip) {
00084 char localhost[1024];
00085 if (!ahostname) {
00086 ink_release_assert(!gethostname(localhost, 1023));
00087 ahostname = localhost;
00088 }
00089 hostname = ats_strdup(ahostname);
00090
00091
00092
00093
00094 #ifdef LOCAL_CLUSTER_TEST_MODE
00095 ip = inet_addr("127.0.0.1");
00096 #else
00097 #ifdef CLUSTER_TEST
00098 int clustering_enabled = true;
00099 #else
00100 int clustering_enabled = !!getenv("PROXY_CLUSTER_ADDR");
00101 #endif
00102 if (clustering_enabled) {
00103 char *clusterIP = getenv("PROXY_CLUSTER_ADDR");
00104 Debug("cluster_note", "[Machine::Machine] Cluster IP addr: %s\n", clusterIP);
00105 ip = inet_addr(clusterIP);
00106 } else {
00107
00108 ink_gethostbyname_r_data data;
00109 struct hostent *r = ink_gethostbyname_r(ahostname, &data);
00110 if (!r) {
00111 Warning("unable to DNS %s: %d", ahostname, data.herrno);
00112 ip = 0;
00113 } else {
00114
00115
00116
00117 ip = (unsigned int) -1;
00118 for (int i = 0; r->h_addr_list[i]; i++)
00119 if (ip > *(unsigned int *) r->h_addr_list[i])
00120 ip = *(unsigned int *) r->h_addr_list[i];
00121 if (ip == (unsigned int) -1)
00122 ip = 0;
00123 }
00124
00125 }
00126 #endif // LOCAL_CLUSTER_TEST_MODE
00127 } else {
00128
00129 ip = aip;
00130
00131 ink_gethostbyaddr_r_data data;
00132 struct hostent *r = ink_gethostbyaddr_r((char *) &ip, sizeof(int), AF_INET, &data);
00133
00134 if (r == NULL) {
00135 Alias32 x;
00136 memcpy(&x.u32, &ip, sizeof(x.u32));
00137 Debug("machine_debug", "unable to reverse DNS %u.%u.%u.%u: %d", x.byte[0], x.byte[1], x.byte[2], x.byte[3], data.herrno);
00138 } else
00139 hostname = ats_strdup(r->h_name);
00140 }
00141 if (hostname)
00142 hostname_len = strlen(hostname);
00143 else
00144 hostname_len = 0;
00145
00146 num_connections = num_of_cluster_threads;
00147 clusterHandlers = (ClusterHandler **)ats_calloc(num_connections, sizeof(ClusterHandler *));
00148 }
00149
00150 ClusterHandler *ClusterMachine::pop_ClusterHandler(int no_rr)
00151 {
00152 int find = 0;
00153 int64_t now = rr_count;
00154 if (no_rr == 0) {
00155 ink_atomic_increment(&rr_count, 1);
00156 }
00157
00158
00159 while (!clusterHandlers[now % this->num_connections] && (find < this->num_connections)) {
00160 now++;
00161 find++;
00162 }
00163 return this->clusterHandlers[now % this->num_connections];
00164 }
00165
00166 ClusterMachine::~ClusterMachine()
00167 {
00168 ats_free(hostname);
00169 ats_free(clusterHandlers);
00170 }
00171
00172 struct MachineTimeoutContinuation;
00173 typedef int (MachineTimeoutContinuation::*McTimeoutContHandler) (int, void *);
00174 struct MachineTimeoutContinuation: public Continuation
00175 {
00176 ClusterMachine *m;
00177 int dieEvent(int event, Event * e)
00178 {
00179 (void) event;
00180 (void) e;
00181 delete m;
00182 delete this;
00183 return EVENT_DONE;
00184 }
00185
00186 MachineTimeoutContinuation(ClusterMachine * am)
00187 : Continuation(NULL), m(am)
00188 {
00189 SET_HANDLER((McTimeoutContHandler) & MachineTimeoutContinuation::dieEvent);
00190 }
00191 };
00192
00193 void
00194 free_ClusterMachine(ClusterMachine * m)
00195 {
00196 EThread *thread = this_ethread();
00197 ProxyMutex *mutex = thread->mutex;
00198
00199 CLUSTER_INCREMENT_DYN_STAT(CLUSTER_MACHINES_FREED_STAT);
00200 m->dead = true;
00201 eventProcessor.schedule_in(new MachineTimeoutContinuation(m), MACHINE_TIMEOUT, ET_CALL);
00202 }
00203
00204 void
00205 free_MachineList(MachineList * l)
00206 {
00207 new_Freer(l, MACHINE_TIMEOUT);
00208 }
00209
00210 MachineList *
00211 read_MachineList(char *filename, int afd)
00212 {
00213 char line[256];
00214 int n = -1, i = 0, ln = 0;
00215 MachineList *l = NULL;
00216 ink_assert(filename || (afd != -1));
00217 ats_scoped_str path(Layout::get()->relative_to(Layout::get()->sysconfdir, filename));
00218
00219 int fd = ((afd != -1) ? afd : open(path, O_RDONLY));
00220 if (fd >= 0) {
00221 while (ink_file_fd_readline(fd, sizeof(line) - 1, line) > 0) {
00222 ln++;
00223 if (*line == '#')
00224 continue;
00225 if (n == -1 && ParseRules::is_digit(*line)) {
00226 n = atoi(line);
00227 if (n > 0) {
00228 l = (MachineList *)ats_malloc(sizeof(MachineList) + (n - 1) * sizeof(MachineListElement));
00229 l->n = 0;
00230 } else {
00231 l = NULL;
00232 }
00233 continue;
00234 }
00235 if (l && ParseRules::is_digit(*line) && i < n) {
00236 char *port = strchr(line, ':');
00237 if (!port)
00238 goto Lfail;
00239 *port++ = 0;
00240 l->machine[i].ip = inet_addr(line);
00241 if (-1 == (int) l->machine[i].ip) {
00242 if (afd == -1) {
00243 Warning("read machine list failure, bad ip, line %d", ln);
00244 return NULL;
00245 } else {
00246 char s[256];
00247 snprintf(s, sizeof s, "bad ip, line %d", ln);
00248 return (MachineList *) ats_strdup(s);
00249 }
00250 }
00251 l->machine[i].port = atoi(port);
00252 if (!l->machine[i].port)
00253 goto Lfail;
00254 i++;
00255 l->n++;
00256 continue;
00257 Lfail:
00258 if (afd == -1) {
00259 Warning("read machine list failure, bad port, line %d", ln);
00260 return NULL;
00261 } else {
00262 char s[256];
00263 snprintf(s, sizeof s, "bad port, line %d", ln);
00264 return (MachineList *) ats_strdup(s);
00265 }
00266 }
00267 }
00268 close(fd);
00269 } else {
00270 Warning("read machine list failure, open failed");
00271 return NULL;
00272 }
00273 if (n >= 0) {
00274 if (i != n) {
00275 if (afd == -1) {
00276 Warning("read machine list failure, length mismatch");
00277 return NULL;
00278 } else
00279 ats_free(l);
00280 return (MachineList *) ats_strdup("number of machines does not match length of list\n");
00281 }
00282 }
00283 return (afd != -1) ? (MachineList *) NULL : l;
00284 }