00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "P_Cache.h"
00025
00026 #ifdef HTTP_CACHE
00027 #include "HttpCacheSM.h"
00028 #endif
00029
00030 #define READ_WHILE_WRITER 1
00031 extern int cache_config_compatibility_4_2_0_fixup;
00032
00033 Action *
00034 Cache::open_read(Continuation * cont, CacheKey * key, CacheFragType type, char *hostname, int host_len)
00035 {
00036 if (!CacheProcessor::IsCacheReady(type)) {
00037 cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NOT_READY);
00038 return ACTION_RESULT_DONE;
00039 }
00040 ink_assert(caches[type] == this);
00041
00042 Vol *vol = key_to_vol(key, hostname, host_len);
00043 Dir result, *last_collision = NULL;
00044 ProxyMutex *mutex = cont->mutex;
00045 OpenDirEntry *od = NULL;
00046 CacheVC *c = NULL;
00047 {
00048 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00049 if (!lock || (od = vol->open_read(key)) || dir_probe(key, vol, &result, &last_collision)) {
00050 c = new_CacheVC(cont);
00051 SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
00052 c->vio.op = VIO::READ;
00053 c->base_stat = cache_read_active_stat;
00054 CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
00055 c->first_key = c->key = c->earliest_key = *key;
00056 c->vol = vol;
00057 c->frag_type = type;
00058 c->od = od;
00059 }
00060 if (!c)
00061 goto Lmiss;
00062 if (!lock) {
00063 CONT_SCHED_LOCK_RETRY(c);
00064 return &c->_action;
00065 }
00066 if (c->od)
00067 goto Lwriter;
00068 c->dir = result;
00069 c->last_collision = last_collision;
00070 switch(c->do_read_call(&c->key)) {
00071 case EVENT_DONE: return ACTION_RESULT_DONE;
00072 case EVENT_RETURN: goto Lcallreturn;
00073 default: return &c->_action;
00074 }
00075 }
00076 Lmiss:
00077 CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
00078 cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
00079 return ACTION_RESULT_DONE;
00080 Lwriter:
00081 SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
00082 if (c->handleEvent(EVENT_IMMEDIATE, 0) == EVENT_DONE)
00083 return ACTION_RESULT_DONE;
00084 return &c->_action;
00085 Lcallreturn:
00086 if (c->handleEvent(AIO_EVENT_DONE, 0) == EVENT_DONE)
00087 return ACTION_RESULT_DONE;
00088 return &c->_action;
00089 }
00090
00091 #ifdef HTTP_CACHE
00092 Action *
00093 Cache::open_read(Continuation * cont, CacheKey * key, CacheHTTPHdr * request,
00094 CacheLookupHttpConfig * params, CacheFragType type, char *hostname, int host_len)
00095 {
00096
00097 if (!CacheProcessor::IsCacheReady(type)) {
00098 cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NOT_READY);
00099 return ACTION_RESULT_DONE;
00100 }
00101 ink_assert(caches[type] == this);
00102
00103 Vol *vol = key_to_vol(key, hostname, host_len);
00104 Dir result, *last_collision = NULL;
00105 ProxyMutex *mutex = cont->mutex;
00106 OpenDirEntry *od = NULL;
00107 CacheVC *c = NULL;
00108
00109 {
00110 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00111 if (!lock || (od = vol->open_read(key)) || dir_probe(key, vol, &result, &last_collision)) {
00112 c = new_CacheVC(cont);
00113 c->first_key = c->key = c->earliest_key = *key;
00114 c->vol = vol;
00115 c->vio.op = VIO::READ;
00116 c->base_stat = cache_read_active_stat;
00117 CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
00118 c->request.copy_shallow(request);
00119 c->frag_type = CACHE_FRAG_TYPE_HTTP;
00120 c->params = params;
00121 c->od = od;
00122 }
00123 if (!lock) {
00124 SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
00125 CONT_SCHED_LOCK_RETRY(c);
00126 return &c->_action;
00127 }
00128 if (!c)
00129 goto Lmiss;
00130 if (c->od)
00131 goto Lwriter;
00132
00133 c->dir = c->first_dir = result;
00134 c->last_collision = last_collision;
00135 SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
00136 switch(c->do_read_call(&c->key)) {
00137 case EVENT_DONE: return ACTION_RESULT_DONE;
00138 case EVENT_RETURN: goto Lcallreturn;
00139 default: return &c->_action;
00140 }
00141 }
00142 Lmiss:
00143 CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
00144 cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
00145 return ACTION_RESULT_DONE;
00146 Lwriter:
00147
00148 ((HttpCacheSM *)cont)->set_readwhilewrite_inprogress(true);
00149 SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
00150 if (c->handleEvent(EVENT_IMMEDIATE, 0) == EVENT_DONE)
00151 return ACTION_RESULT_DONE;
00152 return &c->_action;
00153 Lcallreturn:
00154 if (c->handleEvent(AIO_EVENT_DONE, 0) == EVENT_DONE)
00155 return ACTION_RESULT_DONE;
00156 return &c->_action;
00157 }
00158 #endif
00159
00160 uint32_t
00161 CacheVC::load_http_info(CacheHTTPInfoVector* info, Doc* doc, RefCountObj * block_ptr)
00162 {
00163 uint32_t zret = info->get_handles(doc->hdr(), doc->hlen, block_ptr);
00164 if (cache_config_compatibility_4_2_0_fixup &&
00165 vol->header->version.ink_major == 23 && vol->header->version.ink_minor == 0
00166 ) {
00167 for ( int i = info->xcount - 1 ; i >= 0 ; --i ) {
00168 info->data(i).alternate.m_alt->m_response_hdr.m_mime->recompute_accelerators_and_presence_bits();
00169 info->data(i).alternate.m_alt->m_request_hdr.m_mime->recompute_accelerators_and_presence_bits();
00170 }
00171 }
00172 return zret;
00173 }
00174
00175 int
00176 CacheVC::openReadFromWriterFailure(int event, Event * e)
00177 {
00178
00179 od = NULL;
00180 vector.clear(false);
00181 CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
00182 CACHE_INCREMENT_DYN_STAT(cache_read_busy_failure_stat);
00183 _action.continuation->handleEvent(event, e);
00184 free_CacheVC(this);
00185 return EVENT_DONE;
00186 }
00187
00188 int
00189 CacheVC::openReadChooseWriter(int , Event * )
00190 {
00191 intptr_t err = ECACHE_DOC_BUSY;
00192 CacheVC *w = NULL;
00193
00194 ink_assert(vol->mutex->thread_holding == mutex->thread_holding && write_vc == NULL);
00195
00196 if (!od)
00197 return EVENT_RETURN;
00198
00199 if (frag_type != CACHE_FRAG_TYPE_HTTP) {
00200 ink_assert(od->num_writers == 1);
00201 w = od->writers.head;
00202 if (w->start_time > start_time || w->closed < 0) {
00203 od = NULL;
00204 return EVENT_RETURN;
00205 }
00206 if (!w->closed)
00207 return -err;
00208 write_vc = w;
00209 }
00210 #ifdef HTTP_CACHE
00211 else {
00212 write_vector = &od->vector;
00213 int write_vec_cnt = write_vector->count();
00214 for (int c = 0; c < write_vec_cnt; c++)
00215 vector.insert(write_vector->get(c));
00216
00217
00218 for (w = (CacheVC *) od->writers.head; w; w = (CacheVC *) w->opendir_link.next) {
00219 if (w->start_time > start_time || w->closed < 0)
00220 continue;
00221 if (!w->closed && !cache_config_read_while_writer) {
00222 return -err;
00223 }
00224 if (w->alternate_index != CACHE_ALT_INDEX_DEFAULT)
00225 continue;
00226
00227 if (!w->closed && !w->alternate.valid()) {
00228 od = NULL;
00229 ink_assert(!write_vc);
00230 vector.clear(false);
00231 return EVENT_CONT;
00232 }
00233
00234 int alt_ndx = CACHE_ALT_INDEX_DEFAULT;
00235 if (w->f.update) {
00236
00237 alt_ndx = get_alternate_index(&vector, w->update_key);
00238
00239 if (!w->alternate.valid()) {
00240 if (alt_ndx >= 0)
00241 vector.remove(alt_ndx, false);
00242 continue;
00243 }
00244 }
00245 ink_assert(w->alternate.valid());
00246 if (w->alternate.valid())
00247 vector.insert(&w->alternate, alt_ndx);
00248 }
00249
00250 if (!vector.count()) {
00251 if (od->reading_vec) {
00252
00253
00254
00255 od = NULL;
00256 return EVENT_RETURN;
00257 }
00258 return -ECACHE_NO_DOC;
00259 }
00260 if (cache_config_select_alternate) {
00261 alternate_index = HttpTransactCache::SelectFromAlternates(&vector, &request, params);
00262 if (alternate_index < 0)
00263 return -ECACHE_ALT_MISS;
00264 } else
00265 alternate_index = 0;
00266 CacheHTTPInfo *obj = vector.get(alternate_index);
00267 for (w = (CacheVC *) od->writers.head; w; w = (CacheVC *) w->opendir_link.next) {
00268 if (obj->m_alt == w->alternate.m_alt) {
00269 write_vc = w;
00270 break;
00271 }
00272 }
00273 vector.clear(false);
00274 if (!write_vc) {
00275 DDebug("cache_read_agg", "%p: key: %X writer alternate different: %d", this, first_key.slice32(1), alternate_index);
00276 od = NULL;
00277 return EVENT_RETURN;
00278 }
00279
00280 DDebug("cache_read_agg",
00281 "%p: key: %X eKey: %d # alts: %d, ndx: %d, # writers: %d writer: %p",
00282 this, first_key.slice32(1), write_vc->earliest_key.slice32(1),
00283 vector.count(), alternate_index, od->num_writers, write_vc);
00284 }
00285 #endif //HTTP_CACHE
00286 return EVENT_NONE;
00287 }
00288
00289 int
00290 CacheVC::openReadFromWriter(int event, Event * e)
00291 {
00292 if (!f.read_from_writer_called) {
00293
00294
00295 last_collision = NULL;
00296
00297
00298
00299
00300
00301 start_time = ink_get_hrtime();
00302 f.read_from_writer_called = 1;
00303 }
00304 cancel_trigger();
00305 intptr_t err = ECACHE_DOC_BUSY;
00306 DDebug("cache_read_agg", "%p: key: %X In openReadFromWriter", this, first_key.slice32(1));
00307 #ifndef READ_WHILE_WRITER
00308 return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) -err);
00309 #else
00310 if (_action.cancelled) {
00311 od = NULL;
00312 return free_CacheVC(this);
00313 }
00314 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00315 if (!lock)
00316 VC_SCHED_LOCK_RETRY();
00317 od = vol->open_read(&first_key);
00318 if (!od) {
00319 MUTEX_RELEASE(lock);
00320 write_vc = NULL;
00321 SET_HANDLER(&CacheVC::openReadStartHead);
00322 return openReadStartHead(event, e);
00323 } else
00324 ink_assert(od == vol->open_read(&first_key));
00325 if (!write_vc) {
00326 int ret = openReadChooseWriter(event, e);
00327 if (ret < 0) {
00328 MUTEX_RELEASE(lock);
00329 SET_HANDLER(&CacheVC::openReadFromWriterFailure);
00330 return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, reinterpret_cast<Event *> (ret));
00331 } else if (ret == EVENT_RETURN) {
00332 MUTEX_RELEASE(lock);
00333 SET_HANDLER(&CacheVC::openReadStartHead);
00334 return openReadStartHead(event, e);
00335 } else if (ret == EVENT_CONT) {
00336 ink_assert(!write_vc);
00337 VC_SCHED_WRITER_RETRY();
00338 } else
00339 ink_assert(write_vc);
00340 } else {
00341 if (writer_done()) {
00342 MUTEX_RELEASE(lock);
00343 DDebug("cache_read_agg",
00344 "%p: key: %X writer %p has left, continuing as normal read", this, first_key.slice32(1), write_vc);
00345 od = NULL;
00346 write_vc = NULL;
00347 SET_HANDLER(&CacheVC::openReadStartHead);
00348 return openReadStartHead(event, e);
00349 }
00350 }
00351 #ifdef HTTP_CACHE
00352 OpenDirEntry *cod = od;
00353 #endif
00354 od = NULL;
00355
00356 if (write_vc->closed < 0) {
00357 MUTEX_RELEASE(lock);
00358 write_vc = NULL;
00359
00360 SET_HANDLER(&CacheVC::openReadStartHead);
00361 return openReadStartHead(EVENT_IMMEDIATE, 0);
00362 }
00363
00364 ink_assert(frag_type == CACHE_FRAG_TYPE_HTTP || write_vc->closed);
00365 if (!write_vc->closed && !write_vc->fragment) {
00366 if (!cache_config_read_while_writer || frag_type != CACHE_FRAG_TYPE_HTTP) {
00367 MUTEX_RELEASE(lock);
00368 return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) - err);
00369 }
00370 DDebug("cache_read_agg",
00371 "%p: key: %X writer: closed:%d, fragment:%d, retry: %d",
00372 this, first_key.slice32(1), write_vc->closed, write_vc->fragment, writer_lock_retry);
00373 VC_SCHED_WRITER_RETRY();
00374 }
00375
00376 CACHE_TRY_LOCK(writer_lock, write_vc->mutex, mutex->thread_holding);
00377 if (!writer_lock) {
00378 DDebug("cache_read_agg", "%p: key: %X lock miss", this, first_key.slice32(1));
00379 VC_SCHED_LOCK_RETRY();
00380 }
00381 MUTEX_RELEASE(lock);
00382
00383 if (!write_vc->io.ok())
00384 return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) - err);
00385 #ifdef HTTP_CACHE
00386 if (frag_type == CACHE_FRAG_TYPE_HTTP) {
00387 DDebug("cache_read_agg",
00388 "%p: key: %X http passed stage 1, closed: %d, frag: %d",
00389 this, first_key.slice32(1), write_vc->closed, write_vc->fragment);
00390 if (!write_vc->alternate.valid())
00391 return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) - err);
00392 alternate.copy(&write_vc->alternate);
00393 vector.insert(&alternate);
00394 alternate.object_key_get(&key);
00395 write_vc->f.readers = 1;
00396 if (!(write_vc->f.update && write_vc->total_len == 0)) {
00397 key = write_vc->earliest_key;
00398 if (!write_vc->closed)
00399 alternate.object_size_set(write_vc->vio.nbytes);
00400 else
00401 alternate.object_size_set(write_vc->total_len);
00402 } else {
00403 key = write_vc->update_key;
00404 ink_assert(write_vc->closed);
00405 DDebug("cache_read_agg", "%p: key: %X writer header update", this, first_key.slice32(1));
00406
00407 doc_len = alternate.object_size_get();
00408 if (write_vc->update_key == cod->single_doc_key &&
00409 (cod->move_resident_alt || write_vc->f.rewrite_resident_alt) && write_vc->first_buf._ptr()) {
00410
00411
00412
00413 Doc *doc = (Doc *) write_vc->first_buf->data();
00414 writer_buf = new_IOBufferBlock(write_vc->first_buf, doc->data_len(), doc->prefix_len());
00415 MUTEX_RELEASE(writer_lock);
00416 ink_assert(doc_len == doc->data_len());
00417 length = doc_len;
00418 f.single_fragment = 1;
00419 doc_pos = 0;
00420 earliest_key = key;
00421 dir_clean(&first_dir);
00422 dir_clean(&earliest_dir);
00423 SET_HANDLER(&CacheVC::openReadFromWriterMain);
00424 CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
00425 return callcont(CACHE_EVENT_OPEN_READ);
00426 }
00427
00428
00429 last_collision = NULL;
00430 MUTEX_RELEASE(writer_lock);
00431 SET_HANDLER(&CacheVC::openReadStartEarliest);
00432 return openReadStartEarliest(event, e);
00433 }
00434 } else {
00435 #endif //HTTP_CACHE
00436 DDebug("cache_read_agg", "%p: key: %X non-http passed stage 1", this, first_key.slice32(1));
00437 key = write_vc->earliest_key;
00438 #ifdef HTTP_CACHE
00439 }
00440 #endif
00441 if (write_vc->fragment) {
00442 doc_len = write_vc->vio.nbytes;
00443 last_collision = NULL;
00444 DDebug("cache_read_agg",
00445 "%p: key: %X closed: %d, fragment: %d, len: %d starting first fragment",
00446 this, first_key.slice32(1), write_vc->closed, write_vc->fragment, (int)doc_len);
00447 MUTEX_RELEASE(writer_lock);
00448
00449 SET_HANDLER(&CacheVC::openReadStartEarliest);
00450 return openReadStartEarliest(event, e);
00451 }
00452 writer_buf = write_vc->blocks;
00453 writer_offset = write_vc->offset;
00454 length = write_vc->length;
00455
00456 f.single_fragment = !write_vc->fragment;
00457 doc_pos = 0;
00458 earliest_key = write_vc->earliest_key;
00459 ink_assert(earliest_key == key);
00460 doc_len = write_vc->total_len;
00461 dir_clean(&first_dir);
00462 dir_clean(&earliest_dir);
00463 DDebug("cache_read_agg", "%p: key: %X %X: single fragment read", this, first_key.slice32(1), key.slice32(0));
00464 MUTEX_RELEASE(writer_lock);
00465 SET_HANDLER(&CacheVC::openReadFromWriterMain);
00466 CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
00467 return callcont(CACHE_EVENT_OPEN_READ);
00468 #endif //READ_WHILE_WRITER
00469 }
00470
00471 int
00472 CacheVC::openReadFromWriterMain(int , Event * )
00473 {
00474 cancel_trigger();
00475 if (seek_to) {
00476 vio.ndone = seek_to;
00477 seek_to = 0;
00478 }
00479 IOBufferBlock *b = NULL;
00480 int64_t ntodo = vio.ntodo();
00481 if (ntodo <= 0)
00482 return EVENT_CONT;
00483 if (length < ((int64_t)doc_len) - vio.ndone) {
00484 DDebug("cache_read_agg", "truncation %X", first_key.slice32(1));
00485 if (is_action_tag_set("cache")) {
00486 ink_release_assert(false);
00487 }
00488 Warning("Document %X truncated at %d of %d, reading from writer", first_key.slice32(1), (int)vio.ndone, (int)doc_len);
00489 return calluser(VC_EVENT_ERROR);
00490 }
00491
00492
00493 if (length > ((int64_t)doc_len) - vio.ndone) {
00494 int64_t skip_bytes = length - (doc_len - vio.ndone);
00495 iobufferblock_skip(writer_buf, &writer_offset, &length, skip_bytes);
00496 }
00497 int64_t bytes = length;
00498 if (bytes > vio.ntodo())
00499 bytes = vio.ntodo();
00500 if (vio.ndone >= (int64_t)doc_len) {
00501 ink_assert(bytes <= 0);
00502
00503 return calluser(VC_EVENT_EOS);
00504 }
00505 b = iobufferblock_clone(writer_buf, writer_offset, bytes);
00506 writer_buf = iobufferblock_skip(writer_buf, &writer_offset, &length, bytes);
00507 vio.buffer.writer()->append_block(b);
00508 vio.ndone += bytes;
00509 if (vio.ntodo() <= 0)
00510 return calluser(VC_EVENT_READ_COMPLETE);
00511 else
00512 return calluser(VC_EVENT_READ_READY);
00513 }
00514
00515 int
00516 CacheVC::openReadClose(int event, Event * )
00517 {
00518 cancel_trigger();
00519 if (is_io_in_progress()) {
00520 if (event != AIO_EVENT_DONE)
00521 return EVENT_CONT;
00522 set_io_not_in_progress();
00523 }
00524 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00525 if (!lock)
00526 VC_SCHED_LOCK_RETRY();
00527 if (f.hit_evacuate && dir_valid(vol, &first_dir) && closed > 0) {
00528 if (f.single_fragment)
00529 vol->force_evacuate_head(&first_dir, dir_pinned(&first_dir));
00530 else if (dir_valid(vol, &earliest_dir)) {
00531 vol->force_evacuate_head(&first_dir, dir_pinned(&first_dir));
00532 vol->force_evacuate_head(&earliest_dir, dir_pinned(&earliest_dir));
00533 }
00534 }
00535 vol->close_read(this);
00536 return free_CacheVC(this);
00537 }
00538
00539 int
00540 CacheVC::openReadReadDone(int event, Event * e)
00541 {
00542 Doc *doc = NULL;
00543
00544 cancel_trigger();
00545 if (event == EVENT_IMMEDIATE)
00546 return EVENT_CONT;
00547 set_io_not_in_progress();
00548 {
00549 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00550 if (!lock)
00551 VC_SCHED_LOCK_RETRY();
00552 if (event == AIO_EVENT_DONE && !io.ok()) {
00553 dir_delete(&earliest_key, vol, &earliest_dir);
00554 goto Lerror;
00555 }
00556 if (last_collision &&
00557 dir_valid(vol, &dir))
00558 {
00559 doc = (Doc *) buf->data();
00560 if (doc->magic != DOC_MAGIC) {
00561 char tmpstring[100];
00562 if (doc->magic == DOC_CORRUPT)
00563 Warning("Middle: Doc checksum does not match for %s", key.toHexStr(tmpstring));
00564 else
00565 Warning("Middle: Doc magic does not match for %s", key.toHexStr(tmpstring));
00566 #if TS_USE_INTERIM_CACHE == 1
00567 if (dir_ininterim(&dir)) {
00568 dir_delete(&key, vol, &dir);
00569 goto Lread;
00570 }
00571 #endif
00572 goto Lerror;
00573 }
00574 if (doc->key == key)
00575 goto LreadMain;
00576 #if TS_USE_INTERIM_CACHE == 1
00577 else if (dir_ininterim(&dir)) {
00578 dir_delete(&key, vol, &dir);
00579 last_collision = NULL;
00580 }
00581 #endif
00582 }
00583 #if TS_USE_INTERIM_CACHE == 1
00584 if (last_collision && dir_get_offset(&dir) != dir_get_offset(last_collision))
00585 last_collision = 0;
00586 Lread:
00587 #else
00588 if (last_collision && dir_offset(&dir) != dir_offset(last_collision))
00589 last_collision = 0;
00590 #endif
00591 if (dir_probe(&key, vol, &dir, &last_collision)) {
00592 int ret = do_read_call(&key);
00593 if (ret == EVENT_RETURN)
00594 goto Lcallreturn;
00595 return EVENT_CONT;
00596 } else if (write_vc) {
00597 if (writer_done()) {
00598 last_collision = NULL;
00599 while (dir_probe(&earliest_key, vol, &dir, &last_collision)) {
00600 #if TS_USE_INTERIM_CACHE == 1
00601 if (dir_get_offset(&dir) == dir_get_offset(&earliest_dir)) {
00602 #else
00603 if (dir_offset(&dir) == dir_offset(&earliest_dir)) {
00604 #endif
00605 DDebug("cache_read_agg", "%p: key: %X ReadRead complete: %d",
00606 this, first_key.slice32(1), (int)vio.ndone);
00607 doc_len = vio.ndone;
00608 goto Ldone;
00609 }
00610 }
00611 DDebug("cache_read_agg", "%p: key: %X ReadRead writer aborted: %d",
00612 this, first_key.slice32(1), (int)vio.ndone);
00613 goto Lerror;
00614 }
00615 DDebug("cache_read_agg", "%p: key: %X ReadRead retrying: %d", this, first_key.slice32(1), (int)vio.ndone);
00616 VC_SCHED_WRITER_RETRY();
00617 }
00618
00619 }
00620 Lerror:
00621 char tmpstring[100];
00622 Warning("Document %s truncated", earliest_key.toHexStr(tmpstring));
00623 return calluser(VC_EVENT_ERROR);
00624 Ldone:
00625 return calluser(VC_EVENT_EOS);
00626 Lcallreturn:
00627 return handleEvent(AIO_EVENT_DONE, 0);
00628 LreadMain:
00629 fragment++;
00630 doc_pos = doc->prefix_len();
00631 next_CacheKey(&key, &key);
00632 SET_HANDLER(&CacheVC::openReadMain);
00633 return openReadMain(event, e);
00634 }
00635
00636 int
00637 CacheVC::openReadMain(int , Event * )
00638 {
00639 cancel_trigger();
00640 Doc *doc = (Doc *) buf->data();
00641 int64_t ntodo = vio.ntodo();
00642 int64_t bytes = doc->len - doc_pos;
00643 IOBufferBlock *b = NULL;
00644 if (seek_to) {
00645 if (seek_to >= doc_len) {
00646 vio.ndone = doc_len;
00647 return calluser(VC_EVENT_EOS);
00648 }
00649 #ifdef HTTP_CACHE
00650 HTTPInfo::FragOffset* frags = alternate.get_frag_table();
00651 if (is_debug_tag_set("cache_seek")) {
00652 char b[33], c[33];
00653 Debug("cache_seek", "Seek @ %" PRId64" in %s from #%d @ %" PRId64"/%d:%s",
00654 seek_to, first_key.toHexStr(b), fragment, doc_pos, doc->len, doc->key.toHexStr(c));
00655 }
00656
00657
00658
00659
00660 if (frags) {
00661 int target = 0;
00662 HTTPInfo::FragOffset next_off = frags[target];
00663 int lfi = static_cast<int>(alternate.get_frag_offset_count()) - 1;
00664 ink_assert(lfi >= 0);
00665
00666
00667
00668
00669
00670
00671
00672 if (fragment == 0 ||
00673 seek_to < frags[fragment-1] ||
00674 (fragment <= lfi && frags[fragment] <= seek_to)
00675 ) {
00676
00677 while (seek_to >= next_off && target < lfi) {
00678 next_off = frags[++target];
00679 }
00680 if (target == lfi && seek_to >= next_off) ++target;
00681 } else {
00682 target = fragment;
00683 }
00684 if (target != fragment) {
00685
00686
00687 int cfi = fragment;
00688 --target;
00689 while (target > fragment) {
00690 next_CacheKey(&key, &key);
00691 ++fragment;
00692 }
00693 while (target < fragment) {
00694 prev_CacheKey(&key, &key);
00695 --fragment;
00696 }
00697
00698 if (is_debug_tag_set("cache_seek")) {
00699 char target_key_str[33];
00700 key.toHexStr(target_key_str);
00701 Debug("cache_seek", "Seek #%d @ %" PRId64" -> #%d @ %" PRId64":%s", cfi, doc_pos, target, seek_to, target_key_str);
00702 }
00703 goto Lread;
00704 }
00705 }
00706 doc_pos = doc->prefix_len() + seek_to;
00707 if (fragment) doc_pos -= static_cast<int64_t>(frags[fragment-1]);
00708 vio.ndone = 0;
00709 seek_to = 0;
00710 ntodo = vio.ntodo();
00711 bytes = doc->len - doc_pos;
00712 if (is_debug_tag_set("cache_seek")) {
00713 char target_key_str[33];
00714 key.toHexStr(target_key_str);
00715 Debug("cache_seek", "Read # %d @ %" PRId64"/%d for %" PRId64, fragment, doc_pos, doc->len, bytes);
00716 }
00717 #endif
00718 }
00719 if (ntodo <= 0)
00720 return EVENT_CONT;
00721 if (vio.buffer.writer()->max_read_avail() > vio.buffer.writer()->water_mark && vio.ndone)
00722 return EVENT_CONT;
00723 if ((bytes <= 0) && vio.ntodo() >= 0)
00724 goto Lread;
00725 if (bytes > vio.ntodo())
00726 bytes = vio.ntodo();
00727 b = new_IOBufferBlock(buf, bytes, doc_pos);
00728 b->_buf_end = b->_end;
00729 vio.buffer.writer()->append_block(b);
00730 vio.ndone += bytes;
00731 doc_pos += bytes;
00732 if (vio.ntodo() <= 0)
00733 return calluser(VC_EVENT_READ_COMPLETE);
00734 else {
00735 if (calluser(VC_EVENT_READ_READY) == EVENT_DONE)
00736 return EVENT_DONE;
00737
00738
00739 if (vio.ntodo() > 0 && !vio.buffer.writer()->high_water())
00740 goto Lread;
00741 return EVENT_CONT;
00742 }
00743 Lread: {
00744 if (vio.ndone >= (int64_t)doc_len)
00745
00746 return calluser(VC_EVENT_EOS);
00747 last_collision = 0;
00748 writer_lock_retry = 0;
00749
00750
00751
00752
00753 cancel_trigger();
00754 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00755 if (!lock) {
00756 SET_HANDLER(&CacheVC::openReadMain);
00757 VC_SCHED_LOCK_RETRY();
00758 }
00759 if (dir_probe(&key, vol, &dir, &last_collision)) {
00760 SET_HANDLER(&CacheVC::openReadReadDone);
00761 int ret = do_read_call(&key);
00762 if (ret == EVENT_RETURN)
00763 goto Lcallreturn;
00764 return EVENT_CONT;
00765 } else if (write_vc) {
00766 if (writer_done()) {
00767 last_collision = NULL;
00768 while (dir_probe(&earliest_key, vol, &dir, &last_collision)) {
00769 if (dir_offset(&dir) == dir_offset(&earliest_dir)) {
00770 DDebug("cache_read_agg", "%p: key: %X ReadMain complete: %d",
00771 this, first_key.slice32(1), (int)vio.ndone);
00772 doc_len = vio.ndone;
00773 goto Leos;
00774 }
00775 }
00776 DDebug("cache_read_agg", "%p: key: %X ReadMain writer aborted: %d",
00777 this, first_key.slice32(1), (int)vio.ndone);
00778 goto Lerror;
00779 }
00780 DDebug("cache_read_agg", "%p: key: %X ReadMain retrying: %d", this, first_key.slice32(1), (int)vio.ndone);
00781 SET_HANDLER(&CacheVC::openReadMain);
00782 VC_SCHED_WRITER_RETRY();
00783 }
00784 if (is_action_tag_set("cache"))
00785 ink_release_assert(false);
00786 Warning("Document %X truncated at %d of %d, missing fragment %X", first_key.slice32(1), (int)vio.ndone, (int)doc_len, key.slice32(1));
00787
00788 dir_delete(&earliest_key, vol, &earliest_dir);
00789 }
00790 Lerror:
00791 return calluser(VC_EVENT_ERROR);
00792 Leos:
00793 return calluser(VC_EVENT_EOS);
00794 Lcallreturn:
00795 return handleEvent(AIO_EVENT_DONE, 0);
00796 }
00797
00798
00799
00800
00801
00802 int
00803 CacheVC::openReadStartEarliest(int , Event * )
00804 {
00805 int ret = 0;
00806 Doc *doc = NULL;
00807 cancel_trigger();
00808 set_io_not_in_progress();
00809 if (_action.cancelled)
00810 return free_CacheVC(this);
00811 {
00812 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00813 if (!lock)
00814 VC_SCHED_LOCK_RETRY();
00815 if (!buf)
00816 goto Lread;
00817 if (!io.ok())
00818 goto Ldone;
00819
00820
00821 if (!dir_agg_valid(vol, &dir)) {
00822
00823 if (!dir_valid(vol, &dir))
00824 last_collision = NULL;
00825 goto Lread;
00826 }
00827 doc = (Doc *) buf->data();
00828 if (doc->magic != DOC_MAGIC) {
00829 char tmpstring[100];
00830 if (is_action_tag_set("cache")) {
00831 ink_release_assert(false);
00832 }
00833 if (doc->magic == DOC_CORRUPT)
00834 Warning("Earliest: Doc checksum does not match for %s", key.toHexStr(tmpstring));
00835 else
00836 Warning("Earliest : Doc magic does not match for %s", key.toHexStr(tmpstring));
00837
00838 dir_delete(&key, vol, &dir);
00839
00840
00841
00842
00843 last_collision = NULL;
00844 goto Lread;
00845 }
00846 if (!(doc->key == key))
00847 goto Lread;
00848
00849 earliest_key = key;
00850 doc_pos = doc->prefix_len();
00851 next_CacheKey(&key, &doc->key);
00852 vol->begin_read(this);
00853 if (vol->within_hit_evacuate_window(&earliest_dir) &&
00854 (!cache_config_hit_evacuate_size_limit || doc_len <= (uint64_t)cache_config_hit_evacuate_size_limit)
00855 #if TS_USE_INTERIM_CACHE == 1
00856 && !dir_ininterim(&dir)
00857 #endif
00858 ) {
00859 DDebug("cache_hit_evac", "dir: %" PRId64", write: %" PRId64", phase: %d",
00860 dir_offset(&earliest_dir), offset_to_vol_offset(vol, vol->header->write_pos), vol->header->phase);
00861 f.hit_evacuate = 1;
00862 }
00863 goto Lsuccess;
00864 Lread:
00865 if (dir_probe(&key, vol, &earliest_dir, &last_collision) ||
00866 dir_lookaside_probe(&key, vol, &earliest_dir, NULL))
00867 {
00868 dir = earliest_dir;
00869 #if TS_USE_INTERIM_CACHE == 1
00870 if (dir_ininterim(&dir) && alternate.get_frag_offset_count() > 1) {
00871 dir_delete(&key, vol, &dir);
00872 last_collision = NULL;
00873 goto Lread;
00874 }
00875 #endif
00876 if ((ret = do_read_call(&key)) == EVENT_RETURN)
00877 goto Lcallreturn;
00878 return ret;
00879 }
00880
00881
00882 #ifdef HTTP_CACHE
00883 if (!f.read_from_writer_called && frag_type == CACHE_FRAG_TYPE_HTTP) {
00884
00885 if (!vol->open_write(this, false, 1)) {
00886 Doc *doc1 = (Doc *) first_buf->data();
00887 uint32_t len = this->load_http_info(write_vector, doc1);
00888 ink_assert(len == doc1->hlen && write_vector->count() > 0);
00889 write_vector->remove(alternate_index, true);
00890
00891 if (len != doc1->hlen || !write_vector->count()) {
00892
00893
00894
00895 dir_delete(&first_key, vol, &first_dir);
00896 }
00897 #if TS_USE_INTERIM_CACHE == 1
00898 else if (dir_ininterim(&first_dir))
00899 dir_delete(&first_key, vol, &first_dir);
00900 #endif
00901 else {
00902 buf = NULL;
00903 last_collision = NULL;
00904 write_len = 0;
00905 header_len = write_vector->marshal_length();
00906 f.evac_vector = 1;
00907 f.use_first_key = 1;
00908 key = first_key;
00909
00910
00911
00912
00913 od->first_dir = first_dir;
00914 od->writing_vec = 1;
00915 earliest_key = zero_key;
00916
00917
00918 vio.op = VIO::WRITE;
00919 total_len = 0;
00920 f.update = 1;
00921 alternate_index = CACHE_ALT_REMOVED;
00922
00923
00924
00925
00926 if (doc1->total_len > 0) {
00927 od->move_resident_alt = 1;
00928 od->single_doc_key = doc1->key;
00929 dir_assign(&od->single_doc_dir, &dir);
00930 dir_set_tag(&od->single_doc_dir, od->single_doc_key.slice32(2));
00931 }
00932 SET_HANDLER(&CacheVC::openReadVecWrite);
00933 if ((ret = do_write_call()) == EVENT_RETURN)
00934 goto Lcallreturn;
00935 return ret;
00936 }
00937 }
00938 }
00939 #endif
00940
00941 Ldone:
00942 if (od)
00943 vol->close_write(this);
00944 }
00945 CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
00946 _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
00947 return free_CacheVC(this);
00948 Lcallreturn:
00949 return handleEvent(AIO_EVENT_DONE, 0);
00950 Lsuccess:
00951 if (write_vc)
00952 CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
00953 SET_HANDLER(&CacheVC::openReadMain);
00954 return callcont(CACHE_EVENT_OPEN_READ);
00955 }
00956
00957
00958
00959 #ifdef HTTP_CACHE
00960 int
00961 CacheVC::openReadVecWrite(int , Event * )
00962 {
00963 cancel_trigger();
00964 set_io_not_in_progress();
00965 ink_assert(od);
00966 od->writing_vec = 0;
00967 if (_action.cancelled)
00968 return openWriteCloseDir(EVENT_IMMEDIATE, 0);
00969 {
00970 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00971 if (!lock)
00972 VC_SCHED_LOCK_RETRY();
00973 if (io.ok()) {
00974 ink_assert(f.evac_vector);
00975 ink_assert(frag_type == CACHE_FRAG_TYPE_HTTP);
00976 ink_assert(!buf.m_ptr);
00977 f.evac_vector = false;
00978 last_collision = NULL;
00979 f.update = 0;
00980 alternate_index = CACHE_ALT_INDEX_DEFAULT;
00981 f.use_first_key = 0;
00982 vio.op = VIO::READ;
00983 dir_overwrite(&first_key, vol, &dir, &od->first_dir);
00984 if (od->move_resident_alt)
00985 dir_insert(&od->single_doc_key, vol, &od->single_doc_dir);
00986 int alt_ndx = HttpTransactCache::SelectFromAlternates(write_vector, &request, params);
00987 vol->close_write(this);
00988 if (alt_ndx >= 0) {
00989 vector.clear();
00990
00991
00992
00993 goto Lrestart;
00994 }
00995 } else
00996 vol->close_write(this);
00997 }
00998
00999 CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
01000 _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_ALT_MISS);
01001 return free_CacheVC(this);
01002 Lrestart:
01003 SET_HANDLER(&CacheVC::openReadStartHead);
01004 return openReadStartHead(EVENT_IMMEDIATE, 0);
01005 }
01006 #endif
01007
01008
01009
01010
01011
01012 int
01013 CacheVC::openReadStartHead(int event, Event * e)
01014 {
01015 intptr_t err = ECACHE_NO_DOC;
01016 Doc *doc = NULL;
01017 cancel_trigger();
01018 set_io_not_in_progress();
01019 if (_action.cancelled)
01020 return free_CacheVC(this);
01021 {
01022 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
01023 if (!lock)
01024 VC_SCHED_LOCK_RETRY();
01025 if (!buf)
01026 goto Lread;
01027 if (!io.ok())
01028 goto Ldone;
01029
01030
01031 if (!dir_agg_valid(vol, &dir)) {
01032
01033 if (!dir_valid(vol, &dir))
01034 last_collision = NULL;
01035 #if TS_USE_INTERIM_CACHE == 1
01036 if (dir_ininterim(&dir)) {
01037 dir_delete(&key, vol, &dir);
01038 last_collision = NULL;
01039 }
01040 #endif
01041 goto Lread;
01042 }
01043 doc = (Doc *) buf->data();
01044 if (doc->magic != DOC_MAGIC) {
01045 char tmpstring[100];
01046 if (is_action_tag_set("cache")) {
01047 ink_release_assert(false);
01048 }
01049 if (doc->magic == DOC_CORRUPT)
01050 Warning("Head: Doc checksum does not match for %s", key.toHexStr(tmpstring));
01051 else
01052 Warning("Head : Doc magic does not match for %s", key.toHexStr(tmpstring));
01053
01054 dir_delete(&key, vol, &dir);
01055
01056
01057
01058
01059 last_collision = NULL;
01060 goto Lread;
01061 }
01062 if (!(doc->first_key == key))
01063 #if TS_USE_INTERIM_CACHE == 1
01064 {
01065 if (dir_ininterim(&dir)) {
01066 dir_delete(&key, vol, &dir);
01067 last_collision = NULL;
01068 }
01069 goto Lread;
01070 }
01071 #else
01072 goto Lread;
01073 #endif
01074 if (f.lookup)
01075 goto Lookup;
01076 earliest_dir = dir;
01077 #ifdef HTTP_CACHE
01078 CacheHTTPInfo *alternate_tmp;
01079 if (frag_type == CACHE_FRAG_TYPE_HTTP) {
01080 uint32_t uml;
01081 ink_assert(doc->hlen);
01082 if (!doc->hlen)
01083 goto Ldone;
01084 if ((uml = this->load_http_info(&vector, doc)) != doc->hlen) {
01085 if (buf) {
01086 HTTPCacheAlt* alt = reinterpret_cast<HTTPCacheAlt*>(doc->hdr());
01087 int32_t alt_length = 0;
01088
01089
01090 for ( int32_t i = 0 ; i < vector.count() ; ++i ) {
01091 CacheHTTPInfo* info = vector.get(i);
01092 if (info && info->m_alt) alt_length += info->m_alt->m_unmarshal_len;
01093 }
01094 Note("OpenReadHead failed for cachekey %X : vector inconsistency - "
01095 "unmarshalled %d expecting %d in %d (base=%d, ver=%d:%d) "
01096 "- vector n=%d size=%d"
01097 "first alt=%d[%s]"
01098 , key.slice32(0)
01099 , uml, doc->hlen, doc->len, sizeofDoc
01100 , doc->v_major, doc->v_minor
01101 , vector.count(), alt_length
01102 , alt->m_magic
01103 , (CACHE_ALT_MAGIC_ALIVE == alt->m_magic ? "alive"
01104 : CACHE_ALT_MAGIC_MARSHALED == alt->m_magic ? "serial"
01105 : CACHE_ALT_MAGIC_DEAD == alt->m_magic ? "dead"
01106 : "bogus")
01107 );
01108 dir_delete(&key, vol, &dir);
01109 }
01110 err = ECACHE_BAD_META_DATA;
01111 goto Ldone;
01112 }
01113 if (cache_config_select_alternate) {
01114 alternate_index = HttpTransactCache::SelectFromAlternates(&vector, &request, params);
01115 if (alternate_index < 0) {
01116 err = ECACHE_ALT_MISS;
01117 goto Ldone;
01118 }
01119 } else
01120 alternate_index = 0;
01121 alternate_tmp = vector.get(alternate_index);
01122 if (!alternate_tmp->valid()) {
01123 if (buf) {
01124 Note("OpenReadHead failed for cachekey %X : alternate inconsistency", key.slice32(0));
01125 dir_delete(&key, vol, &dir);
01126 }
01127 goto Ldone;
01128 }
01129
01130 alternate.copy_shallow(alternate_tmp);
01131 alternate.object_key_get(&key);
01132 doc_len = alternate.object_size_get();
01133 if (key == doc->key) {
01134 f.single_fragment = doc->single_fragment();
01135 ink_assert(f.single_fragment);
01136 ink_assert(doc->hlen);
01137 doc_pos = doc->prefix_len();
01138 next_CacheKey(&key, &doc->key);
01139 } else {
01140 f.single_fragment = false;
01141 }
01142 } else
01143 #endif
01144 {
01145 next_CacheKey(&key, &doc->key);
01146 f.single_fragment = doc->single_fragment();
01147 doc_pos = doc->prefix_len();
01148 doc_len = doc->total_len;
01149 }
01150
01151 if (is_debug_tag_set("cache_read")) {
01152 char xt[33],yt[33];
01153 Debug("cache_read", "CacheReadStartHead - read %s target %s - %s %d of %" PRId64" bytes, %d fragments",
01154 doc->key.toHexStr(xt), key.toHexStr(yt),
01155 f.single_fragment ? "single" : "multi",
01156 doc->len, doc->total_len,
01157 #ifdef HTTP_CACHE
01158 alternate.get_frag_offset_count()
01159 #else
01160 0
01161 #endif
01162 );
01163 }
01164
01165
01166 if (!f.single_fragment)
01167 goto Learliest;
01168
01169 if (vol->within_hit_evacuate_window(&dir) &&
01170 (!cache_config_hit_evacuate_size_limit || doc_len <= (uint64_t)cache_config_hit_evacuate_size_limit)
01171 #if TS_USE_INTERIM_CACHE == 1
01172 && !f.read_from_interim
01173 #endif
01174 ) {
01175 DDebug("cache_hit_evac", "dir: %" PRId64", write: %" PRId64", phase: %d",
01176 dir_offset(&dir), offset_to_vol_offset(vol, vol->header->write_pos), vol->header->phase);
01177 f.hit_evacuate = 1;
01178 }
01179
01180 first_buf = buf;
01181 vol->begin_read(this);
01182
01183 goto Lsuccess;
01184
01185 Lread:
01186
01187
01188
01189
01190
01191 OpenDirEntry *cod = vol->open_read(&key);
01192 if (cod && !f.read_from_writer_called) {
01193 if (f.lookup) {
01194 err = ECACHE_DOC_BUSY;
01195 goto Ldone;
01196 }
01197 od = cod;
01198 MUTEX_RELEASE(lock);
01199 SET_HANDLER(&CacheVC::openReadFromWriter);
01200 return handleEvent(EVENT_IMMEDIATE, 0);
01201 }
01202 if (dir_probe(&key, vol, &dir, &last_collision)) {
01203 first_dir = dir;
01204 int ret = do_read_call(&key);
01205 if (ret == EVENT_RETURN)
01206 goto Lcallreturn;
01207 return ret;
01208 }
01209 }
01210 Ldone:
01211 if (!f.lookup) {
01212 CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
01213 _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -err);
01214 } else {
01215 CACHE_INCREMENT_DYN_STAT(cache_lookup_failure_stat);
01216 _action.continuation->handleEvent(CACHE_EVENT_LOOKUP_FAILED, (void *) -err);
01217 }
01218 return free_CacheVC(this);
01219 Lcallreturn:
01220 return handleEvent(AIO_EVENT_DONE, 0);
01221 Lsuccess:
01222 SET_HANDLER(&CacheVC::openReadMain);
01223 return callcont(CACHE_EVENT_OPEN_READ);
01224 Lookup:
01225 CACHE_INCREMENT_DYN_STAT(cache_lookup_success_stat);
01226 _action.continuation->handleEvent(CACHE_EVENT_LOOKUP, 0);
01227 return free_CacheVC(this);
01228 Learliest:
01229 first_buf = buf;
01230 buf = NULL;
01231 earliest_key = key;
01232 last_collision = NULL;
01233 SET_HANDLER(&CacheVC::openReadStartEarliest);
01234 return openReadStartEarliest(event, e);
01235 }