• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

P_ClusterCache.h

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 /****************************************************************************
00025 
00026   Cluster.h
00027 
00028 
00029 ****************************************************************************/
00030 
00031 #ifndef _P_Cluster_Cache_h
00032 #define _P_Cluster_Cache_h
00033 
00034 //*****************************************************************************
00035 // Initially derived from Cluster.h "1.77.2.11 1999/01/21 03:24:10"
00036 //*****************************************************************************
00037 
00038 /****************************************************************************/
00039 // #define LOCAL_CLUSTER_TEST_MODE 1
00040 // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
00041 //  Set the above #define to enable local clustering.  "Local clustering"
00042 //  is a test only mode where all cluster nodes reside on the same host.
00043 //
00044 //  Configuration notes:
00045 //   - For "cluster.config" entries, always use "127.0.0.1" as the IP
00046 //     address and select a host unique cluster port.
00047 //
00048 //  Restrictions:
00049 //   1) Does not work with the manager.  You must only run with the server
00050 //      and hand configure "cluster.config".
00051 //   2) Currently, this has only been tested in a two node configuration.
00052 //
00053 /****************************************************************************/
00054 
00055 #include "P_ClusterMachine.h"
00056 
00057 //
00058 // Cluster Processor
00059 //
00060 // - monitors the status of the cluster
00061 // - provides communication between machines in the cluster
00062 // - provides callbacks to other processors when the cluster configuration
00063 //   changes
00064 //
00065 #define CLUSTER_MAJOR_VERSION               3
00066 #define CLUSTER_MINOR_VERSION               2
00067 
00068 // Lowest supported major/minor cluster version
00069 #define MIN_CLUSTER_MAJOR_VERSION           CLUSTER_MAJOR_VERSION
00070 #define MIN_CLUSTER_MINOR_VERSION           CLUSTER_MINOR_VERSION
00071 
00072 
00073 #define DEFAULT_CLUSTER_PORT_NUMBER         0
00074 #define DEFAULT_NUMBER_OF_CLUSTER_THREADS   1
00075 #define DEFAULT_CLUSTER_HOST                ""
00076 
00077 #define MAX_CLUSTER_SEND_LENGTH             INT_MAX
00078 
00079 #define CLUSTER_MAX_MACHINES                256
00080 // less than 1% disparity at 255 machines, 32707 is prime less than 2^15
00081 #define CLUSTER_HASH_TABLE_SIZE             32707
00082 
00083 // after timeout the configuration is "dead"
00084 #define CLUSTER_CONFIGURATION_TIMEOUT       HRTIME_DAY
00085 // after zombie the configuration is deleted
00086 #define CLUSTER_CONFIGURATION_ZOMBIE        (HRTIME_DAY*2)
00087 
00088 
00089 // the number of configurations into the past we probe for data
00090 // one allows a new machine to come into or fall out of the
00091 // cluster without loss of data.  If the data is redistributed within
00092 // one day, no data will be lost.
00093 #define CONFIGURATION_HISTORY_PROBE_DEPTH   1
00094 
00095 
00096 // move these to a central event definition file (Event.h)
00097 #define CLUSTER_EVENT_CHANGE            (CLUSTER_EVENT_EVENTS_START)
00098 #define CLUSTER_EVENT_CONFIGURATION     (CLUSTER_EVENT_EVENTS_START+1)
00099 #define CLUSTER_EVENT_OPEN              (CLUSTER_EVENT_EVENTS_START+2)
00100 #define CLUSTER_EVENT_OPEN_EXISTS       (CLUSTER_EVENT_EVENTS_START+3)
00101 #define CLUSTER_EVENT_OPEN_FAILED       (CLUSTER_EVENT_EVENTS_START+4)
00102 
00103 // internal event code
00104 #define CLUSTER_EVENT_STEAL_THREAD      (CLUSTER_EVENT_EVENTS_START+50)
00105 
00106 //////////////////////////////////////////////////////////////
00107 // Miscellaneous byte swap routines
00108 //////////////////////////////////////////////////////////////
00109 inline void
00110 ats_swap16(uint16_t * d)
00111 {
00112   unsigned char *p = (unsigned char *) d;
00113   *d = ((p[1] << 8) | p[0]);
00114 }
00115 
00116 inline uint16_t
00117 ats_swap16(uint16_t d)
00118 {
00119   ats_swap16(&d);
00120   return d;
00121 }
00122 
00123 inline void
00124 ats_swap32(uint32_t * d)
00125 {
00126   unsigned char *p = (unsigned char *) d;
00127   *d = ((p[3] << 24) | (p[2] << 16) | (p[1] << 8) | p[0]);
00128 }
00129 
00130 inline uint32_t
00131 ats_swap32(uint32_t d)
00132 {
00133   ats_swap32(&d);
00134   return d;
00135 }
00136 
00137 inline void
00138 ats_swap64(uint64_t * d)
00139 {
00140   unsigned char *p = (unsigned char *) d;
00141   *d = (((uint64_t) p[7] << 56) | ((uint64_t) p[6] << 48) |
00142         ((uint64_t) p[5] << 40) | ((uint64_t) p[4] << 32) |
00143         ((uint64_t) p[3] << 24) | ((uint64_t) p[2] << 16) | ((uint64_t) p[1] << 8) | (uint64_t) p[0]);
00144 }
00145 
00146 inline uint64_t
00147 ats_swap64(uint64_t d)
00148 {
00149   ats_swap64(&d);
00150   return d;
00151 }
00152 
00153 //////////////////////////////////////////////////////////////
00154 
00155 struct ClusterConfiguration
00156 {
00157   int n_machines;
00158   ClusterMachine *machines[CLUSTER_MAX_MACHINES];
00159 
00160   ClusterMachine *machine_hash(unsigned int hash_value)
00161   {
00162     return machines[hash_table[hash_value % CLUSTER_HASH_TABLE_SIZE]];
00163   }
00164 
00165   ClusterMachine *find(unsigned int ip, int port = 0) {
00166     for (int i = 0; i < n_machines; i++)
00167       if (ip == machines[i]->ip && (!port || !machines[i]->cluster_port || machines[i]->cluster_port == port))
00168         return machines[i];
00169     return NULL;
00170   }
00171 
00172   //
00173   // Private
00174   //
00175   ClusterConfiguration();
00176   unsigned char hash_table[CLUSTER_HASH_TABLE_SIZE];
00177   ink_hrtime changed;
00178   SLINK(ClusterConfiguration, link);
00179 };
00180 
00181 inline bool
00182 machine_in_vector(ClusterMachine * m, ClusterMachine ** mm, int len)
00183 {
00184   for (int i = 0; i < len; i++)
00185     if (m == mm[i])
00186       return true;
00187   return false;
00188 }
00189 
00190 //
00191 // Returns either a machine or NULL.
00192 // Finds a machine starting at probe_depth going up to
00193 //    CONFIGURATION_HISTORY_PROBE_DEPTH
00194 // which is up, not the current machine and has not yet been probed.
00195 // Updates: probe_depth and past_probes.
00196 //
00197 inkcoreapi ClusterMachine *cluster_machine_at_depth(unsigned int hash, int *probe_depth = NULL,
00198                                                     ClusterMachine ** past_probes = NULL);
00199 
00200 //
00201 // Cluster
00202 //   A cluster of machines which act as a single cache.
00203 //
00204 struct Cluster
00205 {
00206   //
00207   // Public Interface
00208   //
00209 
00210   //
00211   // Cluster Hash Function
00212   //
00213 
00214   // Takes a hash value to a machine.  The hash function has the following
00215   // properties:
00216   //   1 - it divides input domain into the output range evenly (within 1%)
00217   //   2 - it tends to produce the same Machine for the same hash_value's
00218   //       for different configurations
00219   //   3 - it produces the hash same function for a given configuration of
00220   //       machines independent of the order they were added or removed
00221   //       from the cluster.  (it is a pure function of the configuration)
00222   //   Thread-safe
00223   //
00224   ClusterMachine *machine_hash(unsigned int hash_value)
00225   {
00226     return current_configuration()->machine_hash(hash_value);
00227   }
00228 
00229   //
00230   // Cluster Configuration
00231   //
00232 
00233   // Register callback for a cluster configuration change.
00234   // calls cont->handleEvent(EVENT_CLUSTER_CHANGE);
00235   //   Thread-safe
00236   //
00237   void cluster_change_callback(Continuation * cont);
00238 
00239   // Return the current configuration
00240   //   Thread-safe
00241   //
00242   ClusterConfiguration *current_configuration()
00243   {
00244     return configurations.head;
00245   }
00246 
00247   // Return the previous configuration.
00248   // Use from within the cluster_change_callback.
00249   //   Thread-safe
00250   //
00251   ClusterConfiguration *previous_configuration()
00252   {
00253     return configurations.head->link.next;
00254   }
00255 
00256   //
00257   // Private
00258   //
00259   // The configurations are updated only in the thread which is
00260   // accepting cluster connections.
00261   //
00262   SLL<ClusterConfiguration> configurations;
00263 
00264   Cluster();
00265 };
00266 
00267 //
00268 // ClusterVCToken
00269 //   An token passed between nodes to represent a virtualized connection.
00270 //   (see ClusterProcessor::alloc_remote() and attach_remote() below)
00271 //
00272 struct ClusterVCToken
00273 {
00274   //
00275   // Marshal this data to send the token across the cluster
00276   //
00277   uint32_t ip_created;
00278   uint32_t ch_id;
00279   uint32_t sequence_number;
00280 
00281   bool is_clear()
00282   {
00283     return !ip_created;
00284   }
00285   void clear()
00286   {
00287     ip_created = 0;
00288     sequence_number = 0;
00289   }
00290 
00291   ClusterVCToken(unsigned int aip = 0, unsigned int id = 0, unsigned int aseq = 0)
00292 :  ip_created(aip), ch_id(id), sequence_number(aseq) {
00293   }
00294   //
00295   // Private
00296   //
00297   void alloc();
00298 
00299   inline void SwapBytes()
00300   {
00301     ats_swap32(&ch_id);
00302     ats_swap32(&sequence_number);
00303   }
00304 };
00305 
00306 //
00307 // ClusterFunctionPtr
00308 //   A pointer to a procedure which can be invoked accross the cluster.
00309 //   This must be registered.
00310 //
00311 typedef void ClusterFunction(ClusterHandler * ch, void *data, int len);
00312 typedef ClusterFunction *ClusterFunctionPtr;
00313 
00314 struct ClusterVConnectionBase;
00315 
00316 struct ClusterVConnState
00317 {
00318   //
00319   // Private
00320   //
00321   volatile int enabled;
00322   // multiples of XXX_PERIOD, high = less often
00323   int priority;
00324   VIO vio;
00325   void *queue;
00326   int ifd;
00327   Event *delay_timeout;
00328   Link<ClusterVConnectionBase> link;
00329 
00330   // void enqueue(void * q, ClusterVConnection * vc);
00331   ClusterVConnState();
00332 };
00333 
00334 struct ClusterVConnectionBase: public CacheVConnection
00335 {
00336   //
00337   // Initiate an IO operation.
00338   // "data" is unused.
00339   // Only one READ and one WRITE may be active at one time.
00340   //   THREAD-SAFE, may be called when not handling an event from
00341   //                the ClusterVConnectionBase, or the ClusterVConnectionBase
00342   //                creation callback.
00343   //
00344 
00345   virtual VIO *do_io_read(Continuation * c, int64_t nbytes, MIOBuffer * buf);
00346   virtual VIO *do_io_write(Continuation * c, int64_t nbytes, IOBufferReader * buf, bool owner = false);
00347   virtual void do_io_shutdown(ShutdownHowTo_t howto)
00348   {
00349     (void) howto;
00350     ink_assert(!"shutdown of cluster connection");
00351   }
00352   virtual void do_io_close(int lerrno = -1);
00353   virtual VIO* do_io_pread(Continuation*, int64_t, MIOBuffer*, int64_t);
00354 
00355   // Set the timeouts associated with this connection.
00356   // active_timeout is for the total elasped time of the connection.
00357   // inactivity_timeout is the elapsed time *while an operation was
00358   //   enabled* during which the connection was unable to sink/provide data.
00359   // calling these functions repeatedly resets the timeout.
00360   //   NOT THREAD-SAFE, may only be called when handing an event from this
00361   //                    ClusterVConnectionBase, or the ClusterVConnectionBase
00362   //                    creation callback.
00363   //
00364   void set_active_timeout(ink_hrtime timeout_in);
00365   void set_inactivity_timeout(ink_hrtime timeout_in);
00366   void cancel_active_timeout();
00367   void cancel_inactivity_timeout();
00368 
00369   ClusterVConnectionBase();
00370 
00371 #ifdef DEBUG
00372   // Class static data
00373   static int enable_debug_trace;
00374 #endif
00375   Action action_;
00376   EThread *thread;
00377   volatile int closed;
00378   ClusterVConnState read;
00379   ClusterVConnState write;
00380   LINKM(ClusterVConnectionBase, read, link)
00381   LINKM(ClusterVConnectionBase, write, link)
00382   ink_hrtime inactivity_timeout_in;
00383   ink_hrtime active_timeout_in;
00384   Event *inactivity_timeout;
00385   Event *active_timeout;
00386 
00387   virtual void reenable(VIO *);
00388   virtual void reenable_re(VIO *);
00389 };
00390 
00391 inline void
00392 ClusterVConnectionBase::set_active_timeout(ink_hrtime timeout)
00393 {
00394   active_timeout_in = timeout;
00395   if (active_timeout) {
00396     ink_assert(!active_timeout->cancelled);
00397     if (active_timeout->ethread == this_ethread())
00398       active_timeout->schedule_in(timeout);
00399     else {
00400       active_timeout->cancel(this);
00401       active_timeout = thread->schedule_in(this, timeout);
00402     }
00403   } else {
00404     if (thread) {
00405       active_timeout = thread->schedule_in(this, timeout);
00406     }
00407   }
00408 }
00409 
00410 inline void
00411 ClusterVConnectionBase::set_inactivity_timeout(ink_hrtime timeout)
00412 {
00413   inactivity_timeout_in = timeout;
00414   if (inactivity_timeout) {
00415     ink_assert(!inactivity_timeout->cancelled);
00416     if (inactivity_timeout->ethread == this_ethread())
00417       inactivity_timeout->schedule_in(timeout);
00418     else {
00419       inactivity_timeout->cancel(this);
00420       inactivity_timeout = thread->schedule_in(this, timeout);
00421     }
00422   } else {
00423     if (thread) {
00424       inactivity_timeout = thread->schedule_in(this, timeout);
00425     }
00426   }
00427 }
00428 
00429 inline void
00430 ClusterVConnectionBase::cancel_active_timeout()
00431 {
00432   if (active_timeout) {
00433     active_timeout->cancel(this);
00434     active_timeout = NULL;
00435     active_timeout_in = 0;
00436   }
00437 }
00438 
00439 inline void
00440 ClusterVConnectionBase::cancel_inactivity_timeout()
00441 {
00442   if (inactivity_timeout) {
00443     inactivity_timeout->cancel(this);
00444     inactivity_timeout = NULL;
00445     inactivity_timeout_in = 0;
00446   }
00447 }
00448 
00449 // Data debt owed to VC which is deferred due to lock miss
00450 class ByteBankDescriptor
00451 {
00452 public:
00453   ByteBankDescriptor()
00454   {
00455   }
00456   IOBufferBlock *get_block()
00457   {
00458     return block;
00459   }
00460 
00461   static ByteBankDescriptor *ByteBankDescriptor_alloc(IOBufferBlock *);
00462   static void ByteBankDescriptor_free(ByteBankDescriptor *);
00463 
00464 public:
00465   LINK(ByteBankDescriptor, link);
00466 
00467 private:
00468   Ptr<IOBufferBlock> block;  // holder of bank bytes
00469 };
00470 
00471 enum TypeVConnection
00472 {
00473   VC_NULL,
00474   VC_CLUSTER,
00475   VC_CLUSTER_READ,
00476   VC_CLUSTER_WRITE,
00477   VC_CLUSTER_CLOSED
00478 };
00479 
00480 //
00481 // ClusterVConnection
00482 //
00483 struct ClusterVConnection: public ClusterVConnectionBase
00484 {
00485   //
00486   // Public Interface (included from ClusterVConnectionBase)
00487   //
00488   // Thread-safe  (see Net.h for details)
00489   //
00490   // virtual VIO * do_io(
00491   //   int                   op,
00492   //   Continuation        * c = NULL,
00493   //   int                   nbytes = INT64_MAX,
00494   //   MIOBuffer           * buf = 0,
00495   //   int                   whence = SEEK_CUR);
00496   //
00497   // NOT Thread-safe (see Net.h for details)
00498   //
00499   // void set_active_timeout(ink_hrtime timeout_in);
00500   // void set_inactivity_timeout(ink_hrtime timeout_in);
00501 
00502   //
00503   // Private
00504   //
00505 
00506   int startEvent(int event, Event * e);
00507   int mainEvent(int event, Event * e);
00508 
00509   // 0 on success -1 on failure
00510   int start(EThread * t);       // New connect protocol
00511 
00512   ClusterVConnection(int is_new_connect_read = 0);
00513   ~ClusterVConnection();
00514   void free();                  // Destructor actions (we are using ClassAllocator)
00515 
00516   virtual void do_io_close(int lerrno = -1);
00517   virtual VIO *do_io_read(Continuation * c, int64_t nbytes, MIOBuffer * buf);
00518   virtual VIO *do_io_write(Continuation * c, int64_t nbytes, IOBufferReader * buf, bool owner = false);
00519   virtual void reenable(VIO *vio);
00520 
00521   ClusterHandler *ch;
00522   //
00523   //  Read Channel: (new_connect_read == true)
00524   //     - open_local()    caller is reader
00525   //     - connect_local() caller is writer
00526   //
00527   //  Write Channel: (new_connect_read == false)
00528   //     - open_local()    caller is writer
00529   //     - connect_local() caller is reader
00530   //
00531   int new_connect_read;         // Data flow direction wrt origin node
00532   int remote_free;
00533   int last_local_free;
00534   int channel;
00535   ClusterVCToken token;
00536   volatile int close_disabled;
00537   volatile int remote_closed;
00538   volatile int remote_close_disabled;
00539   volatile int remote_lerrno;
00540   volatile uint32_t in_vcs;
00541   volatile uint32_t type;
00542   SLINK(ClusterVConnection, ready_alink);
00543   int was_closed();
00544   void allow_close();
00545   void disable_close();
00546   int was_remote_closed();
00547   void allow_remote_close();
00548   bool schedule_write();
00549   void set_type(int);
00550   ink_hrtime start_time;
00551   ink_hrtime last_activity_time;
00552   Queue<ByteBankDescriptor> byte_bank_q;   // done awaiting completion
00553   int n_set_data_msgs;          // # pending set_data() msgs on VC
00554   int n_recv_set_data_msgs;     // # set_data() msgs received on VC
00555   volatile int pending_remote_fill;     // Remote fill pending on connection
00556   Ptr<IOBufferBlock> read_block;   // Hold current data for open read
00557   bool remote_ram_cache_hit;    // Entire object was from remote ram cache
00558   bool have_all_data;           // All data in read_block
00559   int initial_data_bytes;       // bytes in open_read buffer
00560   Ptr<IOBufferBlock> remote_write_block;   // Write side data for remote fill
00561   void *current_cont;           // Track current continuation (debug)
00562 
00563 #define CLUSTER_IOV_NOT_OPEN               -2
00564 #define CLUSTER_IOV_NONE                   -1
00565   int iov_map;                  // which iov?
00566 
00567   Ptr<ProxyMutex> read_locked;
00568   Ptr<ProxyMutex> write_locked;
00569 
00570   // Data buffer for unmarshaled objects from remote node.
00571   Ptr<IOBufferData> marshal_buf;
00572 
00573   // Pending write data
00574   Ptr<IOBufferBlock> write_list;
00575   IOBufferBlock *write_list_tail;
00576   int write_list_bytes;
00577   int write_bytes_in_transit;
00578 
00579   CacheHTTPInfo alternate;
00580   time_t time_pin;
00581   int disk_io_priority;
00582   void set_remote_fill_action(Action *);
00583 
00584   // Indicates whether a cache hit was from an peering cluster cache
00585   bool is_ram_cache_hit() const { return remote_ram_cache_hit; };
00586   void set_ram_cache_hit(bool remote_hit) { remote_ram_cache_hit = remote_hit; }
00587 
00588   // For VC(s) established via OPEN_READ, we are passed a CacheHTTPInfo
00589   //  in the reply.
00590   virtual bool get_data(int id, void *data);    // backward compatibility
00591   virtual void get_http_info(CacheHTTPInfo **);
00592   virtual int64_t get_object_size();
00593   virtual bool is_pread_capable();
00594 
00595   // For VC(s) established via the HTTP version of OPEN_WRITE, additional
00596   //  data for the VC is passed in a second message.  This additional
00597   //  data has a lifetime equal to the cache VC
00598   virtual void set_http_info(CacheHTTPInfo *);
00599 
00600   virtual bool set_pin_in_cache(time_t time_pin);
00601   virtual time_t get_pin_in_cache();
00602   virtual bool set_disk_io_priority(int priority);
00603   virtual int get_disk_io_priority();
00604   virtual int get_header(void **ptr, int *len);
00605   virtual int set_header(void *ptr, int len);
00606   virtual int get_single_data(void **ptr, int *len);
00607 };
00608 
00609 //
00610 // Cluster operation options
00611 //
00612 #define CLUSTER_OPT_STEAL             0x0001    // allow thread stealing
00613 #define CLUSTER_OPT_IMMEDIATE         0x0002    // require immediate response
00614 #define CLUSTER_OPT_ALLOW_IMMEDIATE   0x0004    // allow immediate response
00615 #define CLUSTER_OPT_DELAY             0x0008    // require delayed response
00616 #define CLUSTER_OPT_CONN_READ         0x0010    // new conn read
00617 #define CLUSTER_OPT_CONN_WRITE        0x0020    // new conn write
00618 #define CLUSTER_OPT_DATA_IS_OCONTROL  0x0040    // data in OutgoingControl
00619 #define CLUSTER_FUNCTION_MALLOCED     -1
00620 
00621 struct ClusterRemoteDataHeader
00622 {
00623   int32_t cluster_function;
00624 };
00625 //
00626 // ClusterProcessor
00627 //
00628 class ClusterAccept;
00629 
00630 struct ClusterProcessor
00631 {
00632   //
00633   // Public Interface
00634   //
00635 
00636   // Invoke a function on a remote node
00637   //   marshal your own data, provide a continuation for timeouts and errors
00638   //
00639   // Options: CLUSTER_OPT_DELAY, CLUSTER_OPT_STEAL, CLUSTER_OPT_DATA_IS_OCONTROL
00640   // Returns: 1 for immediate send, 0 for delayed, -1 for error
00641 
00642   int invoke_remote(ClusterHandler *ch, int cluster_fn_index, void *data, int len, int options = CLUSTER_OPT_STEAL);
00643 
00644   int invoke_remote_data(ClusterHandler *ch, int cluster_fn_index,
00645                          void *data, int data_len,
00646                          IOBufferBlock * buf,
00647                          int logical_channel, ClusterVCToken * token,
00648                          void (*bufdata_free) (void *), void *bufdata_free_arg, int options = CLUSTER_OPT_STEAL);
00649 
00650   // Pass the data in as a malloc'ed block to be freed by callee
00651   int invoke_remote_malloced(ClusterHandler *ch, ClusterRemoteDataHeader * data, int len /* including header */ )
00652   {
00653     return invoke_remote(ch, CLUSTER_FUNCTION_MALLOCED, data, len);
00654   }
00655   void free_remote_data(char *data, int len);
00656 
00657   // Allocate the local side of a remote VConnection.
00658   // returns a token which can be passed to the remote side
00659   // through an existing link and passed to attach_remoteVC()
00660   // if CLUSTER_OPT_IMMEDIATE is set, CLUSTER_DELAYED_OPEN will not be returned
00661   //
00662   // Options: CLUSTER_OPT_IMMEDIATE, CLUSTER_OPT_ALLOW_IMMEDIATE
00663   // Returns: pointer for CLUSTER_OPT_IMMEDIATE
00664   //            or CLUSTER_DELAYED_OPEN on success,
00665   //          NULL on failure
00666   // calls:  cont->handleEvent( CLUSTER_EVENT_OPEN, ClusterVConnection *)
00667   //         on delayed success.
00668   //
00669   // NOTE: the CLUSTER_EVENT_OPEN may be called before "open/connect" returns
00670 
00671 #define CLUSTER_DELAYED_OPEN       ((ClusterVConnection*)-1)
00672 #define CLUSTER_NODE_DOWN          ((ClusterVConnection*)-2)
00673   ClusterVConnection *open_local(Continuation * cont, ClusterMachine * mp, ClusterVCToken & token, int options = 0);
00674 
00675   // Get the other side of a remote VConnection which was previously
00676   // allocated with open.
00677   //
00678   // Options: CLUSTER_OPT_IMMEDIATE, CLUSTER_OPT_ALLOW_IMMEDIATE
00679   // return a pointer or CLUSTER_DELAYED_OPEN success, NULL on failure
00680   //
00681   ClusterVConnection *connect_local(Continuation * cont, ClusterVCToken * token, int channel, int options = 0);
00682   inkcoreapi bool disable_remote_cluster_ops(ClusterMachine *);
00683 
00684   //
00685   // Processor interface
00686   //
00687   virtual int init();
00688   virtual int start();
00689 
00690   ClusterProcessor();
00691   virtual ~ ClusterProcessor();
00692 
00693   //
00694   // Private
00695   //
00696   ClusterAccept *accept_handler;
00697   Cluster *this_cluster;
00698   // Connect to a new cluster machine
00699   void connect(char *hostname, int16_t id = -1);
00700   void connect(unsigned int ip, int port = 0, int16_t id = -1, bool delay = false);
00701   // send the list of known machines to new machine
00702   void send_machine_list(ClusterMachine * m);
00703   void compute_cluster_mode();
00704   // Internal invoke_remote interface
00705   int internal_invoke_remote(ClusterHandler * m, int cluster_fn, void *data, int len, int options, void *cmsg);
00706 };
00707 
00708 inkcoreapi extern ClusterProcessor clusterProcessor;
00709 
00710 inline Cluster *
00711 this_cluster()
00712 {
00713   return clusterProcessor.this_cluster;
00714 }
00715 
00716 //
00717 // Set up a thread to receive events from the ClusterProcessor
00718 // This function should be called for all threads created to
00719 // accept such events by the EventProcesor.
00720 //
00721 void initialize_thread_for_cluster(EThread * thread);
00722 
00723 //
00724 // ClusterFunction Registry
00725 //
00726 //   Declare an instance of this class here to register
00727 //   a function.   In order to allow older versions of software
00728 //   to co-exist with newer versions, always add to the bottom
00729 //   of the list.
00730 //
00731 
00732 extern ClusterFunction *ptest_ClusterFunction;
00733 
00734 extern ClusterFunction test_ClusterFunction;
00735 extern ClusterFunction ping_ClusterFunction;
00736 extern ClusterFunction ping_reply_ClusterFunction;
00737 extern ClusterFunction machine_list_ClusterFunction;
00738 extern ClusterFunction close_channel_ClusterFunction;
00739 extern ClusterFunction get_hostinfo_ClusterFunction;
00740 extern ClusterFunction put_hostinfo_ClusterFunction;
00741 extern ClusterFunction cache_lookup_ClusterFunction;
00742 extern ClusterFunction cache_op_ClusterFunction;
00743 extern ClusterFunction cache_op_malloc_ClusterFunction;
00744 extern ClusterFunction cache_op_result_ClusterFunction;
00745 extern ClusterFunction set_channel_data_ClusterFunction;
00746 extern ClusterFunction post_setchan_send_ClusterFunction;
00747 extern ClusterFunction set_channel_pin_ClusterFunction;
00748 extern ClusterFunction post_setchan_pin_ClusterFunction;
00749 extern ClusterFunction set_channel_priority_ClusterFunction;
00750 extern ClusterFunction post_setchan_priority_ClusterFunction;
00751 extern ClusterFunction default_api_ClusterFunction;
00752 
00753 struct ClusterFunctionDescriptor
00754 {
00755   bool fMalloced;               // the function will free the data
00756   bool ClusterFunc;             // Process incoming message only
00757   //   in ET_CLUSTER thread.
00758   int q_priority;               // lower is higher priority
00759   ClusterFunctionPtr pfn;
00760   ClusterFunctionPtr post_pfn;  // msg queue/send callout
00761 };
00762 
00763 #define CLUSTER_CMSG_QUEUES             2
00764 #define CMSG_MAX_PRI                    0
00765 #define CMSG_LOW_PRI                    (CLUSTER_CMSG_QUEUES-1)
00766 
00767 #ifndef DEFINE_CLUSTER_FUNCTIONS
00768 extern
00769 #endif
00770 ClusterFunctionDescriptor clusterFunction[]
00771 #ifdef DEFINE_CLUSTER_FUNCTIONS
00772   = {
00773   {false, true, CMSG_LOW_PRI, test_ClusterFunction, 0},
00774   {false, true, CMSG_LOW_PRI, ping_ClusterFunction, 0},
00775   {false, true, CMSG_LOW_PRI, ping_reply_ClusterFunction, 0},
00776   {false, true, CMSG_LOW_PRI, machine_list_ClusterFunction, 0},
00777   {false, true, CMSG_LOW_PRI, close_channel_ClusterFunction, 0},
00778   {false, false, CMSG_LOW_PRI, get_hostinfo_ClusterFunction, 0},    // in HostDB.cc
00779   {false, false, CMSG_LOW_PRI, put_hostinfo_ClusterFunction, 0},    // in HostDB.cc
00780   {false, true, CMSG_LOW_PRI, cache_lookup_ClusterFunction, 0},    // in CacheCont.cc
00781   {true, true, CMSG_LOW_PRI, cache_op_malloc_ClusterFunction, 0},
00782   {false, true, CMSG_LOW_PRI, cache_op_ClusterFunction, 0},
00783   {false, false, CMSG_LOW_PRI, cache_op_result_ClusterFunction, 0},
00784   {false, false, CMSG_LOW_PRI, 0, 0},   // OBSOLETE
00785   {false, false, CMSG_LOW_PRI, 0, 0},   // OBSOLETE
00786   {false, false, CMSG_LOW_PRI, 0, 0},   // OBSOLETE
00787   {false, true, CMSG_MAX_PRI, set_channel_data_ClusterFunction, post_setchan_send_ClusterFunction},
00788   {false, true, CMSG_MAX_PRI, set_channel_pin_ClusterFunction, post_setchan_pin_ClusterFunction},
00789   {false, true, CMSG_MAX_PRI, set_channel_priority_ClusterFunction, post_setchan_priority_ClusterFunction},
00790    /********************************************
00791     * RESERVED for future cluster internal use *
00792     ********************************************/
00793   {false, false, CMSG_LOW_PRI, 0, 0},
00794   {false, false, CMSG_LOW_PRI, 0, 0},
00795   {false, false, CMSG_LOW_PRI, 0, 0},
00796   {false, false, CMSG_LOW_PRI, 0, 0},
00797   {false, false, CMSG_LOW_PRI, 0, 0},
00798   {false, false, CMSG_LOW_PRI, 0, 0},
00799   {false, false, CMSG_LOW_PRI, 0, 0},
00800   {false, false, CMSG_LOW_PRI, 0, 0},
00801   {false, false, CMSG_LOW_PRI, 0, 0},
00802   {false, false, CMSG_LOW_PRI, 0, 0},
00803   {false, false, CMSG_LOW_PRI, 0, 0},
00804   {false, false, CMSG_LOW_PRI, 0, 0},
00805   {false, false, CMSG_LOW_PRI, 0, 0},
00806   {false, false, CMSG_LOW_PRI, 0, 0},
00807   {false, false, CMSG_LOW_PRI, 0, 0},
00808   {false, false, CMSG_LOW_PRI, 0, 0},
00809   {false, false, CMSG_LOW_PRI, 0, 0},
00810   {false, false, CMSG_LOW_PRI, 0, 0},
00811   {false, false, CMSG_LOW_PRI, 0, 0},
00812   {false, false, CMSG_LOW_PRI, 0, 0},
00813   {false, false, CMSG_LOW_PRI, 0, 0},
00814   {false, false, CMSG_LOW_PRI, 0, 0},
00815   {false, false, CMSG_LOW_PRI, 0, 0},
00816   {false, false, CMSG_LOW_PRI, 0, 0},
00817   {false, false, CMSG_LOW_PRI, 0, 0},
00818   {false, false, CMSG_LOW_PRI, 0, 0},
00819   {false, false, CMSG_LOW_PRI, 0, 0},
00820   {false, false, CMSG_LOW_PRI, 0, 0},
00821   {false, false, CMSG_LOW_PRI, 0, 0},
00822   {false, false, CMSG_LOW_PRI, 0, 0},
00823   {false, false, CMSG_LOW_PRI, 0, 0},
00824   {false, false, CMSG_LOW_PRI, 0, 0},
00825   {false, false, CMSG_LOW_PRI, 0, 0},
00826   {false, false, CMSG_LOW_PRI, 0, 0},
00827 
00828    /*********************************************
00829     * RESERVED for Cluster RPC API use          *
00830     *********************************************/
00831   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00832   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00833   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00834   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00835   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00836   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00837   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00838   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00839   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00840   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00841   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00842   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00843   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00844   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00845   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00846   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00847   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00848   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00849   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00850   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00851   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00852   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00853   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00854   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00855   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00856   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00857   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00858   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00859   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0},
00860   {true, false, CMSG_LOW_PRI, default_api_ClusterFunction, 0}
00861   // ********** ADD NEW ENTRIES ABOVE THIS LINE ************
00862 }
00863 #endif
00864 
00865 ;
00866 extern unsigned SIZE_clusterFunction;        // clusterFunction[] entries
00867 
00868 //////////////////////////////////////////////////////////////
00869 // Map from Cluster Function code to send queue priority
00870 //////////////////////////////////////////////////////////////
00871 inline int
00872 ClusterFuncToQpri(int cluster_func)
00873 {
00874   if (cluster_func < 0) {
00875     return CMSG_LOW_PRI;
00876   } else {
00877     return clusterFunction[cluster_func].q_priority;
00878   }
00879 }
00880 
00881 //
00882 // This table had better match the above list
00883 //
00884 #define TEST_CLUSTER_FUNCTION                        0
00885 #define PING_CLUSTER_FUNCTION                        1
00886 #define PING_REPLY_CLUSTER_FUNCTION                  2
00887 #define MACHINE_LIST_CLUSTER_FUNCTION                3
00888 #define CLOSE_CHANNEL_CLUSTER_FUNCTION               4
00889 #define GET_HOSTINFO_CLUSTER_FUNCTION                5
00890 #define PUT_HOSTINFO_CLUSTER_FUNCTION                6
00891 #define CACHE_LOOKUP_CLUSTER_FUNCTION                7
00892 #define CACHE_OP_MALLOCED_CLUSTER_FUNCTION           8
00893 #define CACHE_OP_CLUSTER_FUNCTION                    9
00894 #define CACHE_OP_RESULT_CLUSTER_FUNCTION             10
00895 #define SET_CHANNEL_DATA_CLUSTER_FUNCTION            14
00896 #define SET_CHANNEL_PIN_CLUSTER_FUNCTION             15
00897 #define SET_CHANNEL_PRIORITY_CLUSTER_FUNCTION        16
00898 
00899 /********************************************
00900  * RESERVED for future cluster internal use *
00901  ********************************************/
00902 #define INTERNAL_RESERVED1_CLUSTER_FUNCTION          17
00903 #define INTERNAL_RESERVED2_CLUSTER_FUNCTION          18
00904 #define INTERNAL_RESERVED3_CLUSTER_FUNCTION          19
00905 #define INTERNAL_RESERVED4_CLUSTER_FUNCTION          20
00906 #define INTERNAL_RESERVED5_CLUSTER_FUNCTION          21
00907 #define INTERNAL_RESERVED6_CLUSTER_FUNCTION          22
00908 #define INTERNAL_RESERVED7_CLUSTER_FUNCTION          23
00909 #define INTERNAL_RESERVED8_CLUSTER_FUNCTION          24
00910 #define INTERNAL_RESERVED9_CLUSTER_FUNCTION          25
00911 #define INTERNAL_RESERVED10_CLUSTER_FUNCTION         26
00912 #define INTERNAL_RESERVED11_CLUSTER_FUNCTION         27
00913 #define INTERNAL_RESERVED12_CLUSTER_FUNCTION         28
00914 #define INTERNAL_RESERVED13_CLUSTER_FUNCTION         29
00915 #define INTERNAL_RESERVED14_CLUSTER_FUNCTION         30
00916 #define INTERNAL_RESERVED15_CLUSTER_FUNCTION         31
00917 #define INTERNAL_RESERVED16_CLUSTER_FUNCTION         32
00918 #define INTERNAL_RESERVED17_CLUSTER_FUNCTION         33
00919 #define INTERNAL_RESERVED18_CLUSTER_FUNCTION         34
00920 #define INTERNAL_RESERVED19_CLUSTER_FUNCTION         35
00921 #define INTERNAL_RESERVED20_CLUSTER_FUNCTION         36
00922 #define INTERNAL_RESERVED21_CLUSTER_FUNCTION         37
00923 #define INTERNAL_RESERVED22_CLUSTER_FUNCTION         38
00924 #define INTERNAL_RESERVED23_CLUSTER_FUNCTION         39
00925 #define INTERNAL_RESERVED24_CLUSTER_FUNCTION         40
00926 #define INTERNAL_RESERVED25_CLUSTER_FUNCTION         41
00927 #define INTERNAL_RESERVED26_CLUSTER_FUNCTION         42
00928 #define INTERNAL_RESERVED27_CLUSTER_FUNCTION         43
00929 #define INTERNAL_RESERVED28_CLUSTER_FUNCTION         44
00930 #define INTERNAL_RESERVED29_CLUSTER_FUNCTION         45
00931 #define INTERNAL_RESERVED30_CLUSTER_FUNCTION         46
00932 #define INTERNAL_RESERVED31_CLUSTER_FUNCTION         47
00933 #define INTERNAL_RESERVED32_CLUSTER_FUNCTION         48
00934 #define INTERNAL_RESERVED33_CLUSTER_FUNCTION         49
00935 #define INTERNAL_RESERVED34_CLUSTER_FUNCTION         50
00936 
00937 /****************************************************************************
00938  * Cluster RPC API definitions.                                             *
00939  *                                                                          *
00940  ****************************************************************************
00941  * Note: All of the following must be kept in sync with INKClusterRPCKey_t  *
00942  *       definition in ts/ts.h and ts/experimental.h                        *
00943  ****************************************************************************/
00944 
00945 /************************************************
00946  * RESERVED for Wireless Group                  *
00947  ************************************************/
00948 #define API_F01_CLUSTER_FUNCTION                     51
00949 #define API_F02_CLUSTER_FUNCTION                     52
00950 #define API_F03_CLUSTER_FUNCTION                     53
00951 #define API_F04_CLUSTER_FUNCTION                     54
00952 #define API_F05_CLUSTER_FUNCTION                     55
00953 #define API_F06_CLUSTER_FUNCTION                     56
00954 #define API_F07_CLUSTER_FUNCTION                     57
00955 #define API_F08_CLUSTER_FUNCTION                     58
00956 #define API_F09_CLUSTER_FUNCTION                     59
00957 #define API_F10_CLUSTER_FUNCTION                     60
00958 
00959 /************************************************
00960  * RESERVED for future use                      *
00961  ************************************************/
00962 #define API_F11_CLUSTER_FUNCTION                     61
00963 #define API_F12_CLUSTER_FUNCTION                     62
00964 #define API_F13_CLUSTER_FUNCTION                     63
00965 #define API_F14_CLUSTER_FUNCTION                     64
00966 #define API_F15_CLUSTER_FUNCTION                     65
00967 #define API_F16_CLUSTER_FUNCTION                     66
00968 #define API_F17_CLUSTER_FUNCTION                     67
00969 #define API_F18_CLUSTER_FUNCTION                     68
00970 #define API_F19_CLUSTER_FUNCTION                     69
00971 #define API_F20_CLUSTER_FUNCTION                     70
00972 
00973 #define API_F21_CLUSTER_FUNCTION                     71
00974 #define API_F22_CLUSTER_FUNCTION                     72
00975 #define API_F23_CLUSTER_FUNCTION                     73
00976 #define API_F24_CLUSTER_FUNCTION                     74
00977 #define API_F25_CLUSTER_FUNCTION                     75
00978 #define API_F26_CLUSTER_FUNCTION                     76
00979 #define API_F27_CLUSTER_FUNCTION                     77
00980 #define API_F28_CLUSTER_FUNCTION                     78
00981 #define API_F29_CLUSTER_FUNCTION                     79
00982 #define API_F30_CLUSTER_FUNCTION                     80
00983 
00984 #define API_STARECT_CLUSTER_FUNCTION                 API_F01_CLUSTER_FUNCTION
00985 #define API_END_CLUSTER_FUNCTION                     API_F30_CLUSTER_FUNCTION
00986 
00987 #define UNDEFINED_CLUSTER_FUNCTION                   0xFDEFFDEF
00988 
00989 //////////////////////////////////////////////
00990 // Initial cluster connect exchange message
00991 //////////////////////////////////////////////
00992 struct ClusterHelloMessage
00993 {
00994   uint16_t _NativeByteOrder;
00995   uint16_t _major;
00996   uint16_t _minor;
00997   uint16_t _min_major;
00998   uint16_t _min_minor;
00999   int16_t _id;
01000 #ifdef LOCAL_CLUSTER_TEST_MODE
01001   int16_t _port;
01002   char _pad[114];               // pad out to 128 bytes
01003 #else
01004   char _pad[116];               // pad out to 128 bytes
01005 #endif
01006 
01007     ClusterHelloMessage():_NativeByteOrder(1)
01008   {
01009     _major = CLUSTER_MAJOR_VERSION;
01010     _minor = CLUSTER_MINOR_VERSION;
01011     _min_major = MIN_CLUSTER_MAJOR_VERSION;
01012     _min_minor = MIN_CLUSTER_MINOR_VERSION;
01013     memset(_pad, '\0', sizeof(_pad));
01014   }
01015   int NativeByteOrder()
01016   {
01017     return (_NativeByteOrder == 1);
01018   }
01019   void AdjustByteOrder()
01020   {
01021     if (!NativeByteOrder()) {
01022       ats_swap16(&_major);
01023       ats_swap16(&_minor);
01024       ats_swap16(&_min_major);
01025       ats_swap16(&_min_minor);
01026     }
01027   }
01028 };
01029 
01030 ///////////////////////////////////////////////////////////////////
01031 // Cluster message header definition.
01032 ///////////////////////////////////////////////////////////////////
01033 struct ClusterMessageHeader
01034 {
01035   uint16_t _InNativeByteOrder;    // always non-zero
01036   uint16_t _MsgVersion;           // always non-zero
01037 
01038   void _init(uint16_t msg_version)
01039   {
01040     _InNativeByteOrder = 1;
01041     _MsgVersion = msg_version;
01042   }
01043   ClusterMessageHeader():_InNativeByteOrder(0), _MsgVersion(0)
01044   {
01045   }
01046   ClusterMessageHeader(uint16_t msg_version) {
01047     _init(msg_version);
01048   }
01049   int MsgInNativeByteOrder()
01050   {
01051     return (_InNativeByteOrder == 1);
01052   }
01053   int NeedByteSwap()
01054   {
01055     return (_InNativeByteOrder != 1);
01056   }
01057   int GetMsgVersion()
01058   {
01059     if (NeedByteSwap()) {
01060       return ats_swap16(_MsgVersion);
01061     } else {
01062       return _MsgVersion;
01063     }
01064   }
01065 };
01066 
01067 ///////////////////////////////////////////////////////////////////
01068 
01069 //
01070 // cluster_ping
01071 //
01072 typedef void (*PingReturnFunction) (ClusterHandler *, void *data, int len);
01073 
01074 struct PingMessage:public ClusterMessageHeader
01075 {
01076   PingReturnFunction fn;        // Note: Pointer to a func
01077   char data[1];                 // start of data
01078 
01079   enum
01080   {
01081     MIN_VERSION = 1,
01082     MAX_VERSION = 1,
01083     PING_MESSAGE_VERSION = MAX_VERSION
01084   };
01085 
01086     PingMessage(uint16_t vers = PING_MESSAGE_VERSION)
01087 :  ClusterMessageHeader(vers), fn(NULL) {
01088     data[0] = '\0';
01089   }
01090   /////////////////////////////////////////////////////////////////////////////
01091   static int protoToVersion(int protoMajor)
01092   {
01093     (void) protoMajor;
01094     return PING_MESSAGE_VERSION;
01095   }
01096   static int sizeof_fixedlen_msg()
01097   {
01098     PingMessage *p = 0;
01099     // Maybe use offsetof here instead. /leif
01100     return (uintptr_t) (&p->data[0]);
01101   }
01102   void init(uint16_t vers = PING_MESSAGE_VERSION) {
01103     _init(vers);
01104   }
01105   inline void SwapBytes()
01106   {
01107   }                             // No action, message is always reflected back
01108   /////////////////////////////////////////////////////////////////////////////
01109 };
01110 
01111 inline void
01112 cluster_ping(ClusterHandler *ch, PingReturnFunction fn, void *data, int len)
01113 {
01114   PingMessage *msg = (PingMessage *)alloca(PingMessage::sizeof_fixedlen_msg() + len);
01115   msg->init();
01116   msg->fn = fn;
01117   memcpy(msg->data, data, len);
01118   clusterProcessor.invoke_remote(ch, PING_CLUSTER_FUNCTION, (void *) msg, (msg->sizeof_fixedlen_msg() + len));
01119 }
01120 
01121 // filled with 0's
01122 extern char channel_dummy_output[DEFAULT_MAX_BUFFER_SIZE];
01123 
01124 //
01125 // Private (for testing)
01126 //
01127 ClusterConfiguration *configuration_add_machine(ClusterConfiguration * c, ClusterMachine * m);
01128 ClusterConfiguration *configuration_remove_machine(ClusterConfiguration * c, ClusterMachine * m);
01129 extern bool machineClusterHash;
01130 extern bool boundClusterHash;
01131 extern bool randClusterHash;
01132 
01133 void build_cluster_hash_table(ClusterConfiguration *);
01134 
01135 inline void
01136 ClusterVC_enqueue_read(Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_read_link> &q, ClusterVConnectionBase * vc)
01137 {
01138   ClusterVConnState * cs = &vc->read;
01139   ink_assert(!cs->queue);
01140   cs->queue = &q;
01141   q.enqueue(vc);
01142 }
01143 
01144 inline void
01145 ClusterVC_enqueue_write(Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_write_link> &q, ClusterVConnectionBase * vc)
01146 {
01147   ClusterVConnState * cs = &vc->write;
01148   ink_assert(!cs->queue);
01149   cs->queue = &q;
01150   q.enqueue(vc);
01151 }
01152 
01153 inline void
01154 ClusterVC_remove_read(ClusterVConnectionBase * vc)
01155 {
01156   ClusterVConnState * cs = &vc->read;
01157   ink_assert(cs->queue);
01158   ((Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_read_link> *)cs->queue)->remove(vc);
01159   cs->queue = NULL;
01160 }
01161 
01162 inline void
01163 ClusterVC_remove_write(ClusterVConnectionBase * vc)
01164 {
01165   ClusterVConnState * cs = &vc->write;
01166   ink_assert(cs->queue);
01167   ((Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_write_link> *)cs->queue)->remove(vc);
01168   cs->queue = NULL;
01169 }
01170 
01171 
01172 #endif /* _Cluster_h */

Generated by  doxygen 1.7.1