• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

P_ClusterHandler.h

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 
00025 /****************************************************************************
00026 
00027   ClusterHandler.h
00028 ****************************************************************************/
00029 
00030 #ifndef _P_ClusterHandler_h
00031 #define _P_ClusterHandler_h
00032 
00033 
00034 class ClusterLoadMonitor;
00035 
00036 struct ClusterCalloutContinuation;
00037 typedef int (ClusterCalloutContinuation::*ClstCoutContHandler) (int, void *);
00038 
00039 struct ClusterCalloutContinuation:public Continuation
00040 {
00041   struct ClusterHandler *_ch;
00042 
00043   int CalloutHandler(int event, Event * e);
00044     ClusterCalloutContinuation(struct ClusterHandler *ch);
00045    ~ClusterCalloutContinuation();
00046 };
00047 
00048 struct ClusterControl: public Continuation
00049 {
00050   int len; // TODO: Should this be 64-bit ?
00051   char size_index;
00052   int64_t *real_data;
00053   char *data;
00054   void (*free_proc) (void *);
00055   void *free_proc_arg;
00056     Ptr<IOBufferBlock> iob_block;
00057 
00058   IOBufferBlock *get_block()
00059   {
00060     return iob_block;
00061   }
00062   bool fast_data()
00063   {
00064     return (len <= MAX_FAST_CONTROL_MESSAGE);
00065   }
00066   bool valid_alloc_data()
00067   {
00068     return iob_block && real_data && data;
00069   }
00070 
00071   enum
00072   {
00073     // DATA_HDR = size_index (1 byte) + magicno (1 byte) + sizeof(this)
00074 
00075     DATA_HDR = (sizeof(int64_t) * 2)      // must be multiple of sizeof(int64_t)
00076   };
00077 
00078   ClusterControl();
00079   void real_alloc_data(int, bool);
00080   void free_data();
00081   virtual void freeall() = 0;
00082 };
00083 
00084 struct OutgoingControl: public ClusterControl
00085 {
00086   ClusterHandler *ch;
00087   ink_hrtime submit_time;
00088 
00089   static OutgoingControl *alloc();
00090 
00091     OutgoingControl();
00092   void alloc_data(bool align_int32_on_non_int64_boundary = true) {
00093     real_alloc_data(1, align_int32_on_non_int64_boundary);      /* read access */
00094   }
00095 
00096   void set_data(char *adata, int alen)
00097   {
00098     data = adata;
00099     len = alen;
00100     free_proc = 0;
00101     free_proc_arg = 0;
00102     real_data = 0;
00103 
00104     // Create IOBufferBlock wrapper around passed data.
00105 
00106     iob_block = new_IOBufferBlock();
00107     iob_block->set_internal(adata, alen, BUFFER_SIZE_FOR_XMALLOC(alen));
00108     iob_block->_buf_end = iob_block->end();
00109   }
00110 
00111   void set_data(IOBufferBlock * buf, void (*free_data_proc) (void *), void *free_data_arg)
00112   {
00113     data = buf->data->data();
00114     len = bytes_IOBufferBlockList(buf, 1);      // read avail bytes
00115     free_proc = free_data_proc;
00116     free_proc_arg = free_data_arg;
00117     real_data = 0;
00118     iob_block = buf;
00119   }
00120   int startEvent(int event, Event * e);
00121   virtual void freeall();
00122 };
00123 
00124 //
00125 // incoming control messsage are received by this machine
00126 //
00127 struct IncomingControl: public ClusterControl
00128 {
00129   ink_hrtime recognized_time;
00130 
00131   static IncomingControl *alloc();
00132 
00133     IncomingControl();
00134   void alloc_data(bool align_int32_on_non_int64_boundary = true) {
00135     real_alloc_data(0, align_int32_on_non_int64_boundary);      /* write access */
00136   }
00137   virtual void freeall();
00138 };
00139 
00140 //
00141 // Interface structure for internal_invoke_remote()
00142 //
00143 struct invoke_remote_data_args
00144 {
00145   int32_t magicno;
00146   OutgoingControl *msg_oc;
00147   OutgoingControl *data_oc;
00148   int dest_channel;
00149   ClusterVCToken token;
00150 
00151   enum
00152   {
00153     MagicNo = 0x04141998
00154   };
00155     invoke_remote_data_args():magicno(MagicNo), msg_oc(NULL), data_oc(NULL), dest_channel(0)
00156   {
00157   }
00158 };
00159 
00160 //
00161 // Descriptor of a chunk of a message (see Memo.ClusterIODesign)
00162 // This has been tested for aligment on the Sparc using TestDescriptor.cc
00163 //
00164 
00165 // type
00166 #define CLUSTER_SEND_FREE   0
00167 #define CLUSTER_SEND_DATA   1
00168 #define CLUSTER_SEQUENCE_NUMBER(_x) (((unsigned int)_x)&0xFFFF)
00169 
00170 struct Descriptor
00171 {                               // Note: Over the Wire structure
00172   uint32_t type:1;
00173   uint32_t channel:15;
00174   uint16_t sequence_number;       // lower 16 bits of the ClusterVCToken.seq
00175   uint32_t length;
00176 
00177   inline void SwapBytes()
00178   {
00179     ats_swap16((uint16_t *) this);    // Hack
00180     ats_swap16((uint16_t *) & sequence_number);
00181     ats_swap32((uint32_t *) & length);
00182   }
00183 };
00184 
00185 struct ClusterMsgHeader
00186 {                               // Note: Over the Wire structure
00187   uint16_t count;
00188   uint16_t descriptor_cksum;
00189   uint16_t control_bytes_cksum;
00190   uint16_t unused;
00191   uint32_t control_bytes;
00192   uint32_t count_check;
00193 
00194   void clear()
00195   {
00196     count = 0;
00197     descriptor_cksum = 0;
00198     control_bytes_cksum = 0;
00199     unused = 0;
00200     control_bytes = 0;
00201     count_check = 0;
00202   }
00203   ClusterMsgHeader():count(0), descriptor_cksum(0), control_bytes_cksum(0), unused(0), control_bytes(0), count_check(0)
00204   {
00205   }
00206   inline void SwapBytes()
00207   {
00208     ats_swap16((uint16_t *) & count);
00209     ats_swap16((uint16_t *) & descriptor_cksum);
00210     ats_swap16((uint16_t *) & control_bytes_cksum);
00211     ats_swap16((uint16_t *) & unused);
00212     ats_swap32((uint32_t *) & control_bytes);
00213     ats_swap32((uint32_t *) & count_check);
00214   }
00215 };
00216 
00217 struct ClusterMsg
00218 {
00219   Descriptor *descriptor;
00220     Ptr<IOBufferBlock> iob_descriptor_block;
00221   int count;
00222   int control_bytes;
00223   int descriptor_cksum;
00224   int control_bytes_cksum;
00225   int unused;
00226   int state;                    // Only used by read to denote
00227   //   read phase (count, descriptor, data)
00228     Queue<OutgoingControl> outgoing_control;
00229     Queue<OutgoingControl> outgoing_small_control;
00230     Queue<OutgoingControl> outgoing_callout; // compound msg callbacks
00231 
00232   // read processing usage.
00233   int control_data_offset;
00234   int did_small_control_set_data;
00235   int did_large_control_set_data;
00236   int did_small_control_msgs;
00237   int did_large_control_msgs;
00238   int did_freespace_msgs;
00239 
00240   ClusterMsgHeader *hdr()
00241   {
00242     return (ClusterMsgHeader *) (((char *) descriptor)
00243                                  - sizeof(ClusterMsgHeader));
00244   }
00245 
00246   IOBufferBlock *get_block()
00247   {
00248     return iob_descriptor_block;
00249   }
00250 
00251   IOBufferBlock *get_block_header()
00252   {
00253     int start_offset;
00254 
00255     start_offset = (char *) hdr() - iob_descriptor_block->buf();
00256     iob_descriptor_block->reset();
00257     iob_descriptor_block->next = 0;
00258     iob_descriptor_block->fill(start_offset);
00259     iob_descriptor_block->consume(start_offset);
00260     return iob_descriptor_block;
00261   }
00262 
00263   IOBufferBlock *get_block_descriptor()
00264   {
00265     int start_offset;
00266 
00267     start_offset = ((char *) hdr() + sizeof(ClusterMsgHeader))
00268       - iob_descriptor_block->buf();
00269     iob_descriptor_block->reset();
00270     iob_descriptor_block->next = 0;
00271     iob_descriptor_block->fill(start_offset);
00272     iob_descriptor_block->consume(start_offset);
00273     return iob_descriptor_block;
00274   }
00275 
00276   void clear()
00277   {
00278     hdr()->clear();
00279     count = 0;
00280     control_bytes = 0;
00281     descriptor_cksum = 0;
00282     control_bytes_cksum = 0;
00283     unused = 0;
00284     state = 0;
00285     outgoing_control.clear();
00286     outgoing_small_control.clear();
00287     control_data_offset = 0;
00288     did_small_control_set_data = 0;
00289     did_large_control_set_data = 0;
00290     did_small_control_msgs = 0;
00291     did_large_control_msgs = 0;
00292     did_freespace_msgs = 0;
00293   }
00294   uint16_t calc_control_bytes_cksum()
00295   {
00296     uint16_t cksum = 0;
00297     char *p = (char *) &descriptor[count];
00298     char *endp = p + control_bytes;
00299     while (p < endp) {
00300       cksum += *p;
00301       ++p;
00302     }
00303     return cksum;
00304   }
00305   uint16_t calc_descriptor_cksum()
00306   {
00307     uint16_t cksum = 0;
00308     char *p = (char *) &descriptor[0];
00309     char *endp = (char *) &descriptor[count];
00310     while (p < endp) {
00311       cksum += *p;
00312       ++p;
00313     }
00314     return cksum;
00315   }
00316 ClusterMsg():descriptor(NULL), iob_descriptor_block(NULL), count(0),
00317     control_bytes(0),
00318     descriptor_cksum(0), control_bytes_cksum(0),
00319     unused(0), state(0),
00320     control_data_offset(0),
00321     did_small_control_set_data(0),
00322     did_large_control_set_data(0), did_small_control_msgs(0), did_large_control_msgs(0), did_freespace_msgs(0) {
00323   }
00324 
00325 };
00326 
00327 //
00328 // State for a particular (read/write) direction of a cluster link
00329 //
00330 struct ClusterHandler;
00331 struct ClusterState: public Continuation
00332 {
00333   ClusterHandler *ch;
00334   bool read_channel;
00335   bool do_iodone_event;         // schedule_imm() on i/o complete
00336   int n_descriptors;
00337   ClusterMsg msg;
00338   unsigned int sequence_number;
00339   int to_do;                    // # of bytes to transact
00340   int did;                      // # of bytes transacted
00341   int n_iov;                    // defined iov(s) in this operation
00342   int io_complete;              // current i/o complete
00343   int io_complete_event;        // current i/o complete event
00344   VIO *v;                       // VIO associated with current op
00345   int bytes_xfered;             // bytes xfered at last callback
00346   int last_ndone;               // last do_io ndone
00347   int total_bytes_xfered;
00348   IOVec *iov;                   // io vector for readv, writev
00349   Ptr<IOBufferData> iob_iov;
00350 
00351   // Write byte bank structures
00352   char *byte_bank;              // bytes buffered for transit
00353   int n_byte_bank;              // number of bytes buffered for transit
00354   int byte_bank_size;           // allocated size of byte bank
00355 
00356   int missed;
00357   bool missed_msg;
00358   ink_hrtime last_time;
00359   ink_hrtime start_time;
00360 
00361   Ptr<IOBufferBlock> block[MAX_TCOUNT];
00362   class MIOBuffer *mbuf;
00363   int state;                    // See enum defs below
00364 
00365 
00366   enum
00367   {
00368     READ_START = 1,
00369     READ_HEADER,
00370     READ_AWAIT_HEADER,
00371     READ_SETUP_DESCRIPTOR,
00372     READ_DESCRIPTOR,
00373     READ_AWAIT_DESCRIPTOR,
00374     READ_SETUP_DATA,
00375     READ_DATA,
00376     READ_AWAIT_DATA,
00377     READ_POST_COMPLETE,
00378     READ_COMPLETE
00379   } read_state_t;
00380 
00381   enum
00382   {
00383     WRITE_START = 1,
00384     WRITE_SETUP,
00385     WRITE_INITIATE,
00386     WRITE_AWAIT_COMPLETION,
00387     WRITE_POST_COMPLETE,
00388     WRITE_COMPLETE
00389   } write_state_t;
00390 
00391   ClusterState(ClusterHandler *, bool);
00392   ~ClusterState();
00393   IOBufferData *get_data();
00394   void build_do_io_vector();
00395   int doIO();
00396   int doIO_read_event(int, void *);
00397   int doIO_write_event(int, void *);
00398   void IOComplete();
00399 };
00400 
00401 //
00402 // ClusterHandlerBase superclass for processors with
00403 // bi-directional VConnections.
00404 //
00405 struct ClusterHandlerBase: public Continuation
00406 {
00407   //
00408   // Private
00409   //
00410   Queue<ClusterVConnectionBase, ClusterVConnection::Link_read_link> *read_vcs;
00411   Queue<ClusterVConnectionBase, ClusterVConnection::Link_write_link> *write_vcs;
00412   int cur_vcs;
00413   int min_priority;
00414   Event *trigger_event;
00415 
00416   ClusterHandlerBase():Continuation(NULL), read_vcs(NULL), write_vcs(NULL), cur_vcs(0), min_priority(1)
00417   {
00418   }
00419 };
00420 
00421 struct ClusterHandler:public ClusterHandlerBase
00422 {
00423 #ifdef MSG_TRACE
00424   FILE *t_fd;
00425 #endif
00426   NetVConnection *net_vc;
00427   EThread *thread;
00428   unsigned int ip;
00429   int port;
00430   char *hostname;
00431   ClusterMachine *machine;
00432   int ifd;
00433   int id;
00434   bool dead;
00435   bool downing;
00436 
00437   int32_t active;                 // handler currently running
00438   bool on_stolen_thread;
00439 
00440   struct ChannelData
00441   {
00442     int channel_number;
00443     LINK(ChannelData, link);
00444   };
00445 
00446   int n_channels;
00447   ClusterVConnection **channels;
00448   struct ChannelData **channel_data;
00449   Queue<ChannelData> free_local_channels;
00450 
00451   bool connector;
00452   int cluster_connect_state;    // see clcon_state_t enum
00453   ClusterHelloMessage clusteringVersion;
00454   ClusterHelloMessage nodeClusteringVersion;
00455   bool needByteSwap;
00456   int configLookupFails;
00457 
00458 #define CONFIG_LOOKUP_RETRIES   10
00459 
00460   enum
00461   {
00462     CLCON_INITIAL = 1,
00463     CLCON_SEND_MSG,
00464     CLCON_SEND_MSG_COMPLETE,
00465     CLCON_READ_MSG,
00466     CLCON_READ_MSG_COMPLETE,
00467     CLCON_VALIDATE_MSG,
00468     CLCON_CONN_BIND_CLEAR,
00469     CLCON_CONN_BIND,
00470     CLCON_CONN_BIND_OK,
00471     CLCON_ABORT_CONNECT,
00472     CLCON_DELETE_CONNECT
00473   } clcon_state_t;
00474 
00475   InkAtomicList outgoing_control_al[CLUSTER_CMSG_QUEUES];
00476   InkAtomicList external_incoming_control;
00477   InkAtomicList external_incoming_open_local;
00478   ClusterCalloutContinuation * callout_cont[MAX_COMPLETION_CALLBACK_EVENTS];
00479   Event *callout_events[MAX_COMPLETION_CALLBACK_EVENTS];
00480   Event *cluster_periodic_event;
00481   Queue<OutgoingControl> outgoing_control[CLUSTER_CMSG_QUEUES];
00482   Queue<IncomingControl> incoming_control;
00483   InkAtomicList read_vcs_ready;
00484   InkAtomicList write_vcs_ready;
00485   ClusterState read;
00486   ClusterState write;
00487 
00488   ink_hrtime current_time;
00489   ink_hrtime last;
00490   ink_hrtime last_report;
00491   int n_since_last_report;
00492   ink_hrtime last_cluster_op_enable;
00493   ink_hrtime last_trace_dump;
00494 
00495   DLL<ClusterVConnectionBase> delayed_reads;
00496   ClusterLoadMonitor *clm;
00497   bool disable_remote_cluster_ops;
00498 
00499   // process_write() state data
00500   int pw_write_descriptors_built;
00501   int pw_freespace_descriptors_built;
00502   int pw_controldata_descriptors_built;
00503   int pw_time_expired;
00504   bool started_on_stolen_thread;
00505   bool control_message_write;
00506 
00507 #ifdef CLUSTER_STATS
00508     Ptr<IOBufferBlock> message_blk;
00509 
00510   int64_t _vc_writes;
00511   int64_t _vc_write_bytes;
00512   int64_t _control_write_bytes;
00513   int _dw_missed_lock;
00514   int _dw_not_enabled;
00515   int _dw_wait_remote_fill;
00516   int _dw_no_active_vio;
00517   int _dw_not_enabled_or_no_write;
00518   int _dw_set_data_pending;
00519   int _dw_no_free_space;
00520   int _fw_missed_lock;
00521   int _fw_not_enabled;
00522   int _fw_wait_remote_fill;
00523   int _fw_no_active_vio;
00524   int _fw_not_enabled_or_no_read;
00525   int _process_read_calls;
00526   int _n_read_start;
00527   int _n_read_header;
00528   int _n_read_await_header;
00529   int _n_read_setup_descriptor;
00530   int _n_read_descriptor;
00531   int _n_read_await_descriptor;
00532   int _n_read_setup_data;
00533   int _n_read_data;
00534   int _n_read_await_data;
00535   int _n_read_post_complete;
00536   int _n_read_complete;
00537   int _process_write_calls;
00538   int _n_write_start;
00539   int _n_write_setup;
00540   int _n_write_initiate;
00541   int _n_write_await_completion;
00542   int _n_write_post_complete;
00543   int _n_write_complete;
00544 
00545   void clear_cluster_stats()
00546   {
00547     _vc_writes = 0;
00548     _vc_write_bytes = 0;
00549     _control_write_bytes = 0;
00550     _dw_missed_lock = 0;
00551     _dw_not_enabled = 0;
00552     _dw_wait_remote_fill = 0;
00553     _dw_no_active_vio = 0;
00554     _dw_not_enabled_or_no_write = 0;
00555     _dw_set_data_pending = 0;
00556     _dw_no_free_space = 0;
00557     _fw_missed_lock = 0;
00558     _fw_not_enabled = 0;
00559     _fw_wait_remote_fill = 0;
00560     _fw_no_active_vio = 0;
00561     _fw_not_enabled_or_no_read = 0;
00562     _process_read_calls = 0;
00563     _n_read_start = 0;
00564     _n_read_header = 0;
00565     _n_read_await_header = 0;
00566     _n_read_setup_descriptor = 0;
00567     _n_read_descriptor = 0;
00568     _n_read_await_descriptor = 0;
00569     _n_read_setup_data = 0;
00570     _n_read_data = 0;
00571     _n_read_await_data = 0;
00572     _n_read_post_complete = 0;
00573     _n_read_complete = 0;
00574     _process_write_calls = 0;
00575     _n_write_start = 0;
00576     _n_write_setup = 0;
00577     _n_write_initiate = 0;
00578     _n_write_await_completion = 0;
00579     _n_write_post_complete = 0;
00580     _n_write_complete = 0;
00581   }
00582 #endif                          // CLUSTER_STATS
00583 
00584   ClusterHandler();
00585   ~ClusterHandler();
00586   bool check_channel(int c);
00587   int alloc_channel(ClusterVConnection * vc, int requested_channel = 0);
00588   void free_channel(ClusterVConnection * vc);
00589 //
00590 //  local_channel()
00591 //  - Initiator node-node TCP socket  &&  Odd channel  => Local Channel
00592 //  - !Initiator node-node TCP socket &&  Even channel => Local Channel
00593   inline bool local_channel(int i)
00594   {
00595     return !connector == !(i & 1);
00596   }
00597 
00598   void close_ClusterVConnection(ClusterVConnection *);
00599   int cluster_signal_and_update(int event, ClusterVConnection * vc, ClusterVConnState * s);
00600   int cluster_signal_and_update_locked(int event, ClusterVConnection * vc, ClusterVConnState * s);
00601   int cluster_signal_error_and_update(ClusterVConnection * vc, ClusterVConnState * s, int lerrno);
00602   void close_free_lock(ClusterVConnection *, ClusterVConnState *);
00603 
00604 #define CLUSTER_READ       true
00605 #define CLUSTER_WRITE      false
00606 
00607   bool build_data_vector(char *, int, bool);
00608   bool build_initial_vector(bool);
00609 
00610   void add_to_byte_bank(ClusterVConnection *);
00611   void update_channels_read();
00612   int process_incoming_callouts(ProxyMutex *);
00613   void update_channels_partial_read();
00614   void process_set_data_msgs();
00615   void process_small_control_msgs();
00616   void process_large_control_msgs();
00617   void process_freespace_msgs();
00618   bool complete_channel_read(int, ClusterVConnection * vc);
00619   void finish_delayed_reads();
00620   // returns: false if the channel was closed
00621 
00622   void update_channels_written();
00623 
00624   int build_write_descriptors();
00625   int build_freespace_descriptors();
00626   int build_controlmsg_descriptors();
00627   int add_small_controlmsg_descriptors();
00628   int valid_for_data_write(ClusterVConnection * vc);
00629   int valid_for_freespace_write(ClusterVConnection * vc);
00630 
00631   int machine_down();
00632   int remote_close(ClusterVConnection * vc, ClusterVConnState * ns);
00633   void steal_thread(EThread * t);
00634 
00635 #define CLUSTER_FREE_ALL_LOCKS -1
00636   void free_locks(bool read_flag, int i = CLUSTER_FREE_ALL_LOCKS);
00637   bool get_read_locks();
00638   bool get_write_locks();
00639   int zombify(Event * e = NULL);        // optional event to use
00640 
00641   int connectClusterEvent(int event, Event * e);
00642   int startClusterEvent(int event, Event * e);
00643   int mainClusterEvent(int event, Event * e);
00644   int beginClusterEvent(int event, Event * e);
00645   int zombieClusterEvent(int event, Event * e);
00646   int protoZombieEvent(int event, Event * e);
00647 
00648   void vcs_push(ClusterVConnection * vc, int type);
00649   bool vc_ok_read(ClusterVConnection *);
00650   bool vc_ok_write(ClusterVConnection *);
00651   int do_open_local_requests();
00652   void swap_descriptor_bytes();
00653   int process_read(ink_hrtime);
00654   int process_write(ink_hrtime, bool);
00655 
00656   void dump_write_msg(int);
00657   void dump_read_msg();
00658   int compute_active_channels();
00659   void dump_internal_data();
00660 
00661 #ifdef CLUSTER_IMMEDIATE_NETIO
00662   void build_poll(bool);
00663 #endif
00664 };
00665 
00666 // Valid (ClusterVConnection *) in ClusterHandler.channels[]
00667 #define VALID_CHANNEL(vc) (vc && !(((uintptr_t) vc) & 1))
00668 
00669 // outgoing control continuations
00670 extern ClassAllocator<OutgoingControl> outControlAllocator;
00671 
00672 // incoming control descriptors
00673 extern ClassAllocator<IncomingControl> inControlAllocator;
00674 
00675 #endif /* _ClusterHandler_h */

Generated by  doxygen 1.7.1