• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

ClusterCache.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 /****************************************************************************
00025 
00026   ClusterCache.cc
00027 ****************************************************************************/
00028 
00029 #include "P_Cluster.h"
00030 
00031 #ifdef DEBUG
00032 #define CLUSTER_TEST_DEBUG      1
00033 #endif
00034 
00035 #ifdef ENABLE_TIME_TRACE
00036 int callback_time_dist[TIME_DIST_BUCKETS_SIZE];
00037 int cache_callbacks = 0;
00038 
00039 int rmt_callback_time_dist[TIME_DIST_BUCKETS_SIZE];
00040 int rmt_cache_callbacks = 0;
00041 
00042 int lkrmt_callback_time_dist[TIME_DIST_BUCKETS_SIZE];
00043 int lkrmt_cache_callbacks = 0;
00044 
00045 int cntlck_acquire_time_dist[TIME_DIST_BUCKETS_SIZE];
00046 int cntlck_acquire_events = 0;
00047 
00048 int open_delay_time_dist[TIME_DIST_BUCKETS_SIZE];
00049 int open_delay_events = 0;
00050 
00051 #endif // ENABLE_TIME_TRACE
00052 
00053 // default will be read from config
00054 int cache_migrate_on_demand = false;
00055 
00056 /////////////////
00057 // Static Data //
00058 /////////////////
00059 static ClassAllocator<CacheContinuation> cacheContAllocator("cacheContAllocator");
00060 
00061 static Queue<CacheContinuation> remoteCacheContQueue[REMOTE_CONNECT_HASH];
00062 static Ptr<ProxyMutex> remoteCacheContQueueMutex[REMOTE_CONNECT_HASH];
00063 
00064 // 0 is an illegal sequence number
00065 #define CACHE_NO_RESPONSE            0
00066 static int cluster_sequence_number = 1;
00067 
00068 #ifdef CLUSTER_TEST_DEBUG
00069 static ink_hrtime cache_cluster_timeout = HRTIME_SECONDS(65536);
00070 #else
00071 static ink_hrtime cache_cluster_timeout = CACHE_CLUSTER_TIMEOUT;
00072 #endif
00073 
00074 ///////////////////
00075 // Declarations  //
00076 ///////////////////
00077 static CacheContinuation *find_cache_continuation(unsigned int, unsigned int);
00078 
00079 static unsigned int new_cache_sequence_number();
00080 
00081 #define DOT_SEPARATED(_x)                             \
00082 ((unsigned char*)&(_x))[0], ((unsigned char*)&(_x))[1],   \
00083   ((unsigned char*)&(_x))[2], ((unsigned char*)&(_x))[3]
00084 
00085 #define ET_CACHE_CONT_SM        ET_NET
00086 #define ALLOW_THREAD_STEAL      true
00087 
00088 /**********************************************************************/
00089 #ifdef CACHE_MSG_TRACE
00090 /**********************************************************************/
00091 
00092 /**********************************************************************/
00093 // Debug trace support for cache RPC messages
00094 /**********************************************************************/
00095 
00096 #define MAX_TENTRIES    4096
00097 struct traceEntry
00098 {
00099   unsigned int seqno;
00100   int op;
00101   char *type;
00102 };
00103 struct traceEntry recvTraceTable[MAX_TENTRIES];
00104 struct traceEntry sndTraceTable[MAX_TENTRIES];
00105 
00106 static recvTraceTable_index = 0;
00107 static sndTraceTable_index = 0;
00108 
00109 void
00110 log_cache_op_msg(unsigned int seqno, int op, char *type)
00111 {
00112   int t = ink_atomic_increment(&recvTraceTable_index, 1);
00113   int n = recvTraceTable_index % MAX_TENTRIES;
00114   recvTraceTable[n].seqno = seqno;
00115   recvTraceTable[n].op = op;
00116   recvTraceTable[n].type = type;
00117 }
00118 
00119 void
00120 log_cache_op_sndmsg(unsigned int seqno, int op, char *type)
00121 {
00122   int t = ink_atomic_increment(&sndTraceTable_index, 1);
00123   int n = sndTraceTable_index % MAX_TENTRIES;
00124   sndTraceTable[n].seqno = seqno;
00125   sndTraceTable[n].op = op;
00126   sndTraceTable[n].type = type;
00127 }
00128 
00129 void
00130 dump_recvtrace_table()
00131 {
00132   int n;
00133   printf("\n");
00134   for (n = 0; n < MAX_TENTRIES; ++n)
00135     printf("[%d] seqno=%d, op=%d type=%s\n", n, recvTraceTable[n].seqno,
00136            recvTraceTable[n].op, recvTraceTable[n].type ? recvTraceTable[n].type : "");
00137 }
00138 
00139 void
00140 dump_sndtrace_table()
00141 {
00142   int n;
00143   printf("\n");
00144   for (n = 0; n < MAX_TENTRIES; ++n)
00145     printf("[%d] seqno=%d, op=%d type=%s\n", n, sndTraceTable[n].seqno,
00146            sndTraceTable[n].op, sndTraceTable[n].type ? sndTraceTable[n].type : "");
00147 }
00148 
00149 /**********************************************************************/
00150 #endif // CACHE_MSG_TRACE
00151 /**********************************************************************/
00152 
00153 ///////////////////////////////////////////////////////////////////////
00154 // Cluster write VC cache.
00155 ///////////////////////////////////////////////////////////////////////
00156 //
00157 // In the event that a remote open read fails (HTTP only), an
00158 // open write is issued and if successful a open write connection
00159 // is returned for the open read.  We cache the open write VC and
00160 // resolve the subsequent open write locally from the write VC cache
00161 // using the INK_MD5 of the URL.
00162 // Note that this is a global per node cache.
00163 ///////////////////////////////////////////////////////////////////////
00164 
00165 class ClusterVConnectionCache
00166 {
00167 public:
00168   ClusterVConnectionCache()
00169   {
00170     memset(hash_event, 0, sizeof(hash_event));
00171   }
00172   void init();
00173   int MD5ToIndex(INK_MD5 * p);
00174   int insert(INK_MD5 *, ClusterVConnection *);
00175   ClusterVConnection *lookup(INK_MD5 *);
00176 
00177 public:
00178   struct Entry
00179   {
00180     LINK(Entry, link);
00181     bool mark_for_delete;
00182     INK_MD5 key;
00183     ClusterVConnection *vc;
00184 
00185       Entry():mark_for_delete(0), vc(0)
00186     {
00187     }
00188      ~Entry()
00189     {
00190     }
00191   };
00192 
00193   enum
00194   { MAX_TABLE_ENTRIES = 256,    // must be power of 2
00195     SCAN_INTERVAL = 10          // seconds
00196   };
00197   Queue<Entry> hash_table[MAX_TABLE_ENTRIES];
00198   Ptr<ProxyMutex> hash_lock[MAX_TABLE_ENTRIES];
00199   Event *hash_event[MAX_TABLE_ENTRIES];
00200 };
00201 
00202 static ClassAllocator <
00203   ClusterVConnectionCache::Entry >
00204 ClusterVCCacheEntryAlloc("ClusterVConnectionCache::Entry");
00205 
00206 ClusterVConnectionCache *GlobalOpenWriteVCcache = 0;
00207 
00208 /////////////////////////////////////////////////////////////////
00209 // Perform periodic purges of ClusterVConnectionCache entries
00210 /////////////////////////////////////////////////////////////////
00211 class ClusterVConnectionCacheEvent:public Continuation
00212 {
00213 public:
00214   ClusterVConnectionCacheEvent(ClusterVConnectionCache * c, int n)
00215   : Continuation(new_ProxyMutex()), cache(c), hash_index(n)
00216   {
00217     SET_HANDLER(&ClusterVConnectionCacheEvent::eventHandler);
00218   }
00219   int eventHandler(int, Event *);
00220 
00221 private:
00222   ClusterVConnectionCache * cache;
00223   int hash_index;
00224 };
00225 
00226 void
00227 ClusterVConnectionCache::init()
00228 {
00229   int n;
00230   ClusterVConnectionCacheEvent *eh;
00231 
00232   for (n = 0; n < MAX_TABLE_ENTRIES; ++n) {
00233     hash_lock[n] = new_ProxyMutex();
00234   }
00235   for (n = 0; n < MAX_TABLE_ENTRIES; ++n) {
00236     // Setup up periodic purge events on each hash list
00237 
00238     eh = new ClusterVConnectionCacheEvent(this, n);
00239     hash_event[n] =
00240       eventProcessor.schedule_in(eh, HRTIME_SECONDS(ClusterVConnectionCache::SCAN_INTERVAL), ET_CACHE_CONT_SM);
00241   }
00242 }
00243 inline int
00244 ClusterVConnectionCache::MD5ToIndex(INK_MD5 * p)
00245 {
00246   uint64_t i = p->fold();
00247   int32_t h, l;
00248 
00249   h = i >> 32;
00250   l = i & 0xFFFFFFFF;
00251   return ((h ^ l) % MAX_TABLE_ENTRIES) & (MAX_TABLE_ENTRIES - 1);
00252 }
00253 
00254 int
00255 ClusterVConnectionCache::insert(INK_MD5 * key, ClusterVConnection * vc)
00256 {
00257   int index = MD5ToIndex(key);
00258   Entry *e;
00259   EThread *thread = this_ethread();
00260   ProxyMutex *mutex = thread->mutex;
00261 
00262   MUTEX_TRY_LOCK(lock, hash_lock[index], thread);
00263   if (!lock) {
00264     CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_INSERT_LOCK_MISSES_STAT);
00265     return 0;                   // lock miss, retry later
00266 
00267   } else {
00268     // Add entry to list
00269 
00270     e = ClusterVCCacheEntryAlloc.alloc();
00271     e->key = *key;
00272     e->vc = vc;
00273     hash_table[index].enqueue(e);
00274     CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_INSERTS_STAT);
00275   }
00276   return 1;                     // Success
00277 }
00278 
00279 ClusterVConnection *
00280 ClusterVConnectionCache::lookup(INK_MD5 * key)
00281 {
00282   int index = MD5ToIndex(key);
00283   Entry *e;
00284   ClusterVConnection *vc = 0;
00285   EThread *thread = this_ethread();
00286   ProxyMutex *mutex = thread->mutex;
00287 
00288   MUTEX_TRY_LOCK(lock, hash_lock[index], thread);
00289   if (!lock) {
00290     CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_LOCK_MISSES_STAT);
00291     return vc;                  // lock miss, retry later
00292 
00293   } else {
00294     e = hash_table[index].head;
00295     while (e) {
00296       if (*key == e->key) {     // Hit
00297         vc = e->vc;
00298         hash_table[index].remove(e);
00299         ClusterVCCacheEntryAlloc.free(e);
00300         CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_HITS_STAT);
00301         return vc;
00302 
00303       } else {
00304         e = e->link.next;
00305       }
00306     }
00307   }
00308   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_MISSES_STAT);
00309   return (ClusterVConnection *) - 1;    // Miss
00310 }
00311 
00312 int
00313 ClusterVConnectionCacheEvent::eventHandler(int /* event ATS_UNUSED */, Event * e)
00314 {
00315   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_SCANS_STAT);
00316   MUTEX_TRY_LOCK(lock, cache->hash_lock[hash_index], this_ethread());
00317   if (!lock) {
00318     CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_SCAN_LOCK_MISSES_STAT);
00319     e->schedule_in(HRTIME_MSECONDS(10));
00320     return EVENT_DONE;
00321   }
00322   // Perform purge action on unreferenced VC(s).
00323 
00324   ClusterVConnectionCache::Entry * entry;
00325   ClusterVConnectionCache::Entry * next_entry;
00326   entry = cache->hash_table[hash_index].head;
00327 
00328   while (entry) {
00329     if (entry->mark_for_delete) {
00330       next_entry = entry->link.next;
00331 
00332       cache->hash_table[hash_index].remove(entry);
00333       entry->vc->allow_remote_close();
00334       entry->vc->do_io(VIO::CLOSE);
00335 
00336       ClusterVCCacheEntryAlloc.free(entry);
00337       entry = next_entry;
00338       CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_PURGES_STAT);
00339 
00340     } else {
00341       entry->mark_for_delete = true;
00342       entry = entry->link.next;
00343     }
00344   }
00345 
00346   // Setup for next purge event
00347 
00348   e->schedule_in(HRTIME_SECONDS(ClusterVConnectionCache::SCAN_INTERVAL), ET_CACHE_CONT_SM);
00349   return EVENT_DONE;
00350 }
00351 
00352 ///////////////////////////////////////////////////////////////////////
00353 
00354 ////////////////////////////////////////////////////
00355 // init()
00356 //   Global initializations for CacheContinuation
00357 ////////////////////////////////////////////////////
00358 int
00359 CacheContinuation::init()
00360 {
00361   int n;
00362   for (n = 0; n < REMOTE_CONNECT_HASH; ++n)
00363     remoteCacheContQueueMutex[n] = new_ProxyMutex();
00364 
00365   GlobalOpenWriteVCcache = new ClusterVConnectionCache;
00366   GlobalOpenWriteVCcache->init();
00367   return 0;
00368 }
00369 
00370 ///////////////////////////////////////////////////////////////////////
00371 // do_op()
00372 //   Main function to do a cluster cache operation
00373 ///////////////////////////////////////////////////////////////////////
00374 Action *
00375 CacheContinuation::do_op(Continuation * c, ClusterMachine * mp, void *args,
00376                          int user_opcode, char *data, int data_len, int nbytes, MIOBuffer * b)
00377 {
00378   CacheContinuation *cc = 0;
00379   Action *act = 0;
00380   char *msg = 0;
00381   ClusterHandler *ch = mp->pop_ClusterHandler();
00382 
00383   /////////////////////////////////////////////////////////////////////
00384   // Unconditionally map open read buffer interfaces to open read.
00385   // open read buffer interfaces are now deprecated.
00386   /////////////////////////////////////////////////////////////////////
00387   int opcode = user_opcode;
00388   switch (opcode) {
00389   case CACHE_OPEN_READ_BUFFER:
00390     opcode = CACHE_OPEN_READ;
00391     break;
00392   case CACHE_OPEN_READ_BUFFER_LONG:
00393     opcode = CACHE_OPEN_READ_LONG;
00394     break;
00395   default:
00396     break;
00397   }
00398 
00399   if (!ch)
00400     goto no_send_exit;
00401 
00402   if (c) {
00403     cc = cacheContAllocator_alloc();
00404     cc->ch = ch;
00405     cc->target_machine = mp;
00406     cc->request_opcode = opcode;
00407     cc->mutex = c->mutex;
00408     cc->action = c;
00409     cc->action.cancelled = false;
00410     cc->start_time = ink_get_hrtime();
00411     cc->from = mp;
00412     cc->result = op_failure(opcode);
00413     SET_CONTINUATION_HANDLER(cc, (CacheContHandler)
00414                              & CacheContinuation::remoteOpEvent);
00415     act = &cc->action;
00416 
00417     // set up sequence number so we can find this continuation
00418 
00419     cc->target_ip = mp->ip;
00420     cc->seq_number = new_cache_sequence_number();
00421 
00422     // establish timeout for cache op
00423 
00424     unsigned int hash = FOLDHASH(cc->target_ip, cc->seq_number);
00425     MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], this_ethread());
00426     if (!queuelock) {
00427 
00428       // failed to acquire lock: no problem, retry later
00429       cc->timeout = eventProcessor.schedule_in(cc, CACHE_RETRY_PERIOD, ET_CACHE_CONT_SM);
00430     } else {
00431       remoteCacheContQueue[hash].enqueue(cc);
00432       MUTEX_RELEASE(queuelock);
00433       cc->timeout = eventProcessor.schedule_in(cc, cache_cluster_timeout, ET_CACHE_CONT_SM);
00434     }
00435   }
00436   //
00437   // Determine the type of the "Over The Wire" (OTW) message header and
00438   //   initialize it.
00439   //
00440   Debug("cache_msg",
00441         "do_op opcode=%d seqno=%d Machine=%p data=%p datalen=%d mio=%p",
00442         opcode, (c ? cc->seq_number : CACHE_NO_RESPONSE), mp, data, data_len, b);
00443 
00444   switch (opcode) {
00445   case CACHE_OPEN_WRITE_BUFFER:
00446   case CACHE_OPEN_WRITE_BUFFER_LONG:
00447     {
00448       ink_release_assert(!"write buffer not supported");
00449       break;
00450     }
00451   case CACHE_OPEN_READ_BUFFER:
00452   case CACHE_OPEN_READ_BUFFER_LONG:
00453     {
00454       ink_release_assert(!"read buffer not supported");
00455       break;
00456     }
00457   case CACHE_OPEN_WRITE:
00458   case CACHE_OPEN_READ:
00459     {
00460       ink_release_assert(c > 0);
00461       //////////////////////
00462       // Use short format //
00463       //////////////////////
00464       if (!data) {
00465         data_len = op_to_sizeof_fixedlen_msg(opcode);
00466         data = (char *) ALLOCA_DOUBLE(data_len);
00467       }
00468       msg = (char *) data;
00469       CacheOpMsg_short *m = (CacheOpMsg_short *) msg;
00470       m->init();
00471       m->opcode = opcode;
00472       m->cfl_flags = ((CacheOpArgs_General *) args)->cfl_flags;
00473       m->md5 = *((CacheOpArgs_General *) args)->url_md5;
00474       cc->url_md5 = m->md5;
00475       m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
00476       m->frag_type = ((CacheOpArgs_General *) args)->frag_type;
00477       if (opcode == CACHE_OPEN_WRITE) {
00478         m->nbytes = nbytes;
00479         m->data = (uint32_t) ((CacheOpArgs_General *) args)->pin_in_cache;
00480       } else {
00481         m->nbytes = 0;
00482         m->data = 0;
00483       }
00484 
00485       if (opcode == CACHE_OPEN_READ) {
00486         //
00487         // Set upper limit on initial data received with response
00488         // for open read response
00489         //
00490         m->buffer_size = DEFAULT_MAX_BUFFER_SIZE;
00491       } else {
00492         m->buffer_size = 0;
00493       }
00494 
00495       //
00496       // Establish the local VC
00497       //
00498       int res = setup_local_vc(msg, data_len, cc, mp, &act);
00499       if (!res) {
00500         /////////////////////////////////////////////////////
00501         // Unable to setup local VC, request aborted.
00502         // Remove request from pending list and deallocate.
00503         /////////////////////////////////////////////////////
00504         cc->remove_and_delete(0, (Event *) 0);
00505         return act;
00506 
00507       } else if (res != -1) {
00508         ///////////////////////////////////////
00509         // VC established, send request
00510         ///////////////////////////////////////
00511         break;
00512 
00513       } else {
00514         //////////////////////////////////////////////////////
00515         // Unable to setup VC, delay required, await callback
00516         //////////////////////////////////////////////////////
00517         goto no_send_exit;
00518       }
00519     }
00520 
00521   case CACHE_OPEN_READ_LONG:
00522   case CACHE_OPEN_WRITE_LONG:
00523     {
00524       ink_release_assert(c > 0);
00525       //////////////////////
00526       // Use long format  //
00527       //////////////////////
00528       msg = data;
00529       CacheOpMsg_long *m = (CacheOpMsg_long *) msg;
00530       m->init();
00531       m->opcode = opcode;
00532       m->cfl_flags = ((CacheOpArgs_General *) args)->cfl_flags;
00533       m->url_md5 = *((CacheOpArgs_General *) args)->url_md5;
00534       cc->url_md5 = m->url_md5;
00535       m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
00536       m->nbytes = nbytes;
00537       m->data = (uint32_t) ((CacheOpArgs_General *) args)->pin_in_cache;
00538       m->frag_type = (uint32_t) ((CacheOpArgs_General *) args)->frag_type;
00539 
00540       if (opcode == CACHE_OPEN_READ_LONG) {
00541         //
00542         // Set upper limit on initial data received with response
00543         // for open read response
00544         //
00545         m->buffer_size = DEFAULT_MAX_BUFFER_SIZE;
00546       } else {
00547         m->buffer_size = 0;
00548       }
00549       //
00550       // Establish the local VC
00551       //
00552       int res = setup_local_vc(msg, data_len, cc, mp, &act);
00553       if (!res) {
00554         /////////////////////////////////////////////////////
00555         // Unable to setup local VC, request aborted.
00556         // Remove request from pending list and deallocate.
00557         /////////////////////////////////////////////////////
00558         cc->remove_and_delete(0, (Event *) 0);
00559         return act;
00560 
00561       } else if (res != -1) {
00562         ///////////////////////////////////////
00563         // VC established, send request
00564         ///////////////////////////////////////
00565         break;
00566 
00567       } else {
00568         //////////////////////////////////////////////////////
00569         // Unable to setup VC, delay required, await callback
00570         //////////////////////////////////////////////////////
00571         goto no_send_exit;
00572       }
00573     }
00574   case CACHE_UPDATE:
00575   case CACHE_REMOVE:
00576   case CACHE_DEREF:
00577     {
00578       //////////////////////
00579       // Use short format //
00580       //////////////////////
00581       msg = data;
00582       CacheOpMsg_short *m = (CacheOpMsg_short *) msg;
00583       m->init();
00584       m->opcode = opcode;
00585       m->frag_type = ((CacheOpArgs_Deref *) args)->frag_type;
00586       m->cfl_flags = ((CacheOpArgs_Deref *) args)->cfl_flags;
00587       if (opcode == CACHE_DEREF)
00588         m->md5 = *((CacheOpArgs_Deref *) args)->md5;
00589       else
00590         m->md5 = *((CacheOpArgs_General *) args)->url_md5;
00591       m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
00592       break;
00593     }
00594   case CACHE_LINK:
00595     {
00596       ////////////////////////
00597       // Use short_2 format //
00598       ////////////////////////
00599       msg = data;
00600       CacheOpMsg_short_2 *m = (CacheOpMsg_short_2 *) msg;
00601       m->init();
00602       m->opcode = opcode;
00603       m->cfl_flags = ((CacheOpArgs_Link *) args)->cfl_flags;
00604       m->md5_1 = *((CacheOpArgs_Link *) args)->from;
00605       m->md5_2 = *((CacheOpArgs_Link *) args)->to;
00606       m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
00607       m->frag_type = ((CacheOpArgs_Link *) args)->frag_type;
00608       break;
00609     }
00610   default:
00611     msg = 0;
00612     break;
00613   }
00614 #ifdef CACHE_MSG_TRACE
00615   log_cache_op_sndmsg((c ? cc->seq_number : CACHE_NO_RESPONSE), 0, "do_op");
00616 #endif
00617   clusterProcessor.invoke_remote(ch,
00618                                  op_needs_marshalled_coi(opcode) ? CACHE_OP_MALLOCED_CLUSTER_FUNCTION
00619                                  : CACHE_OP_CLUSTER_FUNCTION, (char *) msg, data_len);
00620 
00621 no_send_exit:
00622   if (c) {
00623     return act;
00624   } else {
00625     return (Action *) 0;
00626   }
00627 }
00628 
00629 int
00630 CacheContinuation::setup_local_vc(char *data, int data_len, CacheContinuation * cc, ClusterMachine * mp, Action ** act)
00631 {
00632   bool read_op = op_is_read(cc->request_opcode);
00633   bool short_msg = op_is_shortform(cc->request_opcode);
00634 
00635   // Alloc buffer, copy message and attach to continuation
00636   cc->setMsgBufferLen(data_len);
00637   cc->allocMsgBuffer();
00638   memcpy(cc->getMsgBuffer(), data, data_len);
00639 
00640   SET_CONTINUATION_HANDLER(cc, (CacheContHandler)
00641                            & CacheContinuation::localVCsetupEvent);
00642 
00643   if (short_msg) {
00644     Debug("cache_proto", "open_local-s (%s) seqno=%d", (read_op ? "R" : "W"), ((CacheOpMsg_short *) data)->seq_number);
00645   } else {
00646     Debug("cache_proto", "open_local-l (%s) seqno=%d", (read_op ? "R" : "W"), ((CacheOpMsg_long *) data)->seq_number);
00647   }
00648 
00649   // Create local VC
00650   ClusterVConnection *vc;
00651 
00652   if (!read_op && (cc->request_opcode == CACHE_OPEN_WRITE_LONG)) {
00653     // Determine if the open_write has already been established.
00654     vc = cc->lookupOpenWriteVC();
00655 
00656   } else {
00657     vc = clusterProcessor.open_local(cc, mp, cc->open_local_token,
00658                                      (CLUSTER_OPT_ALLOW_IMMEDIATE |
00659                                       (read_op ? CLUSTER_OPT_CONN_READ : CLUSTER_OPT_CONN_WRITE)));
00660   }
00661   if (!vc) {
00662     // Error, abort request
00663     if (short_msg) {
00664       Debug("cache_proto", "0open_local-s (%s) failed, seqno=%d",
00665             (read_op ? "R" : "W"), ((CacheOpMsg_short *) data)->seq_number);
00666     } else {
00667       Debug("cache_proto", "1open_local-l (%s) failed, seqno=%d",
00668             (read_op ? "R" : "W"), ((CacheOpMsg_long *) data)->seq_number);
00669     }
00670     cc->freeMsgBuffer();
00671     if (cc->timeout)
00672       cc->timeout->cancel();
00673     cc->timeout = NULL;
00674 
00675     // Post async failure callback on a different continuation.
00676     *act = callback_failure(&cc->action, (read_op ? CACHE_EVENT_OPEN_READ_FAILED : CACHE_EVENT_OPEN_WRITE_FAILED), 0);
00677     return 0;
00678 
00679   } else if (vc != CLUSTER_DELAYED_OPEN) {
00680     // We have established the VC
00681     if (read_op) {
00682       cc->read_cluster_vc = vc;
00683     } else {
00684       cc->write_cluster_vc = vc;
00685     }
00686     cc->cluster_vc_channel = vc->channel;
00687     vc->current_cont = cc;
00688 
00689     if (short_msg) {
00690       CacheOpMsg_short *ms = (CacheOpMsg_short *) data;
00691       ms->channel = vc->channel;
00692       ms->token = cc->open_local_token;
00693       Debug("cache_proto",
00694             "0open_local-s (%s) success, seqno=%d chan=%d token=%d,%d VC=%p",
00695             (read_op ? "R" : "W"), ms->seq_number, vc->channel, ms->token.ip_created, ms->token.sequence_number, vc);
00696     } else {
00697       CacheOpMsg_long *ml = (CacheOpMsg_long *) data;
00698       ml->channel = vc->channel;
00699       ml->token = cc->open_local_token;
00700       Debug("cache_proto",
00701             "1open_local-l (%s) success, seqno=%d chan=%d token=%d,%d VC=%p",
00702             (read_op ? "R" : "W"), ml->seq_number, vc->channel, ml->token.ip_created, ml->token.sequence_number, vc);
00703     }
00704     cc->freeMsgBuffer();
00705     SET_CONTINUATION_HANDLER(cc, (CacheContHandler)
00706                              & CacheContinuation::remoteOpEvent);
00707     return 1;
00708 
00709   } else {
00710     //////////////////////////////////////////////////////
00711     // Unable to setup VC, delay required, await callback
00712     //////////////////////////////////////////////////////
00713     return -1;
00714   }
00715 }
00716 
00717 ClusterVConnection *
00718 CacheContinuation::lookupOpenWriteVC()
00719 {
00720   ///////////////////////////////////////////////////////////////
00721   // See if we already have an open_write ClusterVConnection
00722   // which was established in a previous remote open_read which
00723   // failed.
00724   ///////////////////////////////////////////////////////////////
00725   ClusterVConnection *vc;
00726   CacheOpMsg_long *ml = (CacheOpMsg_long *) getMsgBuffer();
00727 
00728   vc = GlobalOpenWriteVCcache->lookup(&ml->url_md5);
00729 
00730   if (vc == ((ClusterVConnection *) 0)) {
00731     // Retry lookup
00732     SET_CONTINUATION_HANDLER(this, (CacheContHandler)
00733                              & CacheContinuation::lookupOpenWriteVCEvent);
00734     //
00735     // Note: In the lookupOpenWriteVCEvent handler, we use EVENT_IMMEDIATE
00736     //       to distinguish the lookup retry from a request timeout
00737     //       which uses EVENT_INTERVAL.
00738     //
00739     lookup_open_write_vc_event = eventProcessor.schedule_imm(this, ET_CACHE_CONT_SM);
00740 
00741   } else if (vc != ((ClusterVConnection *) - 1)) {
00742     // Hit, found open_write VC in cache.
00743     // Post open_write completion by simulating a
00744     // remote cache op result message.
00745 
00746     vc->action_ = action;       // establish new continuation
00747 
00748     SET_CONTINUATION_HANDLER(this, (CacheContHandler)
00749                              & CacheContinuation::localVCsetupEvent);
00750     this->handleEvent(CLUSTER_EVENT_OPEN_EXISTS, vc);
00751 
00752     CacheOpReplyMsg msg;
00753     int msglen;
00754 
00755     msglen = CacheOpReplyMsg::sizeof_fixedlen_msg();
00756     msg.result = CACHE_EVENT_OPEN_WRITE;
00757     msg.seq_number = seq_number;
00758     msg.token = vc->token;
00759 
00760     cache_op_result_ClusterFunction(ch, (void *) &msg, msglen);
00761 
00762   } else {
00763     // Miss, establish local VC and send remote open_write request
00764 
00765     SET_CONTINUATION_HANDLER(this, (CacheContHandler)
00766                              & CacheContinuation::localVCsetupEvent);
00767     vc = clusterProcessor.open_local(this, from, open_local_token,
00768                                      (CLUSTER_OPT_ALLOW_IMMEDIATE | CLUSTER_OPT_CONN_WRITE));
00769     if (!vc) {
00770       this->handleEvent(CLUSTER_EVENT_OPEN_FAILED, 0);
00771 
00772     } else if (vc != CLUSTER_DELAYED_OPEN) {
00773       this->handleEvent(CLUSTER_EVENT_OPEN, vc);
00774     }
00775   }
00776   return CLUSTER_DELAYED_OPEN;  // force completion in callback
00777 }
00778 
00779 int
00780 CacheContinuation::lookupOpenWriteVCEvent(int event, Event * e)
00781 {
00782   if (event == EVENT_IMMEDIATE) {
00783     // Retry open_write VC lookup
00784     lookupOpenWriteVC();
00785 
00786   } else {
00787     lookup_open_write_vc_event->cancel();
00788     SET_CONTINUATION_HANDLER(this, (CacheContHandler)
00789                              & CacheContinuation::localVCsetupEvent);
00790     this->handleEvent(event, e);
00791   }
00792   return EVENT_DONE;
00793 }
00794 
00795 int
00796 CacheContinuation::remove_and_delete(int /* event ATS_UNUSED */, Event * e)
00797 {
00798   unsigned int hash = FOLDHASH(target_ip, seq_number);
00799   MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], this_ethread());
00800   if (queuelock) {
00801     if (remoteCacheContQueue[hash].in(this)) {
00802       remoteCacheContQueue[hash].remove(this);
00803     }
00804     MUTEX_RELEASE(queuelock);
00805     if (use_deferred_callback)
00806       callback_failure(&action, result, result_error, this);
00807     else
00808       cacheContAllocator_free(this);
00809 
00810   } else {
00811     SET_HANDLER((CacheContHandler) & CacheContinuation::remove_and_delete);
00812     if (!e) {
00813       timeout = eventProcessor.schedule_in(this, cache_cluster_timeout, ET_CACHE_CONT_SM);
00814     } else {
00815       e->schedule_in(cache_cluster_timeout);
00816     }
00817   }
00818   return EVENT_DONE;
00819 }
00820 
00821 int
00822 CacheContinuation::localVCsetupEvent(int event, ClusterVConnection * vc)
00823 {
00824   ink_assert(magicno == (int) MagicNo);
00825   ink_assert(getMsgBuffer());
00826   bool short_msg = op_is_shortform(request_opcode);
00827   bool read_op = op_is_read(request_opcode);
00828 
00829   if (event == EVENT_INTERVAL) {
00830     Event *e = (Event *) vc;
00831     unsigned int hash = FOLDHASH(target_ip, seq_number);
00832 
00833     MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], e->ethread);
00834     if (!queuelock) {
00835       e->schedule_in(CACHE_RETRY_PERIOD);
00836       return EVENT_CONT;
00837     }
00838 
00839     if (!remoteCacheContQueue[hash].in(this)) {
00840       ////////////////////////////////////////////////////
00841       // Not yet queued on outstanding operations list
00842       ////////////////////////////////////////////////////
00843       remoteCacheContQueue[hash].enqueue(this);
00844       ink_assert(timeout == e);
00845       MUTEX_RELEASE(queuelock);
00846       e->schedule_in(cache_cluster_timeout);
00847       return EVENT_CONT;
00848 
00849     } else {
00850       /////////////////////////////////////////////////////
00851       // Timeout occurred
00852       /////////////////////////////////////////////////////
00853       remoteCacheContQueue[hash].remove(this);
00854       MUTEX_RELEASE(queuelock);
00855       Debug("cluster_timeout", "0cluster op timeout %d", seq_number);
00856       CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_TIMEOUTS_STAT);
00857       timeout = (Event *) 1;    // Note timeout
00858       /////////////////////////////////////////////////////////////////
00859       // Note: Failure callback is sent now, but the deallocation of
00860       //       the CacheContinuation is deferred until we receive the
00861       //       open_local() callback.
00862       /////////////////////////////////////////////////////////////////
00863       if (!action.cancelled)
00864         action.continuation->handleEvent((read_op ? CACHE_EVENT_OPEN_READ_FAILED : CACHE_EVENT_OPEN_WRITE_FAILED), 0);
00865       return EVENT_DONE;
00866     }
00867 
00868   } else if (((event == CLUSTER_EVENT_OPEN) || (event == CLUSTER_EVENT_OPEN_EXISTS))
00869              && (((ptrdiff_t) timeout & (ptrdiff_t) 1) == 0)) {
00870     ink_hrtime now;
00871     now = ink_get_hrtime();
00872     CLUSTER_SUM_DYN_STAT(CLUSTER_OPEN_DELAY_TIME_STAT, now - start_time);
00873     LOG_EVENT_TIME(start_time, open_delay_time_dist, open_delay_events);
00874     if (read_op) {
00875       read_cluster_vc = vc;
00876     } else {
00877       write_cluster_vc = vc;
00878     }
00879     cluster_vc_channel = vc->channel;
00880     vc->current_cont = this;
00881 
00882     if (short_msg) {
00883       CacheOpMsg_short *ms = (CacheOpMsg_short *) getMsgBuffer();
00884       ms->channel = vc->channel;
00885       ms->token = open_local_token;
00886 
00887       Debug("cache_proto",
00888             "2open_local-s (%s) success, seqno=%d chan=%d token=%d,%d VC=%p",
00889             (read_op ? "R" : "W"), ms->seq_number, vc->channel, ms->token.ip_created, ms->token.sequence_number, vc);
00890 
00891     } else {
00892       CacheOpMsg_long *ml = (CacheOpMsg_long *) getMsgBuffer();
00893       ml->channel = vc->channel;
00894       ml->token = open_local_token;
00895 
00896       Debug("cache_proto",
00897             "3open_local-l (%s) success, seqno=%d chan=%d token=%d,%d VC=%p",
00898             (read_op ? "R" : "W"), ml->seq_number, vc->channel, ml->token.ip_created, ml->token.sequence_number, vc);
00899     }
00900     SET_HANDLER((CacheContHandler) & CacheContinuation::remoteOpEvent);
00901 
00902     if (event != CLUSTER_EVENT_OPEN_EXISTS) {
00903       // Send request message
00904       clusterProcessor.invoke_remote(ch,
00905                                      (op_needs_marshalled_coi(request_opcode) ?
00906                                       CACHE_OP_MALLOCED_CLUSTER_FUNCTION :
00907                                       CACHE_OP_CLUSTER_FUNCTION), (char *) getMsgBuffer(), getMsgBufferLen());
00908     }
00909 
00910   } else {
00911     int send_failure_callback = 1;
00912 
00913     if (((ptrdiff_t) timeout & (ptrdiff_t) 1) == 0) {
00914       if (short_msg) {
00915         Debug("cache_proto", "2open_local-s (%s) failed, seqno=%d",
00916               (read_op ? "R" : "W"), ((CacheOpMsg_short *) getMsgBuffer())->seq_number);
00917       } else {
00918         Debug("cache_proto", "3open_local-l (%s) failed, seqno=%d",
00919               (read_op ? "R" : "W"), ((CacheOpMsg_long *) getMsgBuffer())->seq_number);
00920       }
00921 
00922     } else {
00923       Debug("cache_proto", "4open_local cancelled due to timeout, seqno=%d", seq_number);
00924       this->timeout = 0;
00925 
00926       // Deallocate VC if successfully acquired
00927 
00928       if (event == CLUSTER_EVENT_OPEN) {
00929         vc->pending_remote_fill = 0;
00930         vc->remote_closed = 1;  // avoid remote close msg
00931         vc->do_io(VIO::CLOSE);
00932       }
00933       send_failure_callback = 0;        // already sent.
00934     }
00935 
00936     if (this->timeout)
00937       this->timeout->cancel();
00938     this->timeout = NULL;
00939 
00940     freeMsgBuffer();
00941     if (send_failure_callback) {
00942       //
00943       // Action corresponding to "this" already sent back to user,
00944       //   use "this" to establish the failure callback after
00945       //   removing ourselves from the active list.
00946       //
00947       this->use_deferred_callback = true;
00948       this->result = (read_op ? CACHE_EVENT_OPEN_READ_FAILED : CACHE_EVENT_OPEN_WRITE_FAILED);
00949       this->result_error = 0;
00950       remove_and_delete(0, (Event *) 0);
00951 
00952     } else {
00953       cacheContAllocator_free(this);
00954     }
00955     return EVENT_DONE;
00956   }
00957   // Free message
00958   freeMsgBuffer();
00959 
00960   return EVENT_DONE;
00961 }
00962 
00963 ///////////////////////////////////////////////////////////////////////////
00964 // cache_op_ClusterFunction()
00965 //   On the receiving side, handle a general cluster cache operation
00966 ///////////////////////////////////////////////////////////////////////////
00967 
00968 ////////////////////////////////////////////////////////////////////////
00969 // Marshaling functions for OTW message headers
00970 ////////////////////////////////////////////////////////////////////////
00971 
00972 inline CacheOpMsg_long *
00973 unmarshal_CacheOpMsg_long(void *data, int NeedByteSwap)
00974 {
00975   if (NeedByteSwap)
00976     ((CacheOpMsg_long *) data)->SwapBytes();
00977   return (CacheOpMsg_long *) data;
00978 }
00979 
00980 inline CacheOpMsg_short *
00981 unmarshal_CacheOpMsg_short(void *data, int NeedByteSwap)
00982 {
00983   if (NeedByteSwap)
00984     ((CacheOpMsg_short *) data)->SwapBytes();
00985   return (CacheOpMsg_short *) data;
00986 }
00987 
00988 inline CacheOpMsg_short_2 *
00989 unmarshal_CacheOpMsg_short_2(void *data, int NeedByteSwap)
00990 {
00991   if (NeedByteSwap)
00992     ((CacheOpMsg_short_2 *) data)->SwapBytes();
00993   return (CacheOpMsg_short_2 *) data;
00994 }
00995 
00996 // init_from_long() support routine for cache_op_ClusterFunction()
00997 inline void
00998 init_from_long(CacheContinuation * cont, CacheOpMsg_long * msg, ClusterMachine * m)
00999 {
01000   cont->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
01001   cont->seq_number = msg->seq_number;
01002   cont->cfl_flags = msg->cfl_flags;
01003   cont->from = m;
01004   cont->url_md5 = msg->url_md5;
01005   cont->cluster_vc_channel = msg->channel;
01006   cont->frag_type = (CacheFragType) msg->frag_type;
01007   if ((cont->request_opcode == CACHE_OPEN_WRITE_LONG)
01008       || (cont->request_opcode == CACHE_OPEN_READ_LONG)) {
01009     cont->pin_in_cache = (time_t) msg->data;
01010   } else {
01011     cont->pin_in_cache = 0;
01012   }
01013   cont->token = msg->token;
01014   cont->nbytes = (((int) msg->nbytes < 0) ? 0 : msg->nbytes);
01015 
01016   if (cont->request_opcode == CACHE_OPEN_READ_LONG) {
01017     cont->caller_buf_freebytes = msg->buffer_size;
01018   } else {
01019     cont->caller_buf_freebytes = 0;
01020   }
01021 }
01022 
01023 // init_from_short() support routine for cache_op_ClusterFunction()
01024 inline void
01025 init_from_short(CacheContinuation * cont, CacheOpMsg_short * msg, ClusterMachine * m)
01026 {
01027   cont->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
01028   cont->seq_number = msg->seq_number;
01029   cont->cfl_flags = msg->cfl_flags;
01030   cont->from = m;
01031   cont->url_md5 = msg->md5;
01032   cont->cluster_vc_channel = msg->channel;
01033   cont->token = msg->token;
01034   cont->nbytes = (((int) msg->nbytes < 0) ? 0 : msg->nbytes);
01035   cont->frag_type = (CacheFragType) msg->frag_type;
01036 
01037   if (cont->request_opcode == CACHE_OPEN_WRITE) {
01038     cont->pin_in_cache = (time_t) msg->data;
01039   } else {
01040     cont->pin_in_cache = 0;
01041   }
01042 
01043   if (cont->request_opcode == CACHE_OPEN_READ) {
01044     cont->caller_buf_freebytes = msg->buffer_size;
01045   } else {
01046     cont->caller_buf_freebytes = 0;
01047   }
01048 }
01049 
01050 // init_from_short_2() support routine for cache_op_ClusterFunction()
01051 inline void
01052 init_from_short_2(CacheContinuation * cont, CacheOpMsg_short_2 * msg, ClusterMachine * m)
01053 {
01054   cont->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
01055   cont->seq_number = msg->seq_number;
01056   cont->cfl_flags = msg->cfl_flags;
01057   cont->from = m;
01058   cont->url_md5 = msg->md5_1;
01059   cont->frag_type = (CacheFragType) msg->frag_type;
01060 }
01061 
01062 void
01063 cache_op_ClusterFunction(ClusterHandler * ch, void *data, int len)
01064 {
01065   EThread *thread = this_ethread();
01066   ProxyMutex *mutex = thread->mutex;
01067   ////////////////////////////////////////////////////////
01068   // Note: we are running on the ET_CLUSTER thread
01069   ////////////////////////////////////////////////////////
01070   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
01071 
01072   int opcode;
01073   ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
01074 
01075   if (mh->GetMsgVersion() != CacheOpMsg_long::CACHE_OP_LONG_MESSAGE_VERSION) {  ////////////////////////////////////////////////
01076     // Convert from old to current message format
01077     ////////////////////////////////////////////////
01078     ink_release_assert(!"cache_op_ClusterFunction() bad msg version");
01079   }
01080   opcode = ((CacheOpMsg_long *) data)->opcode;
01081 
01082   // If necessary, create a continuation to reflect the response back
01083 
01084   CacheContinuation *c = CacheContinuation::cacheContAllocator_alloc();
01085   c->mutex = new_ProxyMutex();
01086   MUTEX_TRY_LOCK(lock, c->mutex, this_ethread());
01087   c->request_opcode = opcode;
01088   c->token.clear();
01089   c->start_time = ink_get_hrtime();
01090   c->ch = ch;
01091   SET_CONTINUATION_HANDLER(c, (CacheContHandler)
01092                            & CacheContinuation::replyOpEvent);
01093 
01094   switch (opcode) {
01095   case CACHE_OPEN_WRITE_BUFFER:
01096   case CACHE_OPEN_WRITE_BUFFER_LONG:
01097     ink_release_assert(!"cache_op_ClusterFunction WRITE_BUFFER not supported");
01098     break;
01099 
01100   case CACHE_OPEN_READ_BUFFER:
01101   case CACHE_OPEN_READ_BUFFER_LONG:
01102     ink_release_assert(!"cache_op_ClusterFunction READ_BUFFER not supported");
01103     break;
01104 
01105   case CACHE_OPEN_READ:
01106     {
01107       CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
01108       init_from_short(c, msg, ch->machine);
01109       Debug("cache_msg",
01110             "cache_op-s op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
01111       //
01112       // Establish the remote side of the ClusterVConnection
01113       //
01114       c->write_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
01115                                                            &c->token,
01116                                                            c->cluster_vc_channel,
01117                                                            (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_READ));
01118       if (!c->write_cluster_vc) {
01119         // Unable to setup channel, abort processing.
01120         CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
01121         Debug("chan_inuse",
01122               "1Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
01123               c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
01124 
01125         // Send cluster op failed reply
01126         c->replyOpEvent(CACHE_EVENT_OPEN_READ_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
01127         break;
01128 
01129       } else {
01130         c->write_cluster_vc->current_cont = c;
01131       }
01132       ink_release_assert(c->write_cluster_vc != CLUSTER_DELAYED_OPEN);
01133       ink_release_assert((opcode == CACHE_OPEN_READ)
01134                          || c->write_cluster_vc->pending_remote_fill);
01135 
01136       SET_CONTINUATION_HANDLER(c, (CacheContHandler)
01137                                & CacheContinuation::setupVCdataRead);
01138       Debug("cache_proto",
01139             "0read op, seqno=%d chan=%d bufsize=%d token=%d,%d",
01140             msg->seq_number, msg->channel, msg->buffer_size, msg->token.ip_created, msg->token.sequence_number);
01141 #ifdef CACHE_MSG_TRACE
01142       log_cache_op_msg(msg->seq_number, len, "cache_op_open_read");
01143 #endif
01144       CacheKey key(msg->md5);
01145 
01146       char *hostname = NULL;
01147       int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
01148       if (host_len) {
01149         hostname = (char *) msg->moi.byte;
01150       }
01151       Cache *call_cache = caches[c->frag_type];
01152       c->cache_action = call_cache->open_read(c, &key, c->frag_type, hostname, host_len);
01153       break;
01154     }
01155   case CACHE_OPEN_READ_LONG:
01156     {
01157       // Cache needs message data, copy it.
01158       c->setMsgBufferLen(len);
01159       c->allocMsgBuffer();
01160       memcpy(c->getMsgBuffer(), (char *) data, len);
01161 
01162       int flen = CacheOpMsg_long::sizeof_fixedlen_msg();
01163       CacheOpMsg_long *msg = unmarshal_CacheOpMsg_long(c->getMsgBuffer(), mh->NeedByteSwap());
01164       init_from_long(c, msg, ch->machine);
01165       Debug("cache_msg",
01166             "cache_op-l op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
01167 #ifdef CACHE_MSG_TRACE
01168       log_cache_op_msg(msg->seq_number, len, "cache_op_open_read_long");
01169 #endif
01170       //
01171       // Establish the remote side of the ClusterVConnection
01172       //
01173       c->write_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
01174                                                            &c->token,
01175                                                            c->cluster_vc_channel,
01176                                                            (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_READ));
01177       if (!c->write_cluster_vc) {
01178         // Unable to setup channel, abort processing.
01179         CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
01180         Debug("chan_inuse",
01181               "2Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
01182               c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
01183 
01184         // Send cluster op failed reply
01185         c->replyOpEvent(CACHE_EVENT_OPEN_READ_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
01186         break;
01187 
01188       } else {
01189         c->write_cluster_vc->current_cont = c;
01190       }
01191       ink_release_assert(c->write_cluster_vc != CLUSTER_DELAYED_OPEN);
01192       ink_release_assert((opcode == CACHE_OPEN_READ_LONG)
01193                          || c->write_cluster_vc->pending_remote_fill);
01194 
01195       SET_CONTINUATION_HANDLER(c, (CacheContHandler)
01196                                & CacheContinuation::setupReadWriteVC);
01197       Debug("cache_proto",
01198             "1read op, seqno=%d chan=%d bufsize=%d token=%d,%d",
01199             msg->seq_number, msg->channel, msg->buffer_size, msg->token.ip_created, msg->token.sequence_number);
01200 
01201       const char *p = (const char *) msg + flen;
01202       int moi_len = len - flen;
01203       int res;
01204 
01205       ink_assert(moi_len > 0);
01206 
01207       // Unmarshal CacheHTTPHdr
01208       res = c->ic_request.unmarshal((char *) p, moi_len, NULL);
01209       ink_assert(res > 0);
01210       ink_assert(c->ic_request.valid());
01211       c->request_purge = c->ic_request.method_get_wksidx() == HTTP_WKSIDX_PURGE || c->ic_request.method_get_wksidx() == HTTP_WKSIDX_DELETE;
01212       moi_len -= res;
01213       p += res;
01214       ink_assert(moi_len > 0);
01215       // Unmarshal CacheLookupHttpConfig
01216       c->ic_params = new(CacheLookupHttpConfigAllocator.alloc())
01217         CacheLookupHttpConfig();
01218       res = c->ic_params->unmarshal(&c->ic_arena, (const char *) p, moi_len);
01219       ink_assert(res > 0);
01220 
01221       moi_len -= res;
01222       p += res;
01223 
01224       CacheKey key(msg->url_md5);
01225 
01226       char *hostname = NULL;
01227       int host_len = 0;
01228 
01229       if (moi_len) {
01230         hostname = (char *) p;
01231         host_len = moi_len;
01232 
01233         // Save hostname and attach it to the continuation since we may
01234         //  need it if we convert this to an open_write.
01235 
01236         c->ic_hostname = new_IOBufferData(iobuffer_size_to_index(host_len));
01237         c->ic_hostname_len = host_len;
01238 
01239         memcpy(c->ic_hostname->data(), hostname, host_len);
01240       }
01241 
01242       Cache *call_cache = caches[c->frag_type];
01243       Action *a = call_cache->open_read(c, &key, &c->ic_request,
01244                                         c->ic_params,
01245                                         c->frag_type, hostname, host_len);
01246       // Get rid of purify warnings since 'c' can be freed by open_read.
01247       if (a != ACTION_RESULT_DONE) {
01248         c->cache_action = a;
01249       }
01250       break;
01251     }
01252   case CACHE_OPEN_WRITE:
01253     {
01254       CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
01255       init_from_short(c, msg, ch->machine);
01256       Debug("cache_msg",
01257             "cache_op-s op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
01258 #ifdef CACHE_MSG_TRACE
01259       log_cache_op_msg(msg->seq_number, len, "cache_op_open_write");
01260 #endif
01261       //
01262       // Establish the remote side of the ClusterVConnection
01263       //
01264       c->read_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
01265                                                           &c->token,
01266                                                           c->cluster_vc_channel,
01267                                                           (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_WRITE));
01268       if (!c->read_cluster_vc) {
01269         // Unable to setup channel, abort processing.
01270         CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
01271         Debug("chan_inuse",
01272               "3Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
01273               c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
01274 
01275         // Send cluster op failed reply
01276         c->replyOpEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
01277         break;
01278 
01279       } else {
01280         c->read_cluster_vc->current_cont = c;
01281       }
01282       ink_release_assert(c->read_cluster_vc != CLUSTER_DELAYED_OPEN);
01283 
01284       CacheKey key(msg->md5);
01285 
01286       char *hostname = NULL;
01287       int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
01288       if (host_len) {
01289         hostname = (char *) msg->moi.byte;
01290       }
01291 
01292       Cache *call_cache = caches[c->frag_type];
01293       Action *a = call_cache->open_write(c, &key, c->frag_type,
01294                                          !!(c->cfl_flags & CFL_OVERWRITE_ON_WRITE),
01295                                          c->pin_in_cache, hostname, host_len);
01296       if (a != ACTION_RESULT_DONE) {
01297         c->cache_action = a;
01298       }
01299       break;
01300     }
01301   case CACHE_OPEN_WRITE_LONG:
01302     {
01303       // Cache needs message data, copy it.
01304       c->setMsgBufferLen(len);
01305       c->allocMsgBuffer();
01306       memcpy(c->getMsgBuffer(), (char *) data, len);
01307 
01308       int flen = CacheOpMsg_long::sizeof_fixedlen_msg();
01309       CacheOpMsg_long *msg = unmarshal_CacheOpMsg_long(c->getMsgBuffer(), mh->NeedByteSwap());
01310       init_from_long(c, msg, ch->machine);
01311       Debug("cache_msg",
01312             "cache_op-l op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
01313 #ifdef CACHE_MSG_TRACE
01314       log_cache_op_msg(msg->seq_number, len, "cache_op_open_write_long");
01315 #endif
01316       //
01317       // Establish the remote side of the ClusterVConnection
01318       //
01319       c->read_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
01320                                                           &c->token,
01321                                                           c->cluster_vc_channel,
01322                                                           (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_WRITE));
01323       if (!c->read_cluster_vc) {
01324         // Unable to setup channel, abort processing.
01325         CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
01326         Debug("chan_inuse",
01327               "4Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
01328               c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
01329 
01330         // Send cluster op failed reply
01331         c->replyOpEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
01332         break;
01333 
01334       } else {
01335         c->read_cluster_vc->current_cont = c;
01336       }
01337       ink_release_assert(c->read_cluster_vc != CLUSTER_DELAYED_OPEN);
01338 
01339       CacheHTTPInfo *ci = 0;
01340       const char *p = (const char *) msg + flen;
01341       int res = 0;
01342       int moi_len = len - flen;
01343 
01344       if (moi_len && c->cfl_flags & CFL_LOPENWRITE_HAVE_OLDINFO) {
01345 
01346         // Unmarshal old CacheHTTPInfo
01347         res = HTTPInfo::unmarshal((char *) p, moi_len, NULL);
01348         ink_assert(res > 0);
01349         c->ic_old_info.get_handle((char *) p, moi_len);
01350         ink_assert(c->ic_old_info.valid());
01351         ci = &c->ic_old_info;
01352       }
01353       if (c->cfl_flags & CFL_ALLOW_MULTIPLE_WRITES) {
01354         ink_assert(!ci);
01355         ci = (CacheHTTPInfo *) CACHE_ALLOW_MULTIPLE_WRITES;
01356       }
01357       moi_len -= res;
01358       p += res;
01359 
01360       CacheKey key(msg->url_md5);
01361       char *hostname = NULL;
01362 
01363       if (moi_len) {
01364         hostname = (char *) p;
01365       }
01366 
01367       Cache *call_cache = caches[c->frag_type];
01368       Action *a = call_cache->open_write(c, &key, ci, c->pin_in_cache,
01369                                          NULL, c->frag_type, hostname, moi_len);
01370       if (a != ACTION_RESULT_DONE) {
01371         c->cache_action = a;
01372       }
01373       break;
01374     }
01375   case CACHE_REMOVE:
01376     {
01377       CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
01378       init_from_short(c, msg, ch->machine);
01379       Debug("cache_msg",
01380             "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
01381 #ifdef CACHE_MSG_TRACE
01382       log_cache_op_msg(msg->seq_number, len, "cache_op_remove");
01383 #endif
01384       CacheKey key(msg->md5);
01385 
01386       char *hostname = NULL;
01387       int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
01388       if (host_len) {
01389         hostname = (char *) msg->moi.byte;
01390       }
01391 
01392       Cache *call_cache = caches[c->frag_type];
01393       Action *a = call_cache->remove(c, &key, c->frag_type,
01394                                      !!(c->cfl_flags & CFL_REMOVE_USER_AGENTS),
01395                                      !!(c->cfl_flags & CFL_REMOVE_LINK),
01396                                      hostname, host_len);
01397       if (a != ACTION_RESULT_DONE) {
01398         c->cache_action = a;
01399       }
01400       break;
01401     }
01402   case CACHE_LINK:
01403     {
01404       CacheOpMsg_short_2 *msg = unmarshal_CacheOpMsg_short_2(data, mh->NeedByteSwap());
01405       init_from_short_2(c, msg, ch->machine);
01406       Debug("cache_msg",
01407             "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
01408 #ifdef CACHE_MSG_TRACE
01409       log_cache_op_msg(msg->seq_number, len, "cache_op_link");
01410 #endif
01411 
01412       CacheKey key1(msg->md5_1);
01413       CacheKey key2(msg->md5_2);
01414 
01415       char *hostname = NULL;
01416       int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
01417       if (host_len) {
01418         hostname = (char *) msg->moi.byte;
01419       }
01420 
01421       Cache *call_cache = caches[c->frag_type];
01422       Action *a = call_cache->link(c, &key1, &key2, c->frag_type,
01423                                    hostname, host_len);
01424       if (a != ACTION_RESULT_DONE) {
01425         c->cache_action = a;
01426       }
01427       break;
01428     }
01429   case CACHE_DEREF:
01430     {
01431       CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
01432       init_from_short(c, msg, ch->machine);
01433       Debug("cache_msg",
01434             "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
01435 #ifdef CACHE_MSG_TRACE
01436       log_cache_op_msg(msg->seq_number, len, "cache_op_deref");
01437 #endif
01438 
01439       CacheKey key(msg->md5);
01440 
01441       char *hostname = NULL;
01442       int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
01443       if (host_len) {
01444         hostname = (char *) msg->moi.byte;
01445       }
01446 
01447       Cache *call_cache = caches[c->frag_type];
01448       Action *a = call_cache->deref(c, &key, c->frag_type,
01449                                     hostname, host_len);
01450       if (a != ACTION_RESULT_DONE) {
01451         c->cache_action = a;
01452       }
01453       break;
01454     }
01455 
01456   default:
01457     {
01458       ink_release_assert(0);
01459     }
01460   }                             // End of switch
01461 }
01462 
01463 void
01464 cache_op_malloc_ClusterFunction(ClusterHandler *ch, void *data, int len)
01465 {
01466   cache_op_ClusterFunction(ch, data, len);
01467   // We own the message data, free it back to the Cluster subsystem
01468   clusterProcessor.free_remote_data((char *) data, len);
01469 }
01470 
01471 int
01472 CacheContinuation::setupVCdataRead(int event, VConnection * vc)
01473 {
01474   ink_assert(magicno == (int) MagicNo);
01475   //
01476   // Setup the initial data read for the given Cache VC.
01477   // This data is sent back in the response message.
01478   //
01479   if (event == CACHE_EVENT_OPEN_READ) {
01480     //////////////////////////////////////////
01481     // Allocate buffer and initiate read.
01482     //////////////////////////////////////////
01483     Debug("cache_proto", "setupVCdataRead CACHE_EVENT_OPEN_READ seqno=%d", seq_number);
01484     ink_release_assert(caller_buf_freebytes);
01485     SET_HANDLER((CacheContHandler) & CacheContinuation::VCdataRead);
01486 
01487     int64_t size_index = iobuffer_size_to_index(caller_buf_freebytes);
01488     MIOBuffer *buf = new_MIOBuffer(size_index);
01489     readahead_reader = buf->alloc_reader();
01490 
01491     MUTEX_TRY_LOCK(lock, mutex, this_ethread());        // prevent immediate callback
01492     readahead_vio = vc->do_io_read(this, caller_buf_freebytes, buf);
01493     return EVENT_DONE;
01494 
01495   } else {
01496     // Error case, deflect processing to replyOpEvent.
01497     SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
01498     return handleEvent(event, vc);
01499   }
01500 }
01501 
01502 int
01503 CacheContinuation::VCdataRead(int event, VIO * target_vio)
01504 {
01505   ink_release_assert(magicno == (int) MagicNo);
01506   ink_release_assert(readahead_vio == target_vio);
01507 
01508   VConnection *vc = target_vio->vc_server;
01509   int reply = CACHE_EVENT_OPEN_READ;
01510   int32_t object_size;
01511 
01512   switch (event) {
01513   case VC_EVENT_EOS:
01514     {
01515       if (!target_vio->ndone) {
01516         // Doc with zero byte body, handle as read failure
01517         goto read_failed;
01518       }
01519       // Fall through
01520     }
01521   case VC_EVENT_READ_READY:
01522   case VC_EVENT_READ_COMPLETE:
01523     {
01524       int clone_bytes;
01525       int current_ndone = target_vio->ndone;
01526 
01527       ink_assert(current_ndone);
01528       ink_assert(current_ndone <= readahead_reader->read_avail());
01529 
01530       object_size = getObjectSize(vc, request_opcode, &cache_vc_info);
01531       have_all_data = ((object_size <= caller_buf_freebytes) && (object_size == current_ndone));
01532 
01533       // Use no more than the caller's max buffer limit
01534 
01535       clone_bytes = current_ndone;
01536       if (!have_all_data) {
01537         if (current_ndone > caller_buf_freebytes) {
01538           clone_bytes = caller_buf_freebytes;
01539         }
01540       }
01541       // Clone data
01542 
01543       IOBufferBlock *tail;
01544       readahead_data = clone_IOBufferBlockList(readahead_reader->get_current_block(),
01545                                                readahead_reader->start_offset, clone_bytes, &tail);
01546 
01547       if (have_all_data) {
01548         // Close VC, since no more data and also to avoid VC_EVENT_EOS
01549 
01550         MIOBuffer *mbuf = target_vio->buffer.writer();
01551         vc->do_io(VIO::CLOSE);
01552         free_MIOBuffer(mbuf);
01553         readahead_vio = 0;
01554       }
01555       SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
01556       handleEvent(reply, vc);
01557       return EVENT_CONT;
01558     }
01559   case VC_EVENT_ERROR:
01560   case VC_EVENT_INACTIVITY_TIMEOUT:
01561   case VC_EVENT_ACTIVE_TIMEOUT:
01562   default:
01563     {
01564     read_failed:
01565       // Read failed, deflect to replyOpEvent.
01566 
01567       MIOBuffer * mbuf = target_vio->buffer.writer();
01568       vc->do_io(VIO::CLOSE);
01569       free_MIOBuffer(mbuf);
01570       readahead_vio = 0;
01571       reply = CACHE_EVENT_OPEN_READ_FAILED;
01572 
01573       SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
01574       handleEvent(reply, (VConnection *) - ECLUSTER_ORB_DATA_READ);
01575       return EVENT_DONE;
01576     }
01577   }                             // End of switch
01578 }
01579 
01580 int
01581 CacheContinuation::setupReadWriteVC(int event, VConnection * vc)
01582 {
01583   // Only handles OPEN_READ_LONG processing.
01584 
01585   switch (event) {
01586   case CACHE_EVENT_OPEN_READ:
01587     {
01588       // setup readahead
01589 
01590       SET_HANDLER((CacheContHandler) & CacheContinuation::setupVCdataRead);
01591       return handleEvent(event, vc);
01592       break;
01593     }
01594   case CACHE_EVENT_OPEN_READ_FAILED:
01595     {
01596       if (frag_type == CACHE_FRAG_TYPE_HTTP && !request_purge) {
01597         // HTTP open read failed, attempt open write now to avoid an additional
01598         //  message round trip
01599 
01600         CacheKey key(url_md5);
01601 
01602         Cache *call_cache = caches[frag_type];
01603         Action *a = call_cache->open_write(this, &key, 0, pin_in_cache,
01604                                            NULL, frag_type, ic_hostname ? ic_hostname->data() : NULL,
01605                                            ic_hostname_len);
01606         if (a != ACTION_RESULT_DONE) {
01607           cache_action = a;
01608         }
01609       } else {
01610         SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
01611         return handleEvent(CACHE_EVENT_OPEN_READ_FAILED, 0);
01612       }
01613       break;
01614     }
01615   case CACHE_EVENT_OPEN_WRITE:
01616     {
01617       // Convert from read to write connection
01618 
01619       ink_assert(!read_cluster_vc && write_cluster_vc);
01620       read_cluster_vc = write_cluster_vc;
01621       read_cluster_vc->set_type(CLUSTER_OPT_CONN_WRITE);
01622       write_cluster_vc = 0;
01623 
01624       SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
01625       return handleEvent(event, vc);
01626       break;
01627     }
01628   case CACHE_EVENT_OPEN_WRITE_FAILED:
01629   default:
01630     {
01631       SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
01632       return handleEvent(CACHE_EVENT_OPEN_READ_FAILED, 0);
01633       break;
01634     }
01635   }                             // end of switch
01636 
01637   return EVENT_DONE;
01638 }
01639 
01640 /////////////////////////////////////////////////////////////////////////
01641 // replyOpEvent()
01642 //   Reflect the (local) reply back to the (remote) requesting node.
01643 /////////////////////////////////////////////////////////////////////////
01644 int
01645 CacheContinuation::replyOpEvent(int event, VConnection * cvc)
01646 {
01647   ink_assert(magicno == (int) MagicNo);
01648   Debug("cache_proto", "replyOpEvent(this=%p,event=%d,VC=%p)", this, event, cvc);
01649   ink_hrtime now;
01650   now = ink_get_hrtime();
01651   CLUSTER_SUM_DYN_STAT(CLUSTER_CACHE_CALLBACK_TIME_STAT, now - start_time);
01652   LOG_EVENT_TIME(start_time, callback_time_dist, cache_callbacks);
01653   ink_release_assert(expect_cache_callback);
01654   expect_cache_callback = false;        // make sure we are called back exactly once
01655 
01656 
01657   result = event;
01658   bool open = event_is_open(event);
01659   bool read_op = op_is_read(request_opcode);
01660   bool open_read_now_open_write = false;
01661 
01662   // Reply message initializations
01663   CacheOpReplyMsg rmsg;
01664   CacheOpReplyMsg *msg = &rmsg;
01665   msg->result = event;
01666 
01667   if ((request_opcode == CACHE_OPEN_READ_LONG)
01668       && cvc && (event == CACHE_EVENT_OPEN_WRITE)) {
01669     //////////////////////////////////////////////////////////////////////////
01670     // open read failed, but open write succeeded, set result to
01671     // CACHE_EVENT_OPEN_READ_FAILED and make result token non zero to
01672     // signal to the remote node that we have established a write connection.
01673     //////////////////////////////////////////////////////////////////////////
01674     msg->result = CACHE_EVENT_OPEN_READ_FAILED;
01675     open_read_now_open_write = true;
01676   }
01677 
01678   msg->seq_number = seq_number;
01679   int flen = CacheOpReplyMsg::sizeof_fixedlen_msg();    // include token
01680   int len = 0;
01681   int vers = 0;
01682 
01683   int results_expected = 1;
01684 
01685   if (no_reply_message)         // CACHE_NO_RESPONSE request
01686     goto free_exit;
01687 
01688   if (open) {
01689 
01690     // prepare for CACHE_OPEN_EVENT
01691 
01692     results_expected = 2;
01693     cache_vc = cvc;
01694     cache_read = (event == CACHE_EVENT_OPEN_READ);
01695 
01696     if (read_op && !open_read_now_open_write) {
01697       ink_release_assert(write_cluster_vc->pending_remote_fill);
01698       ink_assert(have_all_data || (readahead_vio == &((CacheVC *) cache_vc)->vio));
01699       Debug("cache_proto", "connect_local success seqno=%d have_all_data=%d", seq_number, (have_all_data ? 1 : 0));
01700 
01701       if (have_all_data) {
01702         msg->token.clear();     // Tell sender no conn established
01703         write_cluster_vc->type = VC_CLUSTER_WRITE;
01704       } else {
01705         msg->token = token;     // Tell sender conn established
01706         setupReadBufTunnel(cache_vc, write_cluster_vc);
01707       }
01708 
01709     } else {
01710       Debug("cache_proto", "cache_open [%s] success seqno=%d", (cache_read ? "R" : "W"), seq_number);
01711       msg->token = token;       // Tell sender conn established
01712 
01713       OneWayTunnel *pOWT = OneWayTunnel::OneWayTunnel_alloc();
01714       pOWT->init(read_cluster_vc, cache_vc, NULL, nbytes ? nbytes : DEFAULT_MAX_BUFFER_SIZE, this->mutex);
01715       read_cluster_vc->allow_remote_close();
01716       results_expected--;
01717     }
01718 
01719     // For cache reads, marshal the associated CacheHTTPInfo in the reply
01720     if (cache_read) {
01721       int res;
01722 
01723       msg->is_ram_cache_hit = ((CacheVC *)cache_vc)->is_ram_cache_hit();
01724 
01725       if (!cache_vc_info.valid()) {
01726         (void) getObjectSize(cache_vc, request_opcode, &cache_vc_info);
01727       }
01728       // Determine data length and allocate
01729       len = cache_vc_info.marshal_length();
01730       CacheOpReplyMsg *reply = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen + len);
01731 
01732       // Initialize reply message header
01733       *reply = *msg;
01734 
01735       // Marshal response data into reply message
01736       res = cache_vc_info.marshal((char *) reply + flen, len);
01737       ink_assert(res >= 0 && res <= len);
01738 
01739       // Make reply message the current message
01740       msg = reply;
01741     }
01742 
01743   } else {
01744     Debug("cache_proto", "cache operation failed result=%d seqno=%d (this=%p)", event, seq_number, this);
01745     msg->token.clear();         // Tell sender no conn established
01746 
01747     // Reallocate reply message, allowing for marshalled data
01748     len += sizeof(int32_t);
01749     CacheOpReplyMsg *reply = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen + len);
01750 
01751     // Initialize reply message header
01752     *reply = *msg;
01753 
01754     if (request_opcode != CACHE_LINK) {
01755       //
01756       // open read/write failed, close preallocated VC
01757       //
01758       if (read_cluster_vc) {
01759         read_cluster_vc->remote_closed = 1;     // avoid remote close msg
01760         read_cluster_vc->do_io(VIO::CLOSE);
01761       }
01762       if (write_cluster_vc) {
01763         write_cluster_vc->pending_remote_fill = 0;
01764         write_cluster_vc->remote_closed = 1;    // avoid remote close msg
01765         write_cluster_vc->do_io(VIO::CLOSE);
01766       }
01767       reply->moi.u32 = (int32_t) ((uintptr_t) cvc & 0xffffffff);    // code describing failure
01768     }
01769     // Make reply message the current message
01770     msg = reply;
01771   }
01772   CLUSTER_DECREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
01773 
01774   //
01775   // Send reply message
01776   //
01777 #ifdef CACHE_MSG_TRACE
01778   log_cache_op_sndmsg(msg->seq_number, 0, "replyOpEvent");
01779 #endif
01780   vers = CacheOpReplyMsg::protoToVersion(from->msg_proto_major);
01781   if (vers == CacheOpReplyMsg::CACHE_OP_REPLY_MESSAGE_VERSION) {
01782     if (read_op) {
01783       // Transmit reply message and object data in same cluster message
01784       Debug("cache_proto", "Sending reply/data seqno=%d buflen=%" PRId64,
01785             seq_number, readahead_data ? bytes_IOBufferBlockList(readahead_data, 1) : 0);
01786       clusterProcessor.invoke_remote_data(ch,
01787                                           CACHE_OP_RESULT_CLUSTER_FUNCTION,
01788                                           (void *) msg, (flen + len),
01789                                           readahead_data,
01790                                           cluster_vc_channel, &token,
01791                                           &CacheContinuation::disposeOfDataBuffer, (void *) this, CLUSTER_OPT_STEAL);
01792     } else {
01793       Debug("cache_proto", "Sending reply seqno=%d, (this=%p)", seq_number, this);
01794       clusterProcessor.invoke_remote(ch, CACHE_OP_RESULT_CLUSTER_FUNCTION,
01795                                      (void *) msg, (flen + len), CLUSTER_OPT_STEAL);
01796     }
01797 
01798   } else {
01799     //////////////////////////////////////////////////////////////
01800     // Create the specified down rev version of this message
01801     //////////////////////////////////////////////////////////////
01802     ink_release_assert(!"replyOpEvent() bad msg version");
01803   }
01804 
01805 free_exit:
01806   results_expected--;
01807   if (results_expected <= 0) {
01808     Debug("cache_proto", "replyOpEvent: freeing this=%p", this);
01809     cacheContAllocator_free(this);
01810   }
01811   return EVENT_DONE;
01812 }
01813 
01814 void
01815 CacheContinuation::setupReadBufTunnel(VConnection * cache_read_vc, VConnection * cluster_write_vc)
01816 {
01817   ////////////////////////////////////////////////////////////
01818   // Setup OneWayTunnel and tunnel close event handler.
01819   // Used in readahead processing on open read connections.
01820   ////////////////////////////////////////////////////////////
01821   tunnel_cont = cacheContAllocator_alloc();
01822   tunnel_cont->mutex = this->mutex;
01823   SET_CONTINUATION_HANDLER(tunnel_cont, (CacheContHandler)
01824                            & CacheContinuation::tunnelClosedEvent);
01825   int64_t ravail = bytes_IOBufferBlockList(readahead_data, 1);
01826 
01827   tunnel_mutex = tunnel_cont->mutex;
01828   tunnel_closed = false;
01829 
01830   tunnel = OneWayTunnel::OneWayTunnel_alloc();
01831   readahead_reader->consume(ravail);    // allow for bytes sent in initial reply
01832   tunnel->init(cache_read_vc, cluster_write_vc, tunnel_cont, readahead_vio, readahead_reader);
01833   tunnel_cont->action = this;
01834   tunnel_cont->tunnel = tunnel;
01835   tunnel_cont->tunnel_cont = tunnel_cont;
01836 
01837   // Disable cluster_write_vc
01838   ((ClusterVConnection *) cluster_write_vc)->write.enabled = 0;
01839 
01840   // Disable cache read VC
01841   readahead_vio->nbytes = readahead_vio->ndone;
01842 
01843   /////////////////////////////////////////////////////////////////////
01844   // At this point, the OneWayTunnel is blocked awaiting a reenable
01845   // on both the source and target VCs. Reenable occurs after the
01846   // message containing the initial data and open read reply are sent.
01847   /////////////////////////////////////////////////////////////////////
01848 }
01849 
01850 ///////////////////////////////////////////////////////////////////////
01851 // Tunnnel exited event handler, used for readahead on open read.
01852 ///////////////////////////////////////////////////////////////////////
01853 int
01854 CacheContinuation::tunnelClosedEvent(int /* event ATS_UNUSED */, void *c)
01855 {
01856   ink_assert(magicno == (int) MagicNo);
01857   // Note: We are called with the tunnel_mutex held.
01858   CacheContinuation *tc = (CacheContinuation *) c;
01859   ink_release_assert(tc->tunnel_cont == tc);
01860   CacheContinuation *real_cc = (CacheContinuation *) tc->action.continuation;
01861 
01862   if (real_cc) {
01863     // Notify the real continuation of the tunnel closed event
01864     real_cc->tunnel = 0;
01865     real_cc->tunnel_cont = 0;
01866     real_cc->tunnel_closed = true;
01867   }
01868   OneWayTunnel::OneWayTunnel_free(tc->tunnel);
01869   cacheContAllocator_free(tc);
01870 
01871   return EVENT_DONE;
01872 }
01873 
01874 ////////////////////////////////////////////////////////////
01875 // Retry DisposeOfDataBuffer continuation
01876 ////////////////////////////////////////////////////////////
01877 struct retryDisposeOfDataBuffer;
01878 typedef int (retryDisposeOfDataBuffer::*rtryDisOfDBufHandler) (int, void *);
01879 struct retryDisposeOfDataBuffer:public Continuation
01880 {
01881   CacheContinuation *c;
01882 
01883   int handleRetryEvent(int event, Event * e)
01884   {
01885     if (CacheContinuation::handleDisposeEvent(event, c) == EVENT_DONE) {
01886       delete this;
01887         return EVENT_DONE;
01888     } else
01889     {
01890       e->schedule_in(HRTIME_MSECONDS(10));
01891       return EVENT_CONT;
01892     }
01893   }
01894   retryDisposeOfDataBuffer(CacheContinuation * cont)
01895 :  Continuation(new_ProxyMutex()), c(cont) {
01896     SET_HANDLER((rtryDisOfDBufHandler)
01897                 & retryDisposeOfDataBuffer::handleRetryEvent);
01898   }
01899 };
01900 
01901 //////////////////////////////////////////////////////////////////
01902 // Callback from cluster to dispose of data passed in
01903 // call to invoke_remote_data().
01904 //////////////////////////////////////////////////////////////////
01905 void
01906 CacheContinuation::disposeOfDataBuffer(void *d)
01907 {
01908   ink_assert(d);
01909   CacheContinuation *cc = (CacheContinuation *) d;
01910   ink_assert(cc->have_all_data || cc->readahead_vio);
01911   ink_assert(cc->have_all_data || (cc->readahead_vio == &((CacheVC *) cc->cache_vc)->vio));
01912 
01913   if (cc->have_all_data) {
01914     //
01915     // All object data resides in the buffer, no OneWayTunnel
01916     // started and the Cache VConnection has already been closed.
01917     // Close write_cluster_vc and set remote close to avoid send of
01918     // close message to remote node.
01919     //
01920     cc->write_cluster_vc->pending_remote_fill = 0;
01921     cc->write_cluster_vc->remote_closed = 1;
01922     cc->write_cluster_vc->do_io(VIO::CLOSE);
01923     cc->readahead_data = 0;
01924 
01925     cacheContAllocator_free(cc);
01926 
01927   } else {
01928     cc->write_cluster_vc->pending_remote_fill = 0;
01929     cc->write_cluster_vc->allow_remote_close();
01930     if (handleDisposeEvent(0, cc) == EVENT_CONT) {
01931       // Setup retry continuation.
01932       retryDisposeOfDataBuffer *retryCont = new retryDisposeOfDataBuffer(cc);
01933       eventProcessor.schedule_in(retryCont, HRTIME_MSECONDS(10), ET_CALL);
01934     }
01935   }
01936 }
01937 
01938 int
01939 CacheContinuation::handleDisposeEvent(int /* event ATS_UNUSED */, CacheContinuation * cc)
01940 {
01941   ink_assert(cc->magicno == (int) MagicNo);
01942   MUTEX_TRY_LOCK(lock, cc->tunnel_mutex, this_ethread());
01943   if (lock) {
01944     // Write of initial object data is complete.
01945 
01946     if (!cc->tunnel_closed) {
01947       // Start tunnel by reenabling source and target VCs.
01948 
01949       cc->tunnel->vioSource->nbytes = getObjectSize(cc->tunnel->vioSource->vc_server, cc->request_opcode, 0);
01950       cc->tunnel->vioSource->reenable_re();
01951 
01952       // Tunnel may be closed by vioSource->reenable_re(),
01953       // we should check it again here:
01954       if (!cc->tunnel_closed) {
01955         cc->tunnel->vioTarget->reenable();
01956 
01957         // Tell tunnel event we are gone
01958         cc->tunnel_cont->action.continuation = 0;
01959       }
01960     }
01961     cacheContAllocator_free(cc);
01962     return EVENT_DONE;
01963 
01964   } else {
01965     // Lock acquire failed, retry operation.
01966     return EVENT_CONT;
01967   }
01968 }
01969 
01970 /////////////////////////////////////////////////////////////////////////////
01971 // cache_op_result_ClusterFunction()
01972 //   Invoked on the machine which initiated a remote op, this
01973 //   unmarshals the result and calls a continuation in the requesting thread.
01974 /////////////////////////////////////////////////////////////////////////////
01975 void
01976 cache_op_result_ClusterFunction(ClusterHandler *ch, void *d, int l)
01977 {
01978   ////////////////////////////////////////////////////////
01979   // Note: we are running on the ET_CACHE_CONT_SM thread
01980   ////////////////////////////////////////////////////////
01981 
01982   // Copy reply message data
01983   Ptr<IOBufferData> iob = make_ptr(new_IOBufferData(iobuffer_size_to_index(l)));
01984   memcpy(iob->data(), (char *) d, l);
01985   char *data = iob->data();
01986   int flen, len = l;
01987   CacheHTTPInfo ci;
01988   CacheOpReplyMsg *msg = (CacheOpReplyMsg *) data;
01989   int32_t op_result_error = 0;
01990   ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
01991 
01992   if (mh->GetMsgVersion() != CacheOpReplyMsg::CACHE_OP_REPLY_MESSAGE_VERSION) { ////////////////////////////////////////////////
01993     // Convert from old to current message format
01994     ////////////////////////////////////////////////
01995     ink_release_assert(!"cache_op_result_ClusterFunction() bad msg version");
01996   }
01997 
01998   flen = CacheOpReplyMsg::sizeof_fixedlen_msg();
01999   if (mh->NeedByteSwap())
02000     msg->SwapBytes();
02001 
02002   Debug("cluster_cache", "received cache op result, seqno=%d result=%d", msg->seq_number, msg->result);
02003 
02004   // If applicable, unmarshal any response data
02005   if ((len > flen) && event_reply_may_have_moi(msg->result)) {
02006     switch (msg->result) {
02007     case CACHE_EVENT_OPEN_READ:
02008       {
02009         char *p = (char *) msg + flen;
02010         int res;
02011 
02012         // Unmarshal CacheHTTPInfo
02013         res = HTTPInfo::unmarshal(p, len, NULL);
02014         ci.get_handle(p, len);
02015         ink_assert(res > 0);
02016         ink_assert(ci.valid());
02017         break;
02018       }
02019     case CACHE_EVENT_LINK:
02020     case CACHE_EVENT_LINK_FAILED:
02021       break;
02022     case CACHE_EVENT_OPEN_READ_FAILED:
02023     case CACHE_EVENT_OPEN_WRITE_FAILED:
02024     case CACHE_EVENT_REMOVE_FAILED:
02025     case CACHE_EVENT_UPDATE_FAILED:
02026     case CACHE_EVENT_DEREF_FAILED:
02027       {
02028         // Unmarshal the error code
02029         ink_assert(((len - flen) == sizeof(int32_t)));
02030         op_result_error = msg->moi.u32;
02031         if (mh->NeedByteSwap())
02032           ats_swap32((uint32_t *) & op_result_error);
02033         op_result_error = -op_result_error;
02034         break;
02035       }
02036     default:
02037       {
02038         ink_release_assert(!"invalid moi data for received msg");
02039         break;
02040       }
02041     }                           // end of switch
02042   }
02043   // See if this response is still expected (expected case == yes)
02044 
02045   unsigned int hash = FOLDHASH(ch->machine->ip, msg->seq_number);
02046   EThread *thread = this_ethread();
02047   ProxyMutex *mutex = thread->mutex;
02048   if (MUTEX_TAKE_TRY_LOCK(remoteCacheContQueueMutex[hash], thread)) {
02049 
02050     // Find it in pending list
02051 
02052     CacheContinuation *c = find_cache_continuation(msg->seq_number,
02053                                                    ch->machine->ip);
02054     if (!c) {
02055       // Reply took to long, response no longer expected.
02056       MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], thread);
02057       Debug("cluster_timeout", "0cache reply timeout: %d", msg->seq_number);
02058       CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_REPLY_TIMEOUTS_STAT);
02059       if (ci.valid())
02060         ci.destroy();
02061       return;
02062     }
02063 
02064     // Update remote ram cache hit flag
02065     if (msg->result == CACHE_EVENT_OPEN_READ)
02066       c->read_cluster_vc->set_ram_cache_hit(msg->is_ram_cache_hit);
02067 
02068     // Try to send the message
02069 
02070     MUTEX_TRY_LOCK(lock, c->mutex, thread);
02071 
02072     // Failed to acquire lock, defer
02073 
02074     if (!lock) {
02075       MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], thread);
02076       goto Lretry;
02077     }
02078     c->result_error = op_result_error;
02079 
02080     // send message, release lock
02081 
02082     c->freeMsgBuffer();
02083     if (ci.valid()) {
02084       // Unmarshaled CacheHTTPInfo contained in reply message, copy it.
02085       c->setMsgBufferLen(len, iob);
02086       c->ic_new_info = ci;
02087     }
02088     msg->seq_number = len;      // HACK ALERT: reusing variable
02089     c->handleEvent(CACHE_EVENT_RESPONSE_MSG, data);
02090 
02091   } else {
02092 
02093     // Failed to wake it up, defer by creating a timed continuation
02094 
02095   Lretry:
02096     CacheContinuation * c = CacheContinuation::cacheContAllocator_alloc();
02097     c->mutex = new_ProxyMutex();
02098     c->seq_number = msg->seq_number;
02099     c->target_ip = ch->machine->ip;
02100     SET_CONTINUATION_HANDLER(c, (CacheContHandler)
02101                              & CacheContinuation::handleReplyEvent);
02102     c->start_time = ink_get_hrtime();
02103     c->result = msg->result;
02104     if (event_is_open(msg->result))
02105       c->token = msg->token;
02106     if (ci.valid()) {
02107       // Unmarshaled CacheHTTPInfo contained in reply message, copy it.
02108       c->setMsgBufferLen(len, iob);
02109       c->ic_new_info = ci;
02110     }
02111     c->result_error = op_result_error;
02112     eventProcessor.schedule_in(c, CACHE_RETRY_PERIOD, ET_CACHE_CONT_SM);
02113   }
02114 }
02115 
02116 ////////////////////////////////////////////////////////////////////////
02117 // handleReplyEvent()
02118 //   If we cannot acquire any of the locks to handle the response
02119 //   inline, it is defered and later handled by this function.
02120 ////////////////////////////////////////////////////////////////////////
02121 int
02122 CacheContinuation::handleReplyEvent(int event, Event * e)
02123 {
02124   (void) event;
02125 
02126   // take lock on outstanding message queue
02127 
02128   EThread *t = e->ethread;
02129   unsigned int hash = FOLDHASH(target_ip, seq_number);
02130 
02131   if (!MUTEX_TAKE_TRY_LOCK(remoteCacheContQueueMutex[hash], t)) {
02132     e->schedule_in(CACHE_RETRY_PERIOD);
02133     return EVENT_CONT;
02134   }
02135 
02136   LOG_EVENT_TIME(start_time, cntlck_acquire_time_dist, cntlck_acquire_events);
02137 
02138   // See if this response is still expected
02139 
02140   CacheContinuation *c = find_cache_continuation(seq_number, target_ip);
02141   if (c) {
02142 
02143     // Acquire the lock to the continuation mutex
02144 
02145     MUTEX_TRY_LOCK(lock, c->mutex, e->ethread);
02146     if (!lock) {
02147 
02148       // If we fail to acquire the lock, reschedule
02149 
02150       MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], t);
02151       e->schedule_in(CACHE_RETRY_PERIOD);
02152       return EVENT_CONT;
02153     }
02154 
02155     // If unmarshalled CacheHTTPInfo exists, pass it along
02156 
02157     if (ic_new_info.valid()) {
02158       c->freeMsgBuffer();
02159       c->setMsgBufferLen(getMsgBufferLen(), getMsgBufferIOBData());
02160       c->ic_new_info = ic_new_info;
02161       ic_new_info.clear();
02162     }
02163     // send message, release lock
02164 
02165     c->handleEvent(CACHE_EVENT_RESPONSE, this);
02166 
02167   } else {
02168     MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], t);
02169     Debug("cluster_timeout", "cache reply timeout: %d", seq_number);
02170     CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_REPLY_TIMEOUTS_STAT);
02171   }
02172 
02173   // Free this continuation
02174 
02175   cacheContAllocator_free(this);
02176   return EVENT_DONE;
02177 }
02178 
02179 //////////////////////////////////////////////////////////////////////////
02180 // remoteOpEvent()
02181 //   On the requesting node, handle the timeout and response to the user.
02182 //   There may be two CacheContinuations involved:
02183 //    1) One waiting to respond to the user.
02184 //       This case is CACHE_EVENT_RESPONSE_MSG which is handled
02185 //       inline (without delay).
02186 //    2) One which is carrying the response from the remote machine which
02187 //       has been delayed for a lock.  This case is CACHE_EVENT_RESPONSE.
02188 //////////////////////////////////////////////////////////////////////////
02189 int
02190 CacheContinuation::remoteOpEvent(int event_code, Event * e)
02191 {
02192   ink_assert(magicno == (int) MagicNo);
02193   int event = event_code;
02194   ink_hrtime now;
02195   if (start_time) {
02196     int res;
02197     if (event != EVENT_INTERVAL) {
02198       if (event == CACHE_EVENT_RESPONSE) {
02199         CacheContinuation *ccont = (CacheContinuation *) e;
02200         res = ccont->result;
02201       } else {
02202         CacheOpReplyMsg *rmsg = (CacheOpReplyMsg *) e;
02203         res = rmsg->result;
02204       }
02205       if ((res == CACHE_EVENT_LOOKUP) || (res == CACHE_EVENT_LOOKUP_FAILED)) {
02206         now = ink_get_hrtime();
02207         CLUSTER_SUM_DYN_STAT(CLUSTER_CACHE_LKRMT_CALLBACK_TIME_STAT, now - start_time);
02208         LOG_EVENT_TIME(start_time, lkrmt_callback_time_dist, lkrmt_cache_callbacks);
02209       } else {
02210         now = ink_get_hrtime();
02211         CLUSTER_SUM_DYN_STAT(CLUSTER_CACHE_RMT_CALLBACK_TIME_STAT, now - start_time);
02212         LOG_EVENT_TIME(start_time, rmt_callback_time_dist, rmt_cache_callbacks);
02213       }
02214     }
02215     start_time = 0;
02216   }
02217   // for CACHE_EVENT_RESPONSE/XXX the lock was acquired at the higher level
02218   intptr_t return_error = 0;
02219   ClusterVCToken *pToken = NULL;
02220 
02221 retry:
02222 
02223   switch (event) {
02224   default:
02225     ink_assert(!"bad case");
02226     return EVENT_DONE;
02227 
02228   case EVENT_INTERVAL:{
02229 
02230       unsigned int hash = FOLDHASH(target_ip, seq_number);
02231 
02232       MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], e->ethread);
02233       if (!queuelock) {
02234         e->schedule_in(CACHE_RETRY_PERIOD);
02235         return EVENT_CONT;
02236       }
02237       // we are not yet enqueued on the list of outstanding operations
02238 
02239       if (!remoteCacheContQueue[hash].in(this)) {
02240         remoteCacheContQueue[hash].enqueue(this);
02241         ink_assert(timeout == e);
02242         MUTEX_RELEASE(queuelock);
02243         e->schedule_in(cache_cluster_timeout);
02244         return EVENT_CONT;
02245       }
02246       // a timeout has occurred
02247 
02248       if (find_cache_continuation(seq_number, target_ip)) {
02249         // Valid timeout
02250         MUTEX_RELEASE(queuelock);
02251 
02252         Debug("cluster_timeout", "cluster op timeout %d", seq_number);
02253         CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_TIMEOUTS_STAT);
02254         request_timeout = true;
02255         timeout = 0;
02256         //
02257         // Post error completion now and defer deallocation of
02258         // the continuation until we receive the reply or the
02259         // target node goes down.
02260         //
02261         if (!action.cancelled)
02262           action.continuation->handleEvent(result, (void *) -ECLUSTER_OP_TIMEOUT);
02263         action.cancelled = 1;
02264 
02265         if (target_machine->dead) {
02266           event = CACHE_EVENT_RESPONSE_MSG;
02267           goto retry;
02268         } else {
02269           timeout = e;
02270           e->schedule_in(cache_cluster_timeout);
02271           return EVENT_DONE;
02272         }
02273 
02274       } else {
02275         // timeout not expected for continuation; log and ignore
02276         MUTEX_RELEASE(queuelock);
02277         Debug("cluster_timeout", "unknown cluster op timeout %d", seq_number);
02278         Note("Unexpected CacheCont timeout, [%u.%u.%u.%u] seqno=%d", DOT_SEPARATED(target_ip), seq_number);
02279         CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_TIMEOUTS_STAT);
02280         return EVENT_DONE;
02281       }
02282     }
02283 
02284   case CACHE_EVENT_RESPONSE:
02285   case CACHE_EVENT_RESPONSE_MSG:{
02286 
02287       // the response has arrived, cancel timeout
02288 
02289       if (timeout) {
02290         timeout->cancel();
02291         timeout = 0;
02292       }
02293       // remove from the pending queue
02294       unsigned int hash = FOLDHASH(target_ip, seq_number);
02295 
02296       remoteCacheContQueue[hash].remove(this);
02297       MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], this_ethread());
02298       // Fall through
02299     }
02300 
02301   case CACHE_EVENT_RESPONSE_RETRY:{
02302 
02303       // determine result code
02304 
02305       CacheContinuation *c = (CacheContinuation *) e;
02306       CacheOpReplyMsg *msg = (CacheOpReplyMsg *) e;
02307       if (event == CACHE_EVENT_RESPONSE_MSG) {
02308         result = (request_timeout ? result : msg->result);
02309         pToken = (request_timeout ? &token : &msg->token);
02310       } else if (event == CACHE_EVENT_RESPONSE) {
02311         result = (request_timeout ? result : c->result);
02312         pToken = &c->token;
02313       } else if (event == CACHE_EVENT_RESPONSE_RETRY) {
02314         pToken = &token;
02315       } else {
02316         ink_release_assert(!"remoteOpEvent bad event code");
02317       }
02318 
02319       // handle response
02320 
02321       if (result == CACHE_EVENT_LOOKUP) {
02322         callback_user(result, 0);
02323         return EVENT_DONE;
02324 
02325       } else if (event_is_open(result)) {
02326         bool read_op = ((request_opcode == CACHE_OPEN_READ)
02327                         || (request_opcode == CACHE_OPEN_READ_LONG));
02328         if (read_op) {
02329           ink_release_assert(read_cluster_vc->pending_remote_fill > 1);
02330           read_cluster_vc->pending_remote_fill = 0;
02331 
02332           have_all_data = pToken->is_clear();   // no conn implies all data
02333           if (have_all_data) {
02334             read_cluster_vc->have_all_data = 1;
02335           } else {
02336             read_cluster_vc->have_all_data = 0;
02337           }
02338           // Move CacheHTTPInfo reply data into VC
02339           read_cluster_vc->marshal_buf = this->getMsgBufferIOBData();
02340           read_cluster_vc->alternate = this->ic_new_info;
02341           this->ic_new_info.clear();
02342           ink_release_assert(read_cluster_vc->alternate.object_size_get());
02343 
02344           if (!action.cancelled) {
02345             ClusterVConnection *target_vc = read_cluster_vc;
02346             callback_user(result, target_vc);   // "this" is deallocated
02347             target_vc->allow_remote_close();
02348           } else {
02349             read_cluster_vc->allow_remote_close();
02350             read_cluster_vc->do_io(VIO::ABORT);
02351             cacheContAllocator_free(this);
02352           }
02353 
02354         } else {
02355           ink_assert(result == CACHE_EVENT_OPEN_WRITE);
02356           ink_assert(!pToken->is_clear());
02357 
02358           ClusterVConnection *result_vc = write_cluster_vc;
02359           if (!action.cancelled) {
02360             callback_user(result, result_vc);
02361             result_vc->allow_remote_close();
02362           } else {
02363             result_vc->allow_remote_close();
02364             result_vc->do_io(VIO::ABORT);
02365             cacheContAllocator_free(this);
02366           }
02367         }
02368         return EVENT_DONE;
02369       }
02370       break;
02371     }                           // End of case
02372   }                             // End of switch
02373 
02374   // Handle failure cases
02375 
02376   if (result == CACHE_EVENT_LOOKUP_FAILED) {
02377 
02378 
02379     // check for local probes
02380 
02381     ClusterMachine *m = cluster_machine_at_depth(cache_hash(url_md5));
02382 
02383     // if the current configuration indicates that this
02384     // machine is the master (or the owner machine has failed), go to
02385     // the local machine.  Also if PROBE_LOCAL_CACHE_LAST.
02386     //
02387     int len = getMsgBufferLen();
02388     char *hostname = (len ? getMsgBuffer() : 0);
02389 
02390     if (!m || PROBE_LOCAL_CACHE_LAST) {
02391       SET_HANDLER((CacheContHandler) & CacheContinuation::probeLookupEvent);
02392       CacheKey key(url_md5);
02393 
02394       Cache *call_cache = caches[frag_type];
02395       call_cache->lookup(this, &key, frag_type, hostname, len);
02396       return EVENT_DONE;
02397     }
02398     if (PROBE_LOCAL_CACHE_FIRST) {
02399       callback_user(CACHE_EVENT_LOOKUP_FAILED, 0);
02400     } else {
02401       SET_HANDLER((CacheContHandler) & CacheContinuation::probeLookupEvent);
02402       CacheKey key(url_md5);
02403 
02404       Cache *call_cache = caches[frag_type];
02405       call_cache->lookup(this, &key, frag_type, hostname, len);
02406     }
02407     return EVENT_DONE;
02408 
02409   } else {
02410     // Handle failure of all ops except for lookup
02411 
02412     ClusterVConnection *cacheable_vc = 0;
02413     if ((request_opcode == CACHE_OPEN_READ_LONG) && !pToken->is_clear()) {
02414       ink_assert(read_cluster_vc && !write_cluster_vc);
02415       //
02416       // OPEN_READ_LONG has failed, but the remote node was able to
02417       // establish an OPEN_WRITE_LONG connection.
02418       // Convert the cluster read VC to a write VC and insert it
02419       // into the global write VC cache.  This will allow us to
02420       // locally resolve the subsequent OPEN_WRITE_LONG request.
02421       //
02422 
02423       // Note: We do not allow remote close on this VC while
02424       //       it resides in cache
02425       //
02426       read_cluster_vc->set_type(CLUSTER_OPT_CONN_WRITE);
02427       // FIX ME. ajitb 12/21/99
02428       // Compiler bug in CC: WorkShop Compilers 5.0 98/12/15 C++ 5.0
02429       // Does not accept assignment of ((Continuation *) NULL)
02430       {
02431         Continuation *temp = NULL;
02432         read_cluster_vc->action_ = temp;
02433       }
02434       if (!GlobalOpenWriteVCcache->insert(&url_md5, read_cluster_vc)) {
02435         // Unable to insert VC into cache, try later
02436         cacheable_vc = read_cluster_vc;
02437       }
02438       read_cluster_vc = 0;
02439     }
02440     if (read_cluster_vc) {
02441       read_cluster_vc->remote_closed = 0;       // send remote close
02442       read_cluster_vc->allow_remote_close();
02443       read_cluster_vc->do_io(VIO::ABORT);
02444       read_cluster_vc = 0;
02445     }
02446     if (write_cluster_vc) {
02447       write_cluster_vc->remote_closed = 0;      // send remote close
02448       write_cluster_vc->allow_remote_close();
02449       write_cluster_vc->do_io(VIO::ABORT);
02450       write_cluster_vc = 0;
02451     }
02452     if (!request_timeout) {
02453       if (!return_error) {
02454         return_error = result_error;
02455       }
02456       if (cacheable_vc) {
02457         insert_cache_callback_user(cacheable_vc, result, (void *) return_error);
02458       } else {
02459         callback_user(result, (void *) return_error);
02460       }
02461     } else {
02462       // callback already made at timeout, just free continuation
02463       if (cacheable_vc) {
02464         cacheable_vc->allow_remote_close();
02465         cacheable_vc->do_io(VIO::CLOSE);
02466         cacheable_vc = 0;
02467       }
02468       cacheContAllocator_free(this);
02469     }
02470     return EVENT_DONE;
02471   }
02472 }
02473 
02474 //////////////////////////////////////////////////////////////////////////
02475 // probeLookupEvent()
02476 //   After a local probe, return the response to the client and cleanup.
02477 //////////////////////////////////////////////////////////////////////////
02478 
02479 int
02480 CacheContinuation::probeLookupEvent(int event, void * /* d ATS_UNUSED */)
02481 {
02482   ink_assert(magicno == (int) MagicNo);
02483   callback_user(event, 0);
02484   return EVENT_DONE;
02485 }
02486 
02487 ///////////////////////////////////////////////////////////
02488 // lookupEvent()
02489 //   Result of a local lookup for PROBE_LOCAL_CACHE_FIRST
02490 ///////////////////////////////////////////////////////////
02491 int
02492 CacheContinuation::lookupEvent(int /* event ATS_UNUSED */, void * /* d ATS_UNUSED */)
02493 {
02494   ink_release_assert(!"Invalid call CacheContinuation::lookupEvent");
02495   return EVENT_DONE;
02496 
02497 }
02498 
02499 
02500 
02501 //////////////////////////////////////////////////////////////////////////
02502 // do_remote_lookup()
02503 //   If the object is supposed to be on a remote machine, probe there.
02504 //   Returns: Non zero (Action *) if a probe was initiated
02505 //            Zero (Action *) if no probe
02506 //////////////////////////////////////////////////////////////////////////
02507 Action *
02508 CacheContinuation::do_remote_lookup(Continuation * cont, CacheKey * key,
02509                                     CacheContinuation * c, CacheFragType ft, char *hostname, int hostname_len)
02510 {
02511   int probe_depth = 0;
02512   ClusterMachine *past_probes[CONFIGURATION_HISTORY_PROBE_DEPTH] = { 0 };
02513   int mlen = op_to_sizeof_fixedlen_msg(CACHE_LOOKUP_OP) + ((hostname && hostname_len) ? hostname_len : 0);
02514   CacheLookupMsg *msg = (CacheLookupMsg *) ALLOCA_DOUBLE(mlen);
02515   msg->init();
02516 
02517 
02518   if (key) {
02519     msg->url_md5 = *key;
02520   } else {
02521     ink_assert(c);
02522     msg->url_md5 = c->url_md5;
02523   }
02524 
02525   ClusterMachine *m = NULL;
02526 
02527   if (cache_migrate_on_demand) {
02528     m = cluster_machine_at_depth(cache_hash(msg->url_md5),
02529                                  c ? &c->probe_depth : &probe_depth, c ? c->past_probes : past_probes);
02530   } else {
02531 
02532     // If migrate-on-demand is off, do not probe beyond one level.
02533 
02534     if (c && c->probe_depth)
02535       return (Action *) 0;
02536     m = cluster_machine_at_depth(cache_hash(msg->url_md5));
02537     if (c)
02538       c->probe_depth = 1;
02539   }
02540 
02541   if (!m)
02542     return (Action *) 0;
02543   ClusterHandler *ch = m->pop_ClusterHandler();
02544   if (!ch)
02545     return (Action *) 0;
02546 
02547   // If we do not have a continuation, build one
02548 
02549   if (!c) {
02550     c = cacheContAllocator_alloc();
02551     c->mutex = cont->mutex;
02552     c->probe_depth = probe_depth;
02553     memcpy(c->past_probes, past_probes, sizeof(past_probes));
02554   }
02555   c->ch = ch;
02556   // Save hostname data in case we need to do a local lookup.
02557   if (hostname && hostname_len) {
02558     // Alloc buffer, copy hostname data and attach to continuation
02559     c->setMsgBufferLen(hostname_len);
02560     c->allocMsgBuffer();
02561     memcpy(c->getMsgBuffer(), hostname, hostname_len);
02562   }
02563 
02564   c->url_md5 = msg->url_md5;
02565   c->action.cancelled = false;
02566   c->action = cont;
02567   c->start_time = ink_get_hrtime();
02568   SET_CONTINUATION_HANDLER(c, (CacheContHandler)
02569                            & CacheContinuation::remoteOpEvent);
02570   c->result = CACHE_EVENT_LOOKUP_FAILED;
02571 
02572   // set up sequence number so we can find this continuation
02573 
02574   c->target_ip = m->ip;
02575   c->seq_number = new_cache_sequence_number();
02576   msg->seq_number = c->seq_number;
02577   c->frag_type = ft;
02578   msg->frag_type = ft;
02579 
02580   // establish timeout for lookup
02581 
02582   unsigned int hash = FOLDHASH(c->target_ip, c->seq_number);
02583   MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], this_ethread());
02584   if (!queuelock) {
02585     // failed to acquire lock: no problem, retry later
02586     c->timeout = eventProcessor.schedule_in(c, CACHE_RETRY_PERIOD, ET_CACHE_CONT_SM);
02587   } else {
02588     remoteCacheContQueue[hash].enqueue(c);
02589     MUTEX_RELEASE(queuelock);
02590     c->timeout = eventProcessor.schedule_in(c, cache_cluster_timeout, ET_CACHE_CONT_SM);
02591   }
02592 
02593   char *data;
02594   int len;
02595   int vers = CacheLookupMsg::protoToVersion(m->msg_proto_major);
02596 
02597   if (vers == CacheLookupMsg::CACHE_LOOKUP_MESSAGE_VERSION) {
02598     msg->seq_number = c->seq_number;
02599     data = (char *) msg;
02600     len = mlen;
02601     if (hostname && hostname_len) {
02602       memcpy(msg->moi.byte, hostname, hostname_len);
02603     }
02604   } else {
02605     //////////////////////////////////////////////////////////////
02606     // Create the specified down rev version of this message
02607     //////////////////////////////////////////////////////////////
02608     ink_release_assert(!"CacheLookupMsg bad msg version");
02609   }
02610 
02611   // send the message
02612 
02613 #ifdef CACHE_MSG_TRACE
02614   log_cache_op_sndmsg(msg.seq_number, 0, "cache_lookup");
02615 #endif
02616   clusterProcessor.invoke_remote(c->ch, CACHE_LOOKUP_CLUSTER_FUNCTION, data, len);
02617   return &c->action;
02618 }
02619 
02620 
02621 ////////////////////////////////////////////////////////////////////////////
02622 // cache_lookup_ClusterFunction()
02623 //   This function is invoked on a remote machine to do a remote lookup.
02624 //   It unmarshals the URL and does a local lookup, with its own
02625 //   continuation set to CacheContinuation::replyLookupEvent()
02626 ////////////////////////////////////////////////////////////////////////////
02627 void
02628 cache_lookup_ClusterFunction(ClusterHandler *ch, void *data, int len)
02629 {
02630   (void) len;
02631   EThread *thread = this_ethread();
02632   ProxyMutex *mutex = thread->mutex;
02633   ////////////////////////////////////////////////////////
02634   // Note: we are running on the ET_CLUSTER thread
02635   ////////////////////////////////////////////////////////
02636 
02637   CacheLookupMsg *msg = (CacheLookupMsg *) data;
02638   ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
02639 
02640   if (mh->GetMsgVersion() != CacheLookupMsg::CACHE_LOOKUP_MESSAGE_VERSION) {    ////////////////////////////////////////////////
02641     // Convert from old to current message format
02642     ////////////////////////////////////////////////
02643     ink_release_assert(!"cache_lookup_ClusterFunction() bad msg version");
02644   }
02645 
02646   if (mh->NeedByteSwap())
02647     msg->SwapBytes();
02648 
02649   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
02650 
02651   CacheContinuation *c = CacheContinuation::cacheContAllocator_alloc();
02652   c->mutex = new_ProxyMutex();
02653   MUTEX_TRY_LOCK(lock, c->mutex, this_ethread());
02654   c->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
02655   c->seq_number = msg->seq_number;
02656   c->from = ch->machine;
02657   c->url_md5 = msg->url_md5;
02658   SET_CONTINUATION_HANDLER(c, (CacheContHandler)
02659                            & CacheContinuation::replyLookupEvent);
02660 
02661   CacheKey key(msg->url_md5);
02662 #ifdef CACHE_MSG_TRACE
02663   log_cache_op_msg(msg->seq_number, 0, "cache_lookup");
02664 #endif
02665 
02666   // Extract hostname data if passed.
02667 
02668   char *hostname;
02669   int hostname_len = len - op_to_sizeof_fixedlen_msg(CACHE_LOOKUP_OP);
02670   hostname = (hostname_len ? (char *) msg->moi.byte : 0);
02671 
02672   // Note: Hostname data invalid after return from lookup
02673   Cache *call_cache = caches[msg->frag_type];
02674   call_cache->lookup(c, &key, (CacheFragType) msg->frag_type, hostname, hostname_len);
02675 }
02676 
02677 /////////////////////////////////////////////////////////////////////////
02678 // replyLookupEvent()
02679 //   This function handles the result of a lookup on a remote machine.
02680 //   It packages up the result and sends it back to the calling machine.
02681 /////////////////////////////////////////////////////////////////////////
02682 int
02683 CacheContinuation::replyLookupEvent(int event, void * /* d ATS_UNUSED */)
02684 {
02685   ink_hrtime now;
02686   now = ink_get_hrtime();
02687   CLUSTER_SUM_DYN_STAT(CLUSTER_CACHE_CALLBACK_TIME_STAT, now - start_time);
02688   LOG_EVENT_TIME(start_time, callback_time_dist, cache_callbacks);
02689 
02690   int vers = CacheOpReplyMsg::protoToVersion(from->msg_proto_major);
02691   if (vers == CacheOpReplyMsg::CACHE_OP_REPLY_MESSAGE_VERSION) {
02692     CacheOpReplyMsg *msg;
02693     int flen = CacheOpReplyMsg::sizeof_fixedlen_msg();
02694     msg = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen);
02695     msg->init();
02696     CLUSTER_DECREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
02697     int len = flen - sizeof(msg->token);
02698 
02699     if (!no_reply_message) {
02700       msg->seq_number = seq_number;
02701       msg->result = event;
02702 #ifdef CACHE_MSG_TRACE
02703       log_cache_op_sndmsg(seq_number, event, "cache_result");
02704 #endif
02705       clusterProcessor.invoke_remote(ch, CACHE_OP_RESULT_CLUSTER_FUNCTION, msg, len);
02706     }
02707   } else {
02708     //////////////////////////////////////////////////////////////
02709     // Create the specified down rev version of this message
02710     //////////////////////////////////////////////////////////////
02711     ink_release_assert(!"replyLookupEvent() bad msg version");
02712   }
02713 
02714   // Free up everything
02715 
02716   cacheContAllocator_free(this);
02717   return EVENT_DONE;
02718 }
02719 
02720 int32_t CacheContinuation::getObjectSize(VConnection * vc, int opcode, CacheHTTPInfo * ret_ci)
02721 {
02722   CacheHTTPInfo *ci = 0;
02723   int64_t object_size = 0;
02724 
02725   if ((opcode == CACHE_OPEN_READ_LONG)
02726       || (opcode == CACHE_OPEN_READ_BUFFER_LONG)) {
02727 
02728     ((CacheVC *) vc)->get_http_info(&ci);
02729     if (ci) {
02730       object_size = ci->object_size_get();
02731 
02732     } else {
02733       ci = 0;
02734       object_size = 0;
02735     }
02736 
02737   } else {
02738     object_size = ((CacheVC *)vc)->get_object_size();
02739   }
02740 
02741   if (ret_ci && !ret_ci->valid()) {
02742     CacheHTTPInfo
02743       new_ci;
02744     new_ci.create();
02745     if (ci) {
02746       // Initialize copy
02747       new_ci.copy(ci);
02748     } else {
02749       new_ci.object_size_set(object_size);
02750     }
02751     new_ci.m_alt->m_writeable = 1;
02752     ret_ci->copy_shallow(&new_ci);
02753   }
02754   ink_release_assert(object_size);
02755   return object_size;
02756 }
02757 
02758 //////////////////////////////////////////////////////////////////////////
02759 // insert_cache_callback_user()
02760 //  Insert write VC into global cache prior to performing user callback.
02761 //////////////////////////////////////////////////////////////////////////
02762 void
02763 CacheContinuation::insert_cache_callback_user(ClusterVConnection * vc, int res, void *e)
02764 {
02765   if (GlobalOpenWriteVCcache->insert(&url_md5, vc)) {
02766     // Inserted
02767     callback_user(res, e);
02768 
02769   } else {
02770     // Unable to insert, try later
02771     result = res;
02772     callback_data = e;
02773     callback_data_2 = (void *) vc;
02774     SET_HANDLER((CacheContHandler) & CacheContinuation::insertCallbackEvent);
02775     eventProcessor.schedule_imm(this, ET_CACHE_CONT_SM);
02776   }
02777 }
02778 
02779 int
02780 CacheContinuation::insertCallbackEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
02781 {
02782   if (GlobalOpenWriteVCcache->insert(&url_md5, (ClusterVConnection *)
02783                                      callback_data_2)) {
02784     // Inserted
02785     callback_user(result, callback_data);
02786 
02787   } else {
02788     // Unable to insert, try later
02789     eventProcessor.schedule_imm(this, ET_CACHE_CONT_SM);
02790   }
02791   return EVENT_DONE;
02792 }
02793 
02794 ///////////////////////////////////////////////////////////////////
02795 // callback_user()
02796 //  Invoke handleEvent on the given continuation (cont) with
02797 //    considerations for Action.
02798 ///////////////////////////////////////////////////////////////////
02799 void
02800 CacheContinuation::callback_user(int res, void *e)
02801 {
02802   EThread *et = this_ethread();
02803 
02804   if (!is_ClusterThread(et)) {
02805     MUTEX_TRY_LOCK(lock, mutex, et);
02806     if (lock) {
02807       if (!action.cancelled) {
02808         action.continuation->handleEvent(res, e);
02809       }
02810       cacheContAllocator_free(this);
02811 
02812     } else {
02813       // Unable to acquire lock, retry later
02814       defer_callback_result(res, e);
02815     }
02816   } else {
02817     // Can not post completion on ET_CLUSTER thread.
02818     defer_callback_result(res, e);
02819   }
02820 }
02821 
02822 void
02823 CacheContinuation::defer_callback_result(int r, void *e)
02824 {
02825   result = r;
02826   callback_data = e;
02827   SET_HANDLER((CacheContHandler) & CacheContinuation::callbackResultEvent);
02828   eventProcessor.schedule_imm(this, ET_CACHE_CONT_SM);
02829 }
02830 
02831 int
02832 CacheContinuation::callbackResultEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
02833 {
02834   if (!action.cancelled)
02835     action.continuation->handleEvent(result, callback_data);
02836   cacheContAllocator_free(this);
02837   return EVENT_DONE;
02838 }
02839 
02840 //-----------------------------------------------------------------
02841 // CacheContinuation static member functions
02842 //-----------------------------------------------------------------
02843 
02844 ///////////////////////////////////////////////////////////////////////
02845 // cacheContAllocator_alloc()
02846 ///////////////////////////////////////////////////////////////////////
02847 CacheContinuation *
02848 CacheContinuation::cacheContAllocator_alloc()
02849 {
02850   return cacheContAllocator.alloc();
02851 }
02852 
02853 
02854 ///////////////////////////////////////////////////////////////////////
02855 // cacheContAllocator_free()
02856 ///////////////////////////////////////////////////////////////////////
02857 void
02858 CacheContinuation::cacheContAllocator_free(CacheContinuation * c)
02859 {
02860   ink_assert(c->magicno == (int) MagicNo);
02861 //  ink_assert(!c->cache_op_ClusterFunction);
02862   c->magicno = -1;
02863 #ifdef ENABLE_TIME_TRACE
02864   c->start_time = 0;
02865 #endif
02866   c->free();
02867   c->mutex = NULL;
02868   // FIX ME. ajitb 12/21/99
02869   // Compiler bug in CC: WorkShop Compilers 5.0 98/12/15 C++ 5.0
02870   // Does not accept assignment of ((Continuation *) NULL)
02871   {
02872     Continuation *temp = NULL;
02873     c->action = temp;
02874   }
02875   c->tunnel_mutex = NULL;
02876   cacheContAllocator.free(c);
02877 }
02878 
02879 /////////////////////////////////////////////////////////////////////////
02880 // callback_failure()
02881 //   Post error completion using a continuation.
02882 /////////////////////////////////////////////////////////////////////////
02883 Action *
02884 CacheContinuation::callback_failure(Action * a, int result, int err, CacheContinuation * this_cc)
02885 {
02886   CacheContinuation *cc;
02887   if (!this_cc) {
02888     cc = cacheContAllocator_alloc();
02889     cc->mutex = a->mutex;
02890     cc->action = *a;
02891 
02892   } else {
02893     cc = this_cc;
02894   }
02895   cc->result = result;
02896   cc->result_error = err;
02897   SET_CONTINUATION_HANDLER(cc, (CacheContHandler)
02898                            & CacheContinuation::callbackEvent);
02899   eventProcessor.schedule_imm(cc, ET_CACHE_CONT_SM);
02900   return &cc->action;
02901 }
02902 
02903 ///////////////////////////////////////////////////////////////////////
02904 // callbackEvent()
02905 //  Invoke callback and deallocate continuation.
02906 ///////////////////////////////////////////////////////////////////////
02907 int
02908 CacheContinuation::callbackEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
02909 {
02910   if (!action.cancelled)
02911     action.continuation->handleEvent(result, (void *)(intptr_t)result_error);
02912   cacheContAllocator_free(this);
02913   return EVENT_DONE;
02914 }
02915 
02916 //------------------------------------------------------------------
02917 // File static functions
02918 //------------------------------------------------------------------
02919 
02920 ////////////////////////////////////////////////////////////////////////
02921 // find_cache_continuation()
02922 //   Find a currently pending cache continuation expecting a response.
02923 //   Requires taking the lock on the remoteCacheContQueueMutex first.
02924 ////////////////////////////////////////////////////////////////////////
02925 static CacheContinuation *
02926 find_cache_continuation(unsigned int seq_number, unsigned int from_ip)
02927 {
02928   unsigned int hash = FOLDHASH(from_ip, seq_number);
02929   CacheContinuation *c = NULL;
02930   CacheContinuation *lastc = NULL;
02931   for (c = (CacheContinuation *) remoteCacheContQueue[hash].head; c; c = (CacheContinuation *) c->link.next) {
02932     if (seq_number == c->seq_number && from_ip == c->target_ip) {
02933       if (lastc) {
02934         ink_release_assert(c->link.prev == lastc);
02935       } else {
02936         ink_release_assert(!c->link.prev);
02937       }
02938       break;
02939     }
02940 
02941     lastc = c;
02942   }
02943   return c;
02944 }
02945 
02946 /////////////////////////////////////////////////////////////////////////////
02947 // new_cache_sequence_number()
02948 //  Generate unique request sequence numbers
02949 /////////////////////////////////////////////////////////////////////////////
02950 static unsigned int
02951 new_cache_sequence_number()
02952 {
02953   unsigned int res = 0;
02954 
02955   do {
02956     res = (unsigned int) ink_atomic_increment(&cluster_sequence_number, 1);
02957   } while (!res);
02958 
02959 
02960 
02961   return res;
02962 }
02963 
02964 /***************************************************************************/
02965 #ifdef OMIT
02966 /***************************************************************************/
02967 /////////////////////////////////////////////////////////////////////////////
02968 // forwardEvent()
02969 //   for migrate-on-demand, make a connection between the
02970 //   the node which has the object and the node which should have it.
02971 //
02972 //   prepared for either OPEN_READ (from current owner)
02973 //   or OPEN_WRITE (from new owner)
02974 /////////////////////////////////////////////////////////////////////////////
02975 int
02976 CacheContinuation::forwardEvent(int event, VConnection * c)
02977 {
02978   int ret = EVENT_CONT;
02979   cluster_vc = 0;
02980 
02981   cache_read = false;
02982   switch (event) {
02983   default:
02984     ink_assert(!"bad case");
02985   case CACHE_EVENT_OPEN_WRITE_FAILED:
02986     ret = EVENT_DONE;
02987     break;
02988   case CACHE_EVENT_OPEN_WRITE:
02989     cluster_vc = c;
02990     break;
02991   case CACHE_EVENT_OPEN_READ_FAILED:
02992     cache_read = true;
02993     ret = EVENT_DONE;
02994     break;
02995   case CACHE_EVENT_OPEN_READ:
02996     cache_read = true;
02997     cluster_vc = c;
02998     break;
02999   }
03000   SET_HANDLER((CacheContHandler) & CacheContinuation::forwardWaitEvent);
03001   return ret;
03002 }
03003 
03004 ////////////////////////////////////////////////////////////////////////
03005 // forwardWaitEvent()
03006 //   For migrate-on-demand, make a connection as above (forwardEvent)
03007 //   second either OPEN_READ or OPEN_WRITE,
03008 //   the data for the first is stored in (cluster_vc,cache_read)
03009 ////////////////////////////////////////////////////////////////////////
03010 int
03011 CacheContinuation::forwardWaitEvent(int event, VConnection * c)
03012 {
03013   int ret = EVENT_CONT;
03014   int res = CACHE_EVENT_OPEN_READ_FAILED;
03015   void *res_data = NULL;
03016   VConnection *vc = NULL;
03017 
03018   switch (event) {
03019   default:
03020     ink_assert(!"bad case");
03021   case CACHE_EVENT_OPEN_WRITE_FAILED:
03022   case CACHE_EVENT_OPEN_READ_FAILED:
03023     ret = EVENT_DONE;
03024     break;
03025   case CACHE_EVENT_OPEN_WRITE:
03026   case CACHE_EVENT_OPEN_READ:
03027     vc = c;
03028     break;
03029 
03030   }
03031   VConnection *read_vc = (cache_read ? cluster_vc : vc);
03032   VConnection *write_vc = (!cache_read ? cluster_vc : vc);
03033 
03034   res = read_vc ? CACHE_EVENT_OPEN_READ : CACHE_EVENT_OPEN_READ_FAILED;
03035   res_data = read_vc;
03036 
03037   // if the read and write are sucessful, tunnel the read to the write
03038   if (read_vc && write_vc) {
03039     res_data = new VCTee(read_vc, write_vc, vio);
03040     if (vio) {                  // CACHE_EVENT_OPEN_READ_VIO
03041       res = event;
03042       res_data = &((VCTee *) read_vc)->vio;
03043     }
03044   }
03045   // if the read is sucessful return it to the user
03046   //
03047   c->handleEvent(res, res_data);
03048   return ret;
03049 }
03050 
03051 /////////////////////////////////////////////////////////////////////
03052 // tunnelEvent()
03053 //   If the reply requires data, tunnel the data from the cache
03054 //   to the cluster.
03055 /////////////////////////////////////////////////////////////////////
03056 int
03057 CacheContinuation::tunnelEvent(int event, VConnection * vc)
03058 {
03059   int ret = EVENT_DONE;
03060   int flen = CacheOpReplyMsg::sizeof_fixedlen_msg();    // include token
03061   int len = 0;
03062   bool read_buf = ((request_opcode == CACHE_OPEN_READ_BUFFER)
03063                    || (request_opcode == CACHE_OPEN_READ_BUFFER_LONG));
03064   ink_release_assert(!read_buf);
03065 
03066   CacheOpReplyMsg rmsg;
03067   CacheOpReplyMsg *msg = &rmsg;
03068   msg->result = result;
03069   msg->seq_number = seq_number;
03070   msg->token = token;
03071   int expect_reply = 1;
03072 
03073   if (event == CLUSTER_EVENT_OPEN) {
03074     if (cache_read) {
03075       if (read_buf) {
03076         ink_assert(have_all_data || (readahead_vio == &((CacheVConnection *) cluster_vc)->vio));
03077         write_cluster_vc = (ClusterVConnection *) vc;
03078 
03079         if (have_all_data) {
03080           msg->token.clear();   // Tell sender no conn established
03081         } else {
03082           msg->token = token;   // Tell sender conn established
03083           setupReadBufTunnel(cluster_vc, vc);
03084         }
03085 
03086       } else {
03087         OneWayTunnel *pOWT = OneWayTunnel::OneWayTunnel_alloc();
03088         pOWT->init(cluster_vc, vc, NULL, nbytes, this->mutex);
03089         --expect_reply;
03090       }
03091 
03092       ////////////////////////////////////////////////////////
03093       // cache_read requires CacheHTTPInfo in reply message.
03094       ////////////////////////////////////////////////////////
03095       int res;
03096       CacheHTTPInfo *ci;
03097 
03098       if (!cache_vc_info) {
03099         // OPEN_READ case
03100         (void) getObjectSize(cluster_vc, request_opcode, &cache_vc_info);
03101       }
03102       ci = cache_vc_info;
03103 
03104       // Determine data length and allocate
03105       len = ci->marshal_length();
03106       CacheOpReplyMsg *reply = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen + len);
03107 
03108       // Initialize reply message header
03109       *reply = *msg;
03110 
03111       // Marshal response data into reply message
03112       res = ci->marshal((char *) reply->moi.byte, len);
03113       ink_assert(res > 0);
03114 
03115       // Make reply message the current message
03116       msg = reply;
03117 
03118     } else {
03119       OneWayTunnel *pOWT = OneWayTunnelAllocator.alloc();
03120       pOWT->init(vc, cluster_vc, NULL, nbytes, this->mutex);
03121       --expect_reply;
03122     }
03123     ret = EVENT_CONT;
03124   } else {
03125     ink_release_assert(event == CLUSTER_EVENT_OPEN_FAILED);
03126     msg->result = CACHE_EVENT_SET_FAILED(result);
03127 
03128     if (read_buf) {
03129       Debug("cluster_timeout", "unable to make cluster connection2");
03130       initial_buf = 0;          // Do not send data
03131       initial_bufsize = 0;
03132 
03133       if (!have_all_data) {
03134         // Shutdown cache connection and free MIOBuffer
03135         MIOBuffer *mbuf = readahead_vio->buffer.writer();
03136         cluster_vc->do_io(VIO::CLOSE);
03137         free_MIOBuffer(mbuf);
03138       }
03139     } else {
03140       Debug("cluster_timeout", "unable to make cluster connection2A");
03141       cluster_vc->do_io(VIO::CLOSE);
03142     }
03143     len = 0 - (int) sizeof(msg->token);
03144     --expect_reply;
03145   }
03146 
03147   int vers = CacheOpReplyMsg::protoToVersion(from->msg_proto_major);
03148   if (vers == CacheOpReplyMsg::CACHE_OP_REPLY_MESSAGE_VERSION) {
03149     if (read_buf) {
03150       // Transmit reply message and object data in same cluster message
03151       clusterProcessor.invoke_remote_data(from,
03152                                           CACHE_OP_RESULT_CLUSTER_FUNCTION,
03153                                           (void *) msg, (flen + len),
03154                                           initial_buf, initial_bufsize,
03155                                           cluster_vc_channel, &token,
03156                                           &CacheContinuation::disposeOfDataBuffer, (void *) this, CLUSTER_OPT_STEAL);
03157 
03158     } else {
03159       clusterProcessor.invoke_remote(from, CACHE_OP_RESULT_CLUSTER_FUNCTION,
03160                                      (void *) msg, (flen + len), CLUSTER_OPT_STEAL);
03161     }
03162   } else {
03163     //////////////////////////////////////////////////////////////
03164     // Create the specified down rev version of this message
03165     //////////////////////////////////////////////////////////////
03166     ink_release_assert(!"tunnelEvent() bad msg version");
03167   }
03168   if (expect_reply <= 0)
03169     cacheContAllocator_free(this);
03170   return ret;
03171 }
03172 
03173 /////////////////////////////////////////////////////////////////////
03174 // remoteConnectEvent()
03175 //   If this was an open, make a connection on this side before
03176 //   responding to the user.
03177 /////////////////////////////////////////////////////////////////////
03178 int
03179 CacheContinuation::remoteConnectEvent(int event, VConnection * cvc)
03180 {
03181   ClusterVConnection *vc = (ClusterVConnection *) cvc;
03182 
03183   if (event == CLUSTER_EVENT_OPEN) {
03184     if (result == CACHE_EVENT_OPEN_READ) {
03185       // Move CacheHTTPInfo reply data into VC
03186       vc->alternate = this->ic_new_info;
03187       this->ic_new_info.clear();
03188     }
03189     callback_user(result, vc);
03190     return EVENT_CONT;
03191   } else {
03192     Debug("cluster_cache", "unable to make cluster connection");
03193     callback_user(CACHE_EVENT_SET_FAILED(result), vc);
03194     return EVENT_DONE;
03195   }
03196 }
03197 
03198 /***************************************************************************/
03199 #endif // OMIT
03200 /***************************************************************************/
03201 
03202 // End of ClusterCache.cc

Generated by  doxygen 1.7.1