00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 #include "P_Cluster.h"
00030 
00031 #ifdef DEBUG
00032 #define CLUSTER_TEST_DEBUG      1
00033 #endif
00034 
00035 #ifdef ENABLE_TIME_TRACE
00036 int callback_time_dist[TIME_DIST_BUCKETS_SIZE];
00037 int cache_callbacks = 0;
00038 
00039 int rmt_callback_time_dist[TIME_DIST_BUCKETS_SIZE];
00040 int rmt_cache_callbacks = 0;
00041 
00042 int lkrmt_callback_time_dist[TIME_DIST_BUCKETS_SIZE];
00043 int lkrmt_cache_callbacks = 0;
00044 
00045 int cntlck_acquire_time_dist[TIME_DIST_BUCKETS_SIZE];
00046 int cntlck_acquire_events = 0;
00047 
00048 int open_delay_time_dist[TIME_DIST_BUCKETS_SIZE];
00049 int open_delay_events = 0;
00050 
00051 #endif // ENABLE_TIME_TRACE
00052 
00053 
00054 int cache_migrate_on_demand = false;
00055 
00056 
00057 
00058 
00059 static ClassAllocator<CacheContinuation> cacheContAllocator("cacheContAllocator");
00060 
00061 static Queue<CacheContinuation> remoteCacheContQueue[REMOTE_CONNECT_HASH];
00062 static Ptr<ProxyMutex> remoteCacheContQueueMutex[REMOTE_CONNECT_HASH];
00063 
00064 
00065 #define CACHE_NO_RESPONSE            0
00066 static int cluster_sequence_number = 1;
00067 
00068 #ifdef CLUSTER_TEST_DEBUG
00069 static ink_hrtime cache_cluster_timeout = HRTIME_SECONDS(65536);
00070 #else
00071 static ink_hrtime cache_cluster_timeout = CACHE_CLUSTER_TIMEOUT;
00072 #endif
00073 
00074 
00075 
00076 
00077 static CacheContinuation *find_cache_continuation(unsigned int, unsigned int);
00078 
00079 static unsigned int new_cache_sequence_number();
00080 
00081 #define DOT_SEPARATED(_x)                             \
00082 ((unsigned char*)&(_x))[0], ((unsigned char*)&(_x))[1],   \
00083   ((unsigned char*)&(_x))[2], ((unsigned char*)&(_x))[3]
00084 
00085 #define ET_CACHE_CONT_SM        ET_NET
00086 #define ALLOW_THREAD_STEAL      true
00087 
00088 
00089 #ifdef CACHE_MSG_TRACE
00090 
00091 
00092 
00093 
00094 
00095 
00096 #define MAX_TENTRIES    4096
00097 struct traceEntry
00098 {
00099   unsigned int seqno;
00100   int op;
00101   char *type;
00102 };
00103 struct traceEntry recvTraceTable[MAX_TENTRIES];
00104 struct traceEntry sndTraceTable[MAX_TENTRIES];
00105 
00106 static recvTraceTable_index = 0;
00107 static sndTraceTable_index = 0;
00108 
00109 void
00110 log_cache_op_msg(unsigned int seqno, int op, char *type)
00111 {
00112   int t = ink_atomic_increment(&recvTraceTable_index, 1);
00113   int n = recvTraceTable_index % MAX_TENTRIES;
00114   recvTraceTable[n].seqno = seqno;
00115   recvTraceTable[n].op = op;
00116   recvTraceTable[n].type = type;
00117 }
00118 
00119 void
00120 log_cache_op_sndmsg(unsigned int seqno, int op, char *type)
00121 {
00122   int t = ink_atomic_increment(&sndTraceTable_index, 1);
00123   int n = sndTraceTable_index % MAX_TENTRIES;
00124   sndTraceTable[n].seqno = seqno;
00125   sndTraceTable[n].op = op;
00126   sndTraceTable[n].type = type;
00127 }
00128 
00129 void
00130 dump_recvtrace_table()
00131 {
00132   int n;
00133   printf("\n");
00134   for (n = 0; n < MAX_TENTRIES; ++n)
00135     printf("[%d] seqno=%d, op=%d type=%s\n", n, recvTraceTable[n].seqno,
00136            recvTraceTable[n].op, recvTraceTable[n].type ? recvTraceTable[n].type : "");
00137 }
00138 
00139 void
00140 dump_sndtrace_table()
00141 {
00142   int n;
00143   printf("\n");
00144   for (n = 0; n < MAX_TENTRIES; ++n)
00145     printf("[%d] seqno=%d, op=%d type=%s\n", n, sndTraceTable[n].seqno,
00146            sndTraceTable[n].op, sndTraceTable[n].type ? sndTraceTable[n].type : "");
00147 }
00148 
00149 
00150 #endif // CACHE_MSG_TRACE
00151 
00152 
00153 
00154 
00155 
00156 
00157 
00158 
00159 
00160 
00161 
00162 
00163 
00164 
00165 class ClusterVConnectionCache
00166 {
00167 public:
00168   ClusterVConnectionCache()
00169   {
00170     memset(hash_event, 0, sizeof(hash_event));
00171   }
00172   void init();
00173   int MD5ToIndex(INK_MD5 * p);
00174   int insert(INK_MD5 *, ClusterVConnection *);
00175   ClusterVConnection *lookup(INK_MD5 *);
00176 
00177 public:
00178   struct Entry
00179   {
00180     LINK(Entry, link);
00181     bool mark_for_delete;
00182     INK_MD5 key;
00183     ClusterVConnection *vc;
00184 
00185       Entry():mark_for_delete(0), vc(0)
00186     {
00187     }
00188      ~Entry()
00189     {
00190     }
00191   };
00192 
00193   enum
00194   { MAX_TABLE_ENTRIES = 256,    
00195     SCAN_INTERVAL = 10          
00196   };
00197   Queue<Entry> hash_table[MAX_TABLE_ENTRIES];
00198   Ptr<ProxyMutex> hash_lock[MAX_TABLE_ENTRIES];
00199   Event *hash_event[MAX_TABLE_ENTRIES];
00200 };
00201 
00202 static ClassAllocator <
00203   ClusterVConnectionCache::Entry >
00204 ClusterVCCacheEntryAlloc("ClusterVConnectionCache::Entry");
00205 
00206 ClusterVConnectionCache *GlobalOpenWriteVCcache = 0;
00207 
00208 
00209 
00210 
00211 class ClusterVConnectionCacheEvent:public Continuation
00212 {
00213 public:
00214   ClusterVConnectionCacheEvent(ClusterVConnectionCache * c, int n)
00215   : Continuation(new_ProxyMutex()), cache(c), hash_index(n)
00216   {
00217     SET_HANDLER(&ClusterVConnectionCacheEvent::eventHandler);
00218   }
00219   int eventHandler(int, Event *);
00220 
00221 private:
00222   ClusterVConnectionCache * cache;
00223   int hash_index;
00224 };
00225 
00226 void
00227 ClusterVConnectionCache::init()
00228 {
00229   int n;
00230   ClusterVConnectionCacheEvent *eh;
00231 
00232   for (n = 0; n < MAX_TABLE_ENTRIES; ++n) {
00233     hash_lock[n] = new_ProxyMutex();
00234   }
00235   for (n = 0; n < MAX_TABLE_ENTRIES; ++n) {
00236     
00237 
00238     eh = new ClusterVConnectionCacheEvent(this, n);
00239     hash_event[n] =
00240       eventProcessor.schedule_in(eh, HRTIME_SECONDS(ClusterVConnectionCache::SCAN_INTERVAL), ET_CACHE_CONT_SM);
00241   }
00242 }
00243 inline int
00244 ClusterVConnectionCache::MD5ToIndex(INK_MD5 * p)
00245 {
00246   uint64_t i = p->fold();
00247   int32_t h, l;
00248 
00249   h = i >> 32;
00250   l = i & 0xFFFFFFFF;
00251   return ((h ^ l) % MAX_TABLE_ENTRIES) & (MAX_TABLE_ENTRIES - 1);
00252 }
00253 
00254 int
00255 ClusterVConnectionCache::insert(INK_MD5 * key, ClusterVConnection * vc)
00256 {
00257   int index = MD5ToIndex(key);
00258   Entry *e;
00259   EThread *thread = this_ethread();
00260   ProxyMutex *mutex = thread->mutex;
00261 
00262   MUTEX_TRY_LOCK(lock, hash_lock[index], thread);
00263   if (!lock) {
00264     CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_INSERT_LOCK_MISSES_STAT);
00265     return 0;                   
00266 
00267   } else {
00268     
00269 
00270     e = ClusterVCCacheEntryAlloc.alloc();
00271     e->key = *key;
00272     e->vc = vc;
00273     hash_table[index].enqueue(e);
00274     CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_INSERTS_STAT);
00275   }
00276   return 1;                     
00277 }
00278 
00279 ClusterVConnection *
00280 ClusterVConnectionCache::lookup(INK_MD5 * key)
00281 {
00282   int index = MD5ToIndex(key);
00283   Entry *e;
00284   ClusterVConnection *vc = 0;
00285   EThread *thread = this_ethread();
00286   ProxyMutex *mutex = thread->mutex;
00287 
00288   MUTEX_TRY_LOCK(lock, hash_lock[index], thread);
00289   if (!lock) {
00290     CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_LOCK_MISSES_STAT);
00291     return vc;                  
00292 
00293   } else {
00294     e = hash_table[index].head;
00295     while (e) {
00296       if (*key == e->key) {     
00297         vc = e->vc;
00298         hash_table[index].remove(e);
00299         ClusterVCCacheEntryAlloc.free(e);
00300         CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_HITS_STAT);
00301         return vc;
00302 
00303       } else {
00304         e = e->link.next;
00305       }
00306     }
00307   }
00308   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_MISSES_STAT);
00309   return (ClusterVConnection *) - 1;    
00310 }
00311 
00312 int
00313 ClusterVConnectionCacheEvent::eventHandler(int , Event * e)
00314 {
00315   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_SCANS_STAT);
00316   MUTEX_TRY_LOCK(lock, cache->hash_lock[hash_index], this_ethread());
00317   if (!lock) {
00318     CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_SCAN_LOCK_MISSES_STAT);
00319     e->schedule_in(HRTIME_MSECONDS(10));
00320     return EVENT_DONE;
00321   }
00322   
00323 
00324   ClusterVConnectionCache::Entry * entry;
00325   ClusterVConnectionCache::Entry * next_entry;
00326   entry = cache->hash_table[hash_index].head;
00327 
00328   while (entry) {
00329     if (entry->mark_for_delete) {
00330       next_entry = entry->link.next;
00331 
00332       cache->hash_table[hash_index].remove(entry);
00333       entry->vc->allow_remote_close();
00334       entry->vc->do_io(VIO::CLOSE);
00335 
00336       ClusterVCCacheEntryAlloc.free(entry);
00337       entry = next_entry;
00338       CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_PURGES_STAT);
00339 
00340     } else {
00341       entry->mark_for_delete = true;
00342       entry = entry->link.next;
00343     }
00344   }
00345 
00346   
00347 
00348   e->schedule_in(HRTIME_SECONDS(ClusterVConnectionCache::SCAN_INTERVAL), ET_CACHE_CONT_SM);
00349   return EVENT_DONE;
00350 }
00351 
00352 
00353 
00354 
00355 
00356 
00357 
00358 int
00359 CacheContinuation::init()
00360 {
00361   int n;
00362   for (n = 0; n < REMOTE_CONNECT_HASH; ++n)
00363     remoteCacheContQueueMutex[n] = new_ProxyMutex();
00364 
00365   GlobalOpenWriteVCcache = new ClusterVConnectionCache;
00366   GlobalOpenWriteVCcache->init();
00367   return 0;
00368 }
00369 
00370 
00371 
00372 
00373 
00374 Action *
00375 CacheContinuation::do_op(Continuation * c, ClusterMachine * mp, void *args,
00376                          int user_opcode, char *data, int data_len, int nbytes, MIOBuffer * b)
00377 {
00378   CacheContinuation *cc = 0;
00379   Action *act = 0;
00380   char *msg = 0;
00381   ClusterHandler *ch = mp->pop_ClusterHandler();
00382 
00383 
00384   
00385   
00386 
00387   int opcode = user_opcode;
00388   switch (opcode) {
00389   case CACHE_OPEN_READ_BUFFER:
00390     opcode = CACHE_OPEN_READ;
00391     break;
00392   case CACHE_OPEN_READ_BUFFER_LONG:
00393     opcode = CACHE_OPEN_READ_LONG;
00394     break;
00395   default:
00396     break;
00397   }
00398 
00399   if (!ch)
00400     goto no_send_exit;
00401 
00402   if (c) {
00403     cc = cacheContAllocator_alloc();
00404     cc->ch = ch;
00405     cc->target_machine = mp;
00406     cc->request_opcode = opcode;
00407     cc->mutex = c->mutex;
00408     cc->action = c;
00409     cc->action.cancelled = false;
00410     cc->start_time = ink_get_hrtime();
00411     cc->from = mp;
00412     cc->result = op_failure(opcode);
00413     SET_CONTINUATION_HANDLER(cc, (CacheContHandler)
00414                              & CacheContinuation::remoteOpEvent);
00415     act = &cc->action;
00416 
00417     
00418 
00419     cc->target_ip = mp->ip;
00420     cc->seq_number = new_cache_sequence_number();
00421 
00422     
00423 
00424     unsigned int hash = FOLDHASH(cc->target_ip, cc->seq_number);
00425     MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], this_ethread());
00426     if (!queuelock) {
00427 
00428       
00429       cc->timeout = eventProcessor.schedule_in(cc, CACHE_RETRY_PERIOD, ET_CACHE_CONT_SM);
00430     } else {
00431       remoteCacheContQueue[hash].enqueue(cc);
00432       MUTEX_RELEASE(queuelock);
00433       cc->timeout = eventProcessor.schedule_in(cc, cache_cluster_timeout, ET_CACHE_CONT_SM);
00434     }
00435   }
00436   
00437   
00438   
00439   
00440   Debug("cache_msg",
00441         "do_op opcode=%d seqno=%d Machine=%p data=%p datalen=%d mio=%p",
00442         opcode, (c ? cc->seq_number : CACHE_NO_RESPONSE), mp, data, data_len, b);
00443 
00444   switch (opcode) {
00445   case CACHE_OPEN_WRITE_BUFFER:
00446   case CACHE_OPEN_WRITE_BUFFER_LONG:
00447     {
00448       ink_release_assert(!"write buffer not supported");
00449       break;
00450     }
00451   case CACHE_OPEN_READ_BUFFER:
00452   case CACHE_OPEN_READ_BUFFER_LONG:
00453     {
00454       ink_release_assert(!"read buffer not supported");
00455       break;
00456     }
00457   case CACHE_OPEN_WRITE:
00458   case CACHE_OPEN_READ:
00459     {
00460       ink_release_assert(c > 0);
00461 
00462       
00463 
00464       if (!data) {
00465         data_len = op_to_sizeof_fixedlen_msg(opcode);
00466         data = (char *) ALLOCA_DOUBLE(data_len);
00467       }
00468       msg = (char *) data;
00469       CacheOpMsg_short *m = (CacheOpMsg_short *) msg;
00470       m->init();
00471       m->opcode = opcode;
00472       m->cfl_flags = ((CacheOpArgs_General *) args)->cfl_flags;
00473       m->md5 = *((CacheOpArgs_General *) args)->url_md5;
00474       cc->url_md5 = m->md5;
00475       m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
00476       m->frag_type = ((CacheOpArgs_General *) args)->frag_type;
00477       if (opcode == CACHE_OPEN_WRITE) {
00478         m->nbytes = nbytes;
00479         m->data = (uint32_t) ((CacheOpArgs_General *) args)->pin_in_cache;
00480       } else {
00481         m->nbytes = 0;
00482         m->data = 0;
00483       }
00484 
00485       if (opcode == CACHE_OPEN_READ) {
00486         
00487         
00488         
00489         
00490         m->buffer_size = DEFAULT_MAX_BUFFER_SIZE;
00491       } else {
00492         m->buffer_size = 0;
00493       }
00494 
00495       
00496       
00497       
00498       int res = setup_local_vc(msg, data_len, cc, mp, &act);
00499       if (!res) {
00500 
00501         
00502         
00503 
00504         cc->remove_and_delete(0, (Event *) 0);
00505         return act;
00506 
00507       } else if (res != -1) {
00508 
00509         
00510 
00511         break;
00512 
00513       } else {
00514 
00515         
00516 
00517         goto no_send_exit;
00518       }
00519     }
00520 
00521   case CACHE_OPEN_READ_LONG:
00522   case CACHE_OPEN_WRITE_LONG:
00523     {
00524       ink_release_assert(c > 0);
00525 
00526       
00527 
00528       msg = data;
00529       CacheOpMsg_long *m = (CacheOpMsg_long *) msg;
00530       m->init();
00531       m->opcode = opcode;
00532       m->cfl_flags = ((CacheOpArgs_General *) args)->cfl_flags;
00533       m->url_md5 = *((CacheOpArgs_General *) args)->url_md5;
00534       cc->url_md5 = m->url_md5;
00535       m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
00536       m->nbytes = nbytes;
00537       m->data = (uint32_t) ((CacheOpArgs_General *) args)->pin_in_cache;
00538       m->frag_type = (uint32_t) ((CacheOpArgs_General *) args)->frag_type;
00539 
00540       if (opcode == CACHE_OPEN_READ_LONG) {
00541         
00542         
00543         
00544         
00545         m->buffer_size = DEFAULT_MAX_BUFFER_SIZE;
00546       } else {
00547         m->buffer_size = 0;
00548       }
00549       
00550       
00551       
00552       int res = setup_local_vc(msg, data_len, cc, mp, &act);
00553       if (!res) {
00554 
00555         
00556         
00557 
00558         cc->remove_and_delete(0, (Event *) 0);
00559         return act;
00560 
00561       } else if (res != -1) {
00562 
00563         
00564 
00565         break;
00566 
00567       } else {
00568 
00569         
00570 
00571         goto no_send_exit;
00572       }
00573     }
00574   case CACHE_UPDATE:
00575   case CACHE_REMOVE:
00576   case CACHE_DEREF:
00577     {
00578 
00579       
00580 
00581       msg = data;
00582       CacheOpMsg_short *m = (CacheOpMsg_short *) msg;
00583       m->init();
00584       m->opcode = opcode;
00585       m->frag_type = ((CacheOpArgs_Deref *) args)->frag_type;
00586       m->cfl_flags = ((CacheOpArgs_Deref *) args)->cfl_flags;
00587       if (opcode == CACHE_DEREF)
00588         m->md5 = *((CacheOpArgs_Deref *) args)->md5;
00589       else
00590         m->md5 = *((CacheOpArgs_General *) args)->url_md5;
00591       m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
00592       break;
00593     }
00594   case CACHE_LINK:
00595     {
00596 
00597       
00598 
00599       msg = data;
00600       CacheOpMsg_short_2 *m = (CacheOpMsg_short_2 *) msg;
00601       m->init();
00602       m->opcode = opcode;
00603       m->cfl_flags = ((CacheOpArgs_Link *) args)->cfl_flags;
00604       m->md5_1 = *((CacheOpArgs_Link *) args)->from;
00605       m->md5_2 = *((CacheOpArgs_Link *) args)->to;
00606       m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
00607       m->frag_type = ((CacheOpArgs_Link *) args)->frag_type;
00608       break;
00609     }
00610   default:
00611     msg = 0;
00612     break;
00613   }
00614 #ifdef CACHE_MSG_TRACE
00615   log_cache_op_sndmsg((c ? cc->seq_number : CACHE_NO_RESPONSE), 0, "do_op");
00616 #endif
00617   clusterProcessor.invoke_remote(ch,
00618                                  op_needs_marshalled_coi(opcode) ? CACHE_OP_MALLOCED_CLUSTER_FUNCTION
00619                                  : CACHE_OP_CLUSTER_FUNCTION, (char *) msg, data_len);
00620 
00621 no_send_exit:
00622   if (c) {
00623     return act;
00624   } else {
00625     return (Action *) 0;
00626   }
00627 }
00628 
00629 int
00630 CacheContinuation::setup_local_vc(char *data, int data_len, CacheContinuation * cc, ClusterMachine * mp, Action ** act)
00631 {
00632   bool read_op = op_is_read(cc->request_opcode);
00633   bool short_msg = op_is_shortform(cc->request_opcode);
00634 
00635   
00636   cc->setMsgBufferLen(data_len);
00637   cc->allocMsgBuffer();
00638   memcpy(cc->getMsgBuffer(), data, data_len);
00639 
00640   SET_CONTINUATION_HANDLER(cc, (CacheContHandler)
00641                            & CacheContinuation::localVCsetupEvent);
00642 
00643   if (short_msg) {
00644     Debug("cache_proto", "open_local-s (%s) seqno=%d", (read_op ? "R" : "W"), ((CacheOpMsg_short *) data)->seq_number);
00645   } else {
00646     Debug("cache_proto", "open_local-l (%s) seqno=%d", (read_op ? "R" : "W"), ((CacheOpMsg_long *) data)->seq_number);
00647   }
00648 
00649   
00650   ClusterVConnection *vc;
00651 
00652   if (!read_op && (cc->request_opcode == CACHE_OPEN_WRITE_LONG)) {
00653     
00654     vc = cc->lookupOpenWriteVC();
00655 
00656   } else {
00657     vc = clusterProcessor.open_local(cc, mp, cc->open_local_token,
00658                                      (CLUSTER_OPT_ALLOW_IMMEDIATE |
00659                                       (read_op ? CLUSTER_OPT_CONN_READ : CLUSTER_OPT_CONN_WRITE)));
00660   }
00661   if (!vc) {
00662     
00663     if (short_msg) {
00664       Debug("cache_proto", "0open_local-s (%s) failed, seqno=%d",
00665             (read_op ? "R" : "W"), ((CacheOpMsg_short *) data)->seq_number);
00666     } else {
00667       Debug("cache_proto", "1open_local-l (%s) failed, seqno=%d",
00668             (read_op ? "R" : "W"), ((CacheOpMsg_long *) data)->seq_number);
00669     }
00670     cc->freeMsgBuffer();
00671     if (cc->timeout)
00672       cc->timeout->cancel();
00673     cc->timeout = NULL;
00674 
00675     
00676     *act = callback_failure(&cc->action, (read_op ? CACHE_EVENT_OPEN_READ_FAILED : CACHE_EVENT_OPEN_WRITE_FAILED), 0);
00677     return 0;
00678 
00679   } else if (vc != CLUSTER_DELAYED_OPEN) {
00680     
00681     if (read_op) {
00682       cc->read_cluster_vc = vc;
00683     } else {
00684       cc->write_cluster_vc = vc;
00685     }
00686     cc->cluster_vc_channel = vc->channel;
00687     vc->current_cont = cc;
00688 
00689     if (short_msg) {
00690       CacheOpMsg_short *ms = (CacheOpMsg_short *) data;
00691       ms->channel = vc->channel;
00692       ms->token = cc->open_local_token;
00693       Debug("cache_proto",
00694             "0open_local-s (%s) success, seqno=%d chan=%d token=%d,%d VC=%p",
00695             (read_op ? "R" : "W"), ms->seq_number, vc->channel, ms->token.ip_created, ms->token.sequence_number, vc);
00696     } else {
00697       CacheOpMsg_long *ml = (CacheOpMsg_long *) data;
00698       ml->channel = vc->channel;
00699       ml->token = cc->open_local_token;
00700       Debug("cache_proto",
00701             "1open_local-l (%s) success, seqno=%d chan=%d token=%d,%d VC=%p",
00702             (read_op ? "R" : "W"), ml->seq_number, vc->channel, ml->token.ip_created, ml->token.sequence_number, vc);
00703     }
00704     cc->freeMsgBuffer();
00705     SET_CONTINUATION_HANDLER(cc, (CacheContHandler)
00706                              & CacheContinuation::remoteOpEvent);
00707     return 1;
00708 
00709   } else {
00710 
00711     
00712 
00713     return -1;
00714   }
00715 }
00716 
00717 ClusterVConnection *
00718 CacheContinuation::lookupOpenWriteVC()
00719 {
00720 
00721   
00722   
00723   
00724 
00725   ClusterVConnection *vc;
00726   CacheOpMsg_long *ml = (CacheOpMsg_long *) getMsgBuffer();
00727 
00728   vc = GlobalOpenWriteVCcache->lookup(&ml->url_md5);
00729 
00730   if (vc == ((ClusterVConnection *) 0)) {
00731     
00732     SET_CONTINUATION_HANDLER(this, (CacheContHandler)
00733                              & CacheContinuation::lookupOpenWriteVCEvent);
00734     
00735     
00736     
00737     
00738     
00739     lookup_open_write_vc_event = eventProcessor.schedule_imm(this, ET_CACHE_CONT_SM);
00740 
00741   } else if (vc != ((ClusterVConnection *) - 1)) {
00742     
00743     
00744     
00745 
00746     vc->action_ = action;       
00747 
00748     SET_CONTINUATION_HANDLER(this, (CacheContHandler)
00749                              & CacheContinuation::localVCsetupEvent);
00750     this->handleEvent(CLUSTER_EVENT_OPEN_EXISTS, vc);
00751 
00752     CacheOpReplyMsg msg;
00753     int msglen;
00754 
00755     msglen = CacheOpReplyMsg::sizeof_fixedlen_msg();
00756     msg.result = CACHE_EVENT_OPEN_WRITE;
00757     msg.seq_number = seq_number;
00758     msg.token = vc->token;
00759 
00760     cache_op_result_ClusterFunction(ch, (void *) &msg, msglen);
00761 
00762   } else {
00763     
00764 
00765     SET_CONTINUATION_HANDLER(this, (CacheContHandler)
00766                              & CacheContinuation::localVCsetupEvent);
00767     vc = clusterProcessor.open_local(this, from, open_local_token,
00768                                      (CLUSTER_OPT_ALLOW_IMMEDIATE | CLUSTER_OPT_CONN_WRITE));
00769     if (!vc) {
00770       this->handleEvent(CLUSTER_EVENT_OPEN_FAILED, 0);
00771 
00772     } else if (vc != CLUSTER_DELAYED_OPEN) {
00773       this->handleEvent(CLUSTER_EVENT_OPEN, vc);
00774     }
00775   }
00776   return CLUSTER_DELAYED_OPEN;  
00777 }
00778 
00779 int
00780 CacheContinuation::lookupOpenWriteVCEvent(int event, Event * e)
00781 {
00782   if (event == EVENT_IMMEDIATE) {
00783     
00784     lookupOpenWriteVC();
00785 
00786   } else {
00787     lookup_open_write_vc_event->cancel();
00788     SET_CONTINUATION_HANDLER(this, (CacheContHandler)
00789                              & CacheContinuation::localVCsetupEvent);
00790     this->handleEvent(event, e);
00791   }
00792   return EVENT_DONE;
00793 }
00794 
00795 int
00796 CacheContinuation::remove_and_delete(int , Event * e)
00797 {
00798   unsigned int hash = FOLDHASH(target_ip, seq_number);
00799   MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], this_ethread());
00800   if (queuelock) {
00801     if (remoteCacheContQueue[hash].in(this)) {
00802       remoteCacheContQueue[hash].remove(this);
00803     }
00804     MUTEX_RELEASE(queuelock);
00805     if (use_deferred_callback)
00806       callback_failure(&action, result, result_error, this);
00807     else
00808       cacheContAllocator_free(this);
00809 
00810   } else {
00811     SET_HANDLER((CacheContHandler) & CacheContinuation::remove_and_delete);
00812     if (!e) {
00813       timeout = eventProcessor.schedule_in(this, cache_cluster_timeout, ET_CACHE_CONT_SM);
00814     } else {
00815       e->schedule_in(cache_cluster_timeout);
00816     }
00817   }
00818   return EVENT_DONE;
00819 }
00820 
00821 int
00822 CacheContinuation::localVCsetupEvent(int event, ClusterVConnection * vc)
00823 {
00824   ink_assert(magicno == (int) MagicNo);
00825   ink_assert(getMsgBuffer());
00826   bool short_msg = op_is_shortform(request_opcode);
00827   bool read_op = op_is_read(request_opcode);
00828 
00829   if (event == EVENT_INTERVAL) {
00830     Event *e = (Event *) vc;
00831     unsigned int hash = FOLDHASH(target_ip, seq_number);
00832 
00833     MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], e->ethread);
00834     if (!queuelock) {
00835       e->schedule_in(CACHE_RETRY_PERIOD);
00836       return EVENT_CONT;
00837     }
00838 
00839     if (!remoteCacheContQueue[hash].in(this)) {
00840 
00841       
00842 
00843       remoteCacheContQueue[hash].enqueue(this);
00844       ink_assert(timeout == e);
00845       MUTEX_RELEASE(queuelock);
00846       e->schedule_in(cache_cluster_timeout);
00847       return EVENT_CONT;
00848 
00849     } else {
00850 
00851       
00852 
00853       remoteCacheContQueue[hash].remove(this);
00854       MUTEX_RELEASE(queuelock);
00855       Debug("cluster_timeout", "0cluster op timeout %d", seq_number);
00856       CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_TIMEOUTS_STAT);
00857       timeout = (Event *) 1;    
00858 
00859       
00860       
00861       
00862 
00863       if (!action.cancelled)
00864         action.continuation->handleEvent((read_op ? CACHE_EVENT_OPEN_READ_FAILED : CACHE_EVENT_OPEN_WRITE_FAILED), 0);
00865       return EVENT_DONE;
00866     }
00867 
00868   } else if (((event == CLUSTER_EVENT_OPEN) || (event == CLUSTER_EVENT_OPEN_EXISTS))
00869              && (((ptrdiff_t) timeout & (ptrdiff_t) 1) == 0)) {
00870     ink_hrtime now;
00871     now = ink_get_hrtime();
00872     CLUSTER_SUM_DYN_STAT(CLUSTER_OPEN_DELAY_TIME_STAT, now - start_time);
00873     LOG_EVENT_TIME(start_time, open_delay_time_dist, open_delay_events);
00874     if (read_op) {
00875       read_cluster_vc = vc;
00876     } else {
00877       write_cluster_vc = vc;
00878     }
00879     cluster_vc_channel = vc->channel;
00880     vc->current_cont = this;
00881 
00882     if (short_msg) {
00883       CacheOpMsg_short *ms = (CacheOpMsg_short *) getMsgBuffer();
00884       ms->channel = vc->channel;
00885       ms->token = open_local_token;
00886 
00887       Debug("cache_proto",
00888             "2open_local-s (%s) success, seqno=%d chan=%d token=%d,%d VC=%p",
00889             (read_op ? "R" : "W"), ms->seq_number, vc->channel, ms->token.ip_created, ms->token.sequence_number, vc);
00890 
00891     } else {
00892       CacheOpMsg_long *ml = (CacheOpMsg_long *) getMsgBuffer();
00893       ml->channel = vc->channel;
00894       ml->token = open_local_token;
00895 
00896       Debug("cache_proto",
00897             "3open_local-l (%s) success, seqno=%d chan=%d token=%d,%d VC=%p",
00898             (read_op ? "R" : "W"), ml->seq_number, vc->channel, ml->token.ip_created, ml->token.sequence_number, vc);
00899     }
00900     SET_HANDLER((CacheContHandler) & CacheContinuation::remoteOpEvent);
00901 
00902     if (event != CLUSTER_EVENT_OPEN_EXISTS) {
00903       
00904       clusterProcessor.invoke_remote(ch,
00905                                      (op_needs_marshalled_coi(request_opcode) ?
00906                                       CACHE_OP_MALLOCED_CLUSTER_FUNCTION :
00907                                       CACHE_OP_CLUSTER_FUNCTION), (char *) getMsgBuffer(), getMsgBufferLen());
00908     }
00909 
00910   } else {
00911     int send_failure_callback = 1;
00912 
00913     if (((ptrdiff_t) timeout & (ptrdiff_t) 1) == 0) {
00914       if (short_msg) {
00915         Debug("cache_proto", "2open_local-s (%s) failed, seqno=%d",
00916               (read_op ? "R" : "W"), ((CacheOpMsg_short *) getMsgBuffer())->seq_number);
00917       } else {
00918         Debug("cache_proto", "3open_local-l (%s) failed, seqno=%d",
00919               (read_op ? "R" : "W"), ((CacheOpMsg_long *) getMsgBuffer())->seq_number);
00920       }
00921 
00922     } else {
00923       Debug("cache_proto", "4open_local cancelled due to timeout, seqno=%d", seq_number);
00924       this->timeout = 0;
00925 
00926       
00927 
00928       if (event == CLUSTER_EVENT_OPEN) {
00929         vc->pending_remote_fill = 0;
00930         vc->remote_closed = 1;  
00931         vc->do_io(VIO::CLOSE);
00932       }
00933       send_failure_callback = 0;        
00934     }
00935 
00936     if (this->timeout)
00937       this->timeout->cancel();
00938     this->timeout = NULL;
00939 
00940     freeMsgBuffer();
00941     if (send_failure_callback) {
00942       
00943       
00944       
00945       
00946       
00947       this->use_deferred_callback = true;
00948       this->result = (read_op ? CACHE_EVENT_OPEN_READ_FAILED : CACHE_EVENT_OPEN_WRITE_FAILED);
00949       this->result_error = 0;
00950       remove_and_delete(0, (Event *) 0);
00951 
00952     } else {
00953       cacheContAllocator_free(this);
00954     }
00955     return EVENT_DONE;
00956   }
00957   
00958   freeMsgBuffer();
00959 
00960   return EVENT_DONE;
00961 }
00962 
00963 
00964 
00965 
00966 
00967 
00968 
00969 
00970 
00971 
00972 inline CacheOpMsg_long *
00973 unmarshal_CacheOpMsg_long(void *data, int NeedByteSwap)
00974 {
00975   if (NeedByteSwap)
00976     ((CacheOpMsg_long *) data)->SwapBytes();
00977   return (CacheOpMsg_long *) data;
00978 }
00979 
00980 inline CacheOpMsg_short *
00981 unmarshal_CacheOpMsg_short(void *data, int NeedByteSwap)
00982 {
00983   if (NeedByteSwap)
00984     ((CacheOpMsg_short *) data)->SwapBytes();
00985   return (CacheOpMsg_short *) data;
00986 }
00987 
00988 inline CacheOpMsg_short_2 *
00989 unmarshal_CacheOpMsg_short_2(void *data, int NeedByteSwap)
00990 {
00991   if (NeedByteSwap)
00992     ((CacheOpMsg_short_2 *) data)->SwapBytes();
00993   return (CacheOpMsg_short_2 *) data;
00994 }
00995 
00996 
00997 inline void
00998 init_from_long(CacheContinuation * cont, CacheOpMsg_long * msg, ClusterMachine * m)
00999 {
01000   cont->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
01001   cont->seq_number = msg->seq_number;
01002   cont->cfl_flags = msg->cfl_flags;
01003   cont->from = m;
01004   cont->url_md5 = msg->url_md5;
01005   cont->cluster_vc_channel = msg->channel;
01006   cont->frag_type = (CacheFragType) msg->frag_type;
01007   if ((cont->request_opcode == CACHE_OPEN_WRITE_LONG)
01008       || (cont->request_opcode == CACHE_OPEN_READ_LONG)) {
01009     cont->pin_in_cache = (time_t) msg->data;
01010   } else {
01011     cont->pin_in_cache = 0;
01012   }
01013   cont->token = msg->token;
01014   cont->nbytes = (((int) msg->nbytes < 0) ? 0 : msg->nbytes);
01015 
01016   if (cont->request_opcode == CACHE_OPEN_READ_LONG) {
01017     cont->caller_buf_freebytes = msg->buffer_size;
01018   } else {
01019     cont->caller_buf_freebytes = 0;
01020   }
01021 }
01022 
01023 
01024 inline void
01025 init_from_short(CacheContinuation * cont, CacheOpMsg_short * msg, ClusterMachine * m)
01026 {
01027   cont->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
01028   cont->seq_number = msg->seq_number;
01029   cont->cfl_flags = msg->cfl_flags;
01030   cont->from = m;
01031   cont->url_md5 = msg->md5;
01032   cont->cluster_vc_channel = msg->channel;
01033   cont->token = msg->token;
01034   cont->nbytes = (((int) msg->nbytes < 0) ? 0 : msg->nbytes);
01035   cont->frag_type = (CacheFragType) msg->frag_type;
01036 
01037   if (cont->request_opcode == CACHE_OPEN_WRITE) {
01038     cont->pin_in_cache = (time_t) msg->data;
01039   } else {
01040     cont->pin_in_cache = 0;
01041   }
01042 
01043   if (cont->request_opcode == CACHE_OPEN_READ) {
01044     cont->caller_buf_freebytes = msg->buffer_size;
01045   } else {
01046     cont->caller_buf_freebytes = 0;
01047   }
01048 }
01049 
01050 
01051 inline void
01052 init_from_short_2(CacheContinuation * cont, CacheOpMsg_short_2 * msg, ClusterMachine * m)
01053 {
01054   cont->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
01055   cont->seq_number = msg->seq_number;
01056   cont->cfl_flags = msg->cfl_flags;
01057   cont->from = m;
01058   cont->url_md5 = msg->md5_1;
01059   cont->frag_type = (CacheFragType) msg->frag_type;
01060 }
01061 
01062 void
01063 cache_op_ClusterFunction(ClusterHandler * ch, void *data, int len)
01064 {
01065   EThread *thread = this_ethread();
01066   ProxyMutex *mutex = thread->mutex;
01067 
01068   
01069 
01070   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
01071 
01072   int opcode;
01073   ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
01074 
01075   if (mh->GetMsgVersion() != CacheOpMsg_long::CACHE_OP_LONG_MESSAGE_VERSION) {  
01076     
01077 
01078     ink_release_assert(!"cache_op_ClusterFunction() bad msg version");
01079   }
01080   opcode = ((CacheOpMsg_long *) data)->opcode;
01081 
01082   
01083 
01084   CacheContinuation *c = CacheContinuation::cacheContAllocator_alloc();
01085   c->mutex = new_ProxyMutex();
01086   MUTEX_TRY_LOCK(lock, c->mutex, this_ethread());
01087   c->request_opcode = opcode;
01088   c->token.clear();
01089   c->start_time = ink_get_hrtime();
01090   c->ch = ch;
01091   SET_CONTINUATION_HANDLER(c, (CacheContHandler)
01092                            & CacheContinuation::replyOpEvent);
01093 
01094   switch (opcode) {
01095   case CACHE_OPEN_WRITE_BUFFER:
01096   case CACHE_OPEN_WRITE_BUFFER_LONG:
01097     ink_release_assert(!"cache_op_ClusterFunction WRITE_BUFFER not supported");
01098     break;
01099 
01100   case CACHE_OPEN_READ_BUFFER:
01101   case CACHE_OPEN_READ_BUFFER_LONG:
01102     ink_release_assert(!"cache_op_ClusterFunction READ_BUFFER not supported");
01103     break;
01104 
01105   case CACHE_OPEN_READ:
01106     {
01107       CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
01108       init_from_short(c, msg, ch->machine);
01109       Debug("cache_msg",
01110             "cache_op-s op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
01111       
01112       
01113       
01114       c->write_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
01115                                                            &c->token,
01116                                                            c->cluster_vc_channel,
01117                                                            (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_READ));
01118       if (!c->write_cluster_vc) {
01119         
01120         CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
01121         Debug("chan_inuse",
01122               "1Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
01123               c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
01124 
01125         
01126         c->replyOpEvent(CACHE_EVENT_OPEN_READ_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
01127         break;
01128 
01129       } else {
01130         c->write_cluster_vc->current_cont = c;
01131       }
01132       ink_release_assert(c->write_cluster_vc != CLUSTER_DELAYED_OPEN);
01133       ink_release_assert((opcode == CACHE_OPEN_READ)
01134                          || c->write_cluster_vc->pending_remote_fill);
01135 
01136       SET_CONTINUATION_HANDLER(c, (CacheContHandler)
01137                                & CacheContinuation::setupVCdataRead);
01138       Debug("cache_proto",
01139             "0read op, seqno=%d chan=%d bufsize=%d token=%d,%d",
01140             msg->seq_number, msg->channel, msg->buffer_size, msg->token.ip_created, msg->token.sequence_number);
01141 #ifdef CACHE_MSG_TRACE
01142       log_cache_op_msg(msg->seq_number, len, "cache_op_open_read");
01143 #endif
01144       CacheKey key(msg->md5);
01145 
01146       char *hostname = NULL;
01147       int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
01148       if (host_len) {
01149         hostname = (char *) msg->moi.byte;
01150       }
01151       Cache *call_cache = caches[c->frag_type];
01152       c->cache_action = call_cache->open_read(c, &key, c->frag_type, hostname, host_len);
01153       break;
01154     }
01155   case CACHE_OPEN_READ_LONG:
01156     {
01157       
01158       c->setMsgBufferLen(len);
01159       c->allocMsgBuffer();
01160       memcpy(c->getMsgBuffer(), (char *) data, len);
01161 
01162       int flen = CacheOpMsg_long::sizeof_fixedlen_msg();
01163       CacheOpMsg_long *msg = unmarshal_CacheOpMsg_long(c->getMsgBuffer(), mh->NeedByteSwap());
01164       init_from_long(c, msg, ch->machine);
01165       Debug("cache_msg",
01166             "cache_op-l op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
01167 #ifdef CACHE_MSG_TRACE
01168       log_cache_op_msg(msg->seq_number, len, "cache_op_open_read_long");
01169 #endif
01170       
01171       
01172       
01173       c->write_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
01174                                                            &c->token,
01175                                                            c->cluster_vc_channel,
01176                                                            (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_READ));
01177       if (!c->write_cluster_vc) {
01178         
01179         CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
01180         Debug("chan_inuse",
01181               "2Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
01182               c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
01183 
01184         
01185         c->replyOpEvent(CACHE_EVENT_OPEN_READ_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
01186         break;
01187 
01188       } else {
01189         c->write_cluster_vc->current_cont = c;
01190       }
01191       ink_release_assert(c->write_cluster_vc != CLUSTER_DELAYED_OPEN);
01192       ink_release_assert((opcode == CACHE_OPEN_READ_LONG)
01193                          || c->write_cluster_vc->pending_remote_fill);
01194 
01195       SET_CONTINUATION_HANDLER(c, (CacheContHandler)
01196                                & CacheContinuation::setupReadWriteVC);
01197       Debug("cache_proto",
01198             "1read op, seqno=%d chan=%d bufsize=%d token=%d,%d",
01199             msg->seq_number, msg->channel, msg->buffer_size, msg->token.ip_created, msg->token.sequence_number);
01200 
01201       const char *p = (const char *) msg + flen;
01202       int moi_len = len - flen;
01203       int res;
01204 
01205       ink_assert(moi_len > 0);
01206 
01207       
01208       res = c->ic_request.unmarshal((char *) p, moi_len, NULL);
01209       ink_assert(res > 0);
01210       ink_assert(c->ic_request.valid());
01211       c->request_purge = c->ic_request.method_get_wksidx() == HTTP_WKSIDX_PURGE || c->ic_request.method_get_wksidx() == HTTP_WKSIDX_DELETE;
01212       moi_len -= res;
01213       p += res;
01214       ink_assert(moi_len > 0);
01215       
01216       c->ic_params = new(CacheLookupHttpConfigAllocator.alloc())
01217         CacheLookupHttpConfig();
01218       res = c->ic_params->unmarshal(&c->ic_arena, (const char *) p, moi_len);
01219       ink_assert(res > 0);
01220 
01221       moi_len -= res;
01222       p += res;
01223 
01224       CacheKey key(msg->url_md5);
01225 
01226       char *hostname = NULL;
01227       int host_len = 0;
01228 
01229       if (moi_len) {
01230         hostname = (char *) p;
01231         host_len = moi_len;
01232 
01233         
01234         
01235 
01236         c->ic_hostname = new_IOBufferData(iobuffer_size_to_index(host_len));
01237         c->ic_hostname_len = host_len;
01238 
01239         memcpy(c->ic_hostname->data(), hostname, host_len);
01240       }
01241 
01242       Cache *call_cache = caches[c->frag_type];
01243       Action *a = call_cache->open_read(c, &key, &c->ic_request,
01244                                         c->ic_params,
01245                                         c->frag_type, hostname, host_len);
01246       
01247       if (a != ACTION_RESULT_DONE) {
01248         c->cache_action = a;
01249       }
01250       break;
01251     }
01252   case CACHE_OPEN_WRITE:
01253     {
01254       CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
01255       init_from_short(c, msg, ch->machine);
01256       Debug("cache_msg",
01257             "cache_op-s op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
01258 #ifdef CACHE_MSG_TRACE
01259       log_cache_op_msg(msg->seq_number, len, "cache_op_open_write");
01260 #endif
01261       
01262       
01263       
01264       c->read_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
01265                                                           &c->token,
01266                                                           c->cluster_vc_channel,
01267                                                           (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_WRITE));
01268       if (!c->read_cluster_vc) {
01269         
01270         CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
01271         Debug("chan_inuse",
01272               "3Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
01273               c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
01274 
01275         
01276         c->replyOpEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
01277         break;
01278 
01279       } else {
01280         c->read_cluster_vc->current_cont = c;
01281       }
01282       ink_release_assert(c->read_cluster_vc != CLUSTER_DELAYED_OPEN);
01283 
01284       CacheKey key(msg->md5);
01285 
01286       char *hostname = NULL;
01287       int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
01288       if (host_len) {
01289         hostname = (char *) msg->moi.byte;
01290       }
01291 
01292       Cache *call_cache = caches[c->frag_type];
01293       Action *a = call_cache->open_write(c, &key, c->frag_type,
01294                                          !!(c->cfl_flags & CFL_OVERWRITE_ON_WRITE),
01295                                          c->pin_in_cache, hostname, host_len);
01296       if (a != ACTION_RESULT_DONE) {
01297         c->cache_action = a;
01298       }
01299       break;
01300     }
01301   case CACHE_OPEN_WRITE_LONG:
01302     {
01303       
01304       c->setMsgBufferLen(len);
01305       c->allocMsgBuffer();
01306       memcpy(c->getMsgBuffer(), (char *) data, len);
01307 
01308       int flen = CacheOpMsg_long::sizeof_fixedlen_msg();
01309       CacheOpMsg_long *msg = unmarshal_CacheOpMsg_long(c->getMsgBuffer(), mh->NeedByteSwap());
01310       init_from_long(c, msg, ch->machine);
01311       Debug("cache_msg",
01312             "cache_op-l op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
01313 #ifdef CACHE_MSG_TRACE
01314       log_cache_op_msg(msg->seq_number, len, "cache_op_open_write_long");
01315 #endif
01316       
01317       
01318       
01319       c->read_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
01320                                                           &c->token,
01321                                                           c->cluster_vc_channel,
01322                                                           (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_WRITE));
01323       if (!c->read_cluster_vc) {
01324         
01325         CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
01326         Debug("chan_inuse",
01327               "4Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
01328               c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
01329 
01330         
01331         c->replyOpEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
01332         break;
01333 
01334       } else {
01335         c->read_cluster_vc->current_cont = c;
01336       }
01337       ink_release_assert(c->read_cluster_vc != CLUSTER_DELAYED_OPEN);
01338 
01339       CacheHTTPInfo *ci = 0;
01340       const char *p = (const char *) msg + flen;
01341       int res = 0;
01342       int moi_len = len - flen;
01343 
01344       if (moi_len && c->cfl_flags & CFL_LOPENWRITE_HAVE_OLDINFO) {
01345 
01346         
01347         res = HTTPInfo::unmarshal((char *) p, moi_len, NULL);
01348         ink_assert(res > 0);
01349         c->ic_old_info.get_handle((char *) p, moi_len);
01350         ink_assert(c->ic_old_info.valid());
01351         ci = &c->ic_old_info;
01352       }
01353       if (c->cfl_flags & CFL_ALLOW_MULTIPLE_WRITES) {
01354         ink_assert(!ci);
01355         ci = (CacheHTTPInfo *) CACHE_ALLOW_MULTIPLE_WRITES;
01356       }
01357       moi_len -= res;
01358       p += res;
01359 
01360       CacheKey key(msg->url_md5);
01361       char *hostname = NULL;
01362 
01363       if (moi_len) {
01364         hostname = (char *) p;
01365       }
01366 
01367       Cache *call_cache = caches[c->frag_type];
01368       Action *a = call_cache->open_write(c, &key, ci, c->pin_in_cache,
01369                                          NULL, c->frag_type, hostname, moi_len);
01370       if (a != ACTION_RESULT_DONE) {
01371         c->cache_action = a;
01372       }
01373       break;
01374     }
01375   case CACHE_REMOVE:
01376     {
01377       CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
01378       init_from_short(c, msg, ch->machine);
01379       Debug("cache_msg",
01380             "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
01381 #ifdef CACHE_MSG_TRACE
01382       log_cache_op_msg(msg->seq_number, len, "cache_op_remove");
01383 #endif
01384       CacheKey key(msg->md5);
01385 
01386       char *hostname = NULL;
01387       int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
01388       if (host_len) {
01389         hostname = (char *) msg->moi.byte;
01390       }
01391 
01392       Cache *call_cache = caches[c->frag_type];
01393       Action *a = call_cache->remove(c, &key, c->frag_type,
01394                                      !!(c->cfl_flags & CFL_REMOVE_USER_AGENTS),
01395                                      !!(c->cfl_flags & CFL_REMOVE_LINK),
01396                                      hostname, host_len);
01397       if (a != ACTION_RESULT_DONE) {
01398         c->cache_action = a;
01399       }
01400       break;
01401     }
01402   case CACHE_LINK:
01403     {
01404       CacheOpMsg_short_2 *msg = unmarshal_CacheOpMsg_short_2(data, mh->NeedByteSwap());
01405       init_from_short_2(c, msg, ch->machine);
01406       Debug("cache_msg",
01407             "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
01408 #ifdef CACHE_MSG_TRACE
01409       log_cache_op_msg(msg->seq_number, len, "cache_op_link");
01410 #endif
01411 
01412       CacheKey key1(msg->md5_1);
01413       CacheKey key2(msg->md5_2);
01414 
01415       char *hostname = NULL;
01416       int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
01417       if (host_len) {
01418         hostname = (char *) msg->moi.byte;
01419       }
01420 
01421       Cache *call_cache = caches[c->frag_type];
01422       Action *a = call_cache->link(c, &key1, &key2, c->frag_type,
01423                                    hostname, host_len);
01424       if (a != ACTION_RESULT_DONE) {
01425         c->cache_action = a;
01426       }
01427       break;
01428     }
01429   case CACHE_DEREF:
01430     {
01431       CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
01432       init_from_short(c, msg, ch->machine);
01433       Debug("cache_msg",
01434             "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
01435 #ifdef CACHE_MSG_TRACE
01436       log_cache_op_msg(msg->seq_number, len, "cache_op_deref");
01437 #endif
01438 
01439       CacheKey key(msg->md5);
01440 
01441       char *hostname = NULL;
01442       int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
01443       if (host_len) {
01444         hostname = (char *) msg->moi.byte;
01445       }
01446 
01447       Cache *call_cache = caches[c->frag_type];
01448       Action *a = call_cache->deref(c, &key, c->frag_type,
01449                                     hostname, host_len);
01450       if (a != ACTION_RESULT_DONE) {
01451         c->cache_action = a;
01452       }
01453       break;
01454     }
01455 
01456   default:
01457     {
01458       ink_release_assert(0);
01459     }
01460   }                             
01461 }
01462 
01463 void
01464 cache_op_malloc_ClusterFunction(ClusterHandler *ch, void *data, int len)
01465 {
01466   cache_op_ClusterFunction(ch, data, len);
01467   
01468   clusterProcessor.free_remote_data((char *) data, len);
01469 }
01470 
01471 int
01472 CacheContinuation::setupVCdataRead(int event, VConnection * vc)
01473 {
01474   ink_assert(magicno == (int) MagicNo);
01475   
01476   
01477   
01478   
01479   if (event == CACHE_EVENT_OPEN_READ) {
01480 
01481     
01482 
01483     Debug("cache_proto", "setupVCdataRead CACHE_EVENT_OPEN_READ seqno=%d", seq_number);
01484     ink_release_assert(caller_buf_freebytes);
01485     SET_HANDLER((CacheContHandler) & CacheContinuation::VCdataRead);
01486 
01487     int64_t size_index = iobuffer_size_to_index(caller_buf_freebytes);
01488     MIOBuffer *buf = new_MIOBuffer(size_index);
01489     readahead_reader = buf->alloc_reader();
01490 
01491     MUTEX_TRY_LOCK(lock, mutex, this_ethread());        
01492     readahead_vio = vc->do_io_read(this, caller_buf_freebytes, buf);
01493     return EVENT_DONE;
01494 
01495   } else {
01496     
01497     SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
01498     return handleEvent(event, vc);
01499   }
01500 }
01501 
01502 int
01503 CacheContinuation::VCdataRead(int event, VIO * target_vio)
01504 {
01505   ink_release_assert(magicno == (int) MagicNo);
01506   ink_release_assert(readahead_vio == target_vio);
01507 
01508   VConnection *vc = target_vio->vc_server;
01509   int reply = CACHE_EVENT_OPEN_READ;
01510   int32_t object_size;
01511 
01512   switch (event) {
01513   case VC_EVENT_EOS:
01514     {
01515       if (!target_vio->ndone) {
01516         
01517         goto read_failed;
01518       }
01519       
01520     }
01521   case VC_EVENT_READ_READY:
01522   case VC_EVENT_READ_COMPLETE:
01523     {
01524       int clone_bytes;
01525       int current_ndone = target_vio->ndone;
01526 
01527       ink_assert(current_ndone);
01528       ink_assert(current_ndone <= readahead_reader->read_avail());
01529 
01530       object_size = getObjectSize(vc, request_opcode, &cache_vc_info);
01531       have_all_data = ((object_size <= caller_buf_freebytes) && (object_size == current_ndone));
01532 
01533       
01534 
01535       clone_bytes = current_ndone;
01536       if (!have_all_data) {
01537         if (current_ndone > caller_buf_freebytes) {
01538           clone_bytes = caller_buf_freebytes;
01539         }
01540       }
01541       
01542 
01543       IOBufferBlock *tail;
01544       readahead_data = clone_IOBufferBlockList(readahead_reader->get_current_block(),
01545                                                readahead_reader->start_offset, clone_bytes, &tail);
01546 
01547       if (have_all_data) {
01548         
01549 
01550         MIOBuffer *mbuf = target_vio->buffer.writer();
01551         vc->do_io(VIO::CLOSE);
01552         free_MIOBuffer(mbuf);
01553         readahead_vio = 0;
01554       }
01555       SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
01556       handleEvent(reply, vc);
01557       return EVENT_CONT;
01558     }
01559   case VC_EVENT_ERROR:
01560   case VC_EVENT_INACTIVITY_TIMEOUT:
01561   case VC_EVENT_ACTIVE_TIMEOUT:
01562   default:
01563     {
01564     read_failed:
01565       
01566 
01567       MIOBuffer * mbuf = target_vio->buffer.writer();
01568       vc->do_io(VIO::CLOSE);
01569       free_MIOBuffer(mbuf);
01570       readahead_vio = 0;
01571       reply = CACHE_EVENT_OPEN_READ_FAILED;
01572 
01573       SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
01574       handleEvent(reply, (VConnection *) - ECLUSTER_ORB_DATA_READ);
01575       return EVENT_DONE;
01576     }
01577   }                             
01578 }
01579 
01580 int
01581 CacheContinuation::setupReadWriteVC(int event, VConnection * vc)
01582 {
01583   
01584 
01585   switch (event) {
01586   case CACHE_EVENT_OPEN_READ:
01587     {
01588       
01589 
01590       SET_HANDLER((CacheContHandler) & CacheContinuation::setupVCdataRead);
01591       return handleEvent(event, vc);
01592       break;
01593     }
01594   case CACHE_EVENT_OPEN_READ_FAILED:
01595     {
01596       if (frag_type == CACHE_FRAG_TYPE_HTTP && !request_purge) {
01597         
01598         
01599 
01600         CacheKey key(url_md5);
01601 
01602         Cache *call_cache = caches[frag_type];
01603         Action *a = call_cache->open_write(this, &key, 0, pin_in_cache,
01604                                            NULL, frag_type, ic_hostname ? ic_hostname->data() : NULL,
01605                                            ic_hostname_len);
01606         if (a != ACTION_RESULT_DONE) {
01607           cache_action = a;
01608         }
01609       } else {
01610         SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
01611         return handleEvent(CACHE_EVENT_OPEN_READ_FAILED, 0);
01612       }
01613       break;
01614     }
01615   case CACHE_EVENT_OPEN_WRITE:
01616     {
01617       
01618 
01619       ink_assert(!read_cluster_vc && write_cluster_vc);
01620       read_cluster_vc = write_cluster_vc;
01621       read_cluster_vc->set_type(CLUSTER_OPT_CONN_WRITE);
01622       write_cluster_vc = 0;
01623 
01624       SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
01625       return handleEvent(event, vc);
01626       break;
01627     }
01628   case CACHE_EVENT_OPEN_WRITE_FAILED:
01629   default:
01630     {
01631       SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
01632       return handleEvent(CACHE_EVENT_OPEN_READ_FAILED, 0);
01633       break;
01634     }
01635   }                             
01636 
01637   return EVENT_DONE;
01638 }
01639 
01640 
01641 
01642 
01643 
01644 int
01645 CacheContinuation::replyOpEvent(int event, VConnection * cvc)
01646 {
01647   ink_assert(magicno == (int) MagicNo);
01648   Debug("cache_proto", "replyOpEvent(this=%p,event=%d,VC=%p)", this, event, cvc);
01649   ink_hrtime now;
01650   now = ink_get_hrtime();
01651   CLUSTER_SUM_DYN_STAT(CLUSTER_CACHE_CALLBACK_TIME_STAT, now - start_time);
01652   LOG_EVENT_TIME(start_time, callback_time_dist, cache_callbacks);
01653   ink_release_assert(expect_cache_callback);
01654   expect_cache_callback = false;        
01655 
01656 
01657   result = event;
01658   bool open = event_is_open(event);
01659   bool read_op = op_is_read(request_opcode);
01660   bool open_read_now_open_write = false;
01661 
01662   
01663   CacheOpReplyMsg rmsg;
01664   CacheOpReplyMsg *msg = &rmsg;
01665   msg->result = event;
01666 
01667   if ((request_opcode == CACHE_OPEN_READ_LONG)
01668       && cvc && (event == CACHE_EVENT_OPEN_WRITE)) {
01669 
01670     
01671     
01672     
01673 
01674     msg->result = CACHE_EVENT_OPEN_READ_FAILED;
01675     open_read_now_open_write = true;
01676   }
01677 
01678   msg->seq_number = seq_number;
01679   int flen = CacheOpReplyMsg::sizeof_fixedlen_msg();    
01680   int len = 0;
01681   int vers = 0;
01682 
01683   int results_expected = 1;
01684 
01685   if (no_reply_message)         
01686     goto free_exit;
01687 
01688   if (open) {
01689 
01690     
01691 
01692     results_expected = 2;
01693     cache_vc = cvc;
01694     cache_read = (event == CACHE_EVENT_OPEN_READ);
01695 
01696     if (read_op && !open_read_now_open_write) {
01697       ink_release_assert(write_cluster_vc->pending_remote_fill);
01698       ink_assert(have_all_data || (readahead_vio == &((CacheVC *) cache_vc)->vio));
01699       Debug("cache_proto", "connect_local success seqno=%d have_all_data=%d", seq_number, (have_all_data ? 1 : 0));
01700 
01701       if (have_all_data) {
01702         msg->token.clear();     
01703         write_cluster_vc->type = VC_CLUSTER_WRITE;
01704       } else {
01705         msg->token = token;     
01706         setupReadBufTunnel(cache_vc, write_cluster_vc);
01707       }
01708 
01709     } else {
01710       Debug("cache_proto", "cache_open [%s] success seqno=%d", (cache_read ? "R" : "W"), seq_number);
01711       msg->token = token;       
01712 
01713       OneWayTunnel *pOWT = OneWayTunnel::OneWayTunnel_alloc();
01714       pOWT->init(read_cluster_vc, cache_vc, NULL, nbytes ? nbytes : DEFAULT_MAX_BUFFER_SIZE, this->mutex);
01715       read_cluster_vc->allow_remote_close();
01716       results_expected--;
01717     }
01718 
01719     
01720     if (cache_read) {
01721       int res;
01722 
01723       msg->is_ram_cache_hit = ((CacheVC *)cache_vc)->is_ram_cache_hit();
01724 
01725       if (!cache_vc_info.valid()) {
01726         (void) getObjectSize(cache_vc, request_opcode, &cache_vc_info);
01727       }
01728       
01729       len = cache_vc_info.marshal_length();
01730       CacheOpReplyMsg *reply = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen + len);
01731 
01732       
01733       *reply = *msg;
01734 
01735       
01736       res = cache_vc_info.marshal((char *) reply + flen, len);
01737       ink_assert(res >= 0 && res <= len);
01738 
01739       
01740       msg = reply;
01741     }
01742 
01743   } else {
01744     Debug("cache_proto", "cache operation failed result=%d seqno=%d (this=%p)", event, seq_number, this);
01745     msg->token.clear();         
01746 
01747     
01748     len += sizeof(int32_t);
01749     CacheOpReplyMsg *reply = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen + len);
01750 
01751     
01752     *reply = *msg;
01753 
01754     if (request_opcode != CACHE_LINK) {
01755       
01756       
01757       
01758       if (read_cluster_vc) {
01759         read_cluster_vc->remote_closed = 1;     
01760         read_cluster_vc->do_io(VIO::CLOSE);
01761       }
01762       if (write_cluster_vc) {
01763         write_cluster_vc->pending_remote_fill = 0;
01764         write_cluster_vc->remote_closed = 1;    
01765         write_cluster_vc->do_io(VIO::CLOSE);
01766       }
01767       reply->moi.u32 = (int32_t) ((uintptr_t) cvc & 0xffffffff);    
01768     }
01769     
01770     msg = reply;
01771   }
01772   CLUSTER_DECREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
01773 
01774   
01775   
01776   
01777 #ifdef CACHE_MSG_TRACE
01778   log_cache_op_sndmsg(msg->seq_number, 0, "replyOpEvent");
01779 #endif
01780   vers = CacheOpReplyMsg::protoToVersion(from->msg_proto_major);
01781   if (vers == CacheOpReplyMsg::CACHE_OP_REPLY_MESSAGE_VERSION) {
01782     if (read_op) {
01783       
01784       Debug("cache_proto", "Sending reply/data seqno=%d buflen=%" PRId64,
01785             seq_number, readahead_data ? bytes_IOBufferBlockList(readahead_data, 1) : 0);
01786       clusterProcessor.invoke_remote_data(ch,
01787                                           CACHE_OP_RESULT_CLUSTER_FUNCTION,
01788                                           (void *) msg, (flen + len),
01789                                           readahead_data,
01790                                           cluster_vc_channel, &token,
01791                                           &CacheContinuation::disposeOfDataBuffer, (void *) this, CLUSTER_OPT_STEAL);
01792     } else {
01793       Debug("cache_proto", "Sending reply seqno=%d, (this=%p)", seq_number, this);
01794       clusterProcessor.invoke_remote(ch, CACHE_OP_RESULT_CLUSTER_FUNCTION,
01795                                      (void *) msg, (flen + len), CLUSTER_OPT_STEAL);
01796     }
01797 
01798   } else {
01799 
01800     
01801 
01802     ink_release_assert(!"replyOpEvent() bad msg version");
01803   }
01804 
01805 free_exit:
01806   results_expected--;
01807   if (results_expected <= 0) {
01808     Debug("cache_proto", "replyOpEvent: freeing this=%p", this);
01809     cacheContAllocator_free(this);
01810   }
01811   return EVENT_DONE;
01812 }
01813 
01814 void
01815 CacheContinuation::setupReadBufTunnel(VConnection * cache_read_vc, VConnection * cluster_write_vc)
01816 {
01817 
01818   
01819   
01820 
01821   tunnel_cont = cacheContAllocator_alloc();
01822   tunnel_cont->mutex = this->mutex;
01823   SET_CONTINUATION_HANDLER(tunnel_cont, (CacheContHandler)
01824                            & CacheContinuation::tunnelClosedEvent);
01825   int64_t ravail = bytes_IOBufferBlockList(readahead_data, 1);
01826 
01827   tunnel_mutex = tunnel_cont->mutex;
01828   tunnel_closed = false;
01829 
01830   tunnel = OneWayTunnel::OneWayTunnel_alloc();
01831   readahead_reader->consume(ravail);    
01832   tunnel->init(cache_read_vc, cluster_write_vc, tunnel_cont, readahead_vio, readahead_reader);
01833   tunnel_cont->action = this;
01834   tunnel_cont->tunnel = tunnel;
01835   tunnel_cont->tunnel_cont = tunnel_cont;
01836 
01837   
01838   ((ClusterVConnection *) cluster_write_vc)->write.enabled = 0;
01839 
01840   
01841   readahead_vio->nbytes = readahead_vio->ndone;
01842 
01843 
01844   
01845   
01846   
01847 
01848 }
01849 
01850 
01851 
01852 
01853 int
01854 CacheContinuation::tunnelClosedEvent(int , void *c)
01855 {
01856   ink_assert(magicno == (int) MagicNo);
01857   
01858   CacheContinuation *tc = (CacheContinuation *) c;
01859   ink_release_assert(tc->tunnel_cont == tc);
01860   CacheContinuation *real_cc = (CacheContinuation *) tc->action.continuation;
01861 
01862   if (real_cc) {
01863     
01864     real_cc->tunnel = 0;
01865     real_cc->tunnel_cont = 0;
01866     real_cc->tunnel_closed = true;
01867   }
01868   OneWayTunnel::OneWayTunnel_free(tc->tunnel);
01869   cacheContAllocator_free(tc);
01870 
01871   return EVENT_DONE;
01872 }
01873 
01874 
01875 
01876 
01877 struct retryDisposeOfDataBuffer;
01878 typedef int (retryDisposeOfDataBuffer::*rtryDisOfDBufHandler) (int, void *);
01879 struct retryDisposeOfDataBuffer:public Continuation
01880 {
01881   CacheContinuation *c;
01882 
01883   int handleRetryEvent(int event, Event * e)
01884   {
01885     if (CacheContinuation::handleDisposeEvent(event, c) == EVENT_DONE) {
01886       delete this;
01887         return EVENT_DONE;
01888     } else
01889     {
01890       e->schedule_in(HRTIME_MSECONDS(10));
01891       return EVENT_CONT;
01892     }
01893   }
01894   retryDisposeOfDataBuffer(CacheContinuation * cont)
01895 :  Continuation(new_ProxyMutex()), c(cont) {
01896     SET_HANDLER((rtryDisOfDBufHandler)
01897                 & retryDisposeOfDataBuffer::handleRetryEvent);
01898   }
01899 };
01900 
01901 
01902 
01903 
01904 
01905 void
01906 CacheContinuation::disposeOfDataBuffer(void *d)
01907 {
01908   ink_assert(d);
01909   CacheContinuation *cc = (CacheContinuation *) d;
01910   ink_assert(cc->have_all_data || cc->readahead_vio);
01911   ink_assert(cc->have_all_data || (cc->readahead_vio == &((CacheVC *) cc->cache_vc)->vio));
01912 
01913   if (cc->have_all_data) {
01914     
01915     
01916     
01917     
01918     
01919     
01920     cc->write_cluster_vc->pending_remote_fill = 0;
01921     cc->write_cluster_vc->remote_closed = 1;
01922     cc->write_cluster_vc->do_io(VIO::CLOSE);
01923     cc->readahead_data = 0;
01924 
01925     cacheContAllocator_free(cc);
01926 
01927   } else {
01928     cc->write_cluster_vc->pending_remote_fill = 0;
01929     cc->write_cluster_vc->allow_remote_close();
01930     if (handleDisposeEvent(0, cc) == EVENT_CONT) {
01931       
01932       retryDisposeOfDataBuffer *retryCont = new retryDisposeOfDataBuffer(cc);
01933       eventProcessor.schedule_in(retryCont, HRTIME_MSECONDS(10), ET_CALL);
01934     }
01935   }
01936 }
01937 
01938 int
01939 CacheContinuation::handleDisposeEvent(int , CacheContinuation * cc)
01940 {
01941   ink_assert(cc->magicno == (int) MagicNo);
01942   MUTEX_TRY_LOCK(lock, cc->tunnel_mutex, this_ethread());
01943   if (lock) {
01944     
01945 
01946     if (!cc->tunnel_closed) {
01947       
01948 
01949       cc->tunnel->vioSource->nbytes = getObjectSize(cc->tunnel->vioSource->vc_server, cc->request_opcode, 0);
01950       cc->tunnel->vioSource->reenable_re();
01951 
01952       
01953       
01954       if (!cc->tunnel_closed) {
01955         cc->tunnel->vioTarget->reenable();
01956 
01957         
01958         cc->tunnel_cont->action.continuation = 0;
01959       }
01960     }
01961     cacheContAllocator_free(cc);
01962     return EVENT_DONE;
01963 
01964   } else {
01965     
01966     return EVENT_CONT;
01967   }
01968 }
01969 
01970 
01971 
01972 
01973 
01974 
01975 void
01976 cache_op_result_ClusterFunction(ClusterHandler *ch, void *d, int l)
01977 {
01978 
01979   
01980 
01981 
01982   
01983   Ptr<IOBufferData> iob = make_ptr(new_IOBufferData(iobuffer_size_to_index(l)));
01984   memcpy(iob->data(), (char *) d, l);
01985   char *data = iob->data();
01986   int flen, len = l;
01987   CacheHTTPInfo ci;
01988   CacheOpReplyMsg *msg = (CacheOpReplyMsg *) data;
01989   int32_t op_result_error = 0;
01990   ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
01991 
01992   if (mh->GetMsgVersion() != CacheOpReplyMsg::CACHE_OP_REPLY_MESSAGE_VERSION) { 
01993     
01994 
01995     ink_release_assert(!"cache_op_result_ClusterFunction() bad msg version");
01996   }
01997 
01998   flen = CacheOpReplyMsg::sizeof_fixedlen_msg();
01999   if (mh->NeedByteSwap())
02000     msg->SwapBytes();
02001 
02002   Debug("cluster_cache", "received cache op result, seqno=%d result=%d", msg->seq_number, msg->result);
02003 
02004   
02005   if ((len > flen) && event_reply_may_have_moi(msg->result)) {
02006     switch (msg->result) {
02007     case CACHE_EVENT_OPEN_READ:
02008       {
02009         char *p = (char *) msg + flen;
02010         int res;
02011 
02012         
02013         res = HTTPInfo::unmarshal(p, len, NULL);
02014         ci.get_handle(p, len);
02015         ink_assert(res > 0);
02016         ink_assert(ci.valid());
02017         break;
02018       }
02019     case CACHE_EVENT_LINK:
02020     case CACHE_EVENT_LINK_FAILED:
02021       break;
02022     case CACHE_EVENT_OPEN_READ_FAILED:
02023     case CACHE_EVENT_OPEN_WRITE_FAILED:
02024     case CACHE_EVENT_REMOVE_FAILED:
02025     case CACHE_EVENT_UPDATE_FAILED:
02026     case CACHE_EVENT_DEREF_FAILED:
02027       {
02028         
02029         ink_assert(((len - flen) == sizeof(int32_t)));
02030         op_result_error = msg->moi.u32;
02031         if (mh->NeedByteSwap())
02032           ats_swap32((uint32_t *) & op_result_error);
02033         op_result_error = -op_result_error;
02034         break;
02035       }
02036     default:
02037       {
02038         ink_release_assert(!"invalid moi data for received msg");
02039         break;
02040       }
02041     }                           
02042   }
02043   
02044 
02045   unsigned int hash = FOLDHASH(ch->machine->ip, msg->seq_number);
02046   EThread *thread = this_ethread();
02047   ProxyMutex *mutex = thread->mutex;
02048   if (MUTEX_TAKE_TRY_LOCK(remoteCacheContQueueMutex[hash], thread)) {
02049 
02050     
02051 
02052     CacheContinuation *c = find_cache_continuation(msg->seq_number,
02053                                                    ch->machine->ip);
02054     if (!c) {
02055       
02056       MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], thread);
02057       Debug("cluster_timeout", "0cache reply timeout: %d", msg->seq_number);
02058       CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_REPLY_TIMEOUTS_STAT);
02059       if (ci.valid())
02060         ci.destroy();
02061       return;
02062     }
02063 
02064     
02065     if (msg->result == CACHE_EVENT_OPEN_READ)
02066       c->read_cluster_vc->set_ram_cache_hit(msg->is_ram_cache_hit);
02067 
02068     
02069 
02070     MUTEX_TRY_LOCK(lock, c->mutex, thread);
02071 
02072     
02073 
02074     if (!lock) {
02075       MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], thread);
02076       goto Lretry;
02077     }
02078     c->result_error = op_result_error;
02079 
02080     
02081 
02082     c->freeMsgBuffer();
02083     if (ci.valid()) {
02084       
02085       c->setMsgBufferLen(len, iob);
02086       c->ic_new_info = ci;
02087     }
02088     msg->seq_number = len;      
02089     c->handleEvent(CACHE_EVENT_RESPONSE_MSG, data);
02090 
02091   } else {
02092 
02093     
02094 
02095   Lretry:
02096     CacheContinuation * c = CacheContinuation::cacheContAllocator_alloc();
02097     c->mutex = new_ProxyMutex();
02098     c->seq_number = msg->seq_number;
02099     c->target_ip = ch->machine->ip;
02100     SET_CONTINUATION_HANDLER(c, (CacheContHandler)
02101                              & CacheContinuation::handleReplyEvent);
02102     c->start_time = ink_get_hrtime();
02103     c->result = msg->result;
02104     if (event_is_open(msg->result))
02105       c->token = msg->token;
02106     if (ci.valid()) {
02107       
02108       c->setMsgBufferLen(len, iob);
02109       c->ic_new_info = ci;
02110     }
02111     c->result_error = op_result_error;
02112     eventProcessor.schedule_in(c, CACHE_RETRY_PERIOD, ET_CACHE_CONT_SM);
02113   }
02114 }
02115 
02116 
02117 
02118 
02119 
02120 
02121 int
02122 CacheContinuation::handleReplyEvent(int event, Event * e)
02123 {
02124   (void) event;
02125 
02126   
02127 
02128   EThread *t = e->ethread;
02129   unsigned int hash = FOLDHASH(target_ip, seq_number);
02130 
02131   if (!MUTEX_TAKE_TRY_LOCK(remoteCacheContQueueMutex[hash], t)) {
02132     e->schedule_in(CACHE_RETRY_PERIOD);
02133     return EVENT_CONT;
02134   }
02135 
02136   LOG_EVENT_TIME(start_time, cntlck_acquire_time_dist, cntlck_acquire_events);
02137 
02138   
02139 
02140   CacheContinuation *c = find_cache_continuation(seq_number, target_ip);
02141   if (c) {
02142 
02143     
02144 
02145     MUTEX_TRY_LOCK(lock, c->mutex, e->ethread);
02146     if (!lock) {
02147 
02148       
02149 
02150       MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], t);
02151       e->schedule_in(CACHE_RETRY_PERIOD);
02152       return EVENT_CONT;
02153     }
02154 
02155     
02156 
02157     if (ic_new_info.valid()) {
02158       c->freeMsgBuffer();
02159       c->setMsgBufferLen(getMsgBufferLen(), getMsgBufferIOBData());
02160       c->ic_new_info = ic_new_info;
02161       ic_new_info.clear();
02162     }
02163     
02164 
02165     c->handleEvent(CACHE_EVENT_RESPONSE, this);
02166 
02167   } else {
02168     MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], t);
02169     Debug("cluster_timeout", "cache reply timeout: %d", seq_number);
02170     CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_REPLY_TIMEOUTS_STAT);
02171   }
02172 
02173   
02174 
02175   cacheContAllocator_free(this);
02176   return EVENT_DONE;
02177 }
02178 
02179 
02180 
02181 
02182 
02183 
02184 
02185 
02186 
02187 
02188 
02189 int
02190 CacheContinuation::remoteOpEvent(int event_code, Event * e)
02191 {
02192   ink_assert(magicno == (int) MagicNo);
02193   int event = event_code;
02194   ink_hrtime now;
02195   if (start_time) {
02196     int res;
02197     if (event != EVENT_INTERVAL) {
02198       if (event == CACHE_EVENT_RESPONSE) {
02199         CacheContinuation *ccont = (CacheContinuation *) e;
02200         res = ccont->result;
02201       } else {
02202         CacheOpReplyMsg *rmsg = (CacheOpReplyMsg *) e;
02203         res = rmsg->result;
02204       }
02205       if ((res == CACHE_EVENT_LOOKUP) || (res == CACHE_EVENT_LOOKUP_FAILED)) {
02206         now = ink_get_hrtime();
02207         CLUSTER_SUM_DYN_STAT(CLUSTER_CACHE_LKRMT_CALLBACK_TIME_STAT, now - start_time);
02208         LOG_EVENT_TIME(start_time, lkrmt_callback_time_dist, lkrmt_cache_callbacks);
02209       } else {
02210         now = ink_get_hrtime();
02211         CLUSTER_SUM_DYN_STAT(CLUSTER_CACHE_RMT_CALLBACK_TIME_STAT, now - start_time);
02212         LOG_EVENT_TIME(start_time, rmt_callback_time_dist, rmt_cache_callbacks);
02213       }
02214     }
02215     start_time = 0;
02216   }
02217   
02218   intptr_t return_error = 0;
02219   ClusterVCToken *pToken = NULL;
02220 
02221 retry:
02222 
02223   switch (event) {
02224   default:
02225     ink_assert(!"bad case");
02226     return EVENT_DONE;
02227 
02228   case EVENT_INTERVAL:{
02229 
02230       unsigned int hash = FOLDHASH(target_ip, seq_number);
02231 
02232       MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], e->ethread);
02233       if (!queuelock) {
02234         e->schedule_in(CACHE_RETRY_PERIOD);
02235         return EVENT_CONT;
02236       }
02237       
02238 
02239       if (!remoteCacheContQueue[hash].in(this)) {
02240         remoteCacheContQueue[hash].enqueue(this);
02241         ink_assert(timeout == e);
02242         MUTEX_RELEASE(queuelock);
02243         e->schedule_in(cache_cluster_timeout);
02244         return EVENT_CONT;
02245       }
02246       
02247 
02248       if (find_cache_continuation(seq_number, target_ip)) {
02249         
02250         MUTEX_RELEASE(queuelock);
02251 
02252         Debug("cluster_timeout", "cluster op timeout %d", seq_number);
02253         CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_TIMEOUTS_STAT);
02254         request_timeout = true;
02255         timeout = 0;
02256         
02257         
02258         
02259         
02260         
02261         if (!action.cancelled)
02262           action.continuation->handleEvent(result, (void *) -ECLUSTER_OP_TIMEOUT);
02263         action.cancelled = 1;
02264 
02265         if (target_machine->dead) {
02266           event = CACHE_EVENT_RESPONSE_MSG;
02267           goto retry;
02268         } else {
02269           timeout = e;
02270           e->schedule_in(cache_cluster_timeout);
02271           return EVENT_DONE;
02272         }
02273 
02274       } else {
02275         
02276         MUTEX_RELEASE(queuelock);
02277         Debug("cluster_timeout", "unknown cluster op timeout %d", seq_number);
02278         Note("Unexpected CacheCont timeout, [%u.%u.%u.%u] seqno=%d", DOT_SEPARATED(target_ip), seq_number);
02279         CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_TIMEOUTS_STAT);
02280         return EVENT_DONE;
02281       }
02282     }
02283 
02284   case CACHE_EVENT_RESPONSE:
02285   case CACHE_EVENT_RESPONSE_MSG:{
02286 
02287       
02288 
02289       if (timeout) {
02290         timeout->cancel();
02291         timeout = 0;
02292       }
02293       
02294       unsigned int hash = FOLDHASH(target_ip, seq_number);
02295 
02296       remoteCacheContQueue[hash].remove(this);
02297       MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], this_ethread());
02298       
02299     }
02300 
02301   case CACHE_EVENT_RESPONSE_RETRY:{
02302 
02303       
02304 
02305       CacheContinuation *c = (CacheContinuation *) e;
02306       CacheOpReplyMsg *msg = (CacheOpReplyMsg *) e;
02307       if (event == CACHE_EVENT_RESPONSE_MSG) {
02308         result = (request_timeout ? result : msg->result);
02309         pToken = (request_timeout ? &token : &msg->token);
02310       } else if (event == CACHE_EVENT_RESPONSE) {
02311         result = (request_timeout ? result : c->result);
02312         pToken = &c->token;
02313       } else if (event == CACHE_EVENT_RESPONSE_RETRY) {
02314         pToken = &token;
02315       } else {
02316         ink_release_assert(!"remoteOpEvent bad event code");
02317       }
02318 
02319       
02320 
02321       if (result == CACHE_EVENT_LOOKUP) {
02322         callback_user(result, 0);
02323         return EVENT_DONE;
02324 
02325       } else if (event_is_open(result)) {
02326         bool read_op = ((request_opcode == CACHE_OPEN_READ)
02327                         || (request_opcode == CACHE_OPEN_READ_LONG));
02328         if (read_op) {
02329           ink_release_assert(read_cluster_vc->pending_remote_fill > 1);
02330           read_cluster_vc->pending_remote_fill = 0;
02331 
02332           have_all_data = pToken->is_clear();   
02333           if (have_all_data) {
02334             read_cluster_vc->have_all_data = 1;
02335           } else {
02336             read_cluster_vc->have_all_data = 0;
02337           }
02338           
02339           read_cluster_vc->marshal_buf = this->getMsgBufferIOBData();
02340           read_cluster_vc->alternate = this->ic_new_info;
02341           this->ic_new_info.clear();
02342           ink_release_assert(read_cluster_vc->alternate.object_size_get());
02343 
02344           if (!action.cancelled) {
02345             ClusterVConnection *target_vc = read_cluster_vc;
02346             callback_user(result, target_vc);   
02347             target_vc->allow_remote_close();
02348           } else {
02349             read_cluster_vc->allow_remote_close();
02350             read_cluster_vc->do_io(VIO::ABORT);
02351             cacheContAllocator_free(this);
02352           }
02353 
02354         } else {
02355           ink_assert(result == CACHE_EVENT_OPEN_WRITE);
02356           ink_assert(!pToken->is_clear());
02357 
02358           ClusterVConnection *result_vc = write_cluster_vc;
02359           if (!action.cancelled) {
02360             callback_user(result, result_vc);
02361             result_vc->allow_remote_close();
02362           } else {
02363             result_vc->allow_remote_close();
02364             result_vc->do_io(VIO::ABORT);
02365             cacheContAllocator_free(this);
02366           }
02367         }
02368         return EVENT_DONE;
02369       }
02370       break;
02371     }                           
02372   }                             
02373 
02374   
02375 
02376   if (result == CACHE_EVENT_LOOKUP_FAILED) {
02377 
02378 
02379     
02380 
02381     ClusterMachine *m = cluster_machine_at_depth(cache_hash(url_md5));
02382 
02383     
02384     
02385     
02386     
02387     int len = getMsgBufferLen();
02388     char *hostname = (len ? getMsgBuffer() : 0);
02389 
02390     if (!m || PROBE_LOCAL_CACHE_LAST) {
02391       SET_HANDLER((CacheContHandler) & CacheContinuation::probeLookupEvent);
02392       CacheKey key(url_md5);
02393 
02394       Cache *call_cache = caches[frag_type];
02395       call_cache->lookup(this, &key, frag_type, hostname, len);
02396       return EVENT_DONE;
02397     }
02398     if (PROBE_LOCAL_CACHE_FIRST) {
02399       callback_user(CACHE_EVENT_LOOKUP_FAILED, 0);
02400     } else {
02401       SET_HANDLER((CacheContHandler) & CacheContinuation::probeLookupEvent);
02402       CacheKey key(url_md5);
02403 
02404       Cache *call_cache = caches[frag_type];
02405       call_cache->lookup(this, &key, frag_type, hostname, len);
02406     }
02407     return EVENT_DONE;
02408 
02409   } else {
02410     
02411 
02412     ClusterVConnection *cacheable_vc = 0;
02413     if ((request_opcode == CACHE_OPEN_READ_LONG) && !pToken->is_clear()) {
02414       ink_assert(read_cluster_vc && !write_cluster_vc);
02415       
02416       
02417       
02418       
02419       
02420       
02421       
02422 
02423       
02424       
02425       
02426       read_cluster_vc->set_type(CLUSTER_OPT_CONN_WRITE);
02427       
02428       
02429       
02430       {
02431         Continuation *temp = NULL;
02432         read_cluster_vc->action_ = temp;
02433       }
02434       if (!GlobalOpenWriteVCcache->insert(&url_md5, read_cluster_vc)) {
02435         
02436         cacheable_vc = read_cluster_vc;
02437       }
02438       read_cluster_vc = 0;
02439     }
02440     if (read_cluster_vc) {
02441       read_cluster_vc->remote_closed = 0;       
02442       read_cluster_vc->allow_remote_close();
02443       read_cluster_vc->do_io(VIO::ABORT);
02444       read_cluster_vc = 0;
02445     }
02446     if (write_cluster_vc) {
02447       write_cluster_vc->remote_closed = 0;      
02448       write_cluster_vc->allow_remote_close();
02449       write_cluster_vc->do_io(VIO::ABORT);
02450       write_cluster_vc = 0;
02451     }
02452     if (!request_timeout) {
02453       if (!return_error) {
02454         return_error = result_error;
02455       }
02456       if (cacheable_vc) {
02457         insert_cache_callback_user(cacheable_vc, result, (void *) return_error);
02458       } else {
02459         callback_user(result, (void *) return_error);
02460       }
02461     } else {
02462       
02463       if (cacheable_vc) {
02464         cacheable_vc->allow_remote_close();
02465         cacheable_vc->do_io(VIO::CLOSE);
02466         cacheable_vc = 0;
02467       }
02468       cacheContAllocator_free(this);
02469     }
02470     return EVENT_DONE;
02471   }
02472 }
02473 
02474 
02475 
02476 
02477 
02478 
02479 int
02480 CacheContinuation::probeLookupEvent(int event, void * )
02481 {
02482   ink_assert(magicno == (int) MagicNo);
02483   callback_user(event, 0);
02484   return EVENT_DONE;
02485 }
02486 
02487 
02488 
02489 
02490 
02491 int
02492 CacheContinuation::lookupEvent(int , void * )
02493 {
02494   ink_release_assert(!"Invalid call CacheContinuation::lookupEvent");
02495   return EVENT_DONE;
02496 
02497 }
02498 
02499 
02500 
02501 
02502 
02503 
02504 
02505 
02506 
02507 Action *
02508 CacheContinuation::do_remote_lookup(Continuation * cont, CacheKey * key,
02509                                     CacheContinuation * c, CacheFragType ft, char *hostname, int hostname_len)
02510 {
02511   int probe_depth = 0;
02512   ClusterMachine *past_probes[CONFIGURATION_HISTORY_PROBE_DEPTH] = { 0 };
02513   int mlen = op_to_sizeof_fixedlen_msg(CACHE_LOOKUP_OP) + ((hostname && hostname_len) ? hostname_len : 0);
02514   CacheLookupMsg *msg = (CacheLookupMsg *) ALLOCA_DOUBLE(mlen);
02515   msg->init();
02516 
02517 
02518   if (key) {
02519     msg->url_md5 = *key;
02520   } else {
02521     ink_assert(c);
02522     msg->url_md5 = c->url_md5;
02523   }
02524 
02525   ClusterMachine *m = NULL;
02526 
02527   if (cache_migrate_on_demand) {
02528     m = cluster_machine_at_depth(cache_hash(msg->url_md5),
02529                                  c ? &c->probe_depth : &probe_depth, c ? c->past_probes : past_probes);
02530   } else {
02531 
02532     
02533 
02534     if (c && c->probe_depth)
02535       return (Action *) 0;
02536     m = cluster_machine_at_depth(cache_hash(msg->url_md5));
02537     if (c)
02538       c->probe_depth = 1;
02539   }
02540 
02541   if (!m)
02542     return (Action *) 0;
02543   ClusterHandler *ch = m->pop_ClusterHandler();
02544   if (!ch)
02545     return (Action *) 0;
02546 
02547   
02548 
02549   if (!c) {
02550     c = cacheContAllocator_alloc();
02551     c->mutex = cont->mutex;
02552     c->probe_depth = probe_depth;
02553     memcpy(c->past_probes, past_probes, sizeof(past_probes));
02554   }
02555   c->ch = ch;
02556   
02557   if (hostname && hostname_len) {
02558     
02559     c->setMsgBufferLen(hostname_len);
02560     c->allocMsgBuffer();
02561     memcpy(c->getMsgBuffer(), hostname, hostname_len);
02562   }
02563 
02564   c->url_md5 = msg->url_md5;
02565   c->action.cancelled = false;
02566   c->action = cont;
02567   c->start_time = ink_get_hrtime();
02568   SET_CONTINUATION_HANDLER(c, (CacheContHandler)
02569                            & CacheContinuation::remoteOpEvent);
02570   c->result = CACHE_EVENT_LOOKUP_FAILED;
02571 
02572   
02573 
02574   c->target_ip = m->ip;
02575   c->seq_number = new_cache_sequence_number();
02576   msg->seq_number = c->seq_number;
02577   c->frag_type = ft;
02578   msg->frag_type = ft;
02579 
02580   
02581 
02582   unsigned int hash = FOLDHASH(c->target_ip, c->seq_number);
02583   MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], this_ethread());
02584   if (!queuelock) {
02585     
02586     c->timeout = eventProcessor.schedule_in(c, CACHE_RETRY_PERIOD, ET_CACHE_CONT_SM);
02587   } else {
02588     remoteCacheContQueue[hash].enqueue(c);
02589     MUTEX_RELEASE(queuelock);
02590     c->timeout = eventProcessor.schedule_in(c, cache_cluster_timeout, ET_CACHE_CONT_SM);
02591   }
02592 
02593   char *data;
02594   int len;
02595   int vers = CacheLookupMsg::protoToVersion(m->msg_proto_major);
02596 
02597   if (vers == CacheLookupMsg::CACHE_LOOKUP_MESSAGE_VERSION) {
02598     msg->seq_number = c->seq_number;
02599     data = (char *) msg;
02600     len = mlen;
02601     if (hostname && hostname_len) {
02602       memcpy(msg->moi.byte, hostname, hostname_len);
02603     }
02604   } else {
02605 
02606     
02607 
02608     ink_release_assert(!"CacheLookupMsg bad msg version");
02609   }
02610 
02611   
02612 
02613 #ifdef CACHE_MSG_TRACE
02614   log_cache_op_sndmsg(msg.seq_number, 0, "cache_lookup");
02615 #endif
02616   clusterProcessor.invoke_remote(c->ch, CACHE_LOOKUP_CLUSTER_FUNCTION, data, len);
02617   return &c->action;
02618 }
02619 
02620 
02621 
02622 
02623 
02624 
02625 
02626 
02627 void
02628 cache_lookup_ClusterFunction(ClusterHandler *ch, void *data, int len)
02629 {
02630   (void) len;
02631   EThread *thread = this_ethread();
02632   ProxyMutex *mutex = thread->mutex;
02633 
02634   
02635 
02636 
02637   CacheLookupMsg *msg = (CacheLookupMsg *) data;
02638   ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
02639 
02640   if (mh->GetMsgVersion() != CacheLookupMsg::CACHE_LOOKUP_MESSAGE_VERSION) {    
02641     
02642 
02643     ink_release_assert(!"cache_lookup_ClusterFunction() bad msg version");
02644   }
02645 
02646   if (mh->NeedByteSwap())
02647     msg->SwapBytes();
02648 
02649   CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
02650 
02651   CacheContinuation *c = CacheContinuation::cacheContAllocator_alloc();
02652   c->mutex = new_ProxyMutex();
02653   MUTEX_TRY_LOCK(lock, c->mutex, this_ethread());
02654   c->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
02655   c->seq_number = msg->seq_number;
02656   c->from = ch->machine;
02657   c->url_md5 = msg->url_md5;
02658   SET_CONTINUATION_HANDLER(c, (CacheContHandler)
02659                            & CacheContinuation::replyLookupEvent);
02660 
02661   CacheKey key(msg->url_md5);
02662 #ifdef CACHE_MSG_TRACE
02663   log_cache_op_msg(msg->seq_number, 0, "cache_lookup");
02664 #endif
02665 
02666   
02667 
02668   char *hostname;
02669   int hostname_len = len - op_to_sizeof_fixedlen_msg(CACHE_LOOKUP_OP);
02670   hostname = (hostname_len ? (char *) msg->moi.byte : 0);
02671 
02672   
02673   Cache *call_cache = caches[msg->frag_type];
02674   call_cache->lookup(c, &key, (CacheFragType) msg->frag_type, hostname, hostname_len);
02675 }
02676 
02677 
02678 
02679 
02680 
02681 
02682 int
02683 CacheContinuation::replyLookupEvent(int event, void * )
02684 {
02685   ink_hrtime now;
02686   now = ink_get_hrtime();
02687   CLUSTER_SUM_DYN_STAT(CLUSTER_CACHE_CALLBACK_TIME_STAT, now - start_time);
02688   LOG_EVENT_TIME(start_time, callback_time_dist, cache_callbacks);
02689 
02690   int vers = CacheOpReplyMsg::protoToVersion(from->msg_proto_major);
02691   if (vers == CacheOpReplyMsg::CACHE_OP_REPLY_MESSAGE_VERSION) {
02692     CacheOpReplyMsg *msg;
02693     int flen = CacheOpReplyMsg::sizeof_fixedlen_msg();
02694     msg = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen);
02695     msg->init();
02696     CLUSTER_DECREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
02697     int len = flen - sizeof(msg->token);
02698 
02699     if (!no_reply_message) {
02700       msg->seq_number = seq_number;
02701       msg->result = event;
02702 #ifdef CACHE_MSG_TRACE
02703       log_cache_op_sndmsg(seq_number, event, "cache_result");
02704 #endif
02705       clusterProcessor.invoke_remote(ch, CACHE_OP_RESULT_CLUSTER_FUNCTION, msg, len);
02706     }
02707   } else {
02708 
02709     
02710 
02711     ink_release_assert(!"replyLookupEvent() bad msg version");
02712   }
02713 
02714   
02715 
02716   cacheContAllocator_free(this);
02717   return EVENT_DONE;
02718 }
02719 
02720 int32_t CacheContinuation::getObjectSize(VConnection * vc, int opcode, CacheHTTPInfo * ret_ci)
02721 {
02722   CacheHTTPInfo *ci = 0;
02723   int64_t object_size = 0;
02724 
02725   if ((opcode == CACHE_OPEN_READ_LONG)
02726       || (opcode == CACHE_OPEN_READ_BUFFER_LONG)) {
02727 
02728     ((CacheVC *) vc)->get_http_info(&ci);
02729     if (ci) {
02730       object_size = ci->object_size_get();
02731 
02732     } else {
02733       ci = 0;
02734       object_size = 0;
02735     }
02736 
02737   } else {
02738     object_size = ((CacheVC *)vc)->get_object_size();
02739   }
02740 
02741   if (ret_ci && !ret_ci->valid()) {
02742     CacheHTTPInfo
02743       new_ci;
02744     new_ci.create();
02745     if (ci) {
02746       
02747       new_ci.copy(ci);
02748     } else {
02749       new_ci.object_size_set(object_size);
02750     }
02751     new_ci.m_alt->m_writeable = 1;
02752     ret_ci->copy_shallow(&new_ci);
02753   }
02754   ink_release_assert(object_size);
02755   return object_size;
02756 }
02757 
02758 
02759 
02760 
02761 
02762 void
02763 CacheContinuation::insert_cache_callback_user(ClusterVConnection * vc, int res, void *e)
02764 {
02765   if (GlobalOpenWriteVCcache->insert(&url_md5, vc)) {
02766     
02767     callback_user(res, e);
02768 
02769   } else {
02770     
02771     result = res;
02772     callback_data = e;
02773     callback_data_2 = (void *) vc;
02774     SET_HANDLER((CacheContHandler) & CacheContinuation::insertCallbackEvent);
02775     eventProcessor.schedule_imm(this, ET_CACHE_CONT_SM);
02776   }
02777 }
02778 
02779 int
02780 CacheContinuation::insertCallbackEvent(int , Event * )
02781 {
02782   if (GlobalOpenWriteVCcache->insert(&url_md5, (ClusterVConnection *)
02783                                      callback_data_2)) {
02784     
02785     callback_user(result, callback_data);
02786 
02787   } else {
02788     
02789     eventProcessor.schedule_imm(this, ET_CACHE_CONT_SM);
02790   }
02791   return EVENT_DONE;
02792 }
02793 
02794 
02795 
02796 
02797 
02798 
02799 void
02800 CacheContinuation::callback_user(int res, void *e)
02801 {
02802   EThread *et = this_ethread();
02803 
02804   if (!is_ClusterThread(et)) {
02805     MUTEX_TRY_LOCK(lock, mutex, et);
02806     if (lock) {
02807       if (!action.cancelled) {
02808         action.continuation->handleEvent(res, e);
02809       }
02810       cacheContAllocator_free(this);
02811 
02812     } else {
02813       
02814       defer_callback_result(res, e);
02815     }
02816   } else {
02817     
02818     defer_callback_result(res, e);
02819   }
02820 }
02821 
02822 void
02823 CacheContinuation::defer_callback_result(int r, void *e)
02824 {
02825   result = r;
02826   callback_data = e;
02827   SET_HANDLER((CacheContHandler) & CacheContinuation::callbackResultEvent);
02828   eventProcessor.schedule_imm(this, ET_CACHE_CONT_SM);
02829 }
02830 
02831 int
02832 CacheContinuation::callbackResultEvent(int , Event * )
02833 {
02834   if (!action.cancelled)
02835     action.continuation->handleEvent(result, callback_data);
02836   cacheContAllocator_free(this);
02837   return EVENT_DONE;
02838 }
02839 
02840 
02841 
02842 
02843 
02844 
02845 
02846 
02847 CacheContinuation *
02848 CacheContinuation::cacheContAllocator_alloc()
02849 {
02850   return cacheContAllocator.alloc();
02851 }
02852 
02853 
02854 
02855 
02856 
02857 void
02858 CacheContinuation::cacheContAllocator_free(CacheContinuation * c)
02859 {
02860   ink_assert(c->magicno == (int) MagicNo);
02861 
02862   c->magicno = -1;
02863 #ifdef ENABLE_TIME_TRACE
02864   c->start_time = 0;
02865 #endif
02866   c->free();
02867   c->mutex = NULL;
02868   
02869   
02870   
02871   {
02872     Continuation *temp = NULL;
02873     c->action = temp;
02874   }
02875   c->tunnel_mutex = NULL;
02876   cacheContAllocator.free(c);
02877 }
02878 
02879 
02880 
02881 
02882 
02883 Action *
02884 CacheContinuation::callback_failure(Action * a, int result, int err, CacheContinuation * this_cc)
02885 {
02886   CacheContinuation *cc;
02887   if (!this_cc) {
02888     cc = cacheContAllocator_alloc();
02889     cc->mutex = a->mutex;
02890     cc->action = *a;
02891 
02892   } else {
02893     cc = this_cc;
02894   }
02895   cc->result = result;
02896   cc->result_error = err;
02897   SET_CONTINUATION_HANDLER(cc, (CacheContHandler)
02898                            & CacheContinuation::callbackEvent);
02899   eventProcessor.schedule_imm(cc, ET_CACHE_CONT_SM);
02900   return &cc->action;
02901 }
02902 
02903 
02904 
02905 
02906 
02907 int
02908 CacheContinuation::callbackEvent(int , Event * )
02909 {
02910   if (!action.cancelled)
02911     action.continuation->handleEvent(result, (void *)(intptr_t)result_error);
02912   cacheContAllocator_free(this);
02913   return EVENT_DONE;
02914 }
02915 
02916 
02917 
02918 
02919 
02920 
02921 
02922 
02923 
02924 
02925 static CacheContinuation *
02926 find_cache_continuation(unsigned int seq_number, unsigned int from_ip)
02927 {
02928   unsigned int hash = FOLDHASH(from_ip, seq_number);
02929   CacheContinuation *c = NULL;
02930   CacheContinuation *lastc = NULL;
02931   for (c = (CacheContinuation *) remoteCacheContQueue[hash].head; c; c = (CacheContinuation *) c->link.next) {
02932     if (seq_number == c->seq_number && from_ip == c->target_ip) {
02933       if (lastc) {
02934         ink_release_assert(c->link.prev == lastc);
02935       } else {
02936         ink_release_assert(!c->link.prev);
02937       }
02938       break;
02939     }
02940 
02941     lastc = c;
02942   }
02943   return c;
02944 }
02945 
02946 
02947 
02948 
02949 
02950 static unsigned int
02951 new_cache_sequence_number()
02952 {
02953   unsigned int res = 0;
02954 
02955   do {
02956     res = (unsigned int) ink_atomic_increment(&cluster_sequence_number, 1);
02957   } while (!res);
02958 
02959 
02960 
02961   return res;
02962 }
02963 
02964 
02965 #ifdef OMIT
02966 
02967 
02968 
02969 
02970 
02971 
02972 
02973 
02974 
02975 int
02976 CacheContinuation::forwardEvent(int event, VConnection * c)
02977 {
02978   int ret = EVENT_CONT;
02979   cluster_vc = 0;
02980 
02981   cache_read = false;
02982   switch (event) {
02983   default:
02984     ink_assert(!"bad case");
02985   case CACHE_EVENT_OPEN_WRITE_FAILED:
02986     ret = EVENT_DONE;
02987     break;
02988   case CACHE_EVENT_OPEN_WRITE:
02989     cluster_vc = c;
02990     break;
02991   case CACHE_EVENT_OPEN_READ_FAILED:
02992     cache_read = true;
02993     ret = EVENT_DONE;
02994     break;
02995   case CACHE_EVENT_OPEN_READ:
02996     cache_read = true;
02997     cluster_vc = c;
02998     break;
02999   }
03000   SET_HANDLER((CacheContHandler) & CacheContinuation::forwardWaitEvent);
03001   return ret;
03002 }
03003 
03004 
03005 
03006 
03007 
03008 
03009 
03010 int
03011 CacheContinuation::forwardWaitEvent(int event, VConnection * c)
03012 {
03013   int ret = EVENT_CONT;
03014   int res = CACHE_EVENT_OPEN_READ_FAILED;
03015   void *res_data = NULL;
03016   VConnection *vc = NULL;
03017 
03018   switch (event) {
03019   default:
03020     ink_assert(!"bad case");
03021   case CACHE_EVENT_OPEN_WRITE_FAILED:
03022   case CACHE_EVENT_OPEN_READ_FAILED:
03023     ret = EVENT_DONE;
03024     break;
03025   case CACHE_EVENT_OPEN_WRITE:
03026   case CACHE_EVENT_OPEN_READ:
03027     vc = c;
03028     break;
03029 
03030   }
03031   VConnection *read_vc = (cache_read ? cluster_vc : vc);
03032   VConnection *write_vc = (!cache_read ? cluster_vc : vc);
03033 
03034   res = read_vc ? CACHE_EVENT_OPEN_READ : CACHE_EVENT_OPEN_READ_FAILED;
03035   res_data = read_vc;
03036 
03037   
03038   if (read_vc && write_vc) {
03039     res_data = new VCTee(read_vc, write_vc, vio);
03040     if (vio) {                  
03041       res = event;
03042       res_data = &((VCTee *) read_vc)->vio;
03043     }
03044   }
03045   
03046   
03047   c->handleEvent(res, res_data);
03048   return ret;
03049 }
03050 
03051 
03052 
03053 
03054 
03055 
03056 int
03057 CacheContinuation::tunnelEvent(int event, VConnection * vc)
03058 {
03059   int ret = EVENT_DONE;
03060   int flen = CacheOpReplyMsg::sizeof_fixedlen_msg();    
03061   int len = 0;
03062   bool read_buf = ((request_opcode == CACHE_OPEN_READ_BUFFER)
03063                    || (request_opcode == CACHE_OPEN_READ_BUFFER_LONG));
03064   ink_release_assert(!read_buf);
03065 
03066   CacheOpReplyMsg rmsg;
03067   CacheOpReplyMsg *msg = &rmsg;
03068   msg->result = result;
03069   msg->seq_number = seq_number;
03070   msg->token = token;
03071   int expect_reply = 1;
03072 
03073   if (event == CLUSTER_EVENT_OPEN) {
03074     if (cache_read) {
03075       if (read_buf) {
03076         ink_assert(have_all_data || (readahead_vio == &((CacheVConnection *) cluster_vc)->vio));
03077         write_cluster_vc = (ClusterVConnection *) vc;
03078 
03079         if (have_all_data) {
03080           msg->token.clear();   
03081         } else {
03082           msg->token = token;   
03083           setupReadBufTunnel(cluster_vc, vc);
03084         }
03085 
03086       } else {
03087         OneWayTunnel *pOWT = OneWayTunnel::OneWayTunnel_alloc();
03088         pOWT->init(cluster_vc, vc, NULL, nbytes, this->mutex);
03089         --expect_reply;
03090       }
03091 
03092 
03093       
03094 
03095       int res;
03096       CacheHTTPInfo *ci;
03097 
03098       if (!cache_vc_info) {
03099         
03100         (void) getObjectSize(cluster_vc, request_opcode, &cache_vc_info);
03101       }
03102       ci = cache_vc_info;
03103 
03104       
03105       len = ci->marshal_length();
03106       CacheOpReplyMsg *reply = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen + len);
03107 
03108       
03109       *reply = *msg;
03110 
03111       
03112       res = ci->marshal((char *) reply->moi.byte, len);
03113       ink_assert(res > 0);
03114 
03115       
03116       msg = reply;
03117 
03118     } else {
03119       OneWayTunnel *pOWT = OneWayTunnelAllocator.alloc();
03120       pOWT->init(vc, cluster_vc, NULL, nbytes, this->mutex);
03121       --expect_reply;
03122     }
03123     ret = EVENT_CONT;
03124   } else {
03125     ink_release_assert(event == CLUSTER_EVENT_OPEN_FAILED);
03126     msg->result = CACHE_EVENT_SET_FAILED(result);
03127 
03128     if (read_buf) {
03129       Debug("cluster_timeout", "unable to make cluster connection2");
03130       initial_buf = 0;          
03131       initial_bufsize = 0;
03132 
03133       if (!have_all_data) {
03134         
03135         MIOBuffer *mbuf = readahead_vio->buffer.writer();
03136         cluster_vc->do_io(VIO::CLOSE);
03137         free_MIOBuffer(mbuf);
03138       }
03139     } else {
03140       Debug("cluster_timeout", "unable to make cluster connection2A");
03141       cluster_vc->do_io(VIO::CLOSE);
03142     }
03143     len = 0 - (int) sizeof(msg->token);
03144     --expect_reply;
03145   }
03146 
03147   int vers = CacheOpReplyMsg::protoToVersion(from->msg_proto_major);
03148   if (vers == CacheOpReplyMsg::CACHE_OP_REPLY_MESSAGE_VERSION) {
03149     if (read_buf) {
03150       
03151       clusterProcessor.invoke_remote_data(from,
03152                                           CACHE_OP_RESULT_CLUSTER_FUNCTION,
03153                                           (void *) msg, (flen + len),
03154                                           initial_buf, initial_bufsize,
03155                                           cluster_vc_channel, &token,
03156                                           &CacheContinuation::disposeOfDataBuffer, (void *) this, CLUSTER_OPT_STEAL);
03157 
03158     } else {
03159       clusterProcessor.invoke_remote(from, CACHE_OP_RESULT_CLUSTER_FUNCTION,
03160                                      (void *) msg, (flen + len), CLUSTER_OPT_STEAL);
03161     }
03162   } else {
03163 
03164     
03165 
03166     ink_release_assert(!"tunnelEvent() bad msg version");
03167   }
03168   if (expect_reply <= 0)
03169     cacheContAllocator_free(this);
03170   return ret;
03171 }
03172 
03173 
03174 
03175 
03176 
03177 
03178 int
03179 CacheContinuation::remoteConnectEvent(int event, VConnection * cvc)
03180 {
03181   ClusterVConnection *vc = (ClusterVConnection *) cvc;
03182 
03183   if (event == CLUSTER_EVENT_OPEN) {
03184     if (result == CACHE_EVENT_OPEN_READ) {
03185       
03186       vc->alternate = this->ic_new_info;
03187       this->ic_new_info.clear();
03188     }
03189     callback_user(result, vc);
03190     return EVENT_CONT;
03191   } else {
03192     Debug("cluster_cache", "unable to make cluster connection");
03193     callback_user(CACHE_EVENT_SET_FAILED(result), vc);
03194     return EVENT_DONE;
03195   }
03196 }
03197 
03198 
03199 #endif // OMIT
03200 
03201 
03202