• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

CacheRead.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 #include "P_Cache.h"
00025 
00026 #ifdef HTTP_CACHE
00027 #include "HttpCacheSM.h"      //Added to get the scope of HttpCacheSM object.
00028 #endif
00029 
00030 #define READ_WHILE_WRITER 1
00031 extern int cache_config_compatibility_4_2_0_fixup;
00032 
00033 Action *
00034 Cache::open_read(Continuation * cont, CacheKey * key, CacheFragType type, char *hostname, int host_len)
00035 {
00036   if (!CacheProcessor::IsCacheReady(type)) {
00037     cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NOT_READY);
00038     return ACTION_RESULT_DONE;
00039   }
00040   ink_assert(caches[type] == this);
00041 
00042   Vol *vol = key_to_vol(key, hostname, host_len);
00043   Dir result, *last_collision = NULL;
00044   ProxyMutex *mutex = cont->mutex;
00045   OpenDirEntry *od = NULL;
00046   CacheVC *c = NULL;
00047   {
00048     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00049     if (!lock || (od = vol->open_read(key)) || dir_probe(key, vol, &result, &last_collision)) {
00050       c = new_CacheVC(cont);
00051       SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
00052       c->vio.op = VIO::READ;
00053       c->base_stat = cache_read_active_stat;
00054       CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
00055       c->first_key = c->key = c->earliest_key = *key;
00056       c->vol = vol;
00057       c->frag_type = type;
00058       c->od = od;
00059     }
00060     if (!c)
00061       goto Lmiss;
00062     if (!lock) {
00063       CONT_SCHED_LOCK_RETRY(c);
00064       return &c->_action;
00065     }
00066     if (c->od)
00067       goto Lwriter;
00068     c->dir = result;
00069     c->last_collision = last_collision;
00070     switch(c->do_read_call(&c->key)) {
00071       case EVENT_DONE: return ACTION_RESULT_DONE;
00072       case EVENT_RETURN: goto Lcallreturn;
00073       default: return &c->_action;
00074     }
00075   }
00076 Lmiss:
00077   CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
00078   cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
00079   return ACTION_RESULT_DONE;
00080 Lwriter:
00081   SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
00082   if (c->handleEvent(EVENT_IMMEDIATE, 0) == EVENT_DONE)
00083     return ACTION_RESULT_DONE;
00084   return &c->_action;
00085 Lcallreturn:
00086   if (c->handleEvent(AIO_EVENT_DONE, 0) == EVENT_DONE)
00087     return ACTION_RESULT_DONE;
00088   return &c->_action;
00089 }
00090 
00091 #ifdef HTTP_CACHE
00092 Action *
00093 Cache::open_read(Continuation * cont, CacheKey * key, CacheHTTPHdr * request,
00094                  CacheLookupHttpConfig * params, CacheFragType type, char *hostname, int host_len)
00095 {
00096 
00097   if (!CacheProcessor::IsCacheReady(type)) {
00098     cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NOT_READY);
00099     return ACTION_RESULT_DONE;
00100   }
00101   ink_assert(caches[type] == this);
00102 
00103   Vol *vol = key_to_vol(key, hostname, host_len);
00104   Dir result, *last_collision = NULL;
00105   ProxyMutex *mutex = cont->mutex;
00106   OpenDirEntry *od = NULL;
00107   CacheVC *c = NULL;
00108 
00109   {
00110     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00111     if (!lock || (od = vol->open_read(key)) || dir_probe(key, vol, &result, &last_collision)) {
00112       c = new_CacheVC(cont);
00113       c->first_key = c->key = c->earliest_key = *key;
00114       c->vol = vol;
00115       c->vio.op = VIO::READ;
00116       c->base_stat = cache_read_active_stat;
00117       CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
00118       c->request.copy_shallow(request);
00119       c->frag_type = CACHE_FRAG_TYPE_HTTP;
00120       c->params = params;
00121       c->od = od;
00122     }
00123     if (!lock) {
00124       SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
00125       CONT_SCHED_LOCK_RETRY(c);
00126       return &c->_action;
00127     }
00128     if (!c)
00129       goto Lmiss;
00130     if (c->od)
00131       goto Lwriter;
00132     // hit
00133     c->dir = c->first_dir = result;
00134     c->last_collision = last_collision;
00135     SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
00136     switch(c->do_read_call(&c->key)) {
00137       case EVENT_DONE: return ACTION_RESULT_DONE;
00138       case EVENT_RETURN: goto Lcallreturn;
00139       default: return &c->_action;
00140     }
00141   }
00142 Lmiss:
00143   CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
00144   cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
00145   return ACTION_RESULT_DONE;
00146 Lwriter:
00147   // this is a horrible violation of the interface and should be fixed (FIXME)
00148   ((HttpCacheSM *)cont)->set_readwhilewrite_inprogress(true);
00149   SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
00150   if (c->handleEvent(EVENT_IMMEDIATE, 0) == EVENT_DONE)
00151     return ACTION_RESULT_DONE;
00152   return &c->_action;
00153 Lcallreturn:
00154   if (c->handleEvent(AIO_EVENT_DONE, 0) == EVENT_DONE)
00155     return ACTION_RESULT_DONE;
00156   return &c->_action;
00157 }
00158 #endif
00159 
00160 uint32_t
00161 CacheVC::load_http_info(CacheHTTPInfoVector* info, Doc* doc, RefCountObj * block_ptr)
00162 {
00163   uint32_t zret = info->get_handles(doc->hdr(), doc->hlen, block_ptr);
00164   if (cache_config_compatibility_4_2_0_fixup &&
00165       vol->header->version.ink_major == 23 && vol->header->version.ink_minor == 0
00166     ) {
00167     for ( int i = info->xcount - 1 ; i >= 0 ; --i ) {
00168       info->data(i).alternate.m_alt->m_response_hdr.m_mime->recompute_accelerators_and_presence_bits();
00169       info->data(i).alternate.m_alt->m_request_hdr.m_mime->recompute_accelerators_and_presence_bits();
00170     }
00171   }
00172   return zret;
00173 }
00174 
00175 int
00176 CacheVC::openReadFromWriterFailure(int event, Event * e)
00177 {
00178 
00179   od = NULL;
00180   vector.clear(false);
00181   CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
00182   CACHE_INCREMENT_DYN_STAT(cache_read_busy_failure_stat);
00183   _action.continuation->handleEvent(event, e);
00184   free_CacheVC(this);
00185   return EVENT_DONE;
00186 }
00187 
00188 int
00189 CacheVC::openReadChooseWriter(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
00190 {
00191   intptr_t err = ECACHE_DOC_BUSY;
00192   CacheVC *w = NULL;
00193 
00194   ink_assert(vol->mutex->thread_holding == mutex->thread_holding && write_vc == NULL);
00195 
00196   if (!od)
00197     return EVENT_RETURN;
00198 
00199   if (frag_type != CACHE_FRAG_TYPE_HTTP) {
00200     ink_assert(od->num_writers == 1);
00201     w = od->writers.head;
00202     if (w->start_time > start_time || w->closed < 0) {
00203       od = NULL;
00204       return EVENT_RETURN;
00205     }
00206     if (!w->closed)
00207       return -err;
00208     write_vc = w;
00209   }
00210 #ifdef HTTP_CACHE
00211   else {
00212     write_vector = &od->vector;
00213     int write_vec_cnt = write_vector->count();
00214     for (int c = 0; c < write_vec_cnt; c++)
00215       vector.insert(write_vector->get(c));
00216     // check if all the writers who came before this reader have
00217     // set the http_info.
00218     for (w = (CacheVC *) od->writers.head; w; w = (CacheVC *) w->opendir_link.next) {
00219       if (w->start_time > start_time || w->closed < 0)
00220         continue;
00221       if (!w->closed && !cache_config_read_while_writer) {
00222         return -err;
00223       }
00224       if (w->alternate_index != CACHE_ALT_INDEX_DEFAULT)
00225         continue;
00226 
00227       if (!w->closed && !w->alternate.valid()) {
00228         od = NULL;
00229         ink_assert(!write_vc);
00230         vector.clear(false);
00231         return EVENT_CONT;
00232       }
00233       // construct the vector from the writers.
00234       int alt_ndx = CACHE_ALT_INDEX_DEFAULT;
00235       if (w->f.update) {
00236         // all Update cases. Need to get the alternate index.
00237         alt_ndx = get_alternate_index(&vector, w->update_key);
00238         // if its an alternate delete
00239         if (!w->alternate.valid()) {
00240           if (alt_ndx >= 0)
00241             vector.remove(alt_ndx, false);
00242           continue;
00243         }
00244       }
00245       ink_assert(w->alternate.valid());
00246       if (w->alternate.valid())
00247         vector.insert(&w->alternate, alt_ndx);
00248     }
00249 
00250     if (!vector.count()) {
00251       if (od->reading_vec) {
00252        // the writer(s) are reading the vector, so there is probably
00253         // an old vector. Since this reader came before any of the
00254         // current writers, we should return the old data
00255         od = NULL;
00256         return EVENT_RETURN;
00257       }
00258       return -ECACHE_NO_DOC;
00259     }
00260     if (cache_config_select_alternate) {
00261       alternate_index = HttpTransactCache::SelectFromAlternates(&vector, &request, params);
00262       if (alternate_index < 0)
00263         return -ECACHE_ALT_MISS;
00264     } else
00265       alternate_index = 0;
00266     CacheHTTPInfo *obj = vector.get(alternate_index);
00267     for (w = (CacheVC *) od->writers.head; w; w = (CacheVC *) w->opendir_link.next) {
00268       if (obj->m_alt == w->alternate.m_alt) {
00269         write_vc = w;
00270         break;
00271       }
00272     }
00273     vector.clear(false);
00274     if (!write_vc) {
00275       DDebug("cache_read_agg", "%p: key: %X writer alternate different: %d", this, first_key.slice32(1), alternate_index);
00276       od = NULL;
00277       return EVENT_RETURN;
00278     }
00279 
00280     DDebug("cache_read_agg",
00281           "%p: key: %X eKey: %d # alts: %d, ndx: %d, # writers: %d writer: %p",
00282           this, first_key.slice32(1), write_vc->earliest_key.slice32(1),
00283           vector.count(), alternate_index, od->num_writers, write_vc);
00284   }
00285 #endif //HTTP_CACHE
00286   return EVENT_NONE;
00287 }
00288 
00289 int
00290 CacheVC::openReadFromWriter(int event, Event * e)
00291 {
00292   if (!f.read_from_writer_called) {
00293     // The assignment to last_collision as NULL was
00294     // made conditional after INKqa08411
00295     last_collision = NULL;
00296     // Let's restart the clock from here - the first time this a reader
00297     // gets in this state. Its possible that the open_read was called
00298     // before the open_write, but the reader could not get the volume
00299     // lock. If we don't reset the clock here, we won't choose any writer
00300     // and hence fail the read request.
00301     start_time = ink_get_hrtime();
00302     f.read_from_writer_called = 1;
00303   }
00304   cancel_trigger();
00305   intptr_t err = ECACHE_DOC_BUSY;
00306   DDebug("cache_read_agg", "%p: key: %X In openReadFromWriter", this, first_key.slice32(1));
00307 #ifndef READ_WHILE_WRITER
00308   return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) -err);
00309 #else
00310   if (_action.cancelled) {
00311     od = NULL; // only open for read so no need to close
00312     return free_CacheVC(this);
00313   }
00314   CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00315   if (!lock)
00316     VC_SCHED_LOCK_RETRY();
00317   od = vol->open_read(&first_key); // recheck in case the lock failed
00318   if (!od) {
00319     MUTEX_RELEASE(lock);
00320     write_vc = NULL;
00321     SET_HANDLER(&CacheVC::openReadStartHead);
00322     return openReadStartHead(event, e);
00323   } else
00324     ink_assert(od == vol->open_read(&first_key));
00325   if (!write_vc) {
00326     int ret = openReadChooseWriter(event, e);
00327     if (ret < 0) {
00328       MUTEX_RELEASE(lock);
00329       SET_HANDLER(&CacheVC::openReadFromWriterFailure);
00330       return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, reinterpret_cast<Event *> (ret));
00331     } else if (ret == EVENT_RETURN) {
00332       MUTEX_RELEASE(lock);
00333       SET_HANDLER(&CacheVC::openReadStartHead);
00334       return openReadStartHead(event, e);
00335     } else if (ret == EVENT_CONT) {
00336       ink_assert(!write_vc);
00337       VC_SCHED_WRITER_RETRY();
00338     } else
00339       ink_assert(write_vc);
00340   } else {
00341     if (writer_done()) {
00342       MUTEX_RELEASE(lock);
00343       DDebug("cache_read_agg",
00344             "%p: key: %X writer %p has left, continuing as normal read", this, first_key.slice32(1), write_vc);
00345       od = NULL;
00346       write_vc = NULL;
00347       SET_HANDLER(&CacheVC::openReadStartHead);
00348       return openReadStartHead(event, e);
00349     }
00350   }
00351 #ifdef HTTP_CACHE
00352   OpenDirEntry *cod = od;
00353 #endif
00354   od = NULL;
00355   // someone is currently writing the document
00356   if (write_vc->closed < 0) {
00357     MUTEX_RELEASE(lock);
00358     write_vc = NULL;
00359     //writer aborted, continue as if there is no writer
00360     SET_HANDLER(&CacheVC::openReadStartHead);
00361     return openReadStartHead(EVENT_IMMEDIATE, 0);
00362   }
00363   // allow reading from unclosed writer for http requests only.
00364   ink_assert(frag_type == CACHE_FRAG_TYPE_HTTP || write_vc->closed);
00365   if (!write_vc->closed && !write_vc->fragment) {
00366     if (!cache_config_read_while_writer || frag_type != CACHE_FRAG_TYPE_HTTP) {
00367       MUTEX_RELEASE(lock);
00368       return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) - err);
00369     }
00370     DDebug("cache_read_agg",
00371           "%p: key: %X writer: closed:%d, fragment:%d, retry: %d",
00372           this, first_key.slice32(1), write_vc->closed, write_vc->fragment, writer_lock_retry);
00373     VC_SCHED_WRITER_RETRY();
00374   }
00375 
00376   CACHE_TRY_LOCK(writer_lock, write_vc->mutex, mutex->thread_holding);
00377   if (!writer_lock) {
00378     DDebug("cache_read_agg", "%p: key: %X lock miss", this, first_key.slice32(1));
00379     VC_SCHED_LOCK_RETRY();
00380   }
00381   MUTEX_RELEASE(lock);
00382 
00383   if (!write_vc->io.ok())
00384     return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) - err);
00385 #ifdef HTTP_CACHE
00386   if (frag_type == CACHE_FRAG_TYPE_HTTP) {
00387     DDebug("cache_read_agg",
00388           "%p: key: %X http passed stage 1, closed: %d, frag: %d",
00389           this, first_key.slice32(1), write_vc->closed, write_vc->fragment);
00390     if (!write_vc->alternate.valid())
00391       return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) - err);
00392     alternate.copy(&write_vc->alternate);
00393     vector.insert(&alternate);
00394     alternate.object_key_get(&key);
00395     write_vc->f.readers = 1;
00396     if (!(write_vc->f.update && write_vc->total_len == 0)) {
00397       key = write_vc->earliest_key;
00398       if (!write_vc->closed)
00399         alternate.object_size_set(write_vc->vio.nbytes);
00400       else
00401         alternate.object_size_set(write_vc->total_len);
00402     } else {
00403       key = write_vc->update_key;
00404       ink_assert(write_vc->closed);
00405       DDebug("cache_read_agg", "%p: key: %X writer header update", this, first_key.slice32(1));
00406       // Update case (b) : grab doc_len from the writer's alternate
00407       doc_len = alternate.object_size_get();
00408       if (write_vc->update_key == cod->single_doc_key &&
00409           (cod->move_resident_alt || write_vc->f.rewrite_resident_alt) && write_vc->first_buf._ptr()) {
00410         // the resident alternate is being updated and its a
00411         // header only update. The first_buf of the writer has the
00412         // document body.
00413         Doc *doc = (Doc *) write_vc->first_buf->data();
00414         writer_buf = new_IOBufferBlock(write_vc->first_buf, doc->data_len(), doc->prefix_len());
00415         MUTEX_RELEASE(writer_lock);
00416         ink_assert(doc_len == doc->data_len());
00417         length = doc_len;
00418         f.single_fragment = 1;
00419         doc_pos = 0;
00420         earliest_key = key;
00421         dir_clean(&first_dir);
00422         dir_clean(&earliest_dir);
00423         SET_HANDLER(&CacheVC::openReadFromWriterMain);
00424         CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
00425         return callcont(CACHE_EVENT_OPEN_READ);
00426       }
00427       // want to snarf the new headers from the writer
00428       // and then continue as if nothing happened
00429       last_collision = NULL;
00430       MUTEX_RELEASE(writer_lock);
00431       SET_HANDLER(&CacheVC::openReadStartEarliest);
00432       return openReadStartEarliest(event, e);
00433     }
00434   } else {
00435 #endif //HTTP_CACHE
00436     DDebug("cache_read_agg", "%p: key: %X non-http passed stage 1", this, first_key.slice32(1));
00437     key = write_vc->earliest_key;
00438 #ifdef HTTP_CACHE
00439   }
00440 #endif
00441   if (write_vc->fragment) {
00442     doc_len = write_vc->vio.nbytes;
00443     last_collision = NULL;
00444     DDebug("cache_read_agg",
00445           "%p: key: %X closed: %d, fragment: %d, len: %d starting first fragment",
00446           this, first_key.slice32(1), write_vc->closed, write_vc->fragment, (int)doc_len);
00447     MUTEX_RELEASE(writer_lock);
00448     // either a header + body update or a new document
00449     SET_HANDLER(&CacheVC::openReadStartEarliest);
00450     return openReadStartEarliest(event, e);
00451   }
00452   writer_buf = write_vc->blocks;
00453   writer_offset = write_vc->offset;
00454   length = write_vc->length;
00455   // copy the vector
00456   f.single_fragment = !write_vc->fragment;        // single fragment doc
00457   doc_pos = 0;
00458   earliest_key = write_vc->earliest_key;
00459   ink_assert(earliest_key == key);
00460   doc_len = write_vc->total_len;
00461   dir_clean(&first_dir);
00462   dir_clean(&earliest_dir);
00463   DDebug("cache_read_agg", "%p: key: %X %X: single fragment read", this, first_key.slice32(1), key.slice32(0));
00464   MUTEX_RELEASE(writer_lock);
00465   SET_HANDLER(&CacheVC::openReadFromWriterMain);
00466   CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
00467   return callcont(CACHE_EVENT_OPEN_READ);
00468 #endif //READ_WHILE_WRITER
00469 }
00470 
00471 int
00472 CacheVC::openReadFromWriterMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
00473 {
00474   cancel_trigger();
00475   if (seek_to) {
00476     vio.ndone = seek_to;
00477     seek_to = 0;
00478   }
00479   IOBufferBlock *b = NULL;
00480   int64_t ntodo = vio.ntodo();
00481   if (ntodo <= 0)
00482     return EVENT_CONT;
00483   if (length < ((int64_t)doc_len) - vio.ndone) {
00484     DDebug("cache_read_agg", "truncation %X", first_key.slice32(1));
00485     if (is_action_tag_set("cache")) {
00486       ink_release_assert(false);
00487     }
00488     Warning("Document %X truncated at %d of %d, reading from writer", first_key.slice32(1), (int)vio.ndone, (int)doc_len);
00489     return calluser(VC_EVENT_ERROR);
00490   }
00491   /* its possible that the user did a do_io_close before
00492      openWriteWriteDone was called. */
00493   if (length > ((int64_t)doc_len) - vio.ndone) {
00494     int64_t skip_bytes = length - (doc_len - vio.ndone);
00495     iobufferblock_skip(writer_buf, &writer_offset, &length, skip_bytes);
00496   }
00497   int64_t bytes = length;
00498   if (bytes > vio.ntodo())
00499     bytes = vio.ntodo();
00500   if (vio.ndone >= (int64_t)doc_len) {
00501     ink_assert(bytes <= 0);
00502     // reached the end of the document and the user still wants more
00503     return calluser(VC_EVENT_EOS);
00504   }
00505   b = iobufferblock_clone(writer_buf, writer_offset, bytes);
00506   writer_buf = iobufferblock_skip(writer_buf, &writer_offset, &length, bytes);
00507   vio.buffer.writer()->append_block(b);
00508   vio.ndone += bytes;
00509   if (vio.ntodo() <= 0)
00510     return calluser(VC_EVENT_READ_COMPLETE);
00511   else
00512     return calluser(VC_EVENT_READ_READY);
00513 }
00514 
00515 int
00516 CacheVC::openReadClose(int event, Event * /* e ATS_UNUSED */)
00517 {
00518   cancel_trigger();
00519   if (is_io_in_progress()) {
00520     if (event != AIO_EVENT_DONE)
00521       return EVENT_CONT;
00522     set_io_not_in_progress();
00523   }
00524   CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00525   if (!lock)
00526     VC_SCHED_LOCK_RETRY();
00527   if (f.hit_evacuate && dir_valid(vol, &first_dir) && closed > 0) {
00528     if (f.single_fragment)
00529       vol->force_evacuate_head(&first_dir, dir_pinned(&first_dir));
00530     else if (dir_valid(vol, &earliest_dir)) {
00531       vol->force_evacuate_head(&first_dir, dir_pinned(&first_dir));
00532       vol->force_evacuate_head(&earliest_dir, dir_pinned(&earliest_dir));
00533     }
00534   }
00535   vol->close_read(this);
00536   return free_CacheVC(this);
00537 }
00538 
00539 int
00540 CacheVC::openReadReadDone(int event, Event * e)
00541 {
00542   Doc *doc = NULL;
00543 
00544   cancel_trigger();
00545   if (event == EVENT_IMMEDIATE)
00546     return EVENT_CONT;
00547   set_io_not_in_progress();
00548   {
00549     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00550     if (!lock)
00551       VC_SCHED_LOCK_RETRY();
00552     if (event == AIO_EVENT_DONE && !io.ok()) {
00553       dir_delete(&earliest_key, vol, &earliest_dir);
00554       goto Lerror;
00555     }
00556     if (last_collision &&         // no missed lock
00557         dir_valid(vol, &dir))    // object still valid
00558     {
00559       doc = (Doc *) buf->data();
00560       if (doc->magic != DOC_MAGIC) {
00561         char tmpstring[100];
00562         if (doc->magic == DOC_CORRUPT)
00563           Warning("Middle: Doc checksum does not match for %s", key.toHexStr(tmpstring));
00564         else
00565           Warning("Middle: Doc magic does not match for %s", key.toHexStr(tmpstring));
00566 #if TS_USE_INTERIM_CACHE == 1
00567         if (dir_ininterim(&dir)) {
00568           dir_delete(&key, vol, &dir);
00569           goto Lread;
00570         }
00571 #endif
00572         goto Lerror;
00573       }
00574       if (doc->key == key)
00575         goto LreadMain;
00576 #if TS_USE_INTERIM_CACHE == 1
00577       else if (dir_ininterim(&dir)) {
00578           dir_delete(&key, vol, &dir);
00579           last_collision = NULL;
00580         }
00581 #endif
00582     }
00583 #if TS_USE_INTERIM_CACHE == 1
00584     if (last_collision && dir_get_offset(&dir) != dir_get_offset(last_collision))
00585       last_collision = 0;
00586 Lread:
00587 #else
00588     if (last_collision && dir_offset(&dir) != dir_offset(last_collision))
00589       last_collision = 0;       // object has been/is being overwritten
00590 #endif
00591     if (dir_probe(&key, vol, &dir, &last_collision)) {
00592       int ret = do_read_call(&key);
00593       if (ret == EVENT_RETURN)
00594         goto Lcallreturn;
00595       return EVENT_CONT;
00596     } else if (write_vc) {
00597       if (writer_done()) {
00598         last_collision = NULL;
00599         while (dir_probe(&earliest_key, vol, &dir, &last_collision)) {
00600 #if TS_USE_INTERIM_CACHE == 1
00601           if (dir_get_offset(&dir) == dir_get_offset(&earliest_dir)) {
00602 #else
00603           if (dir_offset(&dir) == dir_offset(&earliest_dir)) {
00604 #endif
00605             DDebug("cache_read_agg", "%p: key: %X ReadRead complete: %d",
00606                   this, first_key.slice32(1), (int)vio.ndone);
00607             doc_len = vio.ndone;
00608             goto Ldone;
00609           }
00610         }
00611         DDebug("cache_read_agg", "%p: key: %X ReadRead writer aborted: %d",
00612               this, first_key.slice32(1), (int)vio.ndone);
00613         goto Lerror;
00614       }
00615       DDebug("cache_read_agg", "%p: key: %X ReadRead retrying: %d", this, first_key.slice32(1), (int)vio.ndone);
00616       VC_SCHED_WRITER_RETRY(); // wait for writer
00617     }
00618     // fall through for truncated documents
00619   }
00620 Lerror:
00621   char tmpstring[100];
00622   Warning("Document %s truncated", earliest_key.toHexStr(tmpstring));
00623   return calluser(VC_EVENT_ERROR);
00624 Ldone:
00625   return calluser(VC_EVENT_EOS);
00626 Lcallreturn:
00627   return handleEvent(AIO_EVENT_DONE, 0);
00628 LreadMain:
00629   fragment++;
00630   doc_pos = doc->prefix_len();
00631   next_CacheKey(&key, &key);
00632   SET_HANDLER(&CacheVC::openReadMain);
00633   return openReadMain(event, e);
00634 }
00635 
00636 int
00637 CacheVC::openReadMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
00638 {
00639   cancel_trigger();
00640   Doc *doc = (Doc *) buf->data();
00641   int64_t ntodo = vio.ntodo();
00642   int64_t bytes = doc->len - doc_pos;
00643   IOBufferBlock *b = NULL;
00644   if (seek_to) { // handle do_io_pread
00645     if (seek_to >= doc_len) {
00646       vio.ndone = doc_len;
00647       return calluser(VC_EVENT_EOS);
00648     }
00649 #ifdef HTTP_CACHE
00650     HTTPInfo::FragOffset* frags = alternate.get_frag_table();
00651     if (is_debug_tag_set("cache_seek")) {
00652       char b[33], c[33];
00653       Debug("cache_seek", "Seek @ %" PRId64" in %s from #%d @ %" PRId64"/%d:%s",
00654             seek_to, first_key.toHexStr(b), fragment, doc_pos, doc->len, doc->key.toHexStr(c));
00655     }
00656     /* Because single fragment objects can migrate to hang off an alt vector
00657        they can appear to the VC as multi-fragment when they are not really.
00658        The essential difference is the existence of a fragment table.
00659     */
00660     if (frags) {
00661       int target = 0;
00662       HTTPInfo::FragOffset next_off = frags[target];
00663       int lfi = static_cast<int>(alternate.get_frag_offset_count()) - 1;
00664       ink_assert(lfi >= 0); // because it's not a single frag doc.
00665 
00666       /* Note: frag[i].offset is the offset of the first byte past the
00667          i'th fragment. So frag[0].offset is the offset of the first
00668          byte of fragment 1. In addition the # of fragments is one
00669          more than the fragment table length, the start of the last
00670          fragment being the last offset in the table.
00671       */
00672       if (fragment == 0 ||
00673           seek_to < frags[fragment-1] ||
00674           (fragment <= lfi && frags[fragment] <= seek_to)
00675         ) {
00676         // search from frag 0 on to find the proper frag
00677         while (seek_to >= next_off && target < lfi) {
00678           next_off = frags[++target];
00679         }
00680         if (target == lfi && seek_to >= next_off) ++target;
00681       } else { // shortcut if we are in the fragment already
00682         target = fragment;
00683       }
00684       if (target != fragment) {
00685         // Lread will read the next fragment always, so if that
00686         // is the one we want, we don't need to do anything
00687         int cfi = fragment;
00688         --target;
00689         while (target > fragment) {
00690           next_CacheKey(&key, &key);
00691           ++fragment;
00692         }
00693         while (target < fragment) {
00694           prev_CacheKey(&key, &key);
00695           --fragment;
00696         }
00697 
00698         if (is_debug_tag_set("cache_seek")) {
00699           char target_key_str[33];
00700           key.toHexStr(target_key_str);
00701           Debug("cache_seek", "Seek #%d @ %" PRId64" -> #%d @ %" PRId64":%s", cfi, doc_pos, target, seek_to, target_key_str);
00702         }
00703         goto Lread;
00704       }
00705     }
00706     doc_pos = doc->prefix_len() + seek_to;
00707     if (fragment) doc_pos -= static_cast<int64_t>(frags[fragment-1]);
00708     vio.ndone = 0;
00709     seek_to = 0;
00710     ntodo = vio.ntodo();
00711     bytes = doc->len - doc_pos;
00712     if (is_debug_tag_set("cache_seek")) {
00713       char target_key_str[33];
00714       key.toHexStr(target_key_str);
00715       Debug("cache_seek", "Read # %d @ %" PRId64"/%d for %" PRId64, fragment, doc_pos, doc->len, bytes);
00716     }
00717 #endif
00718   }
00719   if (ntodo <= 0)
00720     return EVENT_CONT;
00721   if (vio.buffer.writer()->max_read_avail() > vio.buffer.writer()->water_mark && vio.ndone) // initiate read of first block
00722     return EVENT_CONT;
00723   if ((bytes <= 0) && vio.ntodo() >= 0)
00724     goto Lread;
00725   if (bytes > vio.ntodo())
00726     bytes = vio.ntodo();
00727   b = new_IOBufferBlock(buf, bytes, doc_pos);
00728   b->_buf_end = b->_end;
00729   vio.buffer.writer()->append_block(b);
00730   vio.ndone += bytes;
00731   doc_pos += bytes;
00732   if (vio.ntodo() <= 0)
00733     return calluser(VC_EVENT_READ_COMPLETE);
00734   else {
00735     if (calluser(VC_EVENT_READ_READY) == EVENT_DONE)
00736       return EVENT_DONE;
00737     // we have to keep reading until we give the user all the
00738     // bytes it wanted or we hit the watermark.
00739     if (vio.ntodo() > 0 && !vio.buffer.writer()->high_water())
00740       goto Lread;
00741     return EVENT_CONT;
00742   }
00743 Lread: {
00744     if (vio.ndone >= (int64_t)doc_len)
00745       // reached the end of the document and the user still wants more
00746       return calluser(VC_EVENT_EOS);
00747     last_collision = 0;
00748     writer_lock_retry = 0;
00749     // if the state machine calls reenable on the callback from the cache,
00750     // we set up a schedule_imm event. The openReadReadDone discards
00751     // EVENT_IMMEDIATE events. So, we have to cancel that trigger and set
00752     // a new EVENT_INTERVAL event.
00753     cancel_trigger();
00754     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00755     if (!lock) {
00756       SET_HANDLER(&CacheVC::openReadMain);
00757       VC_SCHED_LOCK_RETRY();
00758     }
00759     if (dir_probe(&key, vol, &dir, &last_collision)) {
00760       SET_HANDLER(&CacheVC::openReadReadDone);
00761       int ret = do_read_call(&key);
00762       if (ret == EVENT_RETURN)
00763         goto Lcallreturn;
00764       return EVENT_CONT;
00765     } else if (write_vc) {
00766       if (writer_done()) {
00767         last_collision = NULL;
00768         while (dir_probe(&earliest_key, vol, &dir, &last_collision)) {
00769           if (dir_offset(&dir) == dir_offset(&earliest_dir)) {
00770             DDebug("cache_read_agg", "%p: key: %X ReadMain complete: %d",
00771                   this, first_key.slice32(1), (int)vio.ndone);
00772             doc_len = vio.ndone;
00773             goto Leos;
00774           }
00775         }
00776         DDebug("cache_read_agg", "%p: key: %X ReadMain writer aborted: %d",
00777               this, first_key.slice32(1), (int)vio.ndone);
00778         goto Lerror;
00779       }
00780       DDebug("cache_read_agg", "%p: key: %X ReadMain retrying: %d", this, first_key.slice32(1), (int)vio.ndone);
00781       SET_HANDLER(&CacheVC::openReadMain);
00782       VC_SCHED_WRITER_RETRY();
00783     }
00784     if (is_action_tag_set("cache"))
00785       ink_release_assert(false);
00786     Warning("Document %X truncated at %d of %d, missing fragment %X", first_key.slice32(1), (int)vio.ndone, (int)doc_len, key.slice32(1));
00787     // remove the directory entry
00788     dir_delete(&earliest_key, vol, &earliest_dir);
00789   }
00790 Lerror:
00791   return calluser(VC_EVENT_ERROR);
00792 Leos:
00793   return calluser(VC_EVENT_EOS);
00794 Lcallreturn:
00795   return handleEvent(AIO_EVENT_DONE, 0);
00796 }
00797 
00798 /*
00799   This code follows CacheVC::openReadStartHead closely,
00800   if you change this you might have to change that.
00801 */
00802 int
00803 CacheVC::openReadStartEarliest(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
00804 {
00805   int ret = 0;
00806   Doc *doc = NULL;
00807   cancel_trigger();
00808   set_io_not_in_progress();
00809   if (_action.cancelled)
00810     return free_CacheVC(this);
00811   {
00812     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00813     if (!lock)
00814       VC_SCHED_LOCK_RETRY();
00815     if (!buf)
00816       goto Lread;
00817     if (!io.ok())
00818       goto Ldone;
00819     // an object needs to be outside the aggregation window in order to be
00820     // be evacuated as it is read
00821     if (!dir_agg_valid(vol, &dir)) {
00822       // a directory entry which is nolonger valid may have been overwritten
00823       if (!dir_valid(vol, &dir))
00824         last_collision = NULL;
00825       goto Lread;
00826     }
00827     doc = (Doc *) buf->data();
00828     if (doc->magic != DOC_MAGIC) {
00829       char tmpstring[100];
00830       if (is_action_tag_set("cache")) {
00831         ink_release_assert(false);
00832       }
00833       if (doc->magic == DOC_CORRUPT)
00834         Warning("Earliest: Doc checksum does not match for %s", key.toHexStr(tmpstring));
00835       else
00836         Warning("Earliest : Doc magic does not match for %s", key.toHexStr(tmpstring));
00837       // remove the dir entry
00838       dir_delete(&key, vol, &dir);
00839       // try going through the directory entries again
00840       // in case the dir entry we deleted doesnt correspond
00841       // to the key we are looking for. This is possible
00842       // because of directory collisions
00843       last_collision = NULL;
00844       goto Lread;
00845     }
00846     if (!(doc->key == key)) // collisiion
00847       goto Lread;
00848     // success
00849     earliest_key = key;
00850     doc_pos = doc->prefix_len();
00851     next_CacheKey(&key, &doc->key);
00852     vol->begin_read(this);
00853     if (vol->within_hit_evacuate_window(&earliest_dir) &&
00854         (!cache_config_hit_evacuate_size_limit || doc_len <= (uint64_t)cache_config_hit_evacuate_size_limit)
00855 #if TS_USE_INTERIM_CACHE == 1
00856         && !dir_ininterim(&dir)
00857 #endif
00858         ) {
00859       DDebug("cache_hit_evac", "dir: %" PRId64", write: %" PRId64", phase: %d",
00860             dir_offset(&earliest_dir), offset_to_vol_offset(vol, vol->header->write_pos), vol->header->phase);
00861       f.hit_evacuate = 1;
00862     }
00863     goto Lsuccess;
00864 Lread:
00865     if (dir_probe(&key, vol, &earliest_dir, &last_collision) ||
00866         dir_lookaside_probe(&key, vol, &earliest_dir, NULL))
00867     {
00868       dir = earliest_dir;
00869 #if TS_USE_INTERIM_CACHE == 1
00870       if (dir_ininterim(&dir) && alternate.get_frag_offset_count() > 1) {
00871         dir_delete(&key, vol, &dir);
00872         last_collision = NULL;
00873         goto Lread;
00874       }
00875 #endif
00876       if ((ret = do_read_call(&key)) == EVENT_RETURN)
00877         goto Lcallreturn;
00878       return ret;
00879     }
00880     // read has detected that alternate does not exist in the cache.
00881     // rewrite the vector.
00882 #ifdef HTTP_CACHE
00883     if (!f.read_from_writer_called && frag_type == CACHE_FRAG_TYPE_HTTP) {
00884       // don't want any writers while we are evacuating the vector
00885       if (!vol->open_write(this, false, 1)) {
00886         Doc *doc1 = (Doc *) first_buf->data();
00887         uint32_t len = this->load_http_info(write_vector, doc1);
00888         ink_assert(len == doc1->hlen && write_vector->count() > 0);
00889         write_vector->remove(alternate_index, true);
00890         // if the vector had one alternate, delete it's directory entry
00891         if (len != doc1->hlen || !write_vector->count()) {
00892           // sometimes the delete fails when there is a race and another read
00893           // finds that the directory entry has been overwritten
00894           // (cannot assert on the return value)
00895           dir_delete(&first_key, vol, &first_dir);
00896         }
00897 #if TS_USE_INTERIM_CACHE == 1
00898         else if (dir_ininterim(&first_dir))
00899           dir_delete(&first_key, vol, &first_dir);
00900 #endif
00901         else {
00902           buf = NULL;
00903           last_collision = NULL;
00904           write_len = 0;
00905           header_len = write_vector->marshal_length();
00906           f.evac_vector = 1;
00907           f.use_first_key = 1;
00908           key = first_key;
00909           // always use od->first_dir to overwrite a directory.
00910           // If an evacuation happens while a vector is being updated
00911           // the evacuator changes the od->first_dir to the new directory
00912           // that it inserted
00913           od->first_dir = first_dir;
00914           od->writing_vec = 1;
00915           earliest_key = zero_key;
00916 
00917           // set up this VC as a alternate delete write_vc
00918           vio.op = VIO::WRITE;
00919           total_len = 0;
00920           f.update = 1;
00921           alternate_index = CACHE_ALT_REMOVED;
00922           /////////////////////////////////////////////////////////////////
00923           // change to create a directory entry for a resident alternate //
00924           // when another alternate does not exist.                      //
00925           /////////////////////////////////////////////////////////////////
00926           if (doc1->total_len > 0) {
00927             od->move_resident_alt = 1;
00928             od->single_doc_key = doc1->key;
00929             dir_assign(&od->single_doc_dir, &dir);
00930             dir_set_tag(&od->single_doc_dir, od->single_doc_key.slice32(2));
00931           }
00932           SET_HANDLER(&CacheVC::openReadVecWrite);
00933           if ((ret = do_write_call()) == EVENT_RETURN)
00934             goto Lcallreturn;
00935           return ret;
00936         }
00937       }
00938     }
00939 #endif
00940     // open write failure - another writer, so don't modify the vector
00941   Ldone:
00942     if (od)
00943       vol->close_write(this);
00944   }
00945   CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
00946   _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
00947   return free_CacheVC(this);
00948 Lcallreturn:
00949   return handleEvent(AIO_EVENT_DONE, 0); // hopefully a tail call
00950 Lsuccess:
00951   if (write_vc)
00952     CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
00953   SET_HANDLER(&CacheVC::openReadMain);
00954   return callcont(CACHE_EVENT_OPEN_READ);
00955 }
00956 
00957 // create the directory entry after the vector has been evacuated
00958 // the volume lock has been taken when this function is called
00959 #ifdef HTTP_CACHE
00960 int
00961 CacheVC::openReadVecWrite(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
00962 {
00963   cancel_trigger();
00964   set_io_not_in_progress();
00965   ink_assert(od);
00966   od->writing_vec = 0;
00967   if (_action.cancelled)
00968     return openWriteCloseDir(EVENT_IMMEDIATE, 0);
00969   {
00970     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00971     if (!lock)
00972       VC_SCHED_LOCK_RETRY();
00973     if (io.ok()) {
00974       ink_assert(f.evac_vector);
00975       ink_assert(frag_type == CACHE_FRAG_TYPE_HTTP);
00976       ink_assert(!buf.m_ptr);
00977       f.evac_vector = false;
00978       last_collision = NULL;
00979       f.update = 0;
00980       alternate_index = CACHE_ALT_INDEX_DEFAULT;
00981       f.use_first_key = 0;
00982       vio.op = VIO::READ;
00983       dir_overwrite(&first_key, vol, &dir, &od->first_dir);
00984       if (od->move_resident_alt)
00985         dir_insert(&od->single_doc_key, vol, &od->single_doc_dir);
00986       int alt_ndx = HttpTransactCache::SelectFromAlternates(write_vector, &request, params);
00987       vol->close_write(this);
00988       if (alt_ndx >= 0) {
00989         vector.clear();
00990         // we don't need to start all over again, since we already
00991         // have the vector in memory. But this is simpler and this
00992         // case is rare.
00993         goto Lrestart;
00994       }
00995     } else
00996       vol->close_write(this);
00997   }
00998 
00999   CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
01000   _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_ALT_MISS);
01001   return free_CacheVC(this);
01002 Lrestart:
01003   SET_HANDLER(&CacheVC::openReadStartHead);
01004   return openReadStartHead(EVENT_IMMEDIATE, 0);
01005 }
01006 #endif
01007 
01008 /*
01009   This code follows CacheVC::openReadStartEarliest closely,
01010   if you change this you might have to change that.
01011 */
01012 int
01013 CacheVC::openReadStartHead(int event, Event * e)
01014 {
01015   intptr_t err = ECACHE_NO_DOC;
01016   Doc *doc = NULL;
01017   cancel_trigger();
01018   set_io_not_in_progress();
01019   if (_action.cancelled)
01020     return free_CacheVC(this);
01021   {
01022     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
01023     if (!lock)
01024       VC_SCHED_LOCK_RETRY();
01025     if (!buf)
01026       goto Lread;
01027     if (!io.ok())
01028       goto Ldone;
01029     // an object needs to be outside the aggregation window in order to be
01030     // be evacuated as it is read
01031     if (!dir_agg_valid(vol, &dir)) {
01032       // a directory entry which is nolonger valid may have been overwritten
01033       if (!dir_valid(vol, &dir))
01034         last_collision = NULL;
01035 #if TS_USE_INTERIM_CACHE == 1
01036       if (dir_ininterim(&dir)) {
01037         dir_delete(&key, vol, &dir);
01038         last_collision = NULL;
01039       }
01040 #endif
01041       goto Lread;
01042     }
01043     doc = (Doc *) buf->data();
01044     if (doc->magic != DOC_MAGIC) {
01045       char tmpstring[100];
01046       if (is_action_tag_set("cache")) {
01047         ink_release_assert(false);
01048       }
01049       if (doc->magic == DOC_CORRUPT)
01050         Warning("Head: Doc checksum does not match for %s", key.toHexStr(tmpstring));
01051       else
01052         Warning("Head : Doc magic does not match for %s", key.toHexStr(tmpstring));
01053       // remove the dir entry
01054       dir_delete(&key, vol, &dir);
01055       // try going through the directory entries again
01056       // in case the dir entry we deleted doesnt correspond
01057       // to the key we are looking for. This is possible
01058       // because of directory collisions
01059       last_collision = NULL;
01060       goto Lread;
01061     }
01062     if (!(doc->first_key == key))
01063 #if TS_USE_INTERIM_CACHE == 1
01064     {
01065       if (dir_ininterim(&dir)) {
01066         dir_delete(&key, vol, &dir);
01067         last_collision = NULL;
01068       }
01069       goto Lread;
01070     }
01071 #else
01072       goto Lread;
01073 #endif
01074     if (f.lookup)
01075       goto Lookup;
01076     earliest_dir = dir;
01077 #ifdef HTTP_CACHE
01078     CacheHTTPInfo *alternate_tmp;
01079     if (frag_type == CACHE_FRAG_TYPE_HTTP) {
01080       uint32_t uml;
01081       ink_assert(doc->hlen);
01082       if (!doc->hlen)
01083         goto Ldone;
01084       if ((uml = this->load_http_info(&vector, doc)) != doc->hlen) {
01085         if (buf) {
01086           HTTPCacheAlt* alt = reinterpret_cast<HTTPCacheAlt*>(doc->hdr());
01087           int32_t alt_length = 0;
01088           // count should be reasonable, as vector is initialized and unlikly to be too corrupted
01089           // by bad disk data - count should be the number of successfully unmarshalled alts.
01090           for ( int32_t i = 0 ; i < vector.count() ; ++i ) {
01091             CacheHTTPInfo* info = vector.get(i);
01092             if (info && info->m_alt) alt_length += info->m_alt->m_unmarshal_len;
01093           }
01094           Note("OpenReadHead failed for cachekey %X : vector inconsistency - "
01095                "unmarshalled %d expecting %d in %d (base=%d, ver=%d:%d) "
01096                "- vector n=%d size=%d"
01097                "first alt=%d[%s]"
01098                , key.slice32(0)
01099                , uml, doc->hlen, doc->len, sizeofDoc
01100                , doc->v_major, doc->v_minor
01101                , vector.count(), alt_length
01102                , alt->m_magic
01103                , (CACHE_ALT_MAGIC_ALIVE == alt->m_magic ? "alive"
01104                   : CACHE_ALT_MAGIC_MARSHALED == alt->m_magic ? "serial"
01105                   : CACHE_ALT_MAGIC_DEAD == alt->m_magic ? "dead"
01106                   : "bogus")
01107             );
01108           dir_delete(&key, vol, &dir);
01109         }
01110         err = ECACHE_BAD_META_DATA;
01111         goto Ldone;
01112       }
01113       if (cache_config_select_alternate) {
01114         alternate_index = HttpTransactCache::SelectFromAlternates(&vector, &request, params);
01115         if (alternate_index < 0) {
01116           err = ECACHE_ALT_MISS;
01117           goto Ldone;
01118         }
01119       } else
01120         alternate_index = 0;
01121       alternate_tmp = vector.get(alternate_index);
01122       if (!alternate_tmp->valid()) {
01123         if (buf) {
01124           Note("OpenReadHead failed for cachekey %X : alternate inconsistency", key.slice32(0));
01125           dir_delete(&key, vol, &dir);
01126         }
01127         goto Ldone;
01128       }
01129 
01130       alternate.copy_shallow(alternate_tmp);
01131       alternate.object_key_get(&key);
01132       doc_len = alternate.object_size_get();
01133       if (key == doc->key) {      // is this my data?
01134         f.single_fragment = doc->single_fragment();
01135         ink_assert(f.single_fragment);     // otherwise need to read earliest
01136         ink_assert(doc->hlen);
01137         doc_pos = doc->prefix_len();
01138         next_CacheKey(&key, &doc->key);
01139       } else {
01140         f.single_fragment = false;
01141       }
01142     } else
01143 #endif
01144     {
01145       next_CacheKey(&key, &doc->key);
01146       f.single_fragment = doc->single_fragment();
01147       doc_pos = doc->prefix_len();
01148       doc_len = doc->total_len;
01149     }
01150 
01151     if (is_debug_tag_set("cache_read")) { // amc debug
01152       char xt[33],yt[33];
01153       Debug("cache_read", "CacheReadStartHead - read %s target %s - %s %d of %" PRId64" bytes, %d fragments",
01154             doc->key.toHexStr(xt), key.toHexStr(yt),
01155             f.single_fragment ? "single" : "multi",
01156             doc->len, doc->total_len,
01157 #ifdef HTTP_CACHE
01158             alternate.get_frag_offset_count()
01159 #else
01160             0
01161 #endif
01162             );
01163     }
01164     // the first fragment might have been gc'ed. Make sure the first
01165     // fragment is there before returning CACHE_EVENT_OPEN_READ
01166     if (!f.single_fragment)
01167       goto Learliest;
01168 
01169     if (vol->within_hit_evacuate_window(&dir) &&
01170         (!cache_config_hit_evacuate_size_limit || doc_len <= (uint64_t)cache_config_hit_evacuate_size_limit)
01171 #if TS_USE_INTERIM_CACHE == 1
01172         && !f.read_from_interim
01173 #endif
01174         ) {
01175       DDebug("cache_hit_evac", "dir: %" PRId64", write: %" PRId64", phase: %d",
01176             dir_offset(&dir), offset_to_vol_offset(vol, vol->header->write_pos), vol->header->phase);
01177       f.hit_evacuate = 1;
01178     }
01179 
01180     first_buf = buf;
01181     vol->begin_read(this);
01182 
01183     goto Lsuccess;
01184 
01185   Lread:
01186     // check for collision
01187     // INKqa07684 - Cache::lookup returns CACHE_EVENT_OPEN_READ_FAILED.
01188     // don't want to go through this BS of reading from a writer if
01189     // its a lookup. In this case lookup will fail while the document is
01190     // being written to the cache.
01191     OpenDirEntry *cod = vol->open_read(&key);
01192     if (cod && !f.read_from_writer_called) {
01193       if (f.lookup) {
01194         err = ECACHE_DOC_BUSY;
01195         goto Ldone;
01196       }
01197       od = cod;
01198       MUTEX_RELEASE(lock);
01199       SET_HANDLER(&CacheVC::openReadFromWriter);
01200       return handleEvent(EVENT_IMMEDIATE, 0);
01201     }
01202     if (dir_probe(&key, vol, &dir, &last_collision)) {
01203       first_dir = dir;
01204       int ret = do_read_call(&key);
01205       if (ret == EVENT_RETURN)
01206         goto Lcallreturn;
01207       return ret;
01208     }
01209   }
01210 Ldone:
01211   if (!f.lookup) {
01212     CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
01213     _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -err);
01214   } else {
01215     CACHE_INCREMENT_DYN_STAT(cache_lookup_failure_stat);
01216     _action.continuation->handleEvent(CACHE_EVENT_LOOKUP_FAILED, (void *) -err);
01217   }
01218   return free_CacheVC(this);
01219 Lcallreturn:
01220   return handleEvent(AIO_EVENT_DONE, 0); // hopefully a tail call
01221 Lsuccess:
01222   SET_HANDLER(&CacheVC::openReadMain);
01223   return callcont(CACHE_EVENT_OPEN_READ);
01224 Lookup:
01225   CACHE_INCREMENT_DYN_STAT(cache_lookup_success_stat);
01226   _action.continuation->handleEvent(CACHE_EVENT_LOOKUP, 0);
01227   return free_CacheVC(this);
01228 Learliest:
01229   first_buf = buf;
01230   buf = NULL;
01231   earliest_key = key;
01232   last_collision = NULL;
01233   SET_HANDLER(&CacheVC::openReadStartEarliest);
01234   return openReadStartEarliest(event, e);
01235 }

Generated by  doxygen 1.7.1