00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include "P_ClusterCache.h"
00030
00031 #ifndef _P_ClusterInternal_h
00032 #define _P_ClusterInternal_h
00033
00034
00035
00036
00037 #define CLUSTER_THREAD_STEALING 1
00038 #define CLUSTER_TOMCAT 1
00039 #define CLUSTER_STATS 1
00040
00041
00042 #define ALIGN_DOUBLE(_p) ((((uintptr_t) (_p)) + 7) & ~7)
00043 #define ALLOCA_DOUBLE(_sz) ALIGN_DOUBLE(alloca((_sz) + 8))
00044
00045
00046
00047
00048
00049 #define MAX_TCOUNT 128
00050 #define CONTROL_DATA (128*1024)
00051 #define READ_BANK_BUF_SIZE DEFAULT_MAX_BUFFER_SIZE
00052 #define READ_BANK_BUF_INDEX (DEFAULT_BUFFER_SIZES-1)
00053 #define ALLOC_DATA_MAGIC 0xA5 // 8 bits in size
00054 #define READ_LOCK_SPIN_COUNT 1
00055 #define WRITE_LOCK_SPIN_COUNT 1
00056
00057
00058
00059
00060
00061
00062 #define CLUSTER_BUCKETS 64
00063 #define CLUSTER_PERIOD HRTIME_MSECONDS(10)
00064
00065
00066 #define CLUSTER_MAX_RUN_TIME HRTIME_MSECONDS(100)
00067
00068 #define CLUSTER_MAX_THREAD_STEAL_TIME HRTIME_MSECONDS(10)
00069
00070
00071 #define MIN_CHANNELS 4096
00072 #define MAX_CHANNELS ((32*1024) - 1) // 15 bits in Descriptor
00073
00074 #define CLUSTER_CONTROL_CHANNEL 0
00075 #define LAST_DEDICATED_CHANNEL 0
00076
00077 #define CLUSTER_PHASES 1
00078
00079 #define CLUSTER_INITIAL_PRIORITY CLUSTER_PHASES
00080
00081
00082 #define CLUSTER_BUMP_LENGTH 1
00083 #define CLUSTER_MEMBER_DELAY HRTIME_SECONDS(1)
00084
00085
00086 #ifdef CLUSTER_TEST_DEBUG
00087 #define CLUSTER_CONNECT_TIMEOUT HRTIME_SECONDS(65536)
00088 #else
00089 #define CLUSTER_CONNECT_TIMEOUT HRTIME_SECONDS(10)
00090 #endif
00091 #define CLUSTER_CONNECT_RETRY HRTIME_MSECONDS(20)
00092 #define CLUSTER_RETRY HRTIME_MSECONDS(10)
00093 #define CLUSTER_DELAY_BETWEEN_WRITES HRTIME_MSECONDS(10)
00094
00095
00096 #ifdef CLUSTER_TEST_DEBUG
00097 #define CLUSTER_CHANNEL_INACTIVITY_TIMEOUT (65536 * HRTIME_SECONDS(60))
00098 #else
00099 #define CLUSTER_CHANNEL_INACTIVITY_TIMEOUT (10 * HRTIME_SECONDS(60))
00100 #endif
00101
00102
00103 #define COMPLETION_CALLBACK_PERIOD HRTIME_MSECONDS(10)
00104 #define MAX_COMPLETION_CALLBACK_EVENTS 16
00105
00106
00107 #define CLUSTER_ACTIVE 1
00108 #define CLUSTER_NOT_ACTIVE 0
00109
00110
00111 #define FORCE_CLOSE_ON_OPEN_CHANNEL -2
00112
00113
00114 #define MACHINE_CONFIG 0
00115 #define CLUSTER_CONFIG 1
00116
00117
00118 #define CL_NOTE "cluster_note"
00119 #define CL_WARN "cluster_warn"
00120 #define CL_PROTO "cluster_proto"
00121 #define CL_TRACE "cluster_trace"
00122
00123
00124
00125
00126 #define MAX_FAST_CONTROL_MESSAGE 504 // 512 - 4 (cluster func #) - 4 align
00127 #define SMALL_CONTROL_MESSAGE MAX_FAST_CONTROL_MESSAGE // copied instead
00128
00129 #define WRITE_MESSAGE_ALREADY_BUILT -1
00130
00131 #define MAGIC_COUNT(_x) \
00132 (0xBADBAD ^ ~(uint32_t)_x.msg.count \
00133 ^ ~(uint32_t)_x.msg.descriptor_cksum \
00134 ^ ~(uint32_t)_x.msg.control_bytes_cksum \
00135 ^ ~(uint32_t)_x.msg.unused \
00136 ^ ~((uint32_t)_x.msg.control_bytes << 16) ^_x.sequence_number)
00137
00138 #define DOUBLE_ALIGN(_x) ((((uintptr_t)_x)+7)&~7)
00139
00140
00141
00142
00143 #define MISS_TEST 0
00144 #define TEST_PARTIAL_WRITES 0
00145 #define TEST_PARTIAL_READS 0
00146 #define TEST_TIMING 0
00147 #define TEST_READ_LOCKS_MISSED 0
00148 #define TEST_WRITE_LOCKS_MISSED 0
00149 #define TEST_ENTER_EXIT 0
00150 #define TEST_ENTER_EXIT 0
00151
00152
00153
00154
00155 #if TEST_TIMING
00156 #define TTTEST(_x) \
00157 fprintf(stderr, _x " at: %u\n", \
00158 ((unsigned int)(ink_get_hrtime()/HRTIME_MSECOND)) % 1000)
00159 #define TTEST(_x) \
00160 fprintf(stderr, _x " for: %d at: %u\n", vc->channel, \
00161 ((unsigned int)(ink_get_hrtime()/HRTIME_MSECOND)) % 1000)
00162 #define TIMEOUT_TESTS(_s,_d) \
00163 if (*(int*)_d == 8) \
00164 fprintf(stderr,_s" lookup %d\n", *(int*)(_d+20)); \
00165 else if (*(int*)_d == 10) \
00166 fprintf(stderr,_s" op %d %d\n", *(int*)(_d+36), \
00167 *(int*)(_d+40)); \
00168 else if (*(int*)_d == 11) \
00169 fprintf(stderr,_s" rop %d %d\n", *(int*)(_d+4), \
00170 *(int*)(_d+8))
00171 #else
00172 #define TTTEST(_x)
00173 #define TTEST(_x)
00174 #define TIMEOUT_TESTS(_x,_y)
00175 #endif
00176
00177 #if (TEST_READ_LOCKS_MISSED || TEST_WRITE_LOCKS_MISSED)
00178 static unsigned int test_cluster_locks_missed = 0;
00179 static
00180 test_cluster_lock_might_fail()
00181 {
00182 return (!(rand_r(&test_cluster_locks_missed) % 13));
00183 }
00184 #endif
00185 #if TEST_READ_LOCKS_MISSED
00186 #define TEST_READ_LOCK_MIGHT_FAIL test_cluster_lock_might_fail()
00187 #else
00188 #define TEST_READ_LOCK_MIGHT_FAIL false
00189 #endif
00190 #if TEST_WRITE_LOCKS_MISSED
00191 #define TEST_WRITE_LOCK_MIGHT_FAIL test_cluster_lock_might_fail()
00192 #else
00193 #define TEST_WRITE_LOCK_MIGHT_FAIL false
00194 #endif
00195
00196 #if TEST_ENTER_EXIT
00197 struct enter_exit_class
00198 {
00199 int *outv;
00200 enter_exit_class(int *in, int *out):outv(out)
00201 {
00202 (*in)++;
00203 }
00204 ~enter_exit_class()
00205 {
00206 (*outv)++;
00207 }
00208 };
00209
00210 #define enter_exit(_x,_y) enter_exit_class a(_x,_y)
00211 #else
00212 #define enter_exit(_x,_y)
00213 #endif
00214
00215 #define DOT_SEPARATED(_x) \
00216 ((unsigned char*)&(_x))[0], ((unsigned char*)&(_x))[1], \
00217 ((unsigned char*)&(_x))[2], ((unsigned char*)&(_x))[3]
00218
00219
00220
00221
00222 struct CloseMessage:public ClusterMessageHeader
00223 {
00224 uint32_t channel;
00225 int32_t status;
00226 int32_t lerrno;
00227 uint32_t sequence_number;
00228
00229 enum
00230 {
00231 MIN_VERSION = 1,
00232 MAX_VERSION = 1,
00233 CLOSE_CHAN_MESSAGE_VERSION = MAX_VERSION
00234 };
00235
00236 CloseMessage(uint16_t vers = CLOSE_CHAN_MESSAGE_VERSION)
00237 : ClusterMessageHeader(vers), channel(0), status(0), lerrno(0), sequence_number(0) {
00238 }
00239
00240 static int protoToVersion(int protoMajor)
00241 {
00242 (void) protoMajor;
00243 return CLOSE_CHAN_MESSAGE_VERSION;
00244 }
00245 static int sizeof_fixedlen_msg()
00246 {
00247 return sizeof(CloseMessage);
00248 }
00249 void init(uint16_t vers = CLOSE_CHAN_MESSAGE_VERSION) {
00250 _init(vers);
00251 }
00252 inline void SwapBytes()
00253 {
00254 if (NeedByteSwap()) {
00255 ats_swap32(&channel);
00256 ats_swap32((uint32_t *) & status);
00257 ats_swap32((uint32_t *) & lerrno);
00258 ats_swap32(&sequence_number);
00259 }
00260 }
00261
00262 };
00263
00264
00265
00266
00267 struct MachineListMessage:public ClusterMessageHeader
00268 {
00269 uint32_t n_ip;
00270 uint32_t ip[CLUSTER_MAX_MACHINES];
00271
00272 enum
00273 {
00274 MIN_VERSION = 1,
00275 MAX_VERSION = 1,
00276 MACHINE_LIST_MESSAGE_VERSION = MAX_VERSION
00277 };
00278
00279 MachineListMessage():ClusterMessageHeader(MACHINE_LIST_MESSAGE_VERSION), n_ip(0)
00280 {
00281 memset(ip, 0, sizeof(ip));
00282 }
00283
00284 static int protoToVersion(int protoMajor)
00285 {
00286 (void) protoMajor;
00287 return MACHINE_LIST_MESSAGE_VERSION;
00288 }
00289 static int sizeof_fixedlen_msg()
00290 {
00291 return sizeof(ClusterMessageHeader);
00292 }
00293 void init(uint16_t vers = MACHINE_LIST_MESSAGE_VERSION) {
00294 _init(vers);
00295 }
00296 inline void SwapBytes()
00297 {
00298 ats_swap32(&n_ip);
00299 }
00300
00301 };
00302
00303
00304
00305
00306 struct SetChanDataMessage:public ClusterMessageHeader
00307 {
00308 uint32_t channel;
00309 uint32_t sequence_number;
00310 uint32_t data_type;
00311 char data[4];
00312
00313 enum
00314 {
00315 MIN_VERSION = 1,
00316 MAX_VERSION = 1,
00317 SET_CHANNEL_DATA_MESSAGE_VERSION = MAX_VERSION
00318 };
00319
00320 SetChanDataMessage(uint16_t vers = SET_CHANNEL_DATA_MESSAGE_VERSION)
00321 : ClusterMessageHeader(vers), channel(0), sequence_number(0), data_type(0) {
00322 memset(data, 0, sizeof(data));
00323 }
00324
00325 static int protoToVersion(int protoMajor)
00326 {
00327 (void) protoMajor;
00328 return SET_CHANNEL_DATA_MESSAGE_VERSION;
00329 }
00330 static int sizeof_fixedlen_msg()
00331 {
00332 SetChanDataMessage *p = 0;
00333 return (int) DOUBLE_ALIGN((int64_t) ((char *) &p->data[0] - (char *) p));
00334 }
00335 void init(uint16_t vers = SET_CHANNEL_DATA_MESSAGE_VERSION) {
00336 _init(vers);
00337 }
00338 inline void SwapBytes()
00339 {
00340 if (NeedByteSwap()) {
00341 ats_swap32(&channel);
00342 ats_swap32(&sequence_number);
00343 ats_swap32(&data_type);
00344 }
00345 }
00346
00347 };
00348
00349
00350
00351
00352 struct SetChanPinMessage:public ClusterMessageHeader
00353 {
00354 uint32_t channel;
00355 uint32_t sequence_number;
00356 uint32_t pin_time;
00357
00358 enum
00359 {
00360 MIN_VERSION = 1,
00361 MAX_VERSION = 1,
00362 SET_CHANNEL_PIN_MESSAGE_VERSION = MAX_VERSION
00363 };
00364
00365 SetChanPinMessage(uint16_t vers = SET_CHANNEL_PIN_MESSAGE_VERSION)
00366 : ClusterMessageHeader(vers), channel(0), sequence_number(0), pin_time(0) {
00367 }
00368
00369 static int protoToVersion(int protoMajor)
00370 {
00371 (void) protoMajor;
00372 return SET_CHANNEL_PIN_MESSAGE_VERSION;
00373 }
00374 static int sizeof_fixedlen_msg()
00375 {
00376 return (int) sizeof(SetChanPinMessage);
00377 }
00378 void init(uint16_t vers = SET_CHANNEL_PIN_MESSAGE_VERSION) {
00379 _init(vers);
00380 }
00381 inline void SwapBytes()
00382 {
00383 if (NeedByteSwap()) {
00384 ats_swap32(&channel);
00385 ats_swap32(&sequence_number);
00386 ats_swap32(&pin_time);
00387 }
00388 }
00389
00390 };
00391
00392
00393
00394
00395 struct SetChanPriorityMessage:public ClusterMessageHeader
00396 {
00397 uint32_t channel;
00398 uint32_t sequence_number;
00399 uint32_t disk_priority;
00400
00401 enum
00402 {
00403 MIN_VERSION = 1,
00404 MAX_VERSION = 1,
00405 SET_CHANNEL_PRIORITY_MESSAGE_VERSION = MAX_VERSION
00406 };
00407
00408 SetChanPriorityMessage(uint16_t vers = SET_CHANNEL_PRIORITY_MESSAGE_VERSION)
00409 : ClusterMessageHeader(vers), channel(0), sequence_number(0), disk_priority(0) {
00410 }
00411
00412 static int protoToVersion(int protoMajor)
00413 {
00414 (void) protoMajor;
00415 return SET_CHANNEL_PRIORITY_MESSAGE_VERSION;
00416 }
00417 static int sizeof_fixedlen_msg()
00418 {
00419 return (int) sizeof(SetChanPriorityMessage);
00420 }
00421 void init(uint16_t vers = SET_CHANNEL_PRIORITY_MESSAGE_VERSION) {
00422 _init(vers);
00423 }
00424 inline void SwapBytes()
00425 {
00426 if (NeedByteSwap()) {
00427 ats_swap32(&channel);
00428 ats_swap32(&sequence_number);
00429 ats_swap32(&disk_priority);
00430 }
00431 }
00432
00433 };
00434
00435 inline void
00436 SetHighBit(int *val)
00437 {
00438 *val |= (1 << ((sizeof(int) * 8) - 1));
00439 }
00440
00441 inline void
00442 ClearHighBit(int *val)
00443 {
00444 *val &= ~(1 << ((sizeof(int) * 8) - 1));
00445 }
00446
00447 inline int
00448 IsHighBitSet(int *val)
00449 {
00450 return (*val & (1 << ((sizeof(int) * 8) - 1)));
00451 }
00452
00453
00454
00455
00456
00457 class ClusterAccept:public Continuation
00458 {
00459 public:
00460 ClusterAccept(int *, int, int);
00461 void Init();
00462 void ShutdownDelete();
00463 int ClusterAcceptEvent(int, void *);
00464 int ClusterAcceptMachine(NetVConnection *);
00465
00466 ~ClusterAccept();
00467 private:
00468
00469 int *p_cluster_port;
00470 int socket_send_bufsize;
00471 int socket_recv_bufsize;
00472 int current_cluster_port;
00473 Action *accept_action;
00474 Event *periodic_event;
00475 };
00476
00477
00478 struct ClusterHandler;
00479 typedef int (ClusterHandler::*ClusterContHandler) (int, void *);
00480
00481 struct OutgoingControl;
00482 typedef int (OutgoingControl::*OutgoingCtrlHandler) (int, void *);
00483
00484 struct ClusterVConnection;
00485 typedef int (ClusterVConnection::*ClusterVConnHandler) (int, void *);
00486
00487
00488 extern void cluster_set_priority(ClusterHandler *, ClusterVConnState *, int);
00489 extern void cluster_lower_priority(ClusterHandler *, ClusterVConnState *);
00490 extern void cluster_raise_priority(ClusterHandler *, ClusterVConnState *);
00491 extern void cluster_schedule(ClusterHandler *, ClusterVConnection *, ClusterVConnState *);
00492 extern void cluster_reschedule(ClusterHandler *, ClusterVConnection *, ClusterVConnState *);
00493 extern void cluster_reschedule_offset(ClusterHandler *, ClusterVConnection *, ClusterVConnState *, int);
00494 extern void cluster_disable(ClusterHandler *, ClusterVConnection *, ClusterVConnState *);
00495 extern void cluster_update_priority(ClusterHandler *, ClusterVConnection *, ClusterVConnState *, int64_t, int64_t);
00496 #define CLUSTER_BUMP_NO_REMOVE -1
00497 extern void cluster_bump(ClusterHandler *, ClusterVConnectionBase *, ClusterVConnState *, int);
00498
00499 extern IOBufferBlock *clone_IOBufferBlockList(IOBufferBlock *, int, int, IOBufferBlock **);
00500 extern IOBufferBlock *consume_IOBufferBlockList(IOBufferBlock *, int64_t);
00501 extern int64_t bytes_IOBufferBlockList(IOBufferBlock *, int64_t);
00502
00503
00504 extern void clusterVCAllocator_free(ClusterVConnection * vc);
00505 extern ClassAllocator<ClusterVConnection> clusterVCAllocator;
00506 extern ClassAllocator<ByteBankDescriptor> byteBankAllocator;
00507
00508
00509 extern int cluster_port;
00510
00511 int machine_config_change(const char *, RecDataT, RecData, void *);
00512 extern void do_machine_config_change(void *, const char *);
00513
00514
00515 extern void clusterAPI_init();
00516 extern void machine_online_APIcallout(int);
00517 extern void machine_offline_APIcallout(int);
00518 #endif