00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 #ifndef _P_ClusterHandler_h
00031 #define _P_ClusterHandler_h
00032 
00033 
00034 class ClusterLoadMonitor;
00035 
00036 struct ClusterCalloutContinuation;
00037 typedef int (ClusterCalloutContinuation::*ClstCoutContHandler) (int, void *);
00038 
00039 struct ClusterCalloutContinuation:public Continuation
00040 {
00041   struct ClusterHandler *_ch;
00042 
00043   int CalloutHandler(int event, Event * e);
00044     ClusterCalloutContinuation(struct ClusterHandler *ch);
00045    ~ClusterCalloutContinuation();
00046 };
00047 
00048 struct ClusterControl: public Continuation
00049 {
00050   int len; 
00051   char size_index;
00052   int64_t *real_data;
00053   char *data;
00054   void (*free_proc) (void *);
00055   void *free_proc_arg;
00056     Ptr<IOBufferBlock> iob_block;
00057 
00058   IOBufferBlock *get_block()
00059   {
00060     return iob_block;
00061   }
00062   bool fast_data()
00063   {
00064     return (len <= MAX_FAST_CONTROL_MESSAGE);
00065   }
00066   bool valid_alloc_data()
00067   {
00068     return iob_block && real_data && data;
00069   }
00070 
00071   enum
00072   {
00073     
00074 
00075     DATA_HDR = (sizeof(int64_t) * 2)      
00076   };
00077 
00078   ClusterControl();
00079   void real_alloc_data(int, bool);
00080   void free_data();
00081   virtual void freeall() = 0;
00082 };
00083 
00084 struct OutgoingControl: public ClusterControl
00085 {
00086   ClusterHandler *ch;
00087   ink_hrtime submit_time;
00088 
00089   static OutgoingControl *alloc();
00090 
00091     OutgoingControl();
00092   void alloc_data(bool align_int32_on_non_int64_boundary = true) {
00093     real_alloc_data(1, align_int32_on_non_int64_boundary);      
00094   }
00095 
00096   void set_data(char *adata, int alen)
00097   {
00098     data = adata;
00099     len = alen;
00100     free_proc = 0;
00101     free_proc_arg = 0;
00102     real_data = 0;
00103 
00104     
00105 
00106     iob_block = new_IOBufferBlock();
00107     iob_block->set_internal(adata, alen, BUFFER_SIZE_FOR_XMALLOC(alen));
00108     iob_block->_buf_end = iob_block->end();
00109   }
00110 
00111   void set_data(IOBufferBlock * buf, void (*free_data_proc) (void *), void *free_data_arg)
00112   {
00113     data = buf->data->data();
00114     len = bytes_IOBufferBlockList(buf, 1);      
00115     free_proc = free_data_proc;
00116     free_proc_arg = free_data_arg;
00117     real_data = 0;
00118     iob_block = buf;
00119   }
00120   int startEvent(int event, Event * e);
00121   virtual void freeall();
00122 };
00123 
00124 
00125 
00126 
00127 struct IncomingControl: public ClusterControl
00128 {
00129   ink_hrtime recognized_time;
00130 
00131   static IncomingControl *alloc();
00132 
00133     IncomingControl();
00134   void alloc_data(bool align_int32_on_non_int64_boundary = true) {
00135     real_alloc_data(0, align_int32_on_non_int64_boundary);      
00136   }
00137   virtual void freeall();
00138 };
00139 
00140 
00141 
00142 
00143 struct invoke_remote_data_args
00144 {
00145   int32_t magicno;
00146   OutgoingControl *msg_oc;
00147   OutgoingControl *data_oc;
00148   int dest_channel;
00149   ClusterVCToken token;
00150 
00151   enum
00152   {
00153     MagicNo = 0x04141998
00154   };
00155     invoke_remote_data_args():magicno(MagicNo), msg_oc(NULL), data_oc(NULL), dest_channel(0)
00156   {
00157   }
00158 };
00159 
00160 
00161 
00162 
00163 
00164 
00165 
00166 #define CLUSTER_SEND_FREE   0
00167 #define CLUSTER_SEND_DATA   1
00168 #define CLUSTER_SEQUENCE_NUMBER(_x) (((unsigned int)_x)&0xFFFF)
00169 
00170 struct Descriptor
00171 {                               
00172   uint32_t type:1;
00173   uint32_t channel:15;
00174   uint16_t sequence_number;       
00175   uint32_t length;
00176 
00177   inline void SwapBytes()
00178   {
00179     ats_swap16((uint16_t *) this);    
00180     ats_swap16((uint16_t *) & sequence_number);
00181     ats_swap32((uint32_t *) & length);
00182   }
00183 };
00184 
00185 struct ClusterMsgHeader
00186 {                               
00187   uint16_t count;
00188   uint16_t descriptor_cksum;
00189   uint16_t control_bytes_cksum;
00190   uint16_t unused;
00191   uint32_t control_bytes;
00192   uint32_t count_check;
00193 
00194   void clear()
00195   {
00196     count = 0;
00197     descriptor_cksum = 0;
00198     control_bytes_cksum = 0;
00199     unused = 0;
00200     control_bytes = 0;
00201     count_check = 0;
00202   }
00203   ClusterMsgHeader():count(0), descriptor_cksum(0), control_bytes_cksum(0), unused(0), control_bytes(0), count_check(0)
00204   {
00205   }
00206   inline void SwapBytes()
00207   {
00208     ats_swap16((uint16_t *) & count);
00209     ats_swap16((uint16_t *) & descriptor_cksum);
00210     ats_swap16((uint16_t *) & control_bytes_cksum);
00211     ats_swap16((uint16_t *) & unused);
00212     ats_swap32((uint32_t *) & control_bytes);
00213     ats_swap32((uint32_t *) & count_check);
00214   }
00215 };
00216 
00217 struct ClusterMsg
00218 {
00219   Descriptor *descriptor;
00220     Ptr<IOBufferBlock> iob_descriptor_block;
00221   int count;
00222   int control_bytes;
00223   int descriptor_cksum;
00224   int control_bytes_cksum;
00225   int unused;
00226   int state;                    
00227   
00228     Queue<OutgoingControl> outgoing_control;
00229     Queue<OutgoingControl> outgoing_small_control;
00230     Queue<OutgoingControl> outgoing_callout; 
00231 
00232   
00233   int control_data_offset;
00234   int did_small_control_set_data;
00235   int did_large_control_set_data;
00236   int did_small_control_msgs;
00237   int did_large_control_msgs;
00238   int did_freespace_msgs;
00239 
00240   ClusterMsgHeader *hdr()
00241   {
00242     return (ClusterMsgHeader *) (((char *) descriptor)
00243                                  - sizeof(ClusterMsgHeader));
00244   }
00245 
00246   IOBufferBlock *get_block()
00247   {
00248     return iob_descriptor_block;
00249   }
00250 
00251   IOBufferBlock *get_block_header()
00252   {
00253     int start_offset;
00254 
00255     start_offset = (char *) hdr() - iob_descriptor_block->buf();
00256     iob_descriptor_block->reset();
00257     iob_descriptor_block->next = 0;
00258     iob_descriptor_block->fill(start_offset);
00259     iob_descriptor_block->consume(start_offset);
00260     return iob_descriptor_block;
00261   }
00262 
00263   IOBufferBlock *get_block_descriptor()
00264   {
00265     int start_offset;
00266 
00267     start_offset = ((char *) hdr() + sizeof(ClusterMsgHeader))
00268       - iob_descriptor_block->buf();
00269     iob_descriptor_block->reset();
00270     iob_descriptor_block->next = 0;
00271     iob_descriptor_block->fill(start_offset);
00272     iob_descriptor_block->consume(start_offset);
00273     return iob_descriptor_block;
00274   }
00275 
00276   void clear()
00277   {
00278     hdr()->clear();
00279     count = 0;
00280     control_bytes = 0;
00281     descriptor_cksum = 0;
00282     control_bytes_cksum = 0;
00283     unused = 0;
00284     state = 0;
00285     outgoing_control.clear();
00286     outgoing_small_control.clear();
00287     control_data_offset = 0;
00288     did_small_control_set_data = 0;
00289     did_large_control_set_data = 0;
00290     did_small_control_msgs = 0;
00291     did_large_control_msgs = 0;
00292     did_freespace_msgs = 0;
00293   }
00294   uint16_t calc_control_bytes_cksum()
00295   {
00296     uint16_t cksum = 0;
00297     char *p = (char *) &descriptor[count];
00298     char *endp = p + control_bytes;
00299     while (p < endp) {
00300       cksum += *p;
00301       ++p;
00302     }
00303     return cksum;
00304   }
00305   uint16_t calc_descriptor_cksum()
00306   {
00307     uint16_t cksum = 0;
00308     char *p = (char *) &descriptor[0];
00309     char *endp = (char *) &descriptor[count];
00310     while (p < endp) {
00311       cksum += *p;
00312       ++p;
00313     }
00314     return cksum;
00315   }
00316 ClusterMsg():descriptor(NULL), iob_descriptor_block(NULL), count(0),
00317     control_bytes(0),
00318     descriptor_cksum(0), control_bytes_cksum(0),
00319     unused(0), state(0),
00320     control_data_offset(0),
00321     did_small_control_set_data(0),
00322     did_large_control_set_data(0), did_small_control_msgs(0), did_large_control_msgs(0), did_freespace_msgs(0) {
00323   }
00324 
00325 };
00326 
00327 
00328 
00329 
00330 struct ClusterHandler;
00331 struct ClusterState: public Continuation
00332 {
00333   ClusterHandler *ch;
00334   bool read_channel;
00335   bool do_iodone_event;         
00336   int n_descriptors;
00337   ClusterMsg msg;
00338   unsigned int sequence_number;
00339   int to_do;                    
00340   int did;                      
00341   int n_iov;                    
00342   int io_complete;              
00343   int io_complete_event;        
00344   VIO *v;                       
00345   int bytes_xfered;             
00346   int last_ndone;               
00347   int total_bytes_xfered;
00348   IOVec *iov;                   
00349   Ptr<IOBufferData> iob_iov;
00350 
00351   
00352   char *byte_bank;              
00353   int n_byte_bank;              
00354   int byte_bank_size;           
00355 
00356   int missed;
00357   bool missed_msg;
00358   ink_hrtime last_time;
00359   ink_hrtime start_time;
00360 
00361   Ptr<IOBufferBlock> block[MAX_TCOUNT];
00362   class MIOBuffer *mbuf;
00363   int state;                    
00364 
00365 
00366   enum
00367   {
00368     READ_START = 1,
00369     READ_HEADER,
00370     READ_AWAIT_HEADER,
00371     READ_SETUP_DESCRIPTOR,
00372     READ_DESCRIPTOR,
00373     READ_AWAIT_DESCRIPTOR,
00374     READ_SETUP_DATA,
00375     READ_DATA,
00376     READ_AWAIT_DATA,
00377     READ_POST_COMPLETE,
00378     READ_COMPLETE
00379   } read_state_t;
00380 
00381   enum
00382   {
00383     WRITE_START = 1,
00384     WRITE_SETUP,
00385     WRITE_INITIATE,
00386     WRITE_AWAIT_COMPLETION,
00387     WRITE_POST_COMPLETE,
00388     WRITE_COMPLETE
00389   } write_state_t;
00390 
00391   ClusterState(ClusterHandler *, bool);
00392   ~ClusterState();
00393   IOBufferData *get_data();
00394   void build_do_io_vector();
00395   int doIO();
00396   int doIO_read_event(int, void *);
00397   int doIO_write_event(int, void *);
00398   void IOComplete();
00399 };
00400 
00401 
00402 
00403 
00404 
00405 struct ClusterHandlerBase: public Continuation
00406 {
00407   
00408   
00409   
00410   Queue<ClusterVConnectionBase, ClusterVConnection::Link_read_link> *read_vcs;
00411   Queue<ClusterVConnectionBase, ClusterVConnection::Link_write_link> *write_vcs;
00412   int cur_vcs;
00413   int min_priority;
00414   Event *trigger_event;
00415 
00416   ClusterHandlerBase():Continuation(NULL), read_vcs(NULL), write_vcs(NULL), cur_vcs(0), min_priority(1)
00417   {
00418   }
00419 };
00420 
00421 struct ClusterHandler:public ClusterHandlerBase
00422 {
00423 #ifdef MSG_TRACE
00424   FILE *t_fd;
00425 #endif
00426   NetVConnection *net_vc;
00427   EThread *thread;
00428   unsigned int ip;
00429   int port;
00430   char *hostname;
00431   ClusterMachine *machine;
00432   int ifd;
00433   int id;
00434   bool dead;
00435   bool downing;
00436 
00437   int32_t active;                 
00438   bool on_stolen_thread;
00439 
00440   struct ChannelData
00441   {
00442     int channel_number;
00443     LINK(ChannelData, link);
00444   };
00445 
00446   int n_channels;
00447   ClusterVConnection **channels;
00448   struct ChannelData **channel_data;
00449   Queue<ChannelData> free_local_channels;
00450 
00451   bool connector;
00452   int cluster_connect_state;    
00453   ClusterHelloMessage clusteringVersion;
00454   ClusterHelloMessage nodeClusteringVersion;
00455   bool needByteSwap;
00456   int configLookupFails;
00457 
00458 #define CONFIG_LOOKUP_RETRIES   10
00459 
00460   enum
00461   {
00462     CLCON_INITIAL = 1,
00463     CLCON_SEND_MSG,
00464     CLCON_SEND_MSG_COMPLETE,
00465     CLCON_READ_MSG,
00466     CLCON_READ_MSG_COMPLETE,
00467     CLCON_VALIDATE_MSG,
00468     CLCON_CONN_BIND_CLEAR,
00469     CLCON_CONN_BIND,
00470     CLCON_CONN_BIND_OK,
00471     CLCON_ABORT_CONNECT,
00472     CLCON_DELETE_CONNECT
00473   } clcon_state_t;
00474 
00475   InkAtomicList outgoing_control_al[CLUSTER_CMSG_QUEUES];
00476   InkAtomicList external_incoming_control;
00477   InkAtomicList external_incoming_open_local;
00478   ClusterCalloutContinuation * callout_cont[MAX_COMPLETION_CALLBACK_EVENTS];
00479   Event *callout_events[MAX_COMPLETION_CALLBACK_EVENTS];
00480   Event *cluster_periodic_event;
00481   Queue<OutgoingControl> outgoing_control[CLUSTER_CMSG_QUEUES];
00482   Queue<IncomingControl> incoming_control;
00483   InkAtomicList read_vcs_ready;
00484   InkAtomicList write_vcs_ready;
00485   ClusterState read;
00486   ClusterState write;
00487 
00488   ink_hrtime current_time;
00489   ink_hrtime last;
00490   ink_hrtime last_report;
00491   int n_since_last_report;
00492   ink_hrtime last_cluster_op_enable;
00493   ink_hrtime last_trace_dump;
00494 
00495   DLL<ClusterVConnectionBase> delayed_reads;
00496   ClusterLoadMonitor *clm;
00497   bool disable_remote_cluster_ops;
00498 
00499   
00500   int pw_write_descriptors_built;
00501   int pw_freespace_descriptors_built;
00502   int pw_controldata_descriptors_built;
00503   int pw_time_expired;
00504   bool started_on_stolen_thread;
00505   bool control_message_write;
00506 
00507 #ifdef CLUSTER_STATS
00508     Ptr<IOBufferBlock> message_blk;
00509 
00510   int64_t _vc_writes;
00511   int64_t _vc_write_bytes;
00512   int64_t _control_write_bytes;
00513   int _dw_missed_lock;
00514   int _dw_not_enabled;
00515   int _dw_wait_remote_fill;
00516   int _dw_no_active_vio;
00517   int _dw_not_enabled_or_no_write;
00518   int _dw_set_data_pending;
00519   int _dw_no_free_space;
00520   int _fw_missed_lock;
00521   int _fw_not_enabled;
00522   int _fw_wait_remote_fill;
00523   int _fw_no_active_vio;
00524   int _fw_not_enabled_or_no_read;
00525   int _process_read_calls;
00526   int _n_read_start;
00527   int _n_read_header;
00528   int _n_read_await_header;
00529   int _n_read_setup_descriptor;
00530   int _n_read_descriptor;
00531   int _n_read_await_descriptor;
00532   int _n_read_setup_data;
00533   int _n_read_data;
00534   int _n_read_await_data;
00535   int _n_read_post_complete;
00536   int _n_read_complete;
00537   int _process_write_calls;
00538   int _n_write_start;
00539   int _n_write_setup;
00540   int _n_write_initiate;
00541   int _n_write_await_completion;
00542   int _n_write_post_complete;
00543   int _n_write_complete;
00544 
00545   void clear_cluster_stats()
00546   {
00547     _vc_writes = 0;
00548     _vc_write_bytes = 0;
00549     _control_write_bytes = 0;
00550     _dw_missed_lock = 0;
00551     _dw_not_enabled = 0;
00552     _dw_wait_remote_fill = 0;
00553     _dw_no_active_vio = 0;
00554     _dw_not_enabled_or_no_write = 0;
00555     _dw_set_data_pending = 0;
00556     _dw_no_free_space = 0;
00557     _fw_missed_lock = 0;
00558     _fw_not_enabled = 0;
00559     _fw_wait_remote_fill = 0;
00560     _fw_no_active_vio = 0;
00561     _fw_not_enabled_or_no_read = 0;
00562     _process_read_calls = 0;
00563     _n_read_start = 0;
00564     _n_read_header = 0;
00565     _n_read_await_header = 0;
00566     _n_read_setup_descriptor = 0;
00567     _n_read_descriptor = 0;
00568     _n_read_await_descriptor = 0;
00569     _n_read_setup_data = 0;
00570     _n_read_data = 0;
00571     _n_read_await_data = 0;
00572     _n_read_post_complete = 0;
00573     _n_read_complete = 0;
00574     _process_write_calls = 0;
00575     _n_write_start = 0;
00576     _n_write_setup = 0;
00577     _n_write_initiate = 0;
00578     _n_write_await_completion = 0;
00579     _n_write_post_complete = 0;
00580     _n_write_complete = 0;
00581   }
00582 #endif                          // CLUSTER_STATS
00583 
00584   ClusterHandler();
00585   ~ClusterHandler();
00586   bool check_channel(int c);
00587   int alloc_channel(ClusterVConnection * vc, int requested_channel = 0);
00588   void free_channel(ClusterVConnection * vc);
00589 
00590 
00591 
00592 
00593   inline bool local_channel(int i)
00594   {
00595     return !connector == !(i & 1);
00596   }
00597 
00598   void close_ClusterVConnection(ClusterVConnection *);
00599   int cluster_signal_and_update(int event, ClusterVConnection * vc, ClusterVConnState * s);
00600   int cluster_signal_and_update_locked(int event, ClusterVConnection * vc, ClusterVConnState * s);
00601   int cluster_signal_error_and_update(ClusterVConnection * vc, ClusterVConnState * s, int lerrno);
00602   void close_free_lock(ClusterVConnection *, ClusterVConnState *);
00603 
00604 #define CLUSTER_READ       true
00605 #define CLUSTER_WRITE      false
00606 
00607   bool build_data_vector(char *, int, bool);
00608   bool build_initial_vector(bool);
00609 
00610   void add_to_byte_bank(ClusterVConnection *);
00611   void update_channels_read();
00612   int process_incoming_callouts(ProxyMutex *);
00613   void update_channels_partial_read();
00614   void process_set_data_msgs();
00615   void process_small_control_msgs();
00616   void process_large_control_msgs();
00617   void process_freespace_msgs();
00618   bool complete_channel_read(int, ClusterVConnection * vc);
00619   void finish_delayed_reads();
00620   
00621 
00622   void update_channels_written();
00623 
00624   int build_write_descriptors();
00625   int build_freespace_descriptors();
00626   int build_controlmsg_descriptors();
00627   int add_small_controlmsg_descriptors();
00628   int valid_for_data_write(ClusterVConnection * vc);
00629   int valid_for_freespace_write(ClusterVConnection * vc);
00630 
00631   int machine_down();
00632   int remote_close(ClusterVConnection * vc, ClusterVConnState * ns);
00633   void steal_thread(EThread * t);
00634 
00635 #define CLUSTER_FREE_ALL_LOCKS -1
00636   void free_locks(bool read_flag, int i = CLUSTER_FREE_ALL_LOCKS);
00637   bool get_read_locks();
00638   bool get_write_locks();
00639   int zombify(Event * e = NULL);        
00640 
00641   int connectClusterEvent(int event, Event * e);
00642   int startClusterEvent(int event, Event * e);
00643   int mainClusterEvent(int event, Event * e);
00644   int beginClusterEvent(int event, Event * e);
00645   int zombieClusterEvent(int event, Event * e);
00646   int protoZombieEvent(int event, Event * e);
00647 
00648   void vcs_push(ClusterVConnection * vc, int type);
00649   bool vc_ok_read(ClusterVConnection *);
00650   bool vc_ok_write(ClusterVConnection *);
00651   int do_open_local_requests();
00652   void swap_descriptor_bytes();
00653   int process_read(ink_hrtime);
00654   int process_write(ink_hrtime, bool);
00655 
00656   void dump_write_msg(int);
00657   void dump_read_msg();
00658   int compute_active_channels();
00659   void dump_internal_data();
00660 
00661 #ifdef CLUSTER_IMMEDIATE_NETIO
00662   void build_poll(bool);
00663 #endif
00664 };
00665 
00666 
00667 #define VALID_CHANNEL(vc) (vc && !(((uintptr_t) vc) & 1))
00668 
00669 
00670 extern ClassAllocator<OutgoingControl> outControlAllocator;
00671 
00672 
00673 extern ClassAllocator<IncomingControl> inControlAllocator;
00674 
00675 #endif