• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

P_ClusterInternal.h

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 
00025 /****************************************************************************
00026 
00027   ClusterInternal.h
00028 ****************************************************************************/
00029 #include "P_ClusterCache.h"
00030 
00031 #ifndef _P_ClusterInternal_h
00032 #define _P_ClusterInternal_h
00033 
00034 /*************************************************************************/
00035 // Compilation Options
00036 /*************************************************************************/
00037 #define CLUSTER_THREAD_STEALING         1
00038 #define CLUSTER_TOMCAT                  1
00039 #define CLUSTER_STATS                   1
00040 
00041 
00042 #define ALIGN_DOUBLE(_p)   ((((uintptr_t) (_p)) + 7) & ~7)
00043 #define ALLOCA_DOUBLE(_sz) ALIGN_DOUBLE(alloca((_sz) + 8))
00044 
00045 /*************************************************************************/
00046 // Configuration Parameters
00047 /*************************************************************************/
00048 // Note: MAX_TCOUNT must be power of 2
00049 #define MAX_TCOUNT               128
00050 #define CONTROL_DATA             (128*1024)
00051 #define READ_BANK_BUF_SIZE       DEFAULT_MAX_BUFFER_SIZE
00052 #define READ_BANK_BUF_INDEX      (DEFAULT_BUFFER_SIZES-1)
00053 #define ALLOC_DATA_MAGIC         0xA5   // 8 bits in size
00054 #define READ_LOCK_SPIN_COUNT     1
00055 #define WRITE_LOCK_SPIN_COUNT    1
00056 
00057 // Unix specific optimizations
00058 // #define CLUSTER_IMMEDIATE_NETIO       1
00059 
00060   // (see ClusterHandler::mainClusterEvent)
00061   // this is equivalent to a max of 0.7 seconds
00062 #define CLUSTER_BUCKETS          64
00063 #define CLUSTER_PERIOD           HRTIME_MSECONDS(10)
00064 
00065   // Per instance maximum time allotted to cluster thread
00066 #define CLUSTER_MAX_RUN_TIME    HRTIME_MSECONDS(100)
00067   // Per instance maximum time allotted to thread stealing
00068 #define CLUSTER_MAX_THREAD_STEAL_TIME   HRTIME_MSECONDS(10)
00069 
00070   // minimum number of channels to allocate
00071 #define MIN_CHANNELS             4096
00072 #define MAX_CHANNELS             ((32*1024) - 1)        // 15 bits in Descriptor
00073 
00074 #define CLUSTER_CONTROL_CHANNEL  0
00075 #define LAST_DEDICATED_CHANNEL   0
00076 
00077 #define CLUSTER_PHASES           1
00078 
00079 #define CLUSTER_INITIAL_PRIORITY CLUSTER_PHASES
00080   // how often to retry connect to machines which are supposed to be in the
00081   // cluster
00082 #define CLUSTER_BUMP_LENGTH      1
00083 #define CLUSTER_MEMBER_DELAY     HRTIME_SECONDS(1)
00084   // How long to leave an unconnected ClusterVConnection waiting
00085   // Note: assumes (CLUSTER_CONNECT_TIMEOUT == 2 * CACHE_CLUSTER_TIMEOUT)
00086 #ifdef CLUSTER_TEST_DEBUG
00087 #define CLUSTER_CONNECT_TIMEOUT  HRTIME_SECONDS(65536)
00088 #else
00089 #define CLUSTER_CONNECT_TIMEOUT  HRTIME_SECONDS(10)
00090 #endif
00091 #define CLUSTER_CONNECT_RETRY    HRTIME_MSECONDS(20)
00092 #define CLUSTER_RETRY            HRTIME_MSECONDS(10)
00093 #define CLUSTER_DELAY_BETWEEN_WRITES HRTIME_MSECONDS(10)
00094 
00095   // Force close on cluster channel if no activity detected in this interval
00096 #ifdef CLUSTER_TEST_DEBUG
00097 #define CLUSTER_CHANNEL_INACTIVITY_TIMEOUT (65536 * HRTIME_SECONDS(60))
00098 #else
00099 #define CLUSTER_CHANNEL_INACTIVITY_TIMEOUT (10 * HRTIME_SECONDS(60))
00100 #endif
00101 
00102   // Defines for work deferred to ET_NET threads
00103 #define COMPLETION_CALLBACK_PERIOD      HRTIME_MSECONDS(10)
00104 #define MAX_COMPLETION_CALLBACK_EVENTS  16
00105 
00106   // ClusterHandler::mainClusterEvent() thread active state
00107 #define CLUSTER_ACTIVE           1
00108 #define CLUSTER_NOT_ACTIVE       0
00109 
00110   // defines for ClusterHandler::remote_closed
00111 #define FORCE_CLOSE_ON_OPEN_CHANNEL     -2
00112 
00113   // defines for machine_config_change()
00114 #define MACHINE_CONFIG          0
00115 #define CLUSTER_CONFIG          1
00116 
00117 // Debug interface category definitions
00118 #define CL_NOTE         "cluster_note"
00119 #define CL_WARN         "cluster_warn"
00120 #define CL_PROTO        "cluster_proto"
00121 #define CL_TRACE        "cluster_trace"
00122 
00123 /*************************************************************************/
00124 // Constants
00125 /*************************************************************************/
00126 #define MAX_FAST_CONTROL_MESSAGE 504    // 512 - 4 (cluster func #) - 4 align
00127 #define SMALL_CONTROL_MESSAGE    MAX_FAST_CONTROL_MESSAGE       // copied instead
00128                                                            //  of vectored
00129 #define WRITE_MESSAGE_ALREADY_BUILT -1
00130 
00131 #define MAGIC_COUNT(_x) \
00132 (0xBADBAD ^ ~(uint32_t)_x.msg.count \
00133  ^ ~(uint32_t)_x.msg.descriptor_cksum \
00134  ^ ~(uint32_t)_x.msg.control_bytes_cksum \
00135  ^ ~(uint32_t)_x.msg.unused \
00136  ^ ~((uint32_t)_x.msg.control_bytes << 16) ^_x.sequence_number)
00137 
00138 #define DOUBLE_ALIGN(_x)    ((((uintptr_t)_x)+7)&~7)
00139 
00140 /*************************************************************************/
00141 // Testing Defines
00142 /*************************************************************************/
00143 #define MISS_TEST                0
00144 #define TEST_PARTIAL_WRITES      0
00145 #define TEST_PARTIAL_READS       0
00146 #define TEST_TIMING              0
00147 #define TEST_READ_LOCKS_MISSED   0
00148 #define TEST_WRITE_LOCKS_MISSED  0
00149 #define TEST_ENTER_EXIT          0
00150 #define TEST_ENTER_EXIT          0
00151 
00152 //
00153 // Timing testing
00154 //
00155 #if TEST_TIMING
00156 #define TTTEST(_x) \
00157 fprintf(stderr, _x " at: %u\n", \
00158         ((unsigned int)(ink_get_hrtime()/HRTIME_MSECOND)) % 1000)
00159 #define TTEST(_x) \
00160 fprintf(stderr, _x " for: %d at: %u\n", vc->channel, \
00161         ((unsigned int)(ink_get_hrtime()/HRTIME_MSECOND)) % 1000)
00162 #define TIMEOUT_TESTS(_s,_d) \
00163     if (*(int*)_d == 8)  \
00164       fprintf(stderr,_s" lookup  %d\n", *(int*)(_d+20)); \
00165     else if (*(int*)_d == 10) \
00166       fprintf(stderr,_s" op %d %d\n", *(int*)(_d+36), \
00167               *(int*)(_d+40)); \
00168     else if (*(int*)_d == 11) \
00169       fprintf(stderr,_s" rop %d %d\n", *(int*)(_d+4), \
00170               *(int*)(_d+8))
00171 #else
00172 #define TTTEST(_x)
00173 #define TTEST(_x)
00174 #define TIMEOUT_TESTS(_x,_y)
00175 #endif
00176 
00177 #if (TEST_READ_LOCKS_MISSED || TEST_WRITE_LOCKS_MISSED)
00178 static unsigned int test_cluster_locks_missed = 0;
00179 static
00180 test_cluster_lock_might_fail()
00181 {
00182   return (!(rand_r(&test_cluster_locks_missed) % 13));
00183 }
00184 #endif
00185 #if TEST_READ_LOCKS_MISSED
00186 #define TEST_READ_LOCK_MIGHT_FAIL test_cluster_lock_might_fail()
00187 #else
00188 #define TEST_READ_LOCK_MIGHT_FAIL false
00189 #endif
00190 #if TEST_WRITE_LOCKS_MISSED
00191 #define TEST_WRITE_LOCK_MIGHT_FAIL test_cluster_lock_might_fail()
00192 #else
00193 #define TEST_WRITE_LOCK_MIGHT_FAIL false
00194 #endif
00195 
00196 #if TEST_ENTER_EXIT
00197 struct enter_exit_class
00198 {
00199   int *outv;
00200     enter_exit_class(int *in, int *out):outv(out)
00201   {
00202     (*in)++;
00203   }
00204    ~enter_exit_class()
00205   {
00206     (*outv)++;
00207   }
00208 };
00209 
00210 #define enter_exit(_x,_y) enter_exit_class a(_x,_y)
00211 #else
00212 #define enter_exit(_x,_y)
00213 #endif
00214 
00215 #define DOT_SEPARATED(_x)                             \
00216 ((unsigned char*)&(_x))[0], ((unsigned char*)&(_x))[1],   \
00217   ((unsigned char*)&(_x))[2], ((unsigned char*)&(_x))[3]
00218 
00219 //
00220 // RPC message for CLOSE_CHANNEL_CLUSTER_FUNCTION
00221 //
00222 struct CloseMessage:public ClusterMessageHeader
00223 {
00224   uint32_t channel;
00225   int32_t status;
00226   int32_t lerrno;
00227   uint32_t sequence_number;
00228 
00229   enum
00230   {
00231     MIN_VERSION = 1,
00232     MAX_VERSION = 1,
00233     CLOSE_CHAN_MESSAGE_VERSION = MAX_VERSION
00234   };
00235 
00236     CloseMessage(uint16_t vers = CLOSE_CHAN_MESSAGE_VERSION)
00237 :  ClusterMessageHeader(vers), channel(0), status(0), lerrno(0), sequence_number(0) {
00238   }
00239   ////////////////////////////////////////////////////////////////////////////
00240   static int protoToVersion(int protoMajor)
00241   {
00242     (void) protoMajor;
00243     return CLOSE_CHAN_MESSAGE_VERSION;
00244   }
00245   static int sizeof_fixedlen_msg()
00246   {
00247     return sizeof(CloseMessage);
00248   }
00249   void init(uint16_t vers = CLOSE_CHAN_MESSAGE_VERSION) {
00250     _init(vers);
00251   }
00252   inline void SwapBytes()
00253   {
00254     if (NeedByteSwap()) {
00255       ats_swap32(&channel);
00256       ats_swap32((uint32_t *) & status);
00257       ats_swap32((uint32_t *) & lerrno);
00258       ats_swap32(&sequence_number);
00259     }
00260   }
00261   ////////////////////////////////////////////////////////////////////////////
00262 };
00263 
00264 //
00265 // RPC message for MACHINE_LIST_CLUSTER_FUNCTION
00266 //
00267 struct MachineListMessage:public ClusterMessageHeader
00268 {
00269   uint32_t n_ip;                  // Valid entries in ip[]
00270   uint32_t ip[CLUSTER_MAX_MACHINES];      // variable length data
00271 
00272   enum
00273   {
00274     MIN_VERSION = 1,
00275     MAX_VERSION = 1,
00276     MACHINE_LIST_MESSAGE_VERSION = MAX_VERSION
00277   };
00278 
00279     MachineListMessage():ClusterMessageHeader(MACHINE_LIST_MESSAGE_VERSION), n_ip(0)
00280   {
00281     memset(ip, 0, sizeof(ip));
00282   }
00283   ////////////////////////////////////////////////////////////////////////////
00284   static int protoToVersion(int protoMajor)
00285   {
00286     (void) protoMajor;
00287     return MACHINE_LIST_MESSAGE_VERSION;
00288   }
00289   static int sizeof_fixedlen_msg()
00290   {
00291     return sizeof(ClusterMessageHeader);
00292   }
00293   void init(uint16_t vers = MACHINE_LIST_MESSAGE_VERSION) {
00294     _init(vers);
00295   }
00296   inline void SwapBytes()
00297   {
00298     ats_swap32(&n_ip);
00299   }
00300   ////////////////////////////////////////////////////////////////////////////
00301 };
00302 
00303 //
00304 // RPC message for SET_CHANNEL_DATA_CLUSTER_FUNCTION
00305 //
00306 struct SetChanDataMessage:public ClusterMessageHeader
00307 {
00308   uint32_t channel;
00309   uint32_t sequence_number;
00310   uint32_t data_type;             // enum CacheDataType
00311   char data[4];
00312 
00313   enum
00314   {
00315     MIN_VERSION = 1,
00316     MAX_VERSION = 1,
00317     SET_CHANNEL_DATA_MESSAGE_VERSION = MAX_VERSION
00318   };
00319 
00320     SetChanDataMessage(uint16_t vers = SET_CHANNEL_DATA_MESSAGE_VERSION)
00321 :  ClusterMessageHeader(vers), channel(0), sequence_number(0), data_type(0) {
00322     memset(data, 0, sizeof(data));
00323   }
00324   ////////////////////////////////////////////////////////////////////////////
00325   static int protoToVersion(int protoMajor)
00326   {
00327     (void) protoMajor;
00328     return SET_CHANNEL_DATA_MESSAGE_VERSION;
00329   }
00330   static int sizeof_fixedlen_msg()
00331   {
00332     SetChanDataMessage *p = 0;
00333     return (int) DOUBLE_ALIGN((int64_t) ((char *) &p->data[0] - (char *) p));
00334   }
00335   void init(uint16_t vers = SET_CHANNEL_DATA_MESSAGE_VERSION) {
00336     _init(vers);
00337   }
00338   inline void SwapBytes()
00339   {
00340     if (NeedByteSwap()) {
00341       ats_swap32(&channel);
00342       ats_swap32(&sequence_number);
00343       ats_swap32(&data_type);
00344     }
00345   }
00346   ////////////////////////////////////////////////////////////////////////////
00347 };
00348 
00349 //
00350 // RPC message for SET_CHANNEL_PIN_CLUSTER_FUNCTION
00351 //
00352 struct SetChanPinMessage:public ClusterMessageHeader
00353 {
00354   uint32_t channel;
00355   uint32_t sequence_number;
00356   uint32_t pin_time;
00357 
00358   enum
00359   {
00360     MIN_VERSION = 1,
00361     MAX_VERSION = 1,
00362     SET_CHANNEL_PIN_MESSAGE_VERSION = MAX_VERSION
00363   };
00364 
00365     SetChanPinMessage(uint16_t vers = SET_CHANNEL_PIN_MESSAGE_VERSION)
00366 :  ClusterMessageHeader(vers), channel(0), sequence_number(0), pin_time(0) {
00367   }
00368   ////////////////////////////////////////////////////////////////////////////
00369   static int protoToVersion(int protoMajor)
00370   {
00371     (void) protoMajor;
00372     return SET_CHANNEL_PIN_MESSAGE_VERSION;
00373   }
00374   static int sizeof_fixedlen_msg()
00375   {
00376     return (int) sizeof(SetChanPinMessage);
00377   }
00378   void init(uint16_t vers = SET_CHANNEL_PIN_MESSAGE_VERSION) {
00379     _init(vers);
00380   }
00381   inline void SwapBytes()
00382   {
00383     if (NeedByteSwap()) {
00384       ats_swap32(&channel);
00385       ats_swap32(&sequence_number);
00386       ats_swap32(&pin_time);
00387     }
00388   }
00389   ////////////////////////////////////////////////////////////////////////////
00390 };
00391 
00392 //
00393 // RPC message for SET_CHANNEL_PRIORITY_CLUSTER_FUNCTION
00394 //
00395 struct SetChanPriorityMessage:public ClusterMessageHeader
00396 {
00397   uint32_t channel;
00398   uint32_t sequence_number;
00399   uint32_t disk_priority;
00400 
00401   enum
00402   {
00403     MIN_VERSION = 1,
00404     MAX_VERSION = 1,
00405     SET_CHANNEL_PRIORITY_MESSAGE_VERSION = MAX_VERSION
00406   };
00407 
00408     SetChanPriorityMessage(uint16_t vers = SET_CHANNEL_PRIORITY_MESSAGE_VERSION)
00409 :  ClusterMessageHeader(vers), channel(0), sequence_number(0), disk_priority(0) {
00410   }
00411   ////////////////////////////////////////////////////////////////////////////
00412   static int protoToVersion(int protoMajor)
00413   {
00414     (void) protoMajor;
00415     return SET_CHANNEL_PRIORITY_MESSAGE_VERSION;
00416   }
00417   static int sizeof_fixedlen_msg()
00418   {
00419     return (int) sizeof(SetChanPriorityMessage);
00420   }
00421   void init(uint16_t vers = SET_CHANNEL_PRIORITY_MESSAGE_VERSION) {
00422     _init(vers);
00423   }
00424   inline void SwapBytes()
00425   {
00426     if (NeedByteSwap()) {
00427       ats_swap32(&channel);
00428       ats_swap32(&sequence_number);
00429       ats_swap32(&disk_priority);
00430     }
00431   }
00432   ////////////////////////////////////////////////////////////////////////////
00433 };
00434 
00435 inline void
00436 SetHighBit(int *val)
00437 {
00438   *val |= (1 << ((sizeof(int) * 8) - 1));
00439 }
00440 
00441 inline void
00442 ClearHighBit(int *val)
00443 {
00444   *val &= ~(1 << ((sizeof(int) * 8) - 1));
00445 }
00446 
00447 inline int
00448 IsHighBitSet(int *val)
00449 {
00450   return (*val & (1 << ((sizeof(int) * 8) - 1)));
00451 }
00452 
00453 /////////////////////////////////////////////////////////////////
00454 // ClusterAccept -- Handle cluster connect events from peer
00455 //                  cluster nodes.
00456 /////////////////////////////////////////////////////////////////
00457 class ClusterAccept:public Continuation
00458 {
00459 public:
00460   ClusterAccept(int *, int, int);
00461   void Init();
00462   void ShutdownDelete();
00463   int ClusterAcceptEvent(int, void *);
00464   int ClusterAcceptMachine(NetVConnection *);
00465 
00466    ~ClusterAccept();
00467 private:
00468 
00469   int *p_cluster_port;
00470   int socket_send_bufsize;
00471   int socket_recv_bufsize;
00472   int current_cluster_port;
00473   Action *accept_action;
00474   Event *periodic_event;
00475 };
00476 
00477 // VC++ 5.0 special
00478 struct ClusterHandler;
00479 typedef int (ClusterHandler::*ClusterContHandler) (int, void *);
00480 
00481 struct OutgoingControl;
00482 typedef int (OutgoingControl::*OutgoingCtrlHandler) (int, void *);
00483 
00484 struct ClusterVConnection;
00485 typedef int (ClusterVConnection::*ClusterVConnHandler) (int, void *);
00486 
00487 // Library  declarations
00488 extern void cluster_set_priority(ClusterHandler *, ClusterVConnState *, int);
00489 extern void cluster_lower_priority(ClusterHandler *, ClusterVConnState *);
00490 extern void cluster_raise_priority(ClusterHandler *, ClusterVConnState *);
00491 extern void cluster_schedule(ClusterHandler *, ClusterVConnection *, ClusterVConnState *);
00492 extern void cluster_reschedule(ClusterHandler *, ClusterVConnection *, ClusterVConnState *);
00493 extern void cluster_reschedule_offset(ClusterHandler *, ClusterVConnection *, ClusterVConnState *, int);
00494 extern void cluster_disable(ClusterHandler *, ClusterVConnection *, ClusterVConnState *);
00495 extern void cluster_update_priority(ClusterHandler *, ClusterVConnection *, ClusterVConnState *, int64_t, int64_t);
00496 #define CLUSTER_BUMP_NO_REMOVE    -1
00497 extern void cluster_bump(ClusterHandler *, ClusterVConnectionBase *, ClusterVConnState *, int);
00498 
00499 extern IOBufferBlock *clone_IOBufferBlockList(IOBufferBlock *, int, int, IOBufferBlock **);
00500 extern IOBufferBlock *consume_IOBufferBlockList(IOBufferBlock *, int64_t);
00501 extern int64_t bytes_IOBufferBlockList(IOBufferBlock *, int64_t);
00502 
00503 // ClusterVConnection declarations
00504 extern void clusterVCAllocator_free(ClusterVConnection * vc);
00505 extern ClassAllocator<ClusterVConnection> clusterVCAllocator;
00506 extern ClassAllocator<ByteBankDescriptor> byteBankAllocator;
00507 
00508 // Cluster configuration declarations
00509 extern int cluster_port;
00510 // extern void * machine_config_change(void *, void *);
00511 int machine_config_change(const char *, RecDataT, RecData, void *);
00512 extern void do_machine_config_change(void *, const char *);
00513 
00514 // Cluster API support functions
00515 extern void clusterAPI_init();
00516 extern void machine_online_APIcallout(int);
00517 extern void machine_offline_APIcallout(int);
00518 #endif /* _ClusterInternal_h */

Generated by  doxygen 1.7.1