00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 #include "P_ClusterCache.h"
00030 
00031 #ifndef _P_ClusterInternal_h
00032 #define _P_ClusterInternal_h
00033 
00034 
00035 
00036 
00037 #define CLUSTER_THREAD_STEALING         1
00038 #define CLUSTER_TOMCAT                  1
00039 #define CLUSTER_STATS                   1
00040 
00041 
00042 #define ALIGN_DOUBLE(_p)   ((((uintptr_t) (_p)) + 7) & ~7)
00043 #define ALLOCA_DOUBLE(_sz) ALIGN_DOUBLE(alloca((_sz) + 8))
00044 
00045 
00046 
00047 
00048 
00049 #define MAX_TCOUNT               128
00050 #define CONTROL_DATA             (128*1024)
00051 #define READ_BANK_BUF_SIZE       DEFAULT_MAX_BUFFER_SIZE
00052 #define READ_BANK_BUF_INDEX      (DEFAULT_BUFFER_SIZES-1)
00053 #define ALLOC_DATA_MAGIC         0xA5   // 8 bits in size
00054 #define READ_LOCK_SPIN_COUNT     1
00055 #define WRITE_LOCK_SPIN_COUNT    1
00056 
00057 
00058 
00059 
00060   
00061   
00062 #define CLUSTER_BUCKETS          64
00063 #define CLUSTER_PERIOD           HRTIME_MSECONDS(10)
00064 
00065   
00066 #define CLUSTER_MAX_RUN_TIME    HRTIME_MSECONDS(100)
00067   
00068 #define CLUSTER_MAX_THREAD_STEAL_TIME   HRTIME_MSECONDS(10)
00069 
00070   
00071 #define MIN_CHANNELS             4096
00072 #define MAX_CHANNELS             ((32*1024) - 1)        // 15 bits in Descriptor
00073 
00074 #define CLUSTER_CONTROL_CHANNEL  0
00075 #define LAST_DEDICATED_CHANNEL   0
00076 
00077 #define CLUSTER_PHASES           1
00078 
00079 #define CLUSTER_INITIAL_PRIORITY CLUSTER_PHASES
00080   
00081   
00082 #define CLUSTER_BUMP_LENGTH      1
00083 #define CLUSTER_MEMBER_DELAY     HRTIME_SECONDS(1)
00084   
00085   
00086 #ifdef CLUSTER_TEST_DEBUG
00087 #define CLUSTER_CONNECT_TIMEOUT  HRTIME_SECONDS(65536)
00088 #else
00089 #define CLUSTER_CONNECT_TIMEOUT  HRTIME_SECONDS(10)
00090 #endif
00091 #define CLUSTER_CONNECT_RETRY    HRTIME_MSECONDS(20)
00092 #define CLUSTER_RETRY            HRTIME_MSECONDS(10)
00093 #define CLUSTER_DELAY_BETWEEN_WRITES HRTIME_MSECONDS(10)
00094 
00095   
00096 #ifdef CLUSTER_TEST_DEBUG
00097 #define CLUSTER_CHANNEL_INACTIVITY_TIMEOUT (65536 * HRTIME_SECONDS(60))
00098 #else
00099 #define CLUSTER_CHANNEL_INACTIVITY_TIMEOUT (10 * HRTIME_SECONDS(60))
00100 #endif
00101 
00102   
00103 #define COMPLETION_CALLBACK_PERIOD      HRTIME_MSECONDS(10)
00104 #define MAX_COMPLETION_CALLBACK_EVENTS  16
00105 
00106   
00107 #define CLUSTER_ACTIVE           1
00108 #define CLUSTER_NOT_ACTIVE       0
00109 
00110   
00111 #define FORCE_CLOSE_ON_OPEN_CHANNEL     -2
00112 
00113   
00114 #define MACHINE_CONFIG          0
00115 #define CLUSTER_CONFIG          1
00116 
00117 
00118 #define CL_NOTE         "cluster_note"
00119 #define CL_WARN         "cluster_warn"
00120 #define CL_PROTO        "cluster_proto"
00121 #define CL_TRACE        "cluster_trace"
00122 
00123 
00124 
00125 
00126 #define MAX_FAST_CONTROL_MESSAGE 504    // 512 - 4 (cluster func #) - 4 align
00127 #define SMALL_CONTROL_MESSAGE    MAX_FAST_CONTROL_MESSAGE       // copied instead
00128                                                            
00129 #define WRITE_MESSAGE_ALREADY_BUILT -1
00130 
00131 #define MAGIC_COUNT(_x) \
00132 (0xBADBAD ^ ~(uint32_t)_x.msg.count \
00133  ^ ~(uint32_t)_x.msg.descriptor_cksum \
00134  ^ ~(uint32_t)_x.msg.control_bytes_cksum \
00135  ^ ~(uint32_t)_x.msg.unused \
00136  ^ ~((uint32_t)_x.msg.control_bytes << 16) ^_x.sequence_number)
00137 
00138 #define DOUBLE_ALIGN(_x)    ((((uintptr_t)_x)+7)&~7)
00139 
00140 
00141 
00142 
00143 #define MISS_TEST                0
00144 #define TEST_PARTIAL_WRITES      0
00145 #define TEST_PARTIAL_READS       0
00146 #define TEST_TIMING              0
00147 #define TEST_READ_LOCKS_MISSED   0
00148 #define TEST_WRITE_LOCKS_MISSED  0
00149 #define TEST_ENTER_EXIT          0
00150 #define TEST_ENTER_EXIT          0
00151 
00152 
00153 
00154 
00155 #if TEST_TIMING
00156 #define TTTEST(_x) \
00157 fprintf(stderr, _x " at: %u\n", \
00158         ((unsigned int)(ink_get_hrtime()/HRTIME_MSECOND)) % 1000)
00159 #define TTEST(_x) \
00160 fprintf(stderr, _x " for: %d at: %u\n", vc->channel, \
00161         ((unsigned int)(ink_get_hrtime()/HRTIME_MSECOND)) % 1000)
00162 #define TIMEOUT_TESTS(_s,_d) \
00163     if (*(int*)_d == 8)  \
00164       fprintf(stderr,_s" lookup  %d\n", *(int*)(_d+20)); \
00165     else if (*(int*)_d == 10) \
00166       fprintf(stderr,_s" op %d %d\n", *(int*)(_d+36), \
00167               *(int*)(_d+40)); \
00168     else if (*(int*)_d == 11) \
00169       fprintf(stderr,_s" rop %d %d\n", *(int*)(_d+4), \
00170               *(int*)(_d+8))
00171 #else
00172 #define TTTEST(_x)
00173 #define TTEST(_x)
00174 #define TIMEOUT_TESTS(_x,_y)
00175 #endif
00176 
00177 #if (TEST_READ_LOCKS_MISSED || TEST_WRITE_LOCKS_MISSED)
00178 static unsigned int test_cluster_locks_missed = 0;
00179 static
00180 test_cluster_lock_might_fail()
00181 {
00182   return (!(rand_r(&test_cluster_locks_missed) % 13));
00183 }
00184 #endif
00185 #if TEST_READ_LOCKS_MISSED
00186 #define TEST_READ_LOCK_MIGHT_FAIL test_cluster_lock_might_fail()
00187 #else
00188 #define TEST_READ_LOCK_MIGHT_FAIL false
00189 #endif
00190 #if TEST_WRITE_LOCKS_MISSED
00191 #define TEST_WRITE_LOCK_MIGHT_FAIL test_cluster_lock_might_fail()
00192 #else
00193 #define TEST_WRITE_LOCK_MIGHT_FAIL false
00194 #endif
00195 
00196 #if TEST_ENTER_EXIT
00197 struct enter_exit_class
00198 {
00199   int *outv;
00200     enter_exit_class(int *in, int *out):outv(out)
00201   {
00202     (*in)++;
00203   }
00204    ~enter_exit_class()
00205   {
00206     (*outv)++;
00207   }
00208 };
00209 
00210 #define enter_exit(_x,_y) enter_exit_class a(_x,_y)
00211 #else
00212 #define enter_exit(_x,_y)
00213 #endif
00214 
00215 #define DOT_SEPARATED(_x)                             \
00216 ((unsigned char*)&(_x))[0], ((unsigned char*)&(_x))[1],   \
00217   ((unsigned char*)&(_x))[2], ((unsigned char*)&(_x))[3]
00218 
00219 
00220 
00221 
00222 struct CloseMessage:public ClusterMessageHeader
00223 {
00224   uint32_t channel;
00225   int32_t status;
00226   int32_t lerrno;
00227   uint32_t sequence_number;
00228 
00229   enum
00230   {
00231     MIN_VERSION = 1,
00232     MAX_VERSION = 1,
00233     CLOSE_CHAN_MESSAGE_VERSION = MAX_VERSION
00234   };
00235 
00236     CloseMessage(uint16_t vers = CLOSE_CHAN_MESSAGE_VERSION)
00237 :  ClusterMessageHeader(vers), channel(0), status(0), lerrno(0), sequence_number(0) {
00238   }
00239 
00240   static int protoToVersion(int protoMajor)
00241   {
00242     (void) protoMajor;
00243     return CLOSE_CHAN_MESSAGE_VERSION;
00244   }
00245   static int sizeof_fixedlen_msg()
00246   {
00247     return sizeof(CloseMessage);
00248   }
00249   void init(uint16_t vers = CLOSE_CHAN_MESSAGE_VERSION) {
00250     _init(vers);
00251   }
00252   inline void SwapBytes()
00253   {
00254     if (NeedByteSwap()) {
00255       ats_swap32(&channel);
00256       ats_swap32((uint32_t *) & status);
00257       ats_swap32((uint32_t *) & lerrno);
00258       ats_swap32(&sequence_number);
00259     }
00260   }
00261 
00262 };
00263 
00264 
00265 
00266 
00267 struct MachineListMessage:public ClusterMessageHeader
00268 {
00269   uint32_t n_ip;                  
00270   uint32_t ip[CLUSTER_MAX_MACHINES];      
00271 
00272   enum
00273   {
00274     MIN_VERSION = 1,
00275     MAX_VERSION = 1,
00276     MACHINE_LIST_MESSAGE_VERSION = MAX_VERSION
00277   };
00278 
00279     MachineListMessage():ClusterMessageHeader(MACHINE_LIST_MESSAGE_VERSION), n_ip(0)
00280   {
00281     memset(ip, 0, sizeof(ip));
00282   }
00283 
00284   static int protoToVersion(int protoMajor)
00285   {
00286     (void) protoMajor;
00287     return MACHINE_LIST_MESSAGE_VERSION;
00288   }
00289   static int sizeof_fixedlen_msg()
00290   {
00291     return sizeof(ClusterMessageHeader);
00292   }
00293   void init(uint16_t vers = MACHINE_LIST_MESSAGE_VERSION) {
00294     _init(vers);
00295   }
00296   inline void SwapBytes()
00297   {
00298     ats_swap32(&n_ip);
00299   }
00300 
00301 };
00302 
00303 
00304 
00305 
00306 struct SetChanDataMessage:public ClusterMessageHeader
00307 {
00308   uint32_t channel;
00309   uint32_t sequence_number;
00310   uint32_t data_type;             
00311   char data[4];
00312 
00313   enum
00314   {
00315     MIN_VERSION = 1,
00316     MAX_VERSION = 1,
00317     SET_CHANNEL_DATA_MESSAGE_VERSION = MAX_VERSION
00318   };
00319 
00320     SetChanDataMessage(uint16_t vers = SET_CHANNEL_DATA_MESSAGE_VERSION)
00321 :  ClusterMessageHeader(vers), channel(0), sequence_number(0), data_type(0) {
00322     memset(data, 0, sizeof(data));
00323   }
00324 
00325   static int protoToVersion(int protoMajor)
00326   {
00327     (void) protoMajor;
00328     return SET_CHANNEL_DATA_MESSAGE_VERSION;
00329   }
00330   static int sizeof_fixedlen_msg()
00331   {
00332     SetChanDataMessage *p = 0;
00333     return (int) DOUBLE_ALIGN((int64_t) ((char *) &p->data[0] - (char *) p));
00334   }
00335   void init(uint16_t vers = SET_CHANNEL_DATA_MESSAGE_VERSION) {
00336     _init(vers);
00337   }
00338   inline void SwapBytes()
00339   {
00340     if (NeedByteSwap()) {
00341       ats_swap32(&channel);
00342       ats_swap32(&sequence_number);
00343       ats_swap32(&data_type);
00344     }
00345   }
00346 
00347 };
00348 
00349 
00350 
00351 
00352 struct SetChanPinMessage:public ClusterMessageHeader
00353 {
00354   uint32_t channel;
00355   uint32_t sequence_number;
00356   uint32_t pin_time;
00357 
00358   enum
00359   {
00360     MIN_VERSION = 1,
00361     MAX_VERSION = 1,
00362     SET_CHANNEL_PIN_MESSAGE_VERSION = MAX_VERSION
00363   };
00364 
00365     SetChanPinMessage(uint16_t vers = SET_CHANNEL_PIN_MESSAGE_VERSION)
00366 :  ClusterMessageHeader(vers), channel(0), sequence_number(0), pin_time(0) {
00367   }
00368 
00369   static int protoToVersion(int protoMajor)
00370   {
00371     (void) protoMajor;
00372     return SET_CHANNEL_PIN_MESSAGE_VERSION;
00373   }
00374   static int sizeof_fixedlen_msg()
00375   {
00376     return (int) sizeof(SetChanPinMessage);
00377   }
00378   void init(uint16_t vers = SET_CHANNEL_PIN_MESSAGE_VERSION) {
00379     _init(vers);
00380   }
00381   inline void SwapBytes()
00382   {
00383     if (NeedByteSwap()) {
00384       ats_swap32(&channel);
00385       ats_swap32(&sequence_number);
00386       ats_swap32(&pin_time);
00387     }
00388   }
00389 
00390 };
00391 
00392 
00393 
00394 
00395 struct SetChanPriorityMessage:public ClusterMessageHeader
00396 {
00397   uint32_t channel;
00398   uint32_t sequence_number;
00399   uint32_t disk_priority;
00400 
00401   enum
00402   {
00403     MIN_VERSION = 1,
00404     MAX_VERSION = 1,
00405     SET_CHANNEL_PRIORITY_MESSAGE_VERSION = MAX_VERSION
00406   };
00407 
00408     SetChanPriorityMessage(uint16_t vers = SET_CHANNEL_PRIORITY_MESSAGE_VERSION)
00409 :  ClusterMessageHeader(vers), channel(0), sequence_number(0), disk_priority(0) {
00410   }
00411 
00412   static int protoToVersion(int protoMajor)
00413   {
00414     (void) protoMajor;
00415     return SET_CHANNEL_PRIORITY_MESSAGE_VERSION;
00416   }
00417   static int sizeof_fixedlen_msg()
00418   {
00419     return (int) sizeof(SetChanPriorityMessage);
00420   }
00421   void init(uint16_t vers = SET_CHANNEL_PRIORITY_MESSAGE_VERSION) {
00422     _init(vers);
00423   }
00424   inline void SwapBytes()
00425   {
00426     if (NeedByteSwap()) {
00427       ats_swap32(&channel);
00428       ats_swap32(&sequence_number);
00429       ats_swap32(&disk_priority);
00430     }
00431   }
00432 
00433 };
00434 
00435 inline void
00436 SetHighBit(int *val)
00437 {
00438   *val |= (1 << ((sizeof(int) * 8) - 1));
00439 }
00440 
00441 inline void
00442 ClearHighBit(int *val)
00443 {
00444   *val &= ~(1 << ((sizeof(int) * 8) - 1));
00445 }
00446 
00447 inline int
00448 IsHighBitSet(int *val)
00449 {
00450   return (*val & (1 << ((sizeof(int) * 8) - 1)));
00451 }
00452 
00453 
00454 
00455 
00456 
00457 class ClusterAccept:public Continuation
00458 {
00459 public:
00460   ClusterAccept(int *, int, int);
00461   void Init();
00462   void ShutdownDelete();
00463   int ClusterAcceptEvent(int, void *);
00464   int ClusterAcceptMachine(NetVConnection *);
00465 
00466    ~ClusterAccept();
00467 private:
00468 
00469   int *p_cluster_port;
00470   int socket_send_bufsize;
00471   int socket_recv_bufsize;
00472   int current_cluster_port;
00473   Action *accept_action;
00474   Event *periodic_event;
00475 };
00476 
00477 
00478 struct ClusterHandler;
00479 typedef int (ClusterHandler::*ClusterContHandler) (int, void *);
00480 
00481 struct OutgoingControl;
00482 typedef int (OutgoingControl::*OutgoingCtrlHandler) (int, void *);
00483 
00484 struct ClusterVConnection;
00485 typedef int (ClusterVConnection::*ClusterVConnHandler) (int, void *);
00486 
00487 
00488 extern void cluster_set_priority(ClusterHandler *, ClusterVConnState *, int);
00489 extern void cluster_lower_priority(ClusterHandler *, ClusterVConnState *);
00490 extern void cluster_raise_priority(ClusterHandler *, ClusterVConnState *);
00491 extern void cluster_schedule(ClusterHandler *, ClusterVConnection *, ClusterVConnState *);
00492 extern void cluster_reschedule(ClusterHandler *, ClusterVConnection *, ClusterVConnState *);
00493 extern void cluster_reschedule_offset(ClusterHandler *, ClusterVConnection *, ClusterVConnState *, int);
00494 extern void cluster_disable(ClusterHandler *, ClusterVConnection *, ClusterVConnState *);
00495 extern void cluster_update_priority(ClusterHandler *, ClusterVConnection *, ClusterVConnState *, int64_t, int64_t);
00496 #define CLUSTER_BUMP_NO_REMOVE    -1
00497 extern void cluster_bump(ClusterHandler *, ClusterVConnectionBase *, ClusterVConnState *, int);
00498 
00499 extern IOBufferBlock *clone_IOBufferBlockList(IOBufferBlock *, int, int, IOBufferBlock **);
00500 extern IOBufferBlock *consume_IOBufferBlockList(IOBufferBlock *, int64_t);
00501 extern int64_t bytes_IOBufferBlockList(IOBufferBlock *, int64_t);
00502 
00503 
00504 extern void clusterVCAllocator_free(ClusterVConnection * vc);
00505 extern ClassAllocator<ClusterVConnection> clusterVCAllocator;
00506 extern ClassAllocator<ByteBankDescriptor> byteBankAllocator;
00507 
00508 
00509 extern int cluster_port;
00510 
00511 int machine_config_change(const char *, RecDataT, RecData, void *);
00512 extern void do_machine_config_change(void *, const char *);
00513 
00514 
00515 extern void clusterAPI_init();
00516 extern void machine_online_APIcallout(int);
00517 extern void machine_offline_APIcallout(int);
00518 #endif