00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #ifndef _P_ClusterHandler_h
00031 #define _P_ClusterHandler_h
00032
00033
00034 class ClusterLoadMonitor;
00035
00036 struct ClusterCalloutContinuation;
00037 typedef int (ClusterCalloutContinuation::*ClstCoutContHandler) (int, void *);
00038
00039 struct ClusterCalloutContinuation:public Continuation
00040 {
00041 struct ClusterHandler *_ch;
00042
00043 int CalloutHandler(int event, Event * e);
00044 ClusterCalloutContinuation(struct ClusterHandler *ch);
00045 ~ClusterCalloutContinuation();
00046 };
00047
00048 struct ClusterControl: public Continuation
00049 {
00050 int len;
00051 char size_index;
00052 int64_t *real_data;
00053 char *data;
00054 void (*free_proc) (void *);
00055 void *free_proc_arg;
00056 Ptr<IOBufferBlock> iob_block;
00057
00058 IOBufferBlock *get_block()
00059 {
00060 return iob_block;
00061 }
00062 bool fast_data()
00063 {
00064 return (len <= MAX_FAST_CONTROL_MESSAGE);
00065 }
00066 bool valid_alloc_data()
00067 {
00068 return iob_block && real_data && data;
00069 }
00070
00071 enum
00072 {
00073
00074
00075 DATA_HDR = (sizeof(int64_t) * 2)
00076 };
00077
00078 ClusterControl();
00079 void real_alloc_data(int, bool);
00080 void free_data();
00081 virtual void freeall() = 0;
00082 };
00083
00084 struct OutgoingControl: public ClusterControl
00085 {
00086 ClusterHandler *ch;
00087 ink_hrtime submit_time;
00088
00089 static OutgoingControl *alloc();
00090
00091 OutgoingControl();
00092 void alloc_data(bool align_int32_on_non_int64_boundary = true) {
00093 real_alloc_data(1, align_int32_on_non_int64_boundary);
00094 }
00095
00096 void set_data(char *adata, int alen)
00097 {
00098 data = adata;
00099 len = alen;
00100 free_proc = 0;
00101 free_proc_arg = 0;
00102 real_data = 0;
00103
00104
00105
00106 iob_block = new_IOBufferBlock();
00107 iob_block->set_internal(adata, alen, BUFFER_SIZE_FOR_XMALLOC(alen));
00108 iob_block->_buf_end = iob_block->end();
00109 }
00110
00111 void set_data(IOBufferBlock * buf, void (*free_data_proc) (void *), void *free_data_arg)
00112 {
00113 data = buf->data->data();
00114 len = bytes_IOBufferBlockList(buf, 1);
00115 free_proc = free_data_proc;
00116 free_proc_arg = free_data_arg;
00117 real_data = 0;
00118 iob_block = buf;
00119 }
00120 int startEvent(int event, Event * e);
00121 virtual void freeall();
00122 };
00123
00124
00125
00126
00127 struct IncomingControl: public ClusterControl
00128 {
00129 ink_hrtime recognized_time;
00130
00131 static IncomingControl *alloc();
00132
00133 IncomingControl();
00134 void alloc_data(bool align_int32_on_non_int64_boundary = true) {
00135 real_alloc_data(0, align_int32_on_non_int64_boundary);
00136 }
00137 virtual void freeall();
00138 };
00139
00140
00141
00142
00143 struct invoke_remote_data_args
00144 {
00145 int32_t magicno;
00146 OutgoingControl *msg_oc;
00147 OutgoingControl *data_oc;
00148 int dest_channel;
00149 ClusterVCToken token;
00150
00151 enum
00152 {
00153 MagicNo = 0x04141998
00154 };
00155 invoke_remote_data_args():magicno(MagicNo), msg_oc(NULL), data_oc(NULL), dest_channel(0)
00156 {
00157 }
00158 };
00159
00160
00161
00162
00163
00164
00165
00166 #define CLUSTER_SEND_FREE 0
00167 #define CLUSTER_SEND_DATA 1
00168 #define CLUSTER_SEQUENCE_NUMBER(_x) (((unsigned int)_x)&0xFFFF)
00169
00170 struct Descriptor
00171 {
00172 uint32_t type:1;
00173 uint32_t channel:15;
00174 uint16_t sequence_number;
00175 uint32_t length;
00176
00177 inline void SwapBytes()
00178 {
00179 ats_swap16((uint16_t *) this);
00180 ats_swap16((uint16_t *) & sequence_number);
00181 ats_swap32((uint32_t *) & length);
00182 }
00183 };
00184
00185 struct ClusterMsgHeader
00186 {
00187 uint16_t count;
00188 uint16_t descriptor_cksum;
00189 uint16_t control_bytes_cksum;
00190 uint16_t unused;
00191 uint32_t control_bytes;
00192 uint32_t count_check;
00193
00194 void clear()
00195 {
00196 count = 0;
00197 descriptor_cksum = 0;
00198 control_bytes_cksum = 0;
00199 unused = 0;
00200 control_bytes = 0;
00201 count_check = 0;
00202 }
00203 ClusterMsgHeader():count(0), descriptor_cksum(0), control_bytes_cksum(0), unused(0), control_bytes(0), count_check(0)
00204 {
00205 }
00206 inline void SwapBytes()
00207 {
00208 ats_swap16((uint16_t *) & count);
00209 ats_swap16((uint16_t *) & descriptor_cksum);
00210 ats_swap16((uint16_t *) & control_bytes_cksum);
00211 ats_swap16((uint16_t *) & unused);
00212 ats_swap32((uint32_t *) & control_bytes);
00213 ats_swap32((uint32_t *) & count_check);
00214 }
00215 };
00216
00217 struct ClusterMsg
00218 {
00219 Descriptor *descriptor;
00220 Ptr<IOBufferBlock> iob_descriptor_block;
00221 int count;
00222 int control_bytes;
00223 int descriptor_cksum;
00224 int control_bytes_cksum;
00225 int unused;
00226 int state;
00227
00228 Queue<OutgoingControl> outgoing_control;
00229 Queue<OutgoingControl> outgoing_small_control;
00230 Queue<OutgoingControl> outgoing_callout;
00231
00232
00233 int control_data_offset;
00234 int did_small_control_set_data;
00235 int did_large_control_set_data;
00236 int did_small_control_msgs;
00237 int did_large_control_msgs;
00238 int did_freespace_msgs;
00239
00240 ClusterMsgHeader *hdr()
00241 {
00242 return (ClusterMsgHeader *) (((char *) descriptor)
00243 - sizeof(ClusterMsgHeader));
00244 }
00245
00246 IOBufferBlock *get_block()
00247 {
00248 return iob_descriptor_block;
00249 }
00250
00251 IOBufferBlock *get_block_header()
00252 {
00253 int start_offset;
00254
00255 start_offset = (char *) hdr() - iob_descriptor_block->buf();
00256 iob_descriptor_block->reset();
00257 iob_descriptor_block->next = 0;
00258 iob_descriptor_block->fill(start_offset);
00259 iob_descriptor_block->consume(start_offset);
00260 return iob_descriptor_block;
00261 }
00262
00263 IOBufferBlock *get_block_descriptor()
00264 {
00265 int start_offset;
00266
00267 start_offset = ((char *) hdr() + sizeof(ClusterMsgHeader))
00268 - iob_descriptor_block->buf();
00269 iob_descriptor_block->reset();
00270 iob_descriptor_block->next = 0;
00271 iob_descriptor_block->fill(start_offset);
00272 iob_descriptor_block->consume(start_offset);
00273 return iob_descriptor_block;
00274 }
00275
00276 void clear()
00277 {
00278 hdr()->clear();
00279 count = 0;
00280 control_bytes = 0;
00281 descriptor_cksum = 0;
00282 control_bytes_cksum = 0;
00283 unused = 0;
00284 state = 0;
00285 outgoing_control.clear();
00286 outgoing_small_control.clear();
00287 control_data_offset = 0;
00288 did_small_control_set_data = 0;
00289 did_large_control_set_data = 0;
00290 did_small_control_msgs = 0;
00291 did_large_control_msgs = 0;
00292 did_freespace_msgs = 0;
00293 }
00294 uint16_t calc_control_bytes_cksum()
00295 {
00296 uint16_t cksum = 0;
00297 char *p = (char *) &descriptor[count];
00298 char *endp = p + control_bytes;
00299 while (p < endp) {
00300 cksum += *p;
00301 ++p;
00302 }
00303 return cksum;
00304 }
00305 uint16_t calc_descriptor_cksum()
00306 {
00307 uint16_t cksum = 0;
00308 char *p = (char *) &descriptor[0];
00309 char *endp = (char *) &descriptor[count];
00310 while (p < endp) {
00311 cksum += *p;
00312 ++p;
00313 }
00314 return cksum;
00315 }
00316 ClusterMsg():descriptor(NULL), iob_descriptor_block(NULL), count(0),
00317 control_bytes(0),
00318 descriptor_cksum(0), control_bytes_cksum(0),
00319 unused(0), state(0),
00320 control_data_offset(0),
00321 did_small_control_set_data(0),
00322 did_large_control_set_data(0), did_small_control_msgs(0), did_large_control_msgs(0), did_freespace_msgs(0) {
00323 }
00324
00325 };
00326
00327
00328
00329
00330 struct ClusterHandler;
00331 struct ClusterState: public Continuation
00332 {
00333 ClusterHandler *ch;
00334 bool read_channel;
00335 bool do_iodone_event;
00336 int n_descriptors;
00337 ClusterMsg msg;
00338 unsigned int sequence_number;
00339 int to_do;
00340 int did;
00341 int n_iov;
00342 int io_complete;
00343 int io_complete_event;
00344 VIO *v;
00345 int bytes_xfered;
00346 int last_ndone;
00347 int total_bytes_xfered;
00348 IOVec *iov;
00349 Ptr<IOBufferData> iob_iov;
00350
00351
00352 char *byte_bank;
00353 int n_byte_bank;
00354 int byte_bank_size;
00355
00356 int missed;
00357 bool missed_msg;
00358 ink_hrtime last_time;
00359 ink_hrtime start_time;
00360
00361 Ptr<IOBufferBlock> block[MAX_TCOUNT];
00362 class MIOBuffer *mbuf;
00363 int state;
00364
00365
00366 enum
00367 {
00368 READ_START = 1,
00369 READ_HEADER,
00370 READ_AWAIT_HEADER,
00371 READ_SETUP_DESCRIPTOR,
00372 READ_DESCRIPTOR,
00373 READ_AWAIT_DESCRIPTOR,
00374 READ_SETUP_DATA,
00375 READ_DATA,
00376 READ_AWAIT_DATA,
00377 READ_POST_COMPLETE,
00378 READ_COMPLETE
00379 } read_state_t;
00380
00381 enum
00382 {
00383 WRITE_START = 1,
00384 WRITE_SETUP,
00385 WRITE_INITIATE,
00386 WRITE_AWAIT_COMPLETION,
00387 WRITE_POST_COMPLETE,
00388 WRITE_COMPLETE
00389 } write_state_t;
00390
00391 ClusterState(ClusterHandler *, bool);
00392 ~ClusterState();
00393 IOBufferData *get_data();
00394 void build_do_io_vector();
00395 int doIO();
00396 int doIO_read_event(int, void *);
00397 int doIO_write_event(int, void *);
00398 void IOComplete();
00399 };
00400
00401
00402
00403
00404
00405 struct ClusterHandlerBase: public Continuation
00406 {
00407
00408
00409
00410 Queue<ClusterVConnectionBase, ClusterVConnection::Link_read_link> *read_vcs;
00411 Queue<ClusterVConnectionBase, ClusterVConnection::Link_write_link> *write_vcs;
00412 int cur_vcs;
00413 int min_priority;
00414 Event *trigger_event;
00415
00416 ClusterHandlerBase():Continuation(NULL), read_vcs(NULL), write_vcs(NULL), cur_vcs(0), min_priority(1)
00417 {
00418 }
00419 };
00420
00421 struct ClusterHandler:public ClusterHandlerBase
00422 {
00423 #ifdef MSG_TRACE
00424 FILE *t_fd;
00425 #endif
00426 NetVConnection *net_vc;
00427 EThread *thread;
00428 unsigned int ip;
00429 int port;
00430 char *hostname;
00431 ClusterMachine *machine;
00432 int ifd;
00433 int id;
00434 bool dead;
00435 bool downing;
00436
00437 int32_t active;
00438 bool on_stolen_thread;
00439
00440 struct ChannelData
00441 {
00442 int channel_number;
00443 LINK(ChannelData, link);
00444 };
00445
00446 int n_channels;
00447 ClusterVConnection **channels;
00448 struct ChannelData **channel_data;
00449 Queue<ChannelData> free_local_channels;
00450
00451 bool connector;
00452 int cluster_connect_state;
00453 ClusterHelloMessage clusteringVersion;
00454 ClusterHelloMessage nodeClusteringVersion;
00455 bool needByteSwap;
00456 int configLookupFails;
00457
00458 #define CONFIG_LOOKUP_RETRIES 10
00459
00460 enum
00461 {
00462 CLCON_INITIAL = 1,
00463 CLCON_SEND_MSG,
00464 CLCON_SEND_MSG_COMPLETE,
00465 CLCON_READ_MSG,
00466 CLCON_READ_MSG_COMPLETE,
00467 CLCON_VALIDATE_MSG,
00468 CLCON_CONN_BIND_CLEAR,
00469 CLCON_CONN_BIND,
00470 CLCON_CONN_BIND_OK,
00471 CLCON_ABORT_CONNECT,
00472 CLCON_DELETE_CONNECT
00473 } clcon_state_t;
00474
00475 InkAtomicList outgoing_control_al[CLUSTER_CMSG_QUEUES];
00476 InkAtomicList external_incoming_control;
00477 InkAtomicList external_incoming_open_local;
00478 ClusterCalloutContinuation * callout_cont[MAX_COMPLETION_CALLBACK_EVENTS];
00479 Event *callout_events[MAX_COMPLETION_CALLBACK_EVENTS];
00480 Event *cluster_periodic_event;
00481 Queue<OutgoingControl> outgoing_control[CLUSTER_CMSG_QUEUES];
00482 Queue<IncomingControl> incoming_control;
00483 InkAtomicList read_vcs_ready;
00484 InkAtomicList write_vcs_ready;
00485 ClusterState read;
00486 ClusterState write;
00487
00488 ink_hrtime current_time;
00489 ink_hrtime last;
00490 ink_hrtime last_report;
00491 int n_since_last_report;
00492 ink_hrtime last_cluster_op_enable;
00493 ink_hrtime last_trace_dump;
00494
00495 DLL<ClusterVConnectionBase> delayed_reads;
00496 ClusterLoadMonitor *clm;
00497 bool disable_remote_cluster_ops;
00498
00499
00500 int pw_write_descriptors_built;
00501 int pw_freespace_descriptors_built;
00502 int pw_controldata_descriptors_built;
00503 int pw_time_expired;
00504 bool started_on_stolen_thread;
00505 bool control_message_write;
00506
00507 #ifdef CLUSTER_STATS
00508 Ptr<IOBufferBlock> message_blk;
00509
00510 int64_t _vc_writes;
00511 int64_t _vc_write_bytes;
00512 int64_t _control_write_bytes;
00513 int _dw_missed_lock;
00514 int _dw_not_enabled;
00515 int _dw_wait_remote_fill;
00516 int _dw_no_active_vio;
00517 int _dw_not_enabled_or_no_write;
00518 int _dw_set_data_pending;
00519 int _dw_no_free_space;
00520 int _fw_missed_lock;
00521 int _fw_not_enabled;
00522 int _fw_wait_remote_fill;
00523 int _fw_no_active_vio;
00524 int _fw_not_enabled_or_no_read;
00525 int _process_read_calls;
00526 int _n_read_start;
00527 int _n_read_header;
00528 int _n_read_await_header;
00529 int _n_read_setup_descriptor;
00530 int _n_read_descriptor;
00531 int _n_read_await_descriptor;
00532 int _n_read_setup_data;
00533 int _n_read_data;
00534 int _n_read_await_data;
00535 int _n_read_post_complete;
00536 int _n_read_complete;
00537 int _process_write_calls;
00538 int _n_write_start;
00539 int _n_write_setup;
00540 int _n_write_initiate;
00541 int _n_write_await_completion;
00542 int _n_write_post_complete;
00543 int _n_write_complete;
00544
00545 void clear_cluster_stats()
00546 {
00547 _vc_writes = 0;
00548 _vc_write_bytes = 0;
00549 _control_write_bytes = 0;
00550 _dw_missed_lock = 0;
00551 _dw_not_enabled = 0;
00552 _dw_wait_remote_fill = 0;
00553 _dw_no_active_vio = 0;
00554 _dw_not_enabled_or_no_write = 0;
00555 _dw_set_data_pending = 0;
00556 _dw_no_free_space = 0;
00557 _fw_missed_lock = 0;
00558 _fw_not_enabled = 0;
00559 _fw_wait_remote_fill = 0;
00560 _fw_no_active_vio = 0;
00561 _fw_not_enabled_or_no_read = 0;
00562 _process_read_calls = 0;
00563 _n_read_start = 0;
00564 _n_read_header = 0;
00565 _n_read_await_header = 0;
00566 _n_read_setup_descriptor = 0;
00567 _n_read_descriptor = 0;
00568 _n_read_await_descriptor = 0;
00569 _n_read_setup_data = 0;
00570 _n_read_data = 0;
00571 _n_read_await_data = 0;
00572 _n_read_post_complete = 0;
00573 _n_read_complete = 0;
00574 _process_write_calls = 0;
00575 _n_write_start = 0;
00576 _n_write_setup = 0;
00577 _n_write_initiate = 0;
00578 _n_write_await_completion = 0;
00579 _n_write_post_complete = 0;
00580 _n_write_complete = 0;
00581 }
00582 #endif // CLUSTER_STATS
00583
00584 ClusterHandler();
00585 ~ClusterHandler();
00586 bool check_channel(int c);
00587 int alloc_channel(ClusterVConnection * vc, int requested_channel = 0);
00588 void free_channel(ClusterVConnection * vc);
00589
00590
00591
00592
00593 inline bool local_channel(int i)
00594 {
00595 return !connector == !(i & 1);
00596 }
00597
00598 void close_ClusterVConnection(ClusterVConnection *);
00599 int cluster_signal_and_update(int event, ClusterVConnection * vc, ClusterVConnState * s);
00600 int cluster_signal_and_update_locked(int event, ClusterVConnection * vc, ClusterVConnState * s);
00601 int cluster_signal_error_and_update(ClusterVConnection * vc, ClusterVConnState * s, int lerrno);
00602 void close_free_lock(ClusterVConnection *, ClusterVConnState *);
00603
00604 #define CLUSTER_READ true
00605 #define CLUSTER_WRITE false
00606
00607 bool build_data_vector(char *, int, bool);
00608 bool build_initial_vector(bool);
00609
00610 void add_to_byte_bank(ClusterVConnection *);
00611 void update_channels_read();
00612 int process_incoming_callouts(ProxyMutex *);
00613 void update_channels_partial_read();
00614 void process_set_data_msgs();
00615 void process_small_control_msgs();
00616 void process_large_control_msgs();
00617 void process_freespace_msgs();
00618 bool complete_channel_read(int, ClusterVConnection * vc);
00619 void finish_delayed_reads();
00620
00621
00622 void update_channels_written();
00623
00624 int build_write_descriptors();
00625 int build_freespace_descriptors();
00626 int build_controlmsg_descriptors();
00627 int add_small_controlmsg_descriptors();
00628 int valid_for_data_write(ClusterVConnection * vc);
00629 int valid_for_freespace_write(ClusterVConnection * vc);
00630
00631 int machine_down();
00632 int remote_close(ClusterVConnection * vc, ClusterVConnState * ns);
00633 void steal_thread(EThread * t);
00634
00635 #define CLUSTER_FREE_ALL_LOCKS -1
00636 void free_locks(bool read_flag, int i = CLUSTER_FREE_ALL_LOCKS);
00637 bool get_read_locks();
00638 bool get_write_locks();
00639 int zombify(Event * e = NULL);
00640
00641 int connectClusterEvent(int event, Event * e);
00642 int startClusterEvent(int event, Event * e);
00643 int mainClusterEvent(int event, Event * e);
00644 int beginClusterEvent(int event, Event * e);
00645 int zombieClusterEvent(int event, Event * e);
00646 int protoZombieEvent(int event, Event * e);
00647
00648 void vcs_push(ClusterVConnection * vc, int type);
00649 bool vc_ok_read(ClusterVConnection *);
00650 bool vc_ok_write(ClusterVConnection *);
00651 int do_open_local_requests();
00652 void swap_descriptor_bytes();
00653 int process_read(ink_hrtime);
00654 int process_write(ink_hrtime, bool);
00655
00656 void dump_write_msg(int);
00657 void dump_read_msg();
00658 int compute_active_channels();
00659 void dump_internal_data();
00660
00661 #ifdef CLUSTER_IMMEDIATE_NETIO
00662 void build_poll(bool);
00663 #endif
00664 };
00665
00666
00667 #define VALID_CHANNEL(vc) (vc && !(((uintptr_t) vc) & 1))
00668
00669
00670 extern ClassAllocator<OutgoingControl> outControlAllocator;
00671
00672
00673 extern ClassAllocator<IncomingControl> inControlAllocator;
00674
00675 #endif