00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 #include "P_Cache.h"
00025 
00026 #ifdef HTTP_CACHE
00027 #include "HttpCacheSM.h"      
00028 #endif
00029 
00030 #define READ_WHILE_WRITER 1
00031 extern int cache_config_compatibility_4_2_0_fixup;
00032 
00033 Action *
00034 Cache::open_read(Continuation * cont, CacheKey * key, CacheFragType type, char *hostname, int host_len)
00035 {
00036   if (!CacheProcessor::IsCacheReady(type)) {
00037     cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NOT_READY);
00038     return ACTION_RESULT_DONE;
00039   }
00040   ink_assert(caches[type] == this);
00041 
00042   Vol *vol = key_to_vol(key, hostname, host_len);
00043   Dir result, *last_collision = NULL;
00044   ProxyMutex *mutex = cont->mutex;
00045   OpenDirEntry *od = NULL;
00046   CacheVC *c = NULL;
00047   {
00048     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00049     if (!lock || (od = vol->open_read(key)) || dir_probe(key, vol, &result, &last_collision)) {
00050       c = new_CacheVC(cont);
00051       SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
00052       c->vio.op = VIO::READ;
00053       c->base_stat = cache_read_active_stat;
00054       CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
00055       c->first_key = c->key = c->earliest_key = *key;
00056       c->vol = vol;
00057       c->frag_type = type;
00058       c->od = od;
00059     }
00060     if (!c)
00061       goto Lmiss;
00062     if (!lock) {
00063       CONT_SCHED_LOCK_RETRY(c);
00064       return &c->_action;
00065     }
00066     if (c->od)
00067       goto Lwriter;
00068     c->dir = result;
00069     c->last_collision = last_collision;
00070     switch(c->do_read_call(&c->key)) {
00071       case EVENT_DONE: return ACTION_RESULT_DONE;
00072       case EVENT_RETURN: goto Lcallreturn;
00073       default: return &c->_action;
00074     }
00075   }
00076 Lmiss:
00077   CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
00078   cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
00079   return ACTION_RESULT_DONE;
00080 Lwriter:
00081   SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
00082   if (c->handleEvent(EVENT_IMMEDIATE, 0) == EVENT_DONE)
00083     return ACTION_RESULT_DONE;
00084   return &c->_action;
00085 Lcallreturn:
00086   if (c->handleEvent(AIO_EVENT_DONE, 0) == EVENT_DONE)
00087     return ACTION_RESULT_DONE;
00088   return &c->_action;
00089 }
00090 
00091 #ifdef HTTP_CACHE
00092 Action *
00093 Cache::open_read(Continuation * cont, CacheKey * key, CacheHTTPHdr * request,
00094                  CacheLookupHttpConfig * params, CacheFragType type, char *hostname, int host_len)
00095 {
00096 
00097   if (!CacheProcessor::IsCacheReady(type)) {
00098     cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NOT_READY);
00099     return ACTION_RESULT_DONE;
00100   }
00101   ink_assert(caches[type] == this);
00102 
00103   Vol *vol = key_to_vol(key, hostname, host_len);
00104   Dir result, *last_collision = NULL;
00105   ProxyMutex *mutex = cont->mutex;
00106   OpenDirEntry *od = NULL;
00107   CacheVC *c = NULL;
00108 
00109   {
00110     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00111     if (!lock || (od = vol->open_read(key)) || dir_probe(key, vol, &result, &last_collision)) {
00112       c = new_CacheVC(cont);
00113       c->first_key = c->key = c->earliest_key = *key;
00114       c->vol = vol;
00115       c->vio.op = VIO::READ;
00116       c->base_stat = cache_read_active_stat;
00117       CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
00118       c->request.copy_shallow(request);
00119       c->frag_type = CACHE_FRAG_TYPE_HTTP;
00120       c->params = params;
00121       c->od = od;
00122     }
00123     if (!lock) {
00124       SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
00125       CONT_SCHED_LOCK_RETRY(c);
00126       return &c->_action;
00127     }
00128     if (!c)
00129       goto Lmiss;
00130     if (c->od)
00131       goto Lwriter;
00132     
00133     c->dir = c->first_dir = result;
00134     c->last_collision = last_collision;
00135     SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
00136     switch(c->do_read_call(&c->key)) {
00137       case EVENT_DONE: return ACTION_RESULT_DONE;
00138       case EVENT_RETURN: goto Lcallreturn;
00139       default: return &c->_action;
00140     }
00141   }
00142 Lmiss:
00143   CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
00144   cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
00145   return ACTION_RESULT_DONE;
00146 Lwriter:
00147   
00148   ((HttpCacheSM *)cont)->set_readwhilewrite_inprogress(true);
00149   SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
00150   if (c->handleEvent(EVENT_IMMEDIATE, 0) == EVENT_DONE)
00151     return ACTION_RESULT_DONE;
00152   return &c->_action;
00153 Lcallreturn:
00154   if (c->handleEvent(AIO_EVENT_DONE, 0) == EVENT_DONE)
00155     return ACTION_RESULT_DONE;
00156   return &c->_action;
00157 }
00158 #endif
00159 
00160 uint32_t
00161 CacheVC::load_http_info(CacheHTTPInfoVector* info, Doc* doc, RefCountObj * block_ptr)
00162 {
00163   uint32_t zret = info->get_handles(doc->hdr(), doc->hlen, block_ptr);
00164   if (cache_config_compatibility_4_2_0_fixup &&
00165       vol->header->version.ink_major == 23 && vol->header->version.ink_minor == 0
00166     ) {
00167     for ( int i = info->xcount - 1 ; i >= 0 ; --i ) {
00168       info->data(i).alternate.m_alt->m_response_hdr.m_mime->recompute_accelerators_and_presence_bits();
00169       info->data(i).alternate.m_alt->m_request_hdr.m_mime->recompute_accelerators_and_presence_bits();
00170     }
00171   }
00172   return zret;
00173 }
00174 
00175 int
00176 CacheVC::openReadFromWriterFailure(int event, Event * e)
00177 {
00178 
00179   od = NULL;
00180   vector.clear(false);
00181   CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
00182   CACHE_INCREMENT_DYN_STAT(cache_read_busy_failure_stat);
00183   _action.continuation->handleEvent(event, e);
00184   free_CacheVC(this);
00185   return EVENT_DONE;
00186 }
00187 
00188 int
00189 CacheVC::openReadChooseWriter(int , Event * )
00190 {
00191   intptr_t err = ECACHE_DOC_BUSY;
00192   CacheVC *w = NULL;
00193 
00194   ink_assert(vol->mutex->thread_holding == mutex->thread_holding && write_vc == NULL);
00195 
00196   if (!od)
00197     return EVENT_RETURN;
00198 
00199   if (frag_type != CACHE_FRAG_TYPE_HTTP) {
00200     ink_assert(od->num_writers == 1);
00201     w = od->writers.head;
00202     if (w->start_time > start_time || w->closed < 0) {
00203       od = NULL;
00204       return EVENT_RETURN;
00205     }
00206     if (!w->closed)
00207       return -err;
00208     write_vc = w;
00209   }
00210 #ifdef HTTP_CACHE
00211   else {
00212     write_vector = &od->vector;
00213     int write_vec_cnt = write_vector->count();
00214     for (int c = 0; c < write_vec_cnt; c++)
00215       vector.insert(write_vector->get(c));
00216     
00217     
00218     for (w = (CacheVC *) od->writers.head; w; w = (CacheVC *) w->opendir_link.next) {
00219       if (w->start_time > start_time || w->closed < 0)
00220         continue;
00221       if (!w->closed && !cache_config_read_while_writer) {
00222         return -err;
00223       }
00224       if (w->alternate_index != CACHE_ALT_INDEX_DEFAULT)
00225         continue;
00226 
00227       if (!w->closed && !w->alternate.valid()) {
00228         od = NULL;
00229         ink_assert(!write_vc);
00230         vector.clear(false);
00231         return EVENT_CONT;
00232       }
00233       
00234       int alt_ndx = CACHE_ALT_INDEX_DEFAULT;
00235       if (w->f.update) {
00236         
00237         alt_ndx = get_alternate_index(&vector, w->update_key);
00238         
00239         if (!w->alternate.valid()) {
00240           if (alt_ndx >= 0)
00241             vector.remove(alt_ndx, false);
00242           continue;
00243         }
00244       }
00245       ink_assert(w->alternate.valid());
00246       if (w->alternate.valid())
00247         vector.insert(&w->alternate, alt_ndx);
00248     }
00249 
00250     if (!vector.count()) {
00251       if (od->reading_vec) {
00252        
00253         
00254         
00255         od = NULL;
00256         return EVENT_RETURN;
00257       }
00258       return -ECACHE_NO_DOC;
00259     }
00260     if (cache_config_select_alternate) {
00261       alternate_index = HttpTransactCache::SelectFromAlternates(&vector, &request, params);
00262       if (alternate_index < 0)
00263         return -ECACHE_ALT_MISS;
00264     } else
00265       alternate_index = 0;
00266     CacheHTTPInfo *obj = vector.get(alternate_index);
00267     for (w = (CacheVC *) od->writers.head; w; w = (CacheVC *) w->opendir_link.next) {
00268       if (obj->m_alt == w->alternate.m_alt) {
00269         write_vc = w;
00270         break;
00271       }
00272     }
00273     vector.clear(false);
00274     if (!write_vc) {
00275       DDebug("cache_read_agg", "%p: key: %X writer alternate different: %d", this, first_key.slice32(1), alternate_index);
00276       od = NULL;
00277       return EVENT_RETURN;
00278     }
00279 
00280     DDebug("cache_read_agg",
00281           "%p: key: %X eKey: %d # alts: %d, ndx: %d, # writers: %d writer: %p",
00282           this, first_key.slice32(1), write_vc->earliest_key.slice32(1),
00283           vector.count(), alternate_index, od->num_writers, write_vc);
00284   }
00285 #endif //HTTP_CACHE
00286   return EVENT_NONE;
00287 }
00288 
00289 int
00290 CacheVC::openReadFromWriter(int event, Event * e)
00291 {
00292   if (!f.read_from_writer_called) {
00293     
00294     
00295     last_collision = NULL;
00296     
00297     
00298     
00299     
00300     
00301     start_time = ink_get_hrtime();
00302     f.read_from_writer_called = 1;
00303   }
00304   cancel_trigger();
00305   intptr_t err = ECACHE_DOC_BUSY;
00306   DDebug("cache_read_agg", "%p: key: %X In openReadFromWriter", this, first_key.slice32(1));
00307 #ifndef READ_WHILE_WRITER
00308   return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) -err);
00309 #else
00310   if (_action.cancelled) {
00311     od = NULL; 
00312     return free_CacheVC(this);
00313   }
00314   CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00315   if (!lock)
00316     VC_SCHED_LOCK_RETRY();
00317   od = vol->open_read(&first_key); 
00318   if (!od) {
00319     MUTEX_RELEASE(lock);
00320     write_vc = NULL;
00321     SET_HANDLER(&CacheVC::openReadStartHead);
00322     return openReadStartHead(event, e);
00323   } else
00324     ink_assert(od == vol->open_read(&first_key));
00325   if (!write_vc) {
00326     int ret = openReadChooseWriter(event, e);
00327     if (ret < 0) {
00328       MUTEX_RELEASE(lock);
00329       SET_HANDLER(&CacheVC::openReadFromWriterFailure);
00330       return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, reinterpret_cast<Event *> (ret));
00331     } else if (ret == EVENT_RETURN) {
00332       MUTEX_RELEASE(lock);
00333       SET_HANDLER(&CacheVC::openReadStartHead);
00334       return openReadStartHead(event, e);
00335     } else if (ret == EVENT_CONT) {
00336       ink_assert(!write_vc);
00337       VC_SCHED_WRITER_RETRY();
00338     } else
00339       ink_assert(write_vc);
00340   } else {
00341     if (writer_done()) {
00342       MUTEX_RELEASE(lock);
00343       DDebug("cache_read_agg",
00344             "%p: key: %X writer %p has left, continuing as normal read", this, first_key.slice32(1), write_vc);
00345       od = NULL;
00346       write_vc = NULL;
00347       SET_HANDLER(&CacheVC::openReadStartHead);
00348       return openReadStartHead(event, e);
00349     }
00350   }
00351 #ifdef HTTP_CACHE
00352   OpenDirEntry *cod = od;
00353 #endif
00354   od = NULL;
00355   
00356   if (write_vc->closed < 0) {
00357     MUTEX_RELEASE(lock);
00358     write_vc = NULL;
00359     
00360     SET_HANDLER(&CacheVC::openReadStartHead);
00361     return openReadStartHead(EVENT_IMMEDIATE, 0);
00362   }
00363   
00364   ink_assert(frag_type == CACHE_FRAG_TYPE_HTTP || write_vc->closed);
00365   if (!write_vc->closed && !write_vc->fragment) {
00366     if (!cache_config_read_while_writer || frag_type != CACHE_FRAG_TYPE_HTTP) {
00367       MUTEX_RELEASE(lock);
00368       return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) - err);
00369     }
00370     DDebug("cache_read_agg",
00371           "%p: key: %X writer: closed:%d, fragment:%d, retry: %d",
00372           this, first_key.slice32(1), write_vc->closed, write_vc->fragment, writer_lock_retry);
00373     VC_SCHED_WRITER_RETRY();
00374   }
00375 
00376   CACHE_TRY_LOCK(writer_lock, write_vc->mutex, mutex->thread_holding);
00377   if (!writer_lock) {
00378     DDebug("cache_read_agg", "%p: key: %X lock miss", this, first_key.slice32(1));
00379     VC_SCHED_LOCK_RETRY();
00380   }
00381   MUTEX_RELEASE(lock);
00382 
00383   if (!write_vc->io.ok())
00384     return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) - err);
00385 #ifdef HTTP_CACHE
00386   if (frag_type == CACHE_FRAG_TYPE_HTTP) {
00387     DDebug("cache_read_agg",
00388           "%p: key: %X http passed stage 1, closed: %d, frag: %d",
00389           this, first_key.slice32(1), write_vc->closed, write_vc->fragment);
00390     if (!write_vc->alternate.valid())
00391       return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) - err);
00392     alternate.copy(&write_vc->alternate);
00393     vector.insert(&alternate);
00394     alternate.object_key_get(&key);
00395     write_vc->f.readers = 1;
00396     if (!(write_vc->f.update && write_vc->total_len == 0)) {
00397       key = write_vc->earliest_key;
00398       if (!write_vc->closed)
00399         alternate.object_size_set(write_vc->vio.nbytes);
00400       else
00401         alternate.object_size_set(write_vc->total_len);
00402     } else {
00403       key = write_vc->update_key;
00404       ink_assert(write_vc->closed);
00405       DDebug("cache_read_agg", "%p: key: %X writer header update", this, first_key.slice32(1));
00406       
00407       doc_len = alternate.object_size_get();
00408       if (write_vc->update_key == cod->single_doc_key &&
00409           (cod->move_resident_alt || write_vc->f.rewrite_resident_alt) && write_vc->first_buf._ptr()) {
00410         
00411         
00412         
00413         Doc *doc = (Doc *) write_vc->first_buf->data();
00414         writer_buf = new_IOBufferBlock(write_vc->first_buf, doc->data_len(), doc->prefix_len());
00415         MUTEX_RELEASE(writer_lock);
00416         ink_assert(doc_len == doc->data_len());
00417         length = doc_len;
00418         f.single_fragment = 1;
00419         doc_pos = 0;
00420         earliest_key = key;
00421         dir_clean(&first_dir);
00422         dir_clean(&earliest_dir);
00423         SET_HANDLER(&CacheVC::openReadFromWriterMain);
00424         CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
00425         return callcont(CACHE_EVENT_OPEN_READ);
00426       }
00427       
00428       
00429       last_collision = NULL;
00430       MUTEX_RELEASE(writer_lock);
00431       SET_HANDLER(&CacheVC::openReadStartEarliest);
00432       return openReadStartEarliest(event, e);
00433     }
00434   } else {
00435 #endif //HTTP_CACHE
00436     DDebug("cache_read_agg", "%p: key: %X non-http passed stage 1", this, first_key.slice32(1));
00437     key = write_vc->earliest_key;
00438 #ifdef HTTP_CACHE
00439   }
00440 #endif
00441   if (write_vc->fragment) {
00442     doc_len = write_vc->vio.nbytes;
00443     last_collision = NULL;
00444     DDebug("cache_read_agg",
00445           "%p: key: %X closed: %d, fragment: %d, len: %d starting first fragment",
00446           this, first_key.slice32(1), write_vc->closed, write_vc->fragment, (int)doc_len);
00447     MUTEX_RELEASE(writer_lock);
00448     
00449     SET_HANDLER(&CacheVC::openReadStartEarliest);
00450     return openReadStartEarliest(event, e);
00451   }
00452   writer_buf = write_vc->blocks;
00453   writer_offset = write_vc->offset;
00454   length = write_vc->length;
00455   
00456   f.single_fragment = !write_vc->fragment;        
00457   doc_pos = 0;
00458   earliest_key = write_vc->earliest_key;
00459   ink_assert(earliest_key == key);
00460   doc_len = write_vc->total_len;
00461   dir_clean(&first_dir);
00462   dir_clean(&earliest_dir);
00463   DDebug("cache_read_agg", "%p: key: %X %X: single fragment read", this, first_key.slice32(1), key.slice32(0));
00464   MUTEX_RELEASE(writer_lock);
00465   SET_HANDLER(&CacheVC::openReadFromWriterMain);
00466   CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
00467   return callcont(CACHE_EVENT_OPEN_READ);
00468 #endif //READ_WHILE_WRITER
00469 }
00470 
00471 int
00472 CacheVC::openReadFromWriterMain(int , Event * )
00473 {
00474   cancel_trigger();
00475   if (seek_to) {
00476     vio.ndone = seek_to;
00477     seek_to = 0;
00478   }
00479   IOBufferBlock *b = NULL;
00480   int64_t ntodo = vio.ntodo();
00481   if (ntodo <= 0)
00482     return EVENT_CONT;
00483   if (length < ((int64_t)doc_len) - vio.ndone) {
00484     DDebug("cache_read_agg", "truncation %X", first_key.slice32(1));
00485     if (is_action_tag_set("cache")) {
00486       ink_release_assert(false);
00487     }
00488     Warning("Document %X truncated at %d of %d, reading from writer", first_key.slice32(1), (int)vio.ndone, (int)doc_len);
00489     return calluser(VC_EVENT_ERROR);
00490   }
00491   
00492 
00493   if (length > ((int64_t)doc_len) - vio.ndone) {
00494     int64_t skip_bytes = length - (doc_len - vio.ndone);
00495     iobufferblock_skip(writer_buf, &writer_offset, &length, skip_bytes);
00496   }
00497   int64_t bytes = length;
00498   if (bytes > vio.ntodo())
00499     bytes = vio.ntodo();
00500   if (vio.ndone >= (int64_t)doc_len) {
00501     ink_assert(bytes <= 0);
00502     
00503     return calluser(VC_EVENT_EOS);
00504   }
00505   b = iobufferblock_clone(writer_buf, writer_offset, bytes);
00506   writer_buf = iobufferblock_skip(writer_buf, &writer_offset, &length, bytes);
00507   vio.buffer.writer()->append_block(b);
00508   vio.ndone += bytes;
00509   if (vio.ntodo() <= 0)
00510     return calluser(VC_EVENT_READ_COMPLETE);
00511   else
00512     return calluser(VC_EVENT_READ_READY);
00513 }
00514 
00515 int
00516 CacheVC::openReadClose(int event, Event * )
00517 {
00518   cancel_trigger();
00519   if (is_io_in_progress()) {
00520     if (event != AIO_EVENT_DONE)
00521       return EVENT_CONT;
00522     set_io_not_in_progress();
00523   }
00524   CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00525   if (!lock)
00526     VC_SCHED_LOCK_RETRY();
00527   if (f.hit_evacuate && dir_valid(vol, &first_dir) && closed > 0) {
00528     if (f.single_fragment)
00529       vol->force_evacuate_head(&first_dir, dir_pinned(&first_dir));
00530     else if (dir_valid(vol, &earliest_dir)) {
00531       vol->force_evacuate_head(&first_dir, dir_pinned(&first_dir));
00532       vol->force_evacuate_head(&earliest_dir, dir_pinned(&earliest_dir));
00533     }
00534   }
00535   vol->close_read(this);
00536   return free_CacheVC(this);
00537 }
00538 
00539 int
00540 CacheVC::openReadReadDone(int event, Event * e)
00541 {
00542   Doc *doc = NULL;
00543 
00544   cancel_trigger();
00545   if (event == EVENT_IMMEDIATE)
00546     return EVENT_CONT;
00547   set_io_not_in_progress();
00548   {
00549     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00550     if (!lock)
00551       VC_SCHED_LOCK_RETRY();
00552     if (event == AIO_EVENT_DONE && !io.ok()) {
00553       dir_delete(&earliest_key, vol, &earliest_dir);
00554       goto Lerror;
00555     }
00556     if (last_collision &&         
00557         dir_valid(vol, &dir))    
00558     {
00559       doc = (Doc *) buf->data();
00560       if (doc->magic != DOC_MAGIC) {
00561         char tmpstring[100];
00562         if (doc->magic == DOC_CORRUPT)
00563           Warning("Middle: Doc checksum does not match for %s", key.toHexStr(tmpstring));
00564         else
00565           Warning("Middle: Doc magic does not match for %s", key.toHexStr(tmpstring));
00566 #if TS_USE_INTERIM_CACHE == 1
00567         if (dir_ininterim(&dir)) {
00568           dir_delete(&key, vol, &dir);
00569           goto Lread;
00570         }
00571 #endif
00572         goto Lerror;
00573       }
00574       if (doc->key == key)
00575         goto LreadMain;
00576 #if TS_USE_INTERIM_CACHE == 1
00577       else if (dir_ininterim(&dir)) {
00578           dir_delete(&key, vol, &dir);
00579           last_collision = NULL;
00580         }
00581 #endif
00582     }
00583 #if TS_USE_INTERIM_CACHE == 1
00584     if (last_collision && dir_get_offset(&dir) != dir_get_offset(last_collision))
00585       last_collision = 0;
00586 Lread:
00587 #else
00588     if (last_collision && dir_offset(&dir) != dir_offset(last_collision))
00589       last_collision = 0;       
00590 #endif
00591     if (dir_probe(&key, vol, &dir, &last_collision)) {
00592       int ret = do_read_call(&key);
00593       if (ret == EVENT_RETURN)
00594         goto Lcallreturn;
00595       return EVENT_CONT;
00596     } else if (write_vc) {
00597       if (writer_done()) {
00598         last_collision = NULL;
00599         while (dir_probe(&earliest_key, vol, &dir, &last_collision)) {
00600 #if TS_USE_INTERIM_CACHE == 1
00601           if (dir_get_offset(&dir) == dir_get_offset(&earliest_dir)) {
00602 #else
00603           if (dir_offset(&dir) == dir_offset(&earliest_dir)) {
00604 #endif
00605             DDebug("cache_read_agg", "%p: key: %X ReadRead complete: %d",
00606                   this, first_key.slice32(1), (int)vio.ndone);
00607             doc_len = vio.ndone;
00608             goto Ldone;
00609           }
00610         }
00611         DDebug("cache_read_agg", "%p: key: %X ReadRead writer aborted: %d",
00612               this, first_key.slice32(1), (int)vio.ndone);
00613         goto Lerror;
00614       }
00615       DDebug("cache_read_agg", "%p: key: %X ReadRead retrying: %d", this, first_key.slice32(1), (int)vio.ndone);
00616       VC_SCHED_WRITER_RETRY(); 
00617     }
00618     
00619   }
00620 Lerror:
00621   char tmpstring[100];
00622   Warning("Document %s truncated", earliest_key.toHexStr(tmpstring));
00623   return calluser(VC_EVENT_ERROR);
00624 Ldone:
00625   return calluser(VC_EVENT_EOS);
00626 Lcallreturn:
00627   return handleEvent(AIO_EVENT_DONE, 0);
00628 LreadMain:
00629   fragment++;
00630   doc_pos = doc->prefix_len();
00631   next_CacheKey(&key, &key);
00632   SET_HANDLER(&CacheVC::openReadMain);
00633   return openReadMain(event, e);
00634 }
00635 
00636 int
00637 CacheVC::openReadMain(int , Event * )
00638 {
00639   cancel_trigger();
00640   Doc *doc = (Doc *) buf->data();
00641   int64_t ntodo = vio.ntodo();
00642   int64_t bytes = doc->len - doc_pos;
00643   IOBufferBlock *b = NULL;
00644   if (seek_to) { 
00645     if (seek_to >= doc_len) {
00646       vio.ndone = doc_len;
00647       return calluser(VC_EVENT_EOS);
00648     }
00649 #ifdef HTTP_CACHE
00650     HTTPInfo::FragOffset* frags = alternate.get_frag_table();
00651     if (is_debug_tag_set("cache_seek")) {
00652       char b[33], c[33];
00653       Debug("cache_seek", "Seek @ %" PRId64" in %s from #%d @ %" PRId64"/%d:%s",
00654             seek_to, first_key.toHexStr(b), fragment, doc_pos, doc->len, doc->key.toHexStr(c));
00655     }
00656     
00657 
00658 
00659 
00660     if (frags) {
00661       int target = 0;
00662       HTTPInfo::FragOffset next_off = frags[target];
00663       int lfi = static_cast<int>(alternate.get_frag_offset_count()) - 1;
00664       ink_assert(lfi >= 0); 
00665 
00666       
00667 
00668 
00669 
00670 
00671 
00672       if (fragment == 0 ||
00673           seek_to < frags[fragment-1] ||
00674           (fragment <= lfi && frags[fragment] <= seek_to)
00675         ) {
00676         
00677         while (seek_to >= next_off && target < lfi) {
00678           next_off = frags[++target];
00679         }
00680         if (target == lfi && seek_to >= next_off) ++target;
00681       } else { 
00682         target = fragment;
00683       }
00684       if (target != fragment) {
00685         
00686         
00687         int cfi = fragment;
00688         --target;
00689         while (target > fragment) {
00690           next_CacheKey(&key, &key);
00691           ++fragment;
00692         }
00693         while (target < fragment) {
00694           prev_CacheKey(&key, &key);
00695           --fragment;
00696         }
00697 
00698         if (is_debug_tag_set("cache_seek")) {
00699           char target_key_str[33];
00700           key.toHexStr(target_key_str);
00701           Debug("cache_seek", "Seek #%d @ %" PRId64" -> #%d @ %" PRId64":%s", cfi, doc_pos, target, seek_to, target_key_str);
00702         }
00703         goto Lread;
00704       }
00705     }
00706     doc_pos = doc->prefix_len() + seek_to;
00707     if (fragment) doc_pos -= static_cast<int64_t>(frags[fragment-1]);
00708     vio.ndone = 0;
00709     seek_to = 0;
00710     ntodo = vio.ntodo();
00711     bytes = doc->len - doc_pos;
00712     if (is_debug_tag_set("cache_seek")) {
00713       char target_key_str[33];
00714       key.toHexStr(target_key_str);
00715       Debug("cache_seek", "Read # %d @ %" PRId64"/%d for %" PRId64, fragment, doc_pos, doc->len, bytes);
00716     }
00717 #endif
00718   }
00719   if (ntodo <= 0)
00720     return EVENT_CONT;
00721   if (vio.buffer.writer()->max_read_avail() > vio.buffer.writer()->water_mark && vio.ndone) 
00722     return EVENT_CONT;
00723   if ((bytes <= 0) && vio.ntodo() >= 0)
00724     goto Lread;
00725   if (bytes > vio.ntodo())
00726     bytes = vio.ntodo();
00727   b = new_IOBufferBlock(buf, bytes, doc_pos);
00728   b->_buf_end = b->_end;
00729   vio.buffer.writer()->append_block(b);
00730   vio.ndone += bytes;
00731   doc_pos += bytes;
00732   if (vio.ntodo() <= 0)
00733     return calluser(VC_EVENT_READ_COMPLETE);
00734   else {
00735     if (calluser(VC_EVENT_READ_READY) == EVENT_DONE)
00736       return EVENT_DONE;
00737     
00738     
00739     if (vio.ntodo() > 0 && !vio.buffer.writer()->high_water())
00740       goto Lread;
00741     return EVENT_CONT;
00742   }
00743 Lread: {
00744     if (vio.ndone >= (int64_t)doc_len)
00745       
00746       return calluser(VC_EVENT_EOS);
00747     last_collision = 0;
00748     writer_lock_retry = 0;
00749     
00750     
00751     
00752     
00753     cancel_trigger();
00754     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00755     if (!lock) {
00756       SET_HANDLER(&CacheVC::openReadMain);
00757       VC_SCHED_LOCK_RETRY();
00758     }
00759     if (dir_probe(&key, vol, &dir, &last_collision)) {
00760       SET_HANDLER(&CacheVC::openReadReadDone);
00761       int ret = do_read_call(&key);
00762       if (ret == EVENT_RETURN)
00763         goto Lcallreturn;
00764       return EVENT_CONT;
00765     } else if (write_vc) {
00766       if (writer_done()) {
00767         last_collision = NULL;
00768         while (dir_probe(&earliest_key, vol, &dir, &last_collision)) {
00769           if (dir_offset(&dir) == dir_offset(&earliest_dir)) {
00770             DDebug("cache_read_agg", "%p: key: %X ReadMain complete: %d",
00771                   this, first_key.slice32(1), (int)vio.ndone);
00772             doc_len = vio.ndone;
00773             goto Leos;
00774           }
00775         }
00776         DDebug("cache_read_agg", "%p: key: %X ReadMain writer aborted: %d",
00777               this, first_key.slice32(1), (int)vio.ndone);
00778         goto Lerror;
00779       }
00780       DDebug("cache_read_agg", "%p: key: %X ReadMain retrying: %d", this, first_key.slice32(1), (int)vio.ndone);
00781       SET_HANDLER(&CacheVC::openReadMain);
00782       VC_SCHED_WRITER_RETRY();
00783     }
00784     if (is_action_tag_set("cache"))
00785       ink_release_assert(false);
00786     Warning("Document %X truncated at %d of %d, missing fragment %X", first_key.slice32(1), (int)vio.ndone, (int)doc_len, key.slice32(1));
00787     
00788     dir_delete(&earliest_key, vol, &earliest_dir);
00789   }
00790 Lerror:
00791   return calluser(VC_EVENT_ERROR);
00792 Leos:
00793   return calluser(VC_EVENT_EOS);
00794 Lcallreturn:
00795   return handleEvent(AIO_EVENT_DONE, 0);
00796 }
00797 
00798 
00799 
00800 
00801 
00802 int
00803 CacheVC::openReadStartEarliest(int , Event * )
00804 {
00805   int ret = 0;
00806   Doc *doc = NULL;
00807   cancel_trigger();
00808   set_io_not_in_progress();
00809   if (_action.cancelled)
00810     return free_CacheVC(this);
00811   {
00812     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00813     if (!lock)
00814       VC_SCHED_LOCK_RETRY();
00815     if (!buf)
00816       goto Lread;
00817     if (!io.ok())
00818       goto Ldone;
00819     
00820     
00821     if (!dir_agg_valid(vol, &dir)) {
00822       
00823       if (!dir_valid(vol, &dir))
00824         last_collision = NULL;
00825       goto Lread;
00826     }
00827     doc = (Doc *) buf->data();
00828     if (doc->magic != DOC_MAGIC) {
00829       char tmpstring[100];
00830       if (is_action_tag_set("cache")) {
00831         ink_release_assert(false);
00832       }
00833       if (doc->magic == DOC_CORRUPT)
00834         Warning("Earliest: Doc checksum does not match for %s", key.toHexStr(tmpstring));
00835       else
00836         Warning("Earliest : Doc magic does not match for %s", key.toHexStr(tmpstring));
00837       
00838       dir_delete(&key, vol, &dir);
00839       
00840       
00841       
00842       
00843       last_collision = NULL;
00844       goto Lread;
00845     }
00846     if (!(doc->key == key)) 
00847       goto Lread;
00848     
00849     earliest_key = key;
00850     doc_pos = doc->prefix_len();
00851     next_CacheKey(&key, &doc->key);
00852     vol->begin_read(this);
00853     if (vol->within_hit_evacuate_window(&earliest_dir) &&
00854         (!cache_config_hit_evacuate_size_limit || doc_len <= (uint64_t)cache_config_hit_evacuate_size_limit)
00855 #if TS_USE_INTERIM_CACHE == 1
00856         && !dir_ininterim(&dir)
00857 #endif
00858         ) {
00859       DDebug("cache_hit_evac", "dir: %" PRId64", write: %" PRId64", phase: %d",
00860             dir_offset(&earliest_dir), offset_to_vol_offset(vol, vol->header->write_pos), vol->header->phase);
00861       f.hit_evacuate = 1;
00862     }
00863     goto Lsuccess;
00864 Lread:
00865     if (dir_probe(&key, vol, &earliest_dir, &last_collision) ||
00866         dir_lookaside_probe(&key, vol, &earliest_dir, NULL))
00867     {
00868       dir = earliest_dir;
00869 #if TS_USE_INTERIM_CACHE == 1
00870       if (dir_ininterim(&dir) && alternate.get_frag_offset_count() > 1) {
00871         dir_delete(&key, vol, &dir);
00872         last_collision = NULL;
00873         goto Lread;
00874       }
00875 #endif
00876       if ((ret = do_read_call(&key)) == EVENT_RETURN)
00877         goto Lcallreturn;
00878       return ret;
00879     }
00880     
00881     
00882 #ifdef HTTP_CACHE
00883     if (!f.read_from_writer_called && frag_type == CACHE_FRAG_TYPE_HTTP) {
00884       
00885       if (!vol->open_write(this, false, 1)) {
00886         Doc *doc1 = (Doc *) first_buf->data();
00887         uint32_t len = this->load_http_info(write_vector, doc1);
00888         ink_assert(len == doc1->hlen && write_vector->count() > 0);
00889         write_vector->remove(alternate_index, true);
00890         
00891         if (len != doc1->hlen || !write_vector->count()) {
00892           
00893           
00894           
00895           dir_delete(&first_key, vol, &first_dir);
00896         }
00897 #if TS_USE_INTERIM_CACHE == 1
00898         else if (dir_ininterim(&first_dir))
00899           dir_delete(&first_key, vol, &first_dir);
00900 #endif
00901         else {
00902           buf = NULL;
00903           last_collision = NULL;
00904           write_len = 0;
00905           header_len = write_vector->marshal_length();
00906           f.evac_vector = 1;
00907           f.use_first_key = 1;
00908           key = first_key;
00909           
00910           
00911           
00912           
00913           od->first_dir = first_dir;
00914           od->writing_vec = 1;
00915           earliest_key = zero_key;
00916 
00917           
00918           vio.op = VIO::WRITE;
00919           total_len = 0;
00920           f.update = 1;
00921           alternate_index = CACHE_ALT_REMOVED;
00922 
00923           
00924           
00925 
00926           if (doc1->total_len > 0) {
00927             od->move_resident_alt = 1;
00928             od->single_doc_key = doc1->key;
00929             dir_assign(&od->single_doc_dir, &dir);
00930             dir_set_tag(&od->single_doc_dir, od->single_doc_key.slice32(2));
00931           }
00932           SET_HANDLER(&CacheVC::openReadVecWrite);
00933           if ((ret = do_write_call()) == EVENT_RETURN)
00934             goto Lcallreturn;
00935           return ret;
00936         }
00937       }
00938     }
00939 #endif
00940     
00941   Ldone:
00942     if (od)
00943       vol->close_write(this);
00944   }
00945   CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
00946   _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
00947   return free_CacheVC(this);
00948 Lcallreturn:
00949   return handleEvent(AIO_EVENT_DONE, 0); 
00950 Lsuccess:
00951   if (write_vc)
00952     CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
00953   SET_HANDLER(&CacheVC::openReadMain);
00954   return callcont(CACHE_EVENT_OPEN_READ);
00955 }
00956 
00957 
00958 
00959 #ifdef HTTP_CACHE
00960 int
00961 CacheVC::openReadVecWrite(int , Event * )
00962 {
00963   cancel_trigger();
00964   set_io_not_in_progress();
00965   ink_assert(od);
00966   od->writing_vec = 0;
00967   if (_action.cancelled)
00968     return openWriteCloseDir(EVENT_IMMEDIATE, 0);
00969   {
00970     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00971     if (!lock)
00972       VC_SCHED_LOCK_RETRY();
00973     if (io.ok()) {
00974       ink_assert(f.evac_vector);
00975       ink_assert(frag_type == CACHE_FRAG_TYPE_HTTP);
00976       ink_assert(!buf.m_ptr);
00977       f.evac_vector = false;
00978       last_collision = NULL;
00979       f.update = 0;
00980       alternate_index = CACHE_ALT_INDEX_DEFAULT;
00981       f.use_first_key = 0;
00982       vio.op = VIO::READ;
00983       dir_overwrite(&first_key, vol, &dir, &od->first_dir);
00984       if (od->move_resident_alt)
00985         dir_insert(&od->single_doc_key, vol, &od->single_doc_dir);
00986       int alt_ndx = HttpTransactCache::SelectFromAlternates(write_vector, &request, params);
00987       vol->close_write(this);
00988       if (alt_ndx >= 0) {
00989         vector.clear();
00990         
00991         
00992         
00993         goto Lrestart;
00994       }
00995     } else
00996       vol->close_write(this);
00997   }
00998 
00999   CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
01000   _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_ALT_MISS);
01001   return free_CacheVC(this);
01002 Lrestart:
01003   SET_HANDLER(&CacheVC::openReadStartHead);
01004   return openReadStartHead(EVENT_IMMEDIATE, 0);
01005 }
01006 #endif
01007 
01008 
01009 
01010 
01011 
01012 int
01013 CacheVC::openReadStartHead(int event, Event * e)
01014 {
01015   intptr_t err = ECACHE_NO_DOC;
01016   Doc *doc = NULL;
01017   cancel_trigger();
01018   set_io_not_in_progress();
01019   if (_action.cancelled)
01020     return free_CacheVC(this);
01021   {
01022     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
01023     if (!lock)
01024       VC_SCHED_LOCK_RETRY();
01025     if (!buf)
01026       goto Lread;
01027     if (!io.ok())
01028       goto Ldone;
01029     
01030     
01031     if (!dir_agg_valid(vol, &dir)) {
01032       
01033       if (!dir_valid(vol, &dir))
01034         last_collision = NULL;
01035 #if TS_USE_INTERIM_CACHE == 1
01036       if (dir_ininterim(&dir)) {
01037         dir_delete(&key, vol, &dir);
01038         last_collision = NULL;
01039       }
01040 #endif
01041       goto Lread;
01042     }
01043     doc = (Doc *) buf->data();
01044     if (doc->magic != DOC_MAGIC) {
01045       char tmpstring[100];
01046       if (is_action_tag_set("cache")) {
01047         ink_release_assert(false);
01048       }
01049       if (doc->magic == DOC_CORRUPT)
01050         Warning("Head: Doc checksum does not match for %s", key.toHexStr(tmpstring));
01051       else
01052         Warning("Head : Doc magic does not match for %s", key.toHexStr(tmpstring));
01053       
01054       dir_delete(&key, vol, &dir);
01055       
01056       
01057       
01058       
01059       last_collision = NULL;
01060       goto Lread;
01061     }
01062     if (!(doc->first_key == key))
01063 #if TS_USE_INTERIM_CACHE == 1
01064     {
01065       if (dir_ininterim(&dir)) {
01066         dir_delete(&key, vol, &dir);
01067         last_collision = NULL;
01068       }
01069       goto Lread;
01070     }
01071 #else
01072       goto Lread;
01073 #endif
01074     if (f.lookup)
01075       goto Lookup;
01076     earliest_dir = dir;
01077 #ifdef HTTP_CACHE
01078     CacheHTTPInfo *alternate_tmp;
01079     if (frag_type == CACHE_FRAG_TYPE_HTTP) {
01080       uint32_t uml;
01081       ink_assert(doc->hlen);
01082       if (!doc->hlen)
01083         goto Ldone;
01084       if ((uml = this->load_http_info(&vector, doc)) != doc->hlen) {
01085         if (buf) {
01086           HTTPCacheAlt* alt = reinterpret_cast<HTTPCacheAlt*>(doc->hdr());
01087           int32_t alt_length = 0;
01088           
01089           
01090           for ( int32_t i = 0 ; i < vector.count() ; ++i ) {
01091             CacheHTTPInfo* info = vector.get(i);
01092             if (info && info->m_alt) alt_length += info->m_alt->m_unmarshal_len;
01093           }
01094           Note("OpenReadHead failed for cachekey %X : vector inconsistency - "
01095                "unmarshalled %d expecting %d in %d (base=%d, ver=%d:%d) "
01096                "- vector n=%d size=%d"
01097                "first alt=%d[%s]"
01098                , key.slice32(0)
01099                , uml, doc->hlen, doc->len, sizeofDoc
01100                , doc->v_major, doc->v_minor
01101                , vector.count(), alt_length
01102                , alt->m_magic
01103                , (CACHE_ALT_MAGIC_ALIVE == alt->m_magic ? "alive"
01104                   : CACHE_ALT_MAGIC_MARSHALED == alt->m_magic ? "serial"
01105                   : CACHE_ALT_MAGIC_DEAD == alt->m_magic ? "dead"
01106                   : "bogus")
01107             );
01108           dir_delete(&key, vol, &dir);
01109         }
01110         err = ECACHE_BAD_META_DATA;
01111         goto Ldone;
01112       }
01113       if (cache_config_select_alternate) {
01114         alternate_index = HttpTransactCache::SelectFromAlternates(&vector, &request, params);
01115         if (alternate_index < 0) {
01116           err = ECACHE_ALT_MISS;
01117           goto Ldone;
01118         }
01119       } else
01120         alternate_index = 0;
01121       alternate_tmp = vector.get(alternate_index);
01122       if (!alternate_tmp->valid()) {
01123         if (buf) {
01124           Note("OpenReadHead failed for cachekey %X : alternate inconsistency", key.slice32(0));
01125           dir_delete(&key, vol, &dir);
01126         }
01127         goto Ldone;
01128       }
01129 
01130       alternate.copy_shallow(alternate_tmp);
01131       alternate.object_key_get(&key);
01132       doc_len = alternate.object_size_get();
01133       if (key == doc->key) {      
01134         f.single_fragment = doc->single_fragment();
01135         ink_assert(f.single_fragment);     
01136         ink_assert(doc->hlen);
01137         doc_pos = doc->prefix_len();
01138         next_CacheKey(&key, &doc->key);
01139       } else {
01140         f.single_fragment = false;
01141       }
01142     } else
01143 #endif
01144     {
01145       next_CacheKey(&key, &doc->key);
01146       f.single_fragment = doc->single_fragment();
01147       doc_pos = doc->prefix_len();
01148       doc_len = doc->total_len;
01149     }
01150 
01151     if (is_debug_tag_set("cache_read")) { 
01152       char xt[33],yt[33];
01153       Debug("cache_read", "CacheReadStartHead - read %s target %s - %s %d of %" PRId64" bytes, %d fragments",
01154             doc->key.toHexStr(xt), key.toHexStr(yt),
01155             f.single_fragment ? "single" : "multi",
01156             doc->len, doc->total_len,
01157 #ifdef HTTP_CACHE
01158             alternate.get_frag_offset_count()
01159 #else
01160             0
01161 #endif
01162             );
01163     }
01164     
01165     
01166     if (!f.single_fragment)
01167       goto Learliest;
01168 
01169     if (vol->within_hit_evacuate_window(&dir) &&
01170         (!cache_config_hit_evacuate_size_limit || doc_len <= (uint64_t)cache_config_hit_evacuate_size_limit)
01171 #if TS_USE_INTERIM_CACHE == 1
01172         && !f.read_from_interim
01173 #endif
01174         ) {
01175       DDebug("cache_hit_evac", "dir: %" PRId64", write: %" PRId64", phase: %d",
01176             dir_offset(&dir), offset_to_vol_offset(vol, vol->header->write_pos), vol->header->phase);
01177       f.hit_evacuate = 1;
01178     }
01179 
01180     first_buf = buf;
01181     vol->begin_read(this);
01182 
01183     goto Lsuccess;
01184 
01185   Lread:
01186     
01187     
01188     
01189     
01190     
01191     OpenDirEntry *cod = vol->open_read(&key);
01192     if (cod && !f.read_from_writer_called) {
01193       if (f.lookup) {
01194         err = ECACHE_DOC_BUSY;
01195         goto Ldone;
01196       }
01197       od = cod;
01198       MUTEX_RELEASE(lock);
01199       SET_HANDLER(&CacheVC::openReadFromWriter);
01200       return handleEvent(EVENT_IMMEDIATE, 0);
01201     }
01202     if (dir_probe(&key, vol, &dir, &last_collision)) {
01203       first_dir = dir;
01204       int ret = do_read_call(&key);
01205       if (ret == EVENT_RETURN)
01206         goto Lcallreturn;
01207       return ret;
01208     }
01209   }
01210 Ldone:
01211   if (!f.lookup) {
01212     CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
01213     _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -err);
01214   } else {
01215     CACHE_INCREMENT_DYN_STAT(cache_lookup_failure_stat);
01216     _action.continuation->handleEvent(CACHE_EVENT_LOOKUP_FAILED, (void *) -err);
01217   }
01218   return free_CacheVC(this);
01219 Lcallreturn:
01220   return handleEvent(AIO_EVENT_DONE, 0); 
01221 Lsuccess:
01222   SET_HANDLER(&CacheVC::openReadMain);
01223   return callcont(CACHE_EVENT_OPEN_READ);
01224 Lookup:
01225   CACHE_INCREMENT_DYN_STAT(cache_lookup_success_stat);
01226   _action.continuation->handleEvent(CACHE_EVENT_LOOKUP, 0);
01227   return free_CacheVC(this);
01228 Learliest:
01229   first_buf = buf;
01230   buf = NULL;
01231   earliest_key = key;
01232   last_collision = NULL;
01233   SET_HANDLER(&CacheVC::openReadStartEarliest);
01234   return openReadStartEarliest(event, e);
01235 }