00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include "P_Cluster.h"
00032
00033 #include "InkAPIInternal.h"
00034
00035 class ClusterAPIPeriodicSM;
00036 static void send_machine_online_list(TSClusterStatusHandle_t *);
00037
00038 typedef struct node_callout_entry
00039 {
00040 Ptr<ProxyMutex> mutex;
00041 TSClusterStatusFunction func;
00042 int state;
00043 } node_callout_entry_t;
00044
00045 #define NE_STATE_FREE 0
00046 #define NE_STATE_INITIALIZED 1
00047
00048 #define MAX_CLUSTERSTATUS_CALLOUTS 32
00049
00050 static ProxyMutex *ClusterAPI_mutex;
00051 static ClusterAPIPeriodicSM *periodicSM;
00052
00053 static node_callout_entry_t status_callouts[MAX_CLUSTERSTATUS_CALLOUTS];
00054 static TSClusterRPCFunction RPC_Functions[API_END_CLUSTER_FUNCTION];
00055
00056 #define INDEX_TO_CLUSTER_STATUS_HANDLE(i) ((TSClusterStatusHandle_t)((i)))
00057 #define CLUSTER_STATUS_HANDLE_TO_INDEX(h) ((int) ((h)))
00058 #define NODE_HANDLE_TO_IP(h) (*((struct in_addr *) &((h))))
00059 #define RPC_FUNCTION_KEY_TO_CLUSTER_NUMBER(k) ((int)((k)))
00060 #define IP_TO_NODE_HANDLE(ip) ((TSNodeHandle_t)((ip)))
00061 #define SIZEOF_RPC_MSG_LESS_DATA (sizeof(TSClusterRPCMsg_t) - \
00062 (sizeof(TSClusterRPCMsg_t) - sizeof(TSClusterRPCHandle_t)))
00063
00064 typedef struct RPCHandle
00065 {
00066 union
00067 {
00068
00069 TSClusterRPCHandle_t external;
00070 struct real_format
00071 {
00072 int cluster_function;
00073 int magic;
00074 } internal;
00075 } u;
00076 } RPCHandle_t;
00077
00078 #define RPC_HANDLE_MAGIC 0x12345678
00079
00080 class MachineStatusSM;
00081 typedef int (MachineStatusSM::*MachineStatusSMHandler) (int, void *);
00082 class MachineStatusSM:public Continuation
00083 {
00084 public:
00085
00086 MachineStatusSM(TSNodeHandle_t h, TSNodeStatus_t s):_node_handle(h), _node_status(s), _status_handle(0),
00087 _broadcast(1), _restart(0), _next_n(0)
00088 {
00089 SET_HANDLER((MachineStatusSMHandler)
00090 & MachineStatusSM::MachineStatusSMEvent);
00091 }
00092
00093 MachineStatusSM(TSNodeHandle_t h, TSNodeStatus_t s,
00094 TSClusterStatusHandle_t sh):_node_handle(h), _node_status(s), _status_handle(sh),
00095 _broadcast(0), _restart(0), _next_n(0)
00096 {
00097 SET_HANDLER((MachineStatusSMHandler)
00098 & MachineStatusSM::MachineStatusSMEvent);
00099 }
00100
00101 MachineStatusSM(TSClusterStatusHandle_t sh):
00102 _node_handle(0), _node_status(NODE_ONLINE), _status_handle(sh), _broadcast(0), _restart(0), _next_n(0) {
00103 SET_HANDLER((MachineStatusSMHandler)
00104 & MachineStatusSM::MachineStatusSMEvent);
00105 }
00106 ~MachineStatusSM() {
00107 }
00108 int MachineStatusSMEvent(Event * e, void *d);
00109
00110 private:
00111 TSNodeHandle_t _node_handle;
00112 TSNodeStatus_t _node_status;
00113 TSClusterStatusHandle_t _status_handle;
00114 int _broadcast;
00115 int _restart;
00116 int _next_n;
00117 };
00118
00119 int
00120 MachineStatusSM::MachineStatusSMEvent(Event * , void * )
00121 {
00122 int n;
00123 EThread *et = this_ethread();
00124
00125 if (_broadcast) {
00126
00127
00128
00129 n = _restart ? _next_n : 0;
00130 for (; n < MAX_CLUSTERSTATUS_CALLOUTS; ++n) {
00131 if (status_callouts[n].func && (status_callouts[n].state == NE_STATE_INITIALIZED)) {
00132
00133 MUTEX_TRY_LOCK(lock, status_callouts[n].mutex, et);
00134 if (lock) {
00135 status_callouts[n].func(&_node_handle, _node_status);
00136 Debug("cluster_api", "callout: n %d ([%u.%u.%u.%u], %d)", n, DOT_SEPARATED(_node_handle), _node_status);
00137 } else {
00138 _restart = 1;
00139 _next_n = n;
00140 return EVENT_CONT;
00141 }
00142 }
00143 }
00144 } else {
00145 if (!_node_handle) {
00146
00147
00148
00149 n = CLUSTER_STATUS_HANDLE_TO_INDEX(_status_handle);
00150 if (status_callouts[n].func) {
00151 MUTEX_TRY_LOCK(lock, status_callouts[n].mutex, et);
00152 if (lock) {
00153 int mi;
00154 unsigned int my_ipaddr = (this_cluster_machine())->ip;
00155 ClusterConfiguration *cc;
00156
00157 TSNodeHandle_t nh;
00158
00159 cc = this_cluster()->current_configuration();
00160 if (cc) {
00161 for (mi = 0; mi < cc->n_machines; ++mi) {
00162 if (cc->machines[mi]->ip != my_ipaddr) {
00163 nh = IP_TO_NODE_HANDLE(cc->machines[mi]->ip);
00164 status_callouts[n].func(&nh, NODE_ONLINE);
00165
00166 Debug("cluster_api",
00167 "initial callout: n %d ([%u.%u.%u.%u], %d)", n, DOT_SEPARATED(cc->machines[mi]->ip), NODE_ONLINE);
00168 }
00169 }
00170 }
00171 status_callouts[n].state = NE_STATE_INITIALIZED;
00172
00173 } else {
00174 _restart = 1;
00175 _next_n = n;
00176 return EVENT_CONT;
00177 }
00178 }
00179 } else {
00180
00181
00182
00183 n = CLUSTER_STATUS_HANDLE_TO_INDEX(_status_handle);
00184 if (status_callouts[n].func) {
00185 MUTEX_TRY_LOCK(lock, status_callouts[n].mutex, et);
00186 if (lock) {
00187 status_callouts[n].func(&_node_handle, _node_status);
00188
00189 Debug("cluster_api",
00190 "directed callout: n %d ([%u.%u.%u.%u], %d)", n, DOT_SEPARATED(_node_handle), _node_status);
00191 } else {
00192 _restart = 1;
00193 _next_n = n;
00194 return EVENT_CONT;
00195 }
00196 }
00197 }
00198 }
00199 delete this;
00200 return EVENT_DONE;
00201 }
00202
00203 class ClusterAPIPeriodicSM;
00204 typedef int (ClusterAPIPeriodicSM::*ClusterAPIPeriodicSMHandler) (int, void *);
00205 class ClusterAPIPeriodicSM:public Continuation
00206 {
00207 public:
00208 ClusterAPIPeriodicSM(ProxyMutex * m):Continuation(m), _active_msmp(0)
00209 {
00210 SET_HANDLER((ClusterAPIPeriodicSMHandler)
00211 & ClusterAPIPeriodicSM::ClusterAPIPeriodicSMEvent);
00212 }
00213 ~ClusterAPIPeriodicSM()
00214 {
00215 }
00216 int ClusterAPIPeriodicSMEvent(int, void *);
00217 MachineStatusSM *GetNextSM();
00218
00219 private:
00220 MachineStatusSM * _active_msmp;
00221 };
00222
00223 static InkAtomicList status_callout_atomic_q;
00224 static Queue<MachineStatusSM> status_callout_q;
00225
00226 MachineStatusSM *
00227 ClusterAPIPeriodicSM::GetNextSM()
00228 {
00229 MachineStatusSM *msmp;
00230 MachineStatusSM *msmp_next;
00231
00232 while (1) {
00233 msmp = status_callout_q.pop();
00234 if (!msmp) {
00235 msmp = (MachineStatusSM *)
00236 ink_atomiclist_popall(&status_callout_atomic_q);
00237 if (msmp) {
00238 while (msmp) {
00239 msmp_next = (MachineStatusSM *) msmp->link.next;
00240 msmp->link.next = 0;
00241 status_callout_q.push(msmp);
00242 msmp = msmp_next;
00243 }
00244 continue;
00245 } else {
00246 break;
00247 }
00248 } else {
00249 break;
00250 }
00251 }
00252 return msmp;
00253 }
00254
00255 int
00256 ClusterAPIPeriodicSM::ClusterAPIPeriodicSMEvent(int e, void *d)
00257 {
00258
00259 int ret;
00260
00261 while (1) {
00262 if (_active_msmp) {
00263 ret = _active_msmp->handleEvent(e, d);
00264 if (ret != EVENT_DONE) {
00265 return EVENT_CONT;
00266 }
00267 }
00268 _active_msmp = GetNextSM();
00269 if (!_active_msmp) {
00270 break;
00271 }
00272 }
00273 return EVENT_CONT;
00274 }
00275
00276 void
00277 clusterAPI_init()
00278 {
00279 MachineStatusSM *mssmp = 0;
00280 ink_atomiclist_init(&status_callout_atomic_q,
00281 "cluster API status_callout_q", (char *) &mssmp->link.next - (char *) mssmp);
00282 ClusterAPI_mutex = new_ProxyMutex();
00283 MUTEX_TRY_LOCK(lock, ClusterAPI_mutex, this_ethread());
00284 ink_release_assert(lock);
00285 periodicSM = new ClusterAPIPeriodicSM(ClusterAPI_mutex);
00286
00287
00288 eventProcessor.schedule_every(periodicSM, HRTIME_SECONDS(1), ET_CALL);
00289 }
00290
00291
00292
00293
00294
00295
00296
00297
00298 int
00299 TSAddClusterStatusFunction(TSClusterStatusFunction Status_Function, TSMutex m, TSClusterStatusHandle_t * h)
00300 {
00301 Debug("cluster_api", "TSAddClusterStatusFunction func %p", Status_Function);
00302 int n;
00303 EThread *e = this_ethread();
00304
00305 ink_release_assert(Status_Function);
00306 MUTEX_TAKE_LOCK(ClusterAPI_mutex, e);
00307 for (n = 0; n < MAX_CLUSTERSTATUS_CALLOUTS; ++n) {
00308 if (!status_callouts[n].func) {
00309 status_callouts[n].mutex = (ProxyMutex *) m;
00310 status_callouts[n].func = Status_Function;
00311 MUTEX_UNTAKE_LOCK(ClusterAPI_mutex, e);
00312 *h = INDEX_TO_CLUSTER_STATUS_HANDLE(n);
00313
00314 Debug("cluster_api", "TSAddClusterStatusFunction: func %p n %d", Status_Function, n);
00315 return 0;
00316 }
00317 }
00318 MUTEX_UNTAKE_LOCK(ClusterAPI_mutex, e);
00319 return 1;
00320 }
00321
00322
00323
00324
00325
00326
00327
00328
00329 int
00330 TSDeleteClusterStatusFunction(TSClusterStatusHandle_t * h)
00331 {
00332 int n = CLUSTER_STATUS_HANDLE_TO_INDEX(*h);
00333 EThread *e = this_ethread();
00334
00335 ink_release_assert((n >= 0) && (n < MAX_CLUSTERSTATUS_CALLOUTS));
00336 Debug("cluster_api", "TSDeleteClusterStatusFunction: n %d", n);
00337
00338 MUTEX_TAKE_LOCK(ClusterAPI_mutex, e);
00339 status_callouts[n].mutex = 0;
00340 status_callouts[n].func = (TSClusterStatusFunction) 0;
00341 status_callouts[n].state = NE_STATE_FREE;
00342 MUTEX_UNTAKE_LOCK(ClusterAPI_mutex, e);
00343
00344 return 0;
00345 }
00346
00347 int
00348 TSNodeHandleToIPAddr(TSNodeHandle_t * h, struct in_addr *in)
00349 {
00350 *in = NODE_HANDLE_TO_IP(*h);
00351 return 0;
00352 }
00353
00354 void
00355 TSGetMyNodeHandle(TSNodeHandle_t * h)
00356 {
00357 *h = IP_TO_NODE_HANDLE((this_cluster_machine())->ip);
00358 }
00359
00360
00361
00362
00363
00364
00365
00366 void
00367 TSEnableClusterStatusCallout(TSClusterStatusHandle_t * h)
00368 {
00369 int ci = CLUSTER_STATUS_HANDLE_TO_INDEX(*h);
00370
00371
00372 ink_release_assert((ci >= 0) && (ci < MAX_CLUSTERSTATUS_CALLOUTS));
00373
00374 if (status_callouts[ci].state == NE_STATE_INITIALIZED) {
00375 return;
00376 }
00377
00378 Debug("cluster_api", "TSEnableClusterStatusCallout: n %d", ci);
00379 send_machine_online_list(h);
00380 }
00381
00382 static void
00383 send_machine_online_list(TSClusterStatusHandle_t * h)
00384 {
00385 MachineStatusSM *msm = new MachineStatusSM(*h);
00386
00387 ink_atomiclist_push(&status_callout_atomic_q, (void *) msm);
00388 }
00389
00390
00391
00392
00393
00394 #ifdef NOT_USED_HERE
00395 static void
00396 directed_machine_online(int Ipaddr, TSClusterStatusHandle_t * h)
00397 {
00398 MachineStatusSM *msm = new MachineStatusSM(IP_TO_NODE_HANDLE(Ipaddr), NODE_ONLINE, *h);
00399
00400 ink_atomiclist_push(&status_callout_atomic_q, (void *) msm);
00401 }
00402 #endif
00403
00404
00405
00406
00407 void
00408 machine_online_APIcallout(int Ipaddr)
00409 {
00410 MachineStatusSM *msm = new MachineStatusSM(IP_TO_NODE_HANDLE(Ipaddr), NODE_ONLINE);
00411
00412 ink_atomiclist_push(&status_callout_atomic_q, (void *) msm);
00413 }
00414
00415
00416
00417
00418 void
00419 machine_offline_APIcallout(int Ipaddr)
00420 {
00421 MachineStatusSM *msm = new MachineStatusSM(IP_TO_NODE_HANDLE(Ipaddr), NODE_OFFLINE);
00422
00423 ink_atomiclist_push(&status_callout_atomic_q, (void *) msm);
00424 }
00425
00426
00427
00428
00429
00430
00431
00432 int
00433 TSAddClusterRPCFunction(TSClusterRPCKey_t k, TSClusterRPCFunction func, TSClusterRPCHandle_t * h)
00434 {
00435 RPCHandle_t handle;
00436 int n = RPC_FUNCTION_KEY_TO_CLUSTER_NUMBER(k);
00437 EThread *e = this_ethread();
00438
00439 ink_release_assert(func);
00440 ink_release_assert((n >= API_STARECT_CLUSTER_FUNCTION)
00441 && (n <= API_END_CLUSTER_FUNCTION));
00442 Debug("cluster_api", "TSAddClusterRPCFunction: key %d func %p", k, func);
00443
00444 handle.u.internal.cluster_function = n;
00445 handle.u.internal.magic = RPC_HANDLE_MAGIC;
00446
00447 MUTEX_TAKE_LOCK(ClusterAPI_mutex, e);
00448 if (n < API_END_CLUSTER_FUNCTION)
00449 RPC_Functions[n] = func;
00450 MUTEX_UNTAKE_LOCK(ClusterAPI_mutex, e);
00451
00452 *h = handle.u.external;
00453 return 0;
00454 }
00455
00456
00457
00458
00459
00460
00461
00462 int
00463 TSDeleteClusterRPCFunction(TSClusterRPCHandle_t * rpch)
00464 {
00465 RPCHandle_t *h = (RPCHandle_t *) rpch;
00466 EThread *e = this_ethread();
00467
00468 ink_release_assert(((h->u.internal.cluster_function >= API_STARECT_CLUSTER_FUNCTION)
00469 && (h->u.internal.cluster_function <= API_END_CLUSTER_FUNCTION)));
00470 Debug("cluster_api", "TSDeleteClusterRPCFunction: n %d", h->u.internal.cluster_function);
00471
00472 MUTEX_TAKE_LOCK(ClusterAPI_mutex, e);
00473 RPC_Functions[h->u.internal.cluster_function] = 0;
00474 MUTEX_UNTAKE_LOCK(ClusterAPI_mutex, e);
00475 return 0;
00476 }
00477
00478
00479
00480
00481 void
00482 default_api_ClusterFunction(ClusterHandler *ch, void *data, int len)
00483 {
00484 Debug("cluster_api", "default_api_ClusterFunction: [%u.%u.%u.%u] data %p len %d", DOT_SEPARATED(ch->machine->ip), data, len);
00485
00486 TSClusterRPCMsg_t *msg = (TSClusterRPCMsg_t *) data;
00487 RPCHandle_t *rpch = (RPCHandle_t *) & msg->m_handle;
00488 int cluster_function = rpch->u.internal.cluster_function;
00489
00490 ink_release_assert((size_t) len >= sizeof(TSClusterRPCMsg_t));
00491 ink_release_assert(((cluster_function >= API_STARECT_CLUSTER_FUNCTION)
00492 && (cluster_function <= API_END_CLUSTER_FUNCTION)));
00493
00494 if (cluster_function < API_END_CLUSTER_FUNCTION && RPC_Functions[cluster_function]) {
00495 int msg_data_len = len - SIZEOF_RPC_MSG_LESS_DATA;
00496 TSNodeHandle_t nh = IP_TO_NODE_HANDLE(ch->machine->ip);
00497 (*RPC_Functions[cluster_function]) (&nh, msg, msg_data_len);
00498 } else {
00499 clusterProcessor.free_remote_data((char *) data, len);
00500 }
00501 }
00502
00503
00504
00505
00506 void
00507 TSFreeRPCMsg(TSClusterRPCMsg_t * msg, int msg_data_len)
00508 {
00509 RPCHandle_t *rpch = (RPCHandle_t *) & msg->m_handle;
00510 ink_release_assert(rpch->u.internal.magic == RPC_HANDLE_MAGIC);
00511 Debug("cluster_api", "TSFreeRPCMsg: msg %p msg_data_len %d", msg, msg_data_len);
00512
00513 clusterProcessor.free_remote_data((char *) msg, msg_data_len + SIZEOF_RPC_MSG_LESS_DATA);
00514 }
00515
00516
00517
00518
00519 TSClusterRPCMsg_t *
00520 TSAllocClusterRPCMsg(TSClusterRPCHandle_t * h, int data_size)
00521 {
00522 ink_assert(data_size >= 4);
00523 if (data_size < 4) {
00524
00525 return (TSClusterRPCMsg_t *) 0;
00526 }
00527
00528 TSClusterRPCMsg_t *rpcm;
00529 OutgoingControl *c = OutgoingControl::alloc();
00530
00531 c->len = sizeof(OutgoingControl *) + SIZEOF_RPC_MSG_LESS_DATA + data_size;
00532 c->alloc_data();
00533 *((OutgoingControl **) c->data) = c;
00534
00535 rpcm = (TSClusterRPCMsg_t *) (c->data + sizeof(OutgoingControl *));
00536 rpcm->m_handle = *h;
00537
00538
00539
00540
00541
00542
00543
00544 return rpcm;
00545 }
00546
00547
00548
00549
00550 int
00551 TSSendClusterRPC(TSNodeHandle_t * nh, TSClusterRPCMsg_t * msg)
00552 {
00553 struct in_addr ipaddr = NODE_HANDLE_TO_IP(*nh);
00554 RPCHandle_t *rpch = (RPCHandle_t *) & msg->m_handle;
00555
00556 OutgoingControl *c = *((OutgoingControl **)
00557 ((char *) msg - sizeof(OutgoingControl *)));
00558 ClusterConfiguration * cc = this_cluster()->current_configuration();
00559 ClusterMachine *m;
00560
00561 ink_release_assert(rpch->u.internal.magic == RPC_HANDLE_MAGIC);
00562
00563 if ((m = cc->find(ipaddr.s_addr))) {
00564 int len = c->len - sizeof(OutgoingControl *);
00565 ink_release_assert((size_t) len >= sizeof(TSClusterRPCMsg_t));
00566
00567 Debug("cluster_api", "TSSendClusterRPC: msg %p dlen %d [%u.%u.%u.%u] sent", msg, len, DOT_SEPARATED(ipaddr.s_addr));
00568 clusterProcessor.invoke_remote(m->pop_ClusterHandler(), rpch->u.internal.cluster_function,
00569 msg, len, (CLUSTER_OPT_STEAL | CLUSTER_OPT_DATA_IS_OCONTROL));
00570 } else {
00571 Debug("cluster_api", "TSSendClusterRPC: msg %p to [%u.%u.%u.%u] dropped", msg, DOT_SEPARATED(ipaddr.s_addr));
00572 c->freeall();
00573 }
00574
00575 return 0;
00576 }
00577
00578
00579
00580