00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #ifndef _P_Cluster_Cache_h
00032 #define _P_Cluster_Cache_h
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055 #include "P_ClusterMachine.h"
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065 #define CLUSTER_MAJOR_VERSION 3
00066 #define CLUSTER_MINOR_VERSION 2
00067
00068
00069 #define MIN_CLUSTER_MAJOR_VERSION CLUSTER_MAJOR_VERSION
00070 #define MIN_CLUSTER_MINOR_VERSION CLUSTER_MINOR_VERSION
00071
00072
00073 #define DEFAULT_CLUSTER_PORT_NUMBER 0
00074 #define DEFAULT_NUMBER_OF_CLUSTER_THREADS 1
00075 #define DEFAULT_CLUSTER_HOST ""
00076
00077 #define MAX_CLUSTER_SEND_LENGTH INT_MAX
00078
00079 #define CLUSTER_MAX_MACHINES 256
00080
00081 #define CLUSTER_HASH_TABLE_SIZE 32707
00082
00083
00084 #define CLUSTER_CONFIGURATION_TIMEOUT HRTIME_DAY
00085
00086 #define CLUSTER_CONFIGURATION_ZOMBIE (HRTIME_DAY*2)
00087
00088
00089
00090
00091
00092
00093 #define CONFIGURATION_HISTORY_PROBE_DEPTH 1
00094
00095
00096
00097 #define CLUSTER_EVENT_CHANGE (CLUSTER_EVENT_EVENTS_START)
00098 #define CLUSTER_EVENT_CONFIGURATION (CLUSTER_EVENT_EVENTS_START+1)
00099 #define CLUSTER_EVENT_OPEN (CLUSTER_EVENT_EVENTS_START+2)
00100 #define CLUSTER_EVENT_OPEN_EXISTS (CLUSTER_EVENT_EVENTS_START+3)
00101 #define CLUSTER_EVENT_OPEN_FAILED (CLUSTER_EVENT_EVENTS_START+4)
00102
00103
00104 #define CLUSTER_EVENT_STEAL_THREAD (CLUSTER_EVENT_EVENTS_START+50)
00105
00106
00107
00108
00109 inline void
00110 ats_swap16(uint16_t * d)
00111 {
00112 unsigned char *p = (unsigned char *) d;
00113 *d = ((p[1] << 8) | p[0]);
00114 }
00115
00116 inline uint16_t
00117 ats_swap16(uint16_t d)
00118 {
00119 ats_swap16(&d);
00120 return d;
00121 }
00122
00123 inline void
00124 ats_swap32(uint32_t * d)
00125 {
00126 unsigned char *p = (unsigned char *) d;
00127 *d = ((p[3] << 24) | (p[2] << 16) | (p[1] << 8) | p[0]);
00128 }
00129
00130 inline uint32_t
00131 ats_swap32(uint32_t d)
00132 {
00133 ats_swap32(&d);
00134 return d;
00135 }
00136
00137 inline void
00138 ats_swap64(uint64_t * d)
00139 {
00140 unsigned char *p = (unsigned char *) d;
00141 *d = (((uint64_t) p[7] << 56) | ((uint64_t) p[6] << 48) |
00142 ((uint64_t) p[5] << 40) | ((uint64_t) p[4] << 32) |
00143 ((uint64_t) p[3] << 24) | ((uint64_t) p[2] << 16) | ((uint64_t) p[1] << 8) | (uint64_t) p[0]);
00144 }
00145
00146 inline uint64_t
00147 ats_swap64(uint64_t d)
00148 {
00149 ats_swap64(&d);
00150 return d;
00151 }
00152
00153
00154
00155 struct ClusterConfiguration
00156 {
00157 int n_machines;
00158 ClusterMachine *machines[CLUSTER_MAX_MACHINES];
00159
00160 ClusterMachine *machine_hash(unsigned int hash_value)
00161 {
00162 return machines[hash_table[hash_value % CLUSTER_HASH_TABLE_SIZE]];
00163 }
00164
00165 ClusterMachine *find(unsigned int ip, int port = 0) {
00166 for (int i = 0; i < n_machines; i++)
00167 if (ip == machines[i]->ip && (!port || !machines[i]->cluster_port || machines[i]->cluster_port == port))
00168 return machines[i];
00169 return NULL;
00170 }
00171
00172
00173
00174
00175 ClusterConfiguration();
00176 unsigned char hash_table[CLUSTER_HASH_TABLE_SIZE];
00177 ink_hrtime changed;
00178 SLINK(ClusterConfiguration, link);
00179 };
00180
00181 inline bool
00182 machine_in_vector(ClusterMachine * m, ClusterMachine ** mm, int len)
00183 {
00184 for (int i = 0; i < len; i++)
00185 if (m == mm[i])
00186 return true;
00187 return false;
00188 }
00189
00190
00191
00192
00193
00194
00195
00196
00197 inkcoreapi ClusterMachine *cluster_machine_at_depth(unsigned int hash, int *probe_depth = NULL,
00198 ClusterMachine ** past_probes = NULL);
00199
00200
00201
00202
00203
00204 struct Cluster
00205 {
00206
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224 ClusterMachine *machine_hash(unsigned int hash_value)
00225 {
00226 return current_configuration()->machine_hash(hash_value);
00227 }
00228
00229
00230
00231
00232
00233
00234
00235
00236
00237 void cluster_change_callback(Continuation * cont);
00238
00239
00240
00241
00242 ClusterConfiguration *current_configuration()
00243 {
00244 return configurations.head;
00245 }
00246
00247
00248
00249
00250
00251 ClusterConfiguration *previous_configuration()
00252 {
00253 return configurations.head->link.next;
00254 }
00255
00256
00257
00258
00259
00260
00261
00262 SLL<ClusterConfiguration> configurations;
00263
00264 Cluster();
00265 };
00266
00267
00268
00269
00270
00271
00272 struct ClusterVCToken
00273 {
00274
00275
00276
00277 uint32_t ip_created;
00278 uint32_t ch_id;
00279 uint32_t sequence_number;
00280
00281 bool is_clear()
00282 {
00283 return !ip_created;
00284 }
00285 void clear()
00286 {
00287 ip_created = 0;
00288 sequence_number = 0;
00289 }
00290
00291 ClusterVCToken(unsigned int aip = 0, unsigned int id = 0, unsigned int aseq = 0)
00292 : ip_created(aip), ch_id(id), sequence_number(aseq) {
00293 }
00294
00295
00296
00297 void alloc();
00298
00299 inline void SwapBytes()
00300 {
00301 ats_swap32(&ch_id);
00302 ats_swap32(&sequence_number);
00303 }
00304 };
00305
00306
00307
00308
00309
00310
00311 typedef void ClusterFunction(ClusterHandler * ch, void *data, int len);
00312 typedef ClusterFunction *ClusterFunctionPtr;
00313
00314 struct ClusterVConnectionBase;
00315
00316 struct ClusterVConnState
00317 {
00318
00319
00320
00321 volatile int enabled;
00322
00323 int priority;
00324 VIO vio;
00325 void *queue;
00326 int ifd;
00327 Event *delay_timeout;
00328 Link<ClusterVConnectionBase> link;
00329
00330
00331 ClusterVConnState();
00332 };
00333
00334 struct ClusterVConnectionBase: public CacheVConnection
00335 {
00336
00337
00338
00339
00340
00341
00342
00343
00344
00345 virtual VIO *do_io_read(Continuation * c, int64_t nbytes, MIOBuffer * buf);
00346 virtual VIO *do_io_write(Continuation * c, int64_t nbytes, IOBufferReader * buf, bool owner = false);
00347 virtual void do_io_shutdown(ShutdownHowTo_t howto)
00348 {
00349 (void) howto;
00350 ink_assert(!"shutdown of cluster connection");
00351 }
00352 virtual void do_io_close(int lerrno = -1);
00353 virtual VIO* do_io_pread(Continuation*, int64_t, MIOBuffer*, int64_t);
00354
00355
00356
00357
00358
00359
00360
00361
00362
00363
00364 void set_active_timeout(ink_hrtime timeout_in);
00365 void set_inactivity_timeout(ink_hrtime timeout_in);
00366 void cancel_active_timeout();
00367 void cancel_inactivity_timeout();
00368
00369 ClusterVConnectionBase();
00370
00371 #ifdef DEBUG
00372
00373 static int enable_debug_trace;
00374 #endif
00375 Action action_;
00376 EThread *thread;
00377 volatile int closed;
00378 ClusterVConnState read;
00379 ClusterVConnState write;
00380 LINKM(ClusterVConnectionBase, read, link)
00381 LINKM(ClusterVConnectionBase, write, link)
00382 ink_hrtime inactivity_timeout_in;
00383 ink_hrtime active_timeout_in;
00384 Event *inactivity_timeout;
00385 Event *active_timeout;
00386
00387 virtual void reenable(VIO *);
00388 virtual void reenable_re(VIO *);
00389 };
00390
00391 inline void
00392 ClusterVConnectionBase::set_active_timeout(ink_hrtime timeout)
00393 {
00394 active_timeout_in = timeout;
00395 if (active_timeout) {
00396 ink_assert(!active_timeout->cancelled);
00397 if (active_timeout->ethread == this_ethread())
00398 active_timeout->schedule_in(timeout);
00399 else {
00400 active_timeout->cancel(this);
00401 active_timeout = thread->schedule_in(this, timeout);
00402 }
00403 } else {
00404 if (thread) {
00405 active_timeout = thread->schedule_in(this, timeout);
00406 }
00407 }
00408 }
00409
00410 inline void
00411 ClusterVConnectionBase::set_inactivity_timeout(ink_hrtime timeout)
00412 {
00413 inactivity_timeout_in = timeout;
00414 if (inactivity_timeout) {
00415 ink_assert(!inactivity_timeout->cancelled);
00416 if (inactivity_timeout->ethread == this_ethread())
00417 inactivity_timeout->schedule_in(timeout);
00418 else {
00419 inactivity_timeout->cancel(this);
00420 inactivity_timeout = thread->schedule_in(this, timeout);
00421 }
00422 } else {
00423 if (thread) {
00424 inactivity_timeout = thread->schedule_in(this, timeout);
00425 }
00426 }
00427 }
00428
00429 inline void
00430 ClusterVConnectionBase::cancel_active_timeout()
00431 {
00432 if (active_timeout) {
00433 active_timeout->cancel(this);
00434 active_timeout = NULL;
00435 active_timeout_in = 0;
00436 }
00437 }
00438
00439 inline void
00440 ClusterVConnectionBase::cancel_inactivity_timeout()
00441 {
00442 if (inactivity_timeout) {
00443 inactivity_timeout->cancel(this);
00444 inactivity_timeout = NULL;
00445 inactivity_timeout_in = 0;
00446 }
00447 }
00448
00449
00450 class ByteBankDescriptor
00451 {
00452 public:
00453 ByteBankDescriptor()
00454 {
00455 }
00456 IOBufferBlock *get_block()
00457 {
00458 return block;
00459 }
00460
00461 static ByteBankDescriptor *ByteBankDescriptor_alloc(IOBufferBlock *);
00462 static void ByteBankDescriptor_free(ByteBankDescriptor *);
00463
00464 public:
00465 LINK(ByteBankDescriptor, link);
00466
00467 private:
00468 Ptr<IOBufferBlock> block;
00469 };
00470
00471 enum TypeVConnection
00472 {
00473 VC_NULL,
00474 VC_CLUSTER,
00475 VC_CLUSTER_READ,
00476 VC_CLUSTER_WRITE,
00477 VC_CLUSTER_CLOSED
00478 };
00479
00480
00481
00482
00483 struct ClusterVConnection: public ClusterVConnectionBase
00484 {
00485
00486
00487
00488
00489
00490
00491
00492
00493
00494
00495
00496
00497
00498
00499
00500
00501
00502
00503
00504
00505
00506 int startEvent(int event, Event * e);
00507 int mainEvent(int event, Event * e);
00508
00509
00510 int start(EThread * t);
00511
00512 ClusterVConnection(int is_new_connect_read = 0);
00513 ~ClusterVConnection();
00514 void free();
00515
00516 virtual void do_io_close(int lerrno = -1);
00517 virtual VIO *do_io_read(Continuation * c, int64_t nbytes, MIOBuffer * buf);
00518 virtual VIO *do_io_write(Continuation * c, int64_t nbytes, IOBufferReader * buf, bool owner = false);
00519 virtual void reenable(VIO *vio);
00520
00521 ClusterHandler *ch;
00522
00523
00524
00525
00526
00527
00528
00529
00530
00531 int new_connect_read;
00532 int remote_free;
00533 int last_local_free;
00534 int channel;
00535 ClusterVCToken token;
00536 volatile int close_disabled;
00537 volatile int remote_closed;
00538 volatile int remote_close_disabled;
00539 volatile int remote_lerrno;
00540 volatile uint32_t in_vcs;
00541 volatile uint32_t type;
00542 SLINK(ClusterVConnection, ready_alink);
00543 int was_closed();
00544 void allow_close();
00545 void disable_close();
00546 int was_remote_closed();
00547 void allow_remote_close();
00548 bool schedule_write();
00549 void set_type(int);
00550 ink_hrtime start_time;
00551 ink_hrtime last_activity_time;
00552 Queue<ByteBankDescriptor> byte_bank_q;
00553 int n_set_data_msgs;
00554 int n_recv_set_data_msgs;
00555 volatile int pending_remote_fill;
00556 Ptr<IOBufferBlock> read_block;
00557 bool remote_ram_cache_hit;
00558 bool have_all_data;
00559 int initial_data_bytes;
00560 Ptr<IOBufferBlock> remote_write_block;
00561 void *current_cont;
00562
00563 #define CLUSTER_IOV_NOT_OPEN -2
00564 #define CLUSTER_IOV_NONE -1
00565 int iov_map;
00566
00567 Ptr<ProxyMutex> read_locked;
00568 Ptr<ProxyMutex> write_locked;
00569
00570
00571 Ptr<IOBufferData> marshal_buf;
00572
00573
00574 Ptr<IOBufferBlock> write_list;
00575 IOBufferBlock *write_list_tail;
00576 int write_list_bytes;
00577 int write_bytes_in_transit;
00578
00579 CacheHTTPInfo alternate;
00580 time_t time_pin;
00581 int disk_io_priority;
00582 void set_remote_fill_action(Action *);
00583
00584
00585 bool is_ram_cache_hit() const { return remote_ram_cache_hit; };
00586 void set_ram_cache_hit(bool remote_hit) { remote_ram_cache_hit = remote_hit; }
00587
00588
00589
00590 virtual bool get_data(int id, void *data);
00591 virtual void get_http_info(CacheHTTPInfo **);
00592 virtual int64_t get_object_size();
00593 virtual bool is_pread_capable();
00594
00595
00596
00597
00598 virtual void set_http_info(CacheHTTPInfo *);
00599
00600 virtual bool set_pin_in_cache(time_t time_pin);
00601 virtual time_t get_pin_in_cache();
00602 virtual bool set_disk_io_priority(int priority);
00603 virtual int get_disk_io_priority();
00604 virtual int get_header(void **ptr, int *len);
00605 virtual int set_header(void *ptr, int len);
00606 virtual int get_single_data(void **ptr, int *len);
00607 };
00608
00609
00610
00611
00612 #define CLUSTER_OPT_STEAL 0x0001 // allow thread stealing
00613 #define CLUSTER_OPT_IMMEDIATE 0x0002 // require immediate response
00614 #define CLUSTER_OPT_ALLOW_IMMEDIATE 0x0004 // allow immediate response
00615 #define CLUSTER_OPT_DELAY 0x0008 // require delayed response
00616 #define CLUSTER_OPT_CONN_READ 0x0010 // new conn read
00617 #define CLUSTER_OPT_CONN_WRITE 0x0020 // new conn write
00618 #define CLUSTER_OPT_DATA_IS_OCONTROL 0x0040 // data in OutgoingControl
00619 #define CLUSTER_FUNCTION_MALLOCED -1
00620
00621 struct ClusterRemoteDataHeader
00622 {
00623 int32_t cluster_function;
00624 };
00625
00626
00627
00628 class ClusterAccept;
00629
00630 struct ClusterProcessor
00631 {
00632
00633
00634
00635
00636
00637
00638
00639
00640
00641
00642 int invoke_remote(ClusterHandler *ch, int cluster_fn_index, void *data, int len, int options = CLUSTER_OPT_STEAL);
00643
00644 int invoke_remote_data(ClusterHandler *ch, int cluster_fn_index,
00645 void *data, int data_len,
00646 IOBufferBlock * buf,
00647 int logical_channel, ClusterVCToken * token,
00648 void (*bufdata_free) (void *), void *bufdata_free_arg, int options = CLUSTER_OPT_STEAL);
00649
00650
00651 int invoke_remote_malloced(ClusterHandler *ch, ClusterRemoteDataHeader * data, int len )
00652 {
00653 return invoke_remote(ch, CLUSTER_FUNCTION_MALLOCED, data, len);
00654 }
00655 void free_remote_data(char *data, int len);
00656
00657
00658
00659
00660
00661
00662
00663
00664
00665
00666
00667
00668
00669
00670
00671 #define CLUSTER_DELAYED_OPEN ((ClusterVConnection*)-1)
00672 #define CLUSTER_NODE_DOWN ((ClusterVConnection*)-2)
00673 ClusterVConnection *open_local(Continuation * cont, ClusterMachine * mp, ClusterVCToken & token, int options = 0);
00674
00675
00676
00677
00678
00679
00680
00681 ClusterVConnection *connect_local(Continuation * cont, ClusterVCToken * token, int channel, int options = 0);
00682 inkcoreapi bool disable_remote_cluster_ops(ClusterMachine *);
00683
00684
00685
00686
00687 virtual int init();
00688 virtual int start();
00689
00690 ClusterProcessor();
00691 virtual ~ ClusterProcessor();
00692
00693
00694
00695
00696 ClusterAccept *accept_handler;
00697 Cluster *this_cluster;
00698
00699 void connect(char *hostname, int16_t id = -1);
00700 void connect(unsigned int ip, int port = 0, int16_t id = -1, bool delay = false);
00701
00702 void send_machine_list(ClusterMachine * m);
00703 void compute_cluster_mode();
00704
00705 int internal_invoke_remote(ClusterHandler * m, int cluster_fn, void *data, int len, int options, void *cmsg);
00706 };
00707
00708 inkcoreapi extern ClusterProcessor clusterProcessor;
00709
00710 inline Cluster *
00711 this_cluster()
00712 {
00713 return clusterProcessor.this_cluster;
00714 }
00715
00716
00717
00718
00719
00720
00721 void initialize_thread_for_cluster(EThread * thread);
00722
00723
00724
00725
00726
00727
00728
00729
00730
00731
00732 extern ClusterFunction *ptest_ClusterFunction;
00733
00734 extern ClusterFunction test_ClusterFunction;
00735 extern ClusterFunction ping_ClusterFunction;
00736 extern ClusterFunction ping_reply_ClusterFunction;
00737 extern ClusterFunction machine_list_ClusterFunction;
00738 extern ClusterFunction close_channel_ClusterFunction;
00739 extern ClusterFunction get_hostinfo_ClusterFunction;
00740 extern ClusterFunction put_hostinfo_ClusterFunction;
00741 extern ClusterFunction cache_lookup_ClusterFunction;
00742 extern ClusterFunction cache_op_ClusterFunction;
00743 extern ClusterFunction cache_op_malloc_ClusterFunction;
00744 extern ClusterFunction cache_op_result_ClusterFunction;
00745 extern ClusterFunction set_channel_data_ClusterFunction;
00746 extern ClusterFunction post_setchan_send_ClusterFunction;
00747 extern ClusterFunction set_channel_pin_ClusterFunction;
00748 extern ClusterFunction post_setchan_pin_ClusterFunction;
00749 extern ClusterFunction set_channel_priority_ClusterFunction;
00750 extern ClusterFunction post_setchan_priority_ClusterFunction;
00751 extern ClusterFunction default_api_ClusterFunction;
00752
00753 struct ClusterFunctionDescriptor
00754 {
00755 bool fMalloced;
00756 bool ClusterFunc;
00757
00758 int q_priority;
00759 ClusterFunctionPtr pfn;
00760 ClusterFunctionPtr post_pfn;
00761 };
00762
00763 #define CLUSTER_CMSG_QUEUES 2
00764 #define CMSG_MAX_PRI 0
00765 #define CMSG_LOW_PRI (CLUSTER_CMSG_QUEUES-1)
00766
00767 #ifndef DEFINE_CLUSTER_FUNCTIONS
00768 extern
00769 #endif
00770 ClusterFunctionDescriptor clusterFunction[]
00771 #ifdef DEFINE_CLUSTER_FUNCTIONS
00772 = {
00773 {false, true, CMSG_LOW_PRI, test_ClusterFunction, 0},
00774 {false, true, CMSG_LOW_PRI, ping_ClusterFunction, 0},
00775 {false, true, CMSG_LOW_PRI, ping_reply_ClusterFunction, 0},
00776 {false, true, CMSG_LOW_PRI, machine_list_ClusterFunction, 0},
00777 {false, true, CMSG_LOW_PRI, close_channel_ClusterFunction, 0},
00778 {false, false, CMSG_LOW_PRI, get_hostinfo_ClusterFunction, 0},
00779 {false, false, CMSG_LOW_PRI, put_hostinfo_ClusterFunction, 0},
00780 {false, true, CMSG_LOW_PRI, cache_lookup_ClusterFunction, 0},
00781 {true, true, CMSG_LOW_PRI, cache_op_malloc_ClusterFunction, 0},
00782 {false, true, CMSG_LOW_PRI, cache_op_ClusterFunction, 0},
00783 {false, false, CMSG_LOW_PRI, cache_op_result_ClusterFunction, 0},
00784 {false, false, CMSG_LOW_PRI, 0, 0},
00785 {false, false, CMSG_LOW_PRI, 0, 0},
00786 {false, false, CMSG_LOW_PRI, 0, 0},
00787 {false, true, CMSG_MAX_PRI, set_channel_data_ClusterFunction, post_setchan_send_ClusterFunction},
00788 {false, true, CMSG_MAX_PRI, set_channel_pin_ClusterFunction, post_setchan_pin_ClusterFunction},
00789 {false, true, CMSG_MAX_PRI, set_channel_priority_ClusterFunction, post_setchan_priority_ClusterFunction},
00790
00791
00792
00793 {false, false, CMSG_LOW_PRI, 0, 0},
00794 {false, false, CMSG_LOW_PRI, 0, 0},
00795 {false, false, CMSG_LOW_PRI, 0, 0},
00796 {false, false, CMSG_LOW_PRI, 0, 0},
00797 {false, false, CMSG_LOW_PRI, 0, 0},
00798 {false, false, CMSG_LOW_PRI, 0, 0},
00799 {false, false, CMSG_LOW_PRI, 0, 0},
00800 {false, false, CMSG_LOW_PRI, 0, 0},
00801 {false, false, CMSG_LOW_PRI, 0, 0},
00802 {false, false, CMSG_LOW_PRI, 0, 0},
00803 {false, false, CMSG_LOW_PRI, 0, 0},
00804 {false, false, CMSG_LOW_PRI, 0, 0},
00805 {false, false, CMSG_LOW_PRI, 0, 0},
00806 {false, false, CMSG_LOW_PRI, 0, 0},
00807 {false, false, CMSG_LOW_PRI, 0, 0},
00808 {false, false, CMSG_LOW_PRI, 0, 0},
00809 {false, false, CMSG_LOW_PRI, 0, 0},
00810 {false, false, CMSG_LOW_PRI, 0, 0},
00811 {false, false, CMSG_LOW_PRI, 0, 0},
00812 {false, false, CMSG_LOW_PRI, 0, 0},
00813 {false, false, CMSG_LOW_PRI, 0, 0},
00814 {false, false, CMSG_LOW_PRI, 0, 0},
00815 {false, false, CMSG_LOW_PRI, 0, 0},
00816 {false, false, CMSG_LOW_PRI, 0, 0},
00817 {false, false, CMSG_LOW_PRI, 0, 0},
00818 {false, false, CMSG_LOW_PRI, 0, 0},
00819 {false, false, CMSG_LOW_PRI, 0, 0},
00820 {false, false, CMSG_LOW_PRI, 0, 0},
00821 {false, false, CMSG_LOW_PRI, 0, 0},
00822 {false, false, CMSG_LOW_PRI, 0, 0},
00823 {false, false, CMSG_LOW_PRI, 0, 0},
00824 {false, false, CMSG_LOW_PRI, 0, 0},
00825 {false, false, CMSG_LOW_PRI, 0, 0},
00826 {false, false, CMSG_LOW_PRI, 0, 0},
00827
00828
00829
00830
00831 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00832 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00833 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00834 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00835 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00836 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00837 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00838 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00839 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00840 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00841 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00842 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00843 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00844 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00845 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00846 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00847 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00848 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00849 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00850 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00851 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00852 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00853 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00854 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00855 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00856 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00857 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00858 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00859 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00860 {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0}
00861
00862 }
00863 #endif
00864
00865 ;
00866 extern unsigned SIZE_clusterFunction;
00867
00868
00869
00870
00871 inline int
00872 ClusterFuncToQpri(int cluster_func)
00873 {
00874 if (cluster_func < 0) {
00875 return CMSG_LOW_PRI;
00876 } else {
00877 return clusterFunction[cluster_func].q_priority;
00878 }
00879 }
00880
00881
00882
00883
00884 #define TEST_CLUSTER_FUNCTION 0
00885 #define PING_CLUSTER_FUNCTION 1
00886 #define PING_REPLY_CLUSTER_FUNCTION 2
00887 #define MACHINE_LIST_CLUSTER_FUNCTION 3
00888 #define CLOSE_CHANNEL_CLUSTER_FUNCTION 4
00889 #define GET_HOSTINFO_CLUSTER_FUNCTION 5
00890 #define PUT_HOSTINFO_CLUSTER_FUNCTION 6
00891 #define CACHE_LOOKUP_CLUSTER_FUNCTION 7
00892 #define CACHE_OP_MALLOCED_CLUSTER_FUNCTION 8
00893 #define CACHE_OP_CLUSTER_FUNCTION 9
00894 #define CACHE_OP_RESULT_CLUSTER_FUNCTION 10
00895 #define SET_CHANNEL_DATA_CLUSTER_FUNCTION 14
00896 #define SET_CHANNEL_PIN_CLUSTER_FUNCTION 15
00897 #define SET_CHANNEL_PRIORITY_CLUSTER_FUNCTION 16
00898
00899
00900
00901
00902 #define INTERNAL_RESERVED1_CLUSTER_FUNCTION 17
00903 #define INTERNAL_RESERVED2_CLUSTER_FUNCTION 18
00904 #define INTERNAL_RESERVED3_CLUSTER_FUNCTION 19
00905 #define INTERNAL_RESERVED4_CLUSTER_FUNCTION 20
00906 #define INTERNAL_RESERVED5_CLUSTER_FUNCTION 21
00907 #define INTERNAL_RESERVED6_CLUSTER_FUNCTION 22
00908 #define INTERNAL_RESERVED7_CLUSTER_FUNCTION 23
00909 #define INTERNAL_RESERVED8_CLUSTER_FUNCTION 24
00910 #define INTERNAL_RESERVED9_CLUSTER_FUNCTION 25
00911 #define INTERNAL_RESERVED10_CLUSTER_FUNCTION 26
00912 #define INTERNAL_RESERVED11_CLUSTER_FUNCTION 27
00913 #define INTERNAL_RESERVED12_CLUSTER_FUNCTION 28
00914 #define INTERNAL_RESERVED13_CLUSTER_FUNCTION 29
00915 #define INTERNAL_RESERVED14_CLUSTER_FUNCTION 30
00916 #define INTERNAL_RESERVED15_CLUSTER_FUNCTION 31
00917 #define INTERNAL_RESERVED16_CLUSTER_FUNCTION 32
00918 #define INTERNAL_RESERVED17_CLUSTER_FUNCTION 33
00919 #define INTERNAL_RESERVED18_CLUSTER_FUNCTION 34
00920 #define INTERNAL_RESERVED19_CLUSTER_FUNCTION 35
00921 #define INTERNAL_RESERVED20_CLUSTER_FUNCTION 36
00922 #define INTERNAL_RESERVED21_CLUSTER_FUNCTION 37
00923 #define INTERNAL_RESERVED22_CLUSTER_FUNCTION 38
00924 #define INTERNAL_RESERVED23_CLUSTER_FUNCTION 39
00925 #define INTERNAL_RESERVED24_CLUSTER_FUNCTION 40
00926 #define INTERNAL_RESERVED25_CLUSTER_FUNCTION 41
00927 #define INTERNAL_RESERVED26_CLUSTER_FUNCTION 42
00928 #define INTERNAL_RESERVED27_CLUSTER_FUNCTION 43
00929 #define INTERNAL_RESERVED28_CLUSTER_FUNCTION 44
00930 #define INTERNAL_RESERVED29_CLUSTER_FUNCTION 45
00931 #define INTERNAL_RESERVED30_CLUSTER_FUNCTION 46
00932 #define INTERNAL_RESERVED31_CLUSTER_FUNCTION 47
00933 #define INTERNAL_RESERVED32_CLUSTER_FUNCTION 48
00934 #define INTERNAL_RESERVED33_CLUSTER_FUNCTION 49
00935 #define INTERNAL_RESERVED34_CLUSTER_FUNCTION 50
00936
00937
00938
00939
00940
00941
00942
00943
00944
00945
00946
00947
00948 #define API_F01_CLUSTER_FUNCTION 51
00949 #define API_F02_CLUSTER_FUNCTION 52
00950 #define API_F03_CLUSTER_FUNCTION 53
00951 #define API_F04_CLUSTER_FUNCTION 54
00952 #define API_F05_CLUSTER_FUNCTION 55
00953 #define API_F06_CLUSTER_FUNCTION 56
00954 #define API_F07_CLUSTER_FUNCTION 57
00955 #define API_F08_CLUSTER_FUNCTION 58
00956 #define API_F09_CLUSTER_FUNCTION 59
00957 #define API_F10_CLUSTER_FUNCTION 60
00958
00959
00960
00961
00962 #define API_F11_CLUSTER_FUNCTION 61
00963 #define API_F12_CLUSTER_FUNCTION 62
00964 #define API_F13_CLUSTER_FUNCTION 63
00965 #define API_F14_CLUSTER_FUNCTION 64
00966 #define API_F15_CLUSTER_FUNCTION 65
00967 #define API_F16_CLUSTER_FUNCTION 66
00968 #define API_F17_CLUSTER_FUNCTION 67
00969 #define API_F18_CLUSTER_FUNCTION 68
00970 #define API_F19_CLUSTER_FUNCTION 69
00971 #define API_F20_CLUSTER_FUNCTION 70
00972
00973 #define API_F21_CLUSTER_FUNCTION 71
00974 #define API_F22_CLUSTER_FUNCTION 72
00975 #define API_F23_CLUSTER_FUNCTION 73
00976 #define API_F24_CLUSTER_FUNCTION 74
00977 #define API_F25_CLUSTER_FUNCTION 75
00978 #define API_F26_CLUSTER_FUNCTION 76
00979 #define API_F27_CLUSTER_FUNCTION 77
00980 #define API_F28_CLUSTER_FUNCTION 78
00981 #define API_F29_CLUSTER_FUNCTION 79
00982 #define API_F30_CLUSTER_FUNCTION 80
00983
00984 #define API_STARECT_CLUSTER_FUNCTION API_F01_CLUSTER_FUNCTION
00985 #define API_END_CLUSTER_FUNCTION API_F30_CLUSTER_FUNCTION
00986
00987 #define UNDEFINED_CLUSTER_FUNCTION 0xFDEFFDEF
00988
00989
00990
00991
00992 struct ClusterHelloMessage
00993 {
00994 uint16_t _NativeByteOrder;
00995 uint16_t _major;
00996 uint16_t _minor;
00997 uint16_t _min_major;
00998 uint16_t _min_minor;
00999 int16_t _id;
01000 #ifdef LOCAL_CLUSTER_TEST_MODE
01001 int16_t _port;
01002 char _pad[114];
01003 #else
01004 char _pad[116];
01005 #endif
01006
01007 ClusterHelloMessage():_NativeByteOrder(1)
01008 {
01009 _major = CLUSTER_MAJOR_VERSION;
01010 _minor = CLUSTER_MINOR_VERSION;
01011 _min_major = MIN_CLUSTER_MAJOR_VERSION;
01012 _min_minor = MIN_CLUSTER_MINOR_VERSION;
01013 memset(_pad, '\0', sizeof(_pad));
01014 }
01015 int NativeByteOrder()
01016 {
01017 return (_NativeByteOrder == 1);
01018 }
01019 void AdjustByteOrder()
01020 {
01021 if (!NativeByteOrder()) {
01022 ats_swap16(&_major);
01023 ats_swap16(&_minor);
01024 ats_swap16(&_min_major);
01025 ats_swap16(&_min_minor);
01026 }
01027 }
01028 };
01029
01030
01031
01032
01033 struct ClusterMessageHeader
01034 {
01035 uint16_t _InNativeByteOrder;
01036 uint16_t _MsgVersion;
01037
01038 void _init(uint16_t msg_version)
01039 {
01040 _InNativeByteOrder = 1;
01041 _MsgVersion = msg_version;
01042 }
01043 ClusterMessageHeader():_InNativeByteOrder(0), _MsgVersion(0)
01044 {
01045 }
01046 ClusterMessageHeader(uint16_t msg_version) {
01047 _init(msg_version);
01048 }
01049 int MsgInNativeByteOrder()
01050 {
01051 return (_InNativeByteOrder == 1);
01052 }
01053 int NeedByteSwap()
01054 {
01055 return (_InNativeByteOrder != 1);
01056 }
01057 int GetMsgVersion()
01058 {
01059 if (NeedByteSwap()) {
01060 return ats_swap16(_MsgVersion);
01061 } else {
01062 return _MsgVersion;
01063 }
01064 }
01065 };
01066
01067
01068
01069
01070
01071
01072 typedef void (*PingReturnFunction) (ClusterHandler *, void *data, int len);
01073
01074 struct PingMessage:public ClusterMessageHeader
01075 {
01076 PingReturnFunction fn;
01077 char data[1];
01078
01079 enum
01080 {
01081 MIN_VERSION = 1,
01082 MAX_VERSION = 1,
01083 PING_MESSAGE_VERSION = MAX_VERSION
01084 };
01085
01086 PingMessage(uint16_t vers = PING_MESSAGE_VERSION)
01087 : ClusterMessageHeader(vers), fn(NULL) {
01088 data[0] = '\0';
01089 }
01090
01091 static int protoToVersion(int protoMajor)
01092 {
01093 (void) protoMajor;
01094 return PING_MESSAGE_VERSION;
01095 }
01096 static int sizeof_fixedlen_msg()
01097 {
01098 PingMessage *p = 0;
01099
01100 return (uintptr_t) (&p->data[0]);
01101 }
01102 void init(uint16_t vers = PING_MESSAGE_VERSION) {
01103 _init(vers);
01104 }
01105 inline void SwapBytes()
01106 {
01107 }
01108
01109 };
01110
01111 inline void
01112 cluster_ping(ClusterHandler *ch, PingReturnFunction fn, void *data, int len)
01113 {
01114 PingMessage *msg = (PingMessage *)alloca(PingMessage::sizeof_fixedlen_msg() + len);
01115 msg->init();
01116 msg->fn = fn;
01117 memcpy(msg->data, data, len);
01118 clusterProcessor.invoke_remote(ch, PING_CLUSTER_FUNCTION, (void *) msg, (msg->sizeof_fixedlen_msg() + len));
01119 }
01120
01121
01122 extern char channel_dummy_output[DEFAULT_MAX_BUFFER_SIZE];
01123
01124
01125
01126
01127 ClusterConfiguration *configuration_add_machine(ClusterConfiguration * c, ClusterMachine * m);
01128 ClusterConfiguration *configuration_remove_machine(ClusterConfiguration * c, ClusterMachine * m);
01129 extern bool machineClusterHash;
01130 extern bool boundClusterHash;
01131 extern bool randClusterHash;
01132
01133 void build_cluster_hash_table(ClusterConfiguration *);
01134
01135 inline void
01136 ClusterVC_enqueue_read(Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_read_link> &q, ClusterVConnectionBase * vc)
01137 {
01138 ClusterVConnState * cs = &vc->read;
01139 ink_assert(!cs->queue);
01140 cs->queue = &q;
01141 q.enqueue(vc);
01142 }
01143
01144 inline void
01145 ClusterVC_enqueue_write(Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_write_link> &q, ClusterVConnectionBase * vc)
01146 {
01147 ClusterVConnState * cs = &vc->write;
01148 ink_assert(!cs->queue);
01149 cs->queue = &q;
01150 q.enqueue(vc);
01151 }
01152
01153 inline void
01154 ClusterVC_remove_read(ClusterVConnectionBase * vc)
01155 {
01156 ClusterVConnState * cs = &vc->read;
01157 ink_assert(cs->queue);
01158 ((Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_read_link> *)cs->queue)->remove(vc);
01159 cs->queue = NULL;
01160 }
01161
01162 inline void
01163 ClusterVC_remove_write(ClusterVConnectionBase * vc)
01164 {
01165 ClusterVConnState * cs = &vc->write;
01166 ink_assert(cs->queue);
01167 ((Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_write_link> *)cs->queue)->remove(vc);
01168 cs->queue = NULL;
01169 }
01170
01171
01172 #endif