00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "P_Cache.h"
00026
00027 #define IS_POWER_2(_x) (!((_x)&((_x)-1)))
00028 #define UINT_WRAP_LTE(_x, _y) (((_y)-(_x)) < INT_MAX) // exploit overflow
00029 #define UINT_WRAP_GTE(_x, _y) (((_x)-(_y)) < INT_MAX) // exploit overflow
00030 #define UINT_WRAP_LT(_x, _y) (((_x)-(_y)) >= INT_MAX) // exploit overflow
00031
00032
00033
00034 #ifdef HTTP_CACHE
00035 int
00036 get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key)
00037 {
00038 int alt_count = cache_vector->count();
00039 CacheHTTPInfo *obj;
00040 if (!alt_count)
00041 return -1;
00042 for (int i = 0; i < alt_count; i++) {
00043 obj = cache_vector->get(i);
00044 if (obj->compare_object_key(&key)) {
00045
00046 return i;
00047 }
00048 }
00049 return -1;
00050 }
00051
00052
00053
00054
00055
00056
00057
00058 int
00059 CacheVC::updateVector(int , Event *)
00060 {
00061 cancel_trigger();
00062 if (od->reading_vec || od->writing_vec)
00063 VC_SCHED_LOCK_RETRY();
00064 int ret = 0;
00065 {
00066 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00067 if (!lock || od->writing_vec)
00068 VC_SCHED_LOCK_RETRY();
00069
00070 int vec = alternate.valid();
00071 if (f.update) {
00072
00073 alternate_index = get_alternate_index(write_vector, update_key);
00074 Debug("cache_update", "updating alternate index %d frags %d", alternate_index, alternate_index >=0 ? write_vector->get(alternate_index)->get_frag_offset_count() : -1);
00075
00076 if (!vec) {
00077 ink_assert(!total_len);
00078 if (alternate_index >= 0) {
00079 write_vector->remove(alternate_index, true);
00080 alternate_index = CACHE_ALT_REMOVED;
00081 if (!write_vector->count())
00082 dir_delete(&first_key, vol, &od->first_dir);
00083 }
00084
00085
00086 if (alternate_index != CACHE_ALT_REMOVED || !write_vector->count()) {
00087 SET_HANDLER(&CacheVC::openWriteCloseDir);
00088 return openWriteCloseDir(EVENT_IMMEDIATE, 0);
00089 }
00090 }
00091 if (update_key == od->single_doc_key && (total_len || !vec))
00092 od->move_resident_alt = 0;
00093 }
00094 if (cache_config_http_max_alts > 1 && write_vector->count() >= cache_config_http_max_alts && alternate_index < 0) {
00095 if (od->move_resident_alt && get_alternate_index(write_vector, od->single_doc_key) == 0)
00096 od->move_resident_alt = 0;
00097 write_vector->remove(0, true);
00098 }
00099 if (vec) {
00100
00101
00102
00103
00104 if (alternate_index >= 0)
00105 alternate.copy_frag_offsets_from(write_vector->get(alternate_index));
00106 alternate_index = write_vector->insert(&alternate, alternate_index);
00107 }
00108
00109 if (od->move_resident_alt && first_buf._ptr() && !od->has_multiple_writers()) {
00110 Doc *doc = (Doc *) first_buf->data();
00111 int small_doc = (int64_t)doc->data_len() < (int64_t)cache_config_alt_rewrite_max_size;
00112 int have_res_alt = doc->key == od->single_doc_key;
00113
00114
00115
00116
00117
00118
00119
00120
00121 if (small_doc && have_res_alt && (fragment || (f.update && !total_len))) {
00122
00123
00124 ink_assert(!fragment || f.data_done);
00125 od->move_resident_alt = 0;
00126 f.rewrite_resident_alt = 1;
00127 write_len = doc->data_len();
00128 Debug("cache_update_alt",
00129 "rewriting resident alt size: %d key: %X, first_key: %X", write_len, doc->key.slice32(0), first_key.slice32(0));
00130 }
00131 }
00132 header_len = write_vector->marshal_length();
00133 od->writing_vec = 1;
00134 f.use_first_key = 1;
00135 SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
00136 ret = do_write_call();
00137 }
00138 if (ret == EVENT_RETURN)
00139 return handleEvent(AIO_EVENT_DONE, 0);
00140 return ret;
00141 }
00142 #endif
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178 int
00179 CacheVC::handleWrite(int event, Event *)
00180 {
00181
00182 ink_assert(!trigger);
00183 frag_len = 0;
00184
00185 set_agg_write_in_progress();
00186 POP_HANDLER;
00187 agg_len = vol->round_to_approx_size(write_len + header_len + frag_len + sizeofDoc);
00188 vol->agg_todo_size += agg_len;
00189 bool agg_error =
00190 (agg_len > AGG_SIZE || header_len + sizeofDoc > MAX_FRAG_SIZE ||
00191 (!f.readers && (vol->agg_todo_size > cache_config_agg_write_backlog + AGG_SIZE) && write_len));
00192 #ifdef CACHE_AGG_FAIL_RATE
00193 agg_error = agg_error || ((uint32_t) mutex->thread_holding->generator.random() <
00194 (uint32_t) (UINT_MAX * CACHE_AGG_FAIL_RATE));
00195 #endif
00196 bool max_doc_error = (cache_config_max_doc_size &&
00197 (cache_config_max_doc_size < vio.ndone ||
00198 (vio.nbytes != INT64_MAX && (cache_config_max_doc_size < vio.nbytes))));
00199
00200 if (agg_error || max_doc_error) {
00201 CACHE_INCREMENT_DYN_STAT(cache_write_backlog_failure_stat);
00202 CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
00203 vol->agg_todo_size -= agg_len;
00204 io.aio_result = AIO_SOFT_FAILURE;
00205 if (event == EVENT_CALL)
00206 return EVENT_RETURN;
00207 return handleEvent(AIO_EVENT_DONE, 0);
00208 }
00209 ink_assert(agg_len <= AGG_SIZE);
00210 if (f.evac_vector)
00211 vol->agg.push(this);
00212 else
00213 vol->agg.enqueue(this);
00214 if (!vol->is_io_in_progress())
00215 return vol->aggWrite(event, this);
00216 return EVENT_CONT;
00217 }
00218
00219 static char *
00220 iobufferblock_memcpy(char *p, int len, IOBufferBlock *ab, int offset)
00221 {
00222 IOBufferBlock *b = ab;
00223 while (b && len >= 0) {
00224 char *start = b->_start;
00225 char *end = b->_end;
00226 int max_bytes = end - start;
00227 max_bytes -= offset;
00228 if (max_bytes <= 0) {
00229 offset = -max_bytes;
00230 b = b->next;
00231 continue;
00232 }
00233 int bytes = len;
00234 if (bytes >= max_bytes)
00235 bytes = max_bytes;
00236 ::memcpy(p, start + offset, bytes);
00237 p += bytes;
00238 len -= bytes;
00239 b = b->next;
00240 offset = 0;
00241 }
00242 return p;
00243 }
00244
00245 EvacuationBlock *
00246 Vol::force_evacuate_head(Dir *evac_dir, int pinned)
00247 {
00248
00249 EvacuationBlock *b = evacuation_block_exists(evac_dir, this);
00250
00251
00252 if (b && b->f.done)
00253 return b;
00254
00255 if (!b) {
00256 b = new_EvacuationBlock(mutex->thread_holding);
00257 b->dir = *evac_dir;
00258 DDebug("cache_evac", "force: %d, %d", (int) dir_offset(evac_dir), (int) dir_phase(evac_dir));
00259 evacuate[dir_evac_bucket(evac_dir)].push(b);
00260 }
00261 b->f.pinned = pinned;
00262 b->f.evacuate_head = 1;
00263 b->evac_frags.key = zero_key;
00264
00265 b->readers = 0;
00266 return b;
00267 }
00268
00269 void
00270 Vol::scan_for_pinned_documents()
00271 {
00272 if (cache_config_permit_pinning) {
00273
00274
00275 int ps = offset_to_vol_offset(this, header->write_pos + AGG_SIZE);
00276 int pe = offset_to_vol_offset(this, header->write_pos + 2 * EVACUATION_SIZE + (len / PIN_SCAN_EVERY));
00277 int vol_end_offset = offset_to_vol_offset(this, len + skip);
00278 int before_end_of_vol = pe < vol_end_offset;
00279 DDebug("cache_evac", "scan %d %d", ps, pe);
00280 for (int i = 0; i < vol_direntries(this); i++) {
00281
00282 if (!dir_is_empty(&dir[i]) && dir_pinned(&dir[i]) && dir_head(&dir[i])) {
00283
00284 int o = dir_offset(&dir[i]);
00285 if (dir_phase(&dir[i]) == header->phase) {
00286 if (before_end_of_vol || o >= (pe - vol_end_offset))
00287 continue;
00288 } else {
00289 if (o<ps || o>= pe)
00290 continue;
00291 }
00292 force_evacuate_head(&dir[i], 1);
00293
00294
00295 }
00296 }
00297 }
00298 }
00299
00300
00301
00302
00303
00304
00305 int
00306 Vol::aggWriteDone(int event, Event *e)
00307 {
00308 cancel_trigger();
00309
00310
00311
00312 CACHE_TRY_LOCK(lock, dir_sync_waiting ? cacheDirSync->mutex : mutex, mutex->thread_holding);
00313 if (!lock) {
00314 eventProcessor.schedule_in(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
00315 return EVENT_CONT;
00316 }
00317 if (io.ok()) {
00318 header->last_write_pos = header->write_pos;
00319 header->write_pos += io.aiocb.aio_nbytes;
00320 ink_assert(header->write_pos >= start);
00321 DDebug("cache_agg", "Dir %s, Write: %" PRIu64 ", last Write: %" PRIu64 "\n",
00322 hash_text.get(), header->write_pos, header->last_write_pos);
00323 ink_assert(header->write_pos == header->agg_pos);
00324 if (header->write_pos + EVACUATION_SIZE > scan_pos)
00325 periodic_scan();
00326 agg_buf_pos = 0;
00327 header->write_serial++;
00328 } else {
00329
00330
00331 Debug("cache_disk_error", "Write error on disk %s\n \
00332 write range : [%" PRIu64 " - %" PRIu64 " bytes] [%" PRIu64 " - %" PRIu64 " blocks] \n",
00333 hash_text.get(), (uint64_t)io.aiocb.aio_offset,
00334 (uint64_t)io.aiocb.aio_offset + io.aiocb.aio_nbytes,
00335 (uint64_t)io.aiocb.aio_offset / CACHE_BLOCK_SIZE,
00336 (uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / CACHE_BLOCK_SIZE);
00337 Dir del_dir;
00338 dir_clear(&del_dir);
00339 for (int done = 0; done < agg_buf_pos;) {
00340 Doc *doc = (Doc *) (agg_buffer + done);
00341 dir_set_offset(&del_dir, header->write_pos + done);
00342 dir_delete(&doc->key, this, &del_dir);
00343 done += round_to_approx_size(doc->len);
00344 }
00345 agg_buf_pos = 0;
00346 }
00347 set_io_not_in_progress();
00348
00349 CacheVC *c = 0;
00350 while ((c = sync.dequeue())) {
00351 if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial))
00352 c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
00353 else {
00354 sync.push(c);
00355 break;
00356 }
00357 }
00358 if (dir_sync_waiting) {
00359 dir_sync_waiting = 0;
00360 cacheDirSync->handleEvent(EVENT_IMMEDIATE, 0);
00361 }
00362 if (agg.head || sync.head)
00363 return aggWrite(event, e);
00364 return EVENT_CONT;
00365 }
00366
00367 CacheVC *
00368 new_DocEvacuator(int nbytes, Vol *vol)
00369 {
00370 CacheVC *c = new_CacheVC(vol);
00371 ProxyMutex *mutex = vol->mutex;
00372 c->base_stat = cache_evacuate_active_stat;
00373 CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
00374 c->buf = new_IOBufferData(iobuffer_size_to_index(nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
00375 c->vol = vol;
00376 c->f.evacuator = 1;
00377 c->earliest_key = zero_key;
00378 SET_CONTINUATION_HANDLER(c, &CacheVC::evacuateDocDone);
00379 return c;
00380 }
00381
00382 int
00383 CacheVC::evacuateReadHead(int , Event * )
00384 {
00385
00386 ink_assert(vol->mutex->thread_holding == this_ethread());
00387 cancel_trigger();
00388 Doc *doc = (Doc *) buf->data();
00389 #ifdef HTTP_CACHE
00390 CacheHTTPInfo *alternate_tmp = 0;
00391 #endif
00392 if (!io.ok())
00393 goto Ldone;
00394
00395 if (!dir_valid(vol, &dir)) {
00396 last_collision = NULL;
00397 goto Lcollision;
00398 }
00399 if (doc->magic != DOC_MAGIC || !(doc->first_key == first_key))
00400 goto Lcollision;
00401 #ifdef HTTP_CACHE
00402 alternate_tmp = 0;
00403 if (doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen) {
00404
00405 if (this->load_http_info(&vector, doc) != doc->hlen) {
00406 Note("bad vector detected during evacuation");
00407 goto Ldone;
00408 }
00409 alternate_index = get_alternate_index(&vector, earliest_key);
00410 if (alternate_index < 0)
00411 goto Ldone;
00412 alternate_tmp = vector.get(alternate_index);
00413 doc_len = alternate_tmp->object_size_get();
00414 Debug("cache_evac", "evacuateReadHead http earliest %X first: %X len: %" PRId64,
00415 first_key.slice32(0), earliest_key.slice32(0), doc_len);
00416 } else
00417 #endif
00418 {
00419
00420 CacheKey next_key;
00421 next_CacheKey(&next_key, &doc->key);
00422 if (!(next_key == earliest_key))
00423 goto Ldone;
00424 doc_len = doc->total_len;
00425 DDebug("cache_evac",
00426 "evacuateReadHead non-http earliest %X first: %X len: %" PRId64, first_key.slice32(0), earliest_key.slice32(0), doc_len);
00427 }
00428 if (doc_len == total_len) {
00429
00430
00431 dir_lookaside_fixup(&earliest_key, vol);
00432 return free_CacheVC(this);
00433 }
00434 return EVENT_CONT;
00435 Lcollision:
00436 if (dir_probe(&first_key, vol, &dir, &last_collision)) {
00437 int ret = do_read_call(&first_key);
00438 if (ret == EVENT_RETURN)
00439 return handleEvent(AIO_EVENT_DONE, 0);
00440 return ret;
00441 }
00442 Ldone:
00443 dir_lookaside_remove(&earliest_key, vol);
00444 return free_CacheVC(this);
00445 }
00446
00447 int
00448 CacheVC::evacuateDocDone(int , Event *)
00449 {
00450 ink_assert(vol->mutex->thread_holding == this_ethread());
00451 Doc *doc = (Doc *) buf->data();
00452 DDebug("cache_evac", "evacuateDocDone %X o %d p %d new_o %d new_p %d",
00453 (int) key.slice32(0), (int) dir_offset(&overwrite_dir),
00454 (int) dir_phase(&overwrite_dir), (int) dir_offset(&dir), (int) dir_phase(&dir));
00455 int i = dir_evac_bucket(&overwrite_dir);
00456
00457 EvacuationBlock *b = vol->evacuate[i].head;
00458 for (; b; b = b->link.next) {
00459 if (dir_offset(&b->dir) == dir_offset(&overwrite_dir)) {
00460
00461
00462
00463
00464
00465
00466
00467
00468 if (!dir_head(&overwrite_dir)) {
00469
00470 EvacuationKey *evac = &b->evac_frags;
00471 for (; evac && !(evac->key == doc->key); evac = evac->link.next);
00472 ink_assert(evac);
00473 if (!evac)
00474 break;
00475 if (evac->earliest_key.fold()) {
00476 DDebug("cache_evac", "evacdocdone: evacuating key %X earliest %X",
00477 evac->key.slice32(0), evac->earliest_key.slice32(0));
00478 EvacuationBlock *eblock = 0;
00479 Dir dir_tmp;
00480 dir_lookaside_probe(&evac->earliest_key, vol, &dir_tmp, &eblock);
00481 if (eblock) {
00482 CacheVC *earliest_evac = eblock->earliest_evacuator;
00483 earliest_evac->total_len += doc->data_len();
00484 if (earliest_evac->total_len == earliest_evac->doc_len) {
00485 dir_lookaside_fixup(&evac->earliest_key, vol);
00486 free_CacheVC(earliest_evac);
00487 }
00488 }
00489 }
00490 dir_overwrite(&doc->key, vol, &dir, &overwrite_dir);
00491 }
00492
00493
00494
00495
00496
00497 if (dir_head(&overwrite_dir) && b->f.evacuate_head) {
00498 DDebug("cache_evac",
00499 "evacuateDocDone evacuate_head %X %X hlen %d offset %d",
00500 (int) key.slice32(0), (int) doc->key.slice32(0), doc->hlen, (int) dir_offset(&overwrite_dir));
00501
00502 if (dir_compare_tag(&overwrite_dir, &doc->first_key)) {
00503 OpenDirEntry *cod;
00504 DDebug("cache_evac", "evacuating vector: %X %d",
00505 (int) doc->first_key.slice32(0), (int) dir_offset(&overwrite_dir));
00506 if ((cod = vol->open_read(&doc->first_key))) {
00507
00508 DDebug("cache_evac", "overwriting the open directory %X %d %d",
00509 (int) doc->first_key.slice32(0), (int) dir_offset(&cod->first_dir), (int) dir_offset(&dir));
00510 cod->first_dir = dir;
00511
00512 }
00513 if (dir_overwrite(&doc->first_key, vol, &dir, &overwrite_dir)) {
00514 int64_t o = dir_offset(&overwrite_dir), n = dir_offset(&dir);
00515 vol->ram_cache->fixup(&doc->first_key, (uint32_t)(o >> 32), (uint32_t)o, (uint32_t)(n >> 32), (uint32_t)n);
00516 }
00517 } else {
00518 DDebug("cache_evac", "evacuating earliest: %X %d", (int) doc->key.slice32(0), (int) dir_offset(&overwrite_dir));
00519 ink_assert(dir_compare_tag(&overwrite_dir, &doc->key));
00520 ink_assert(b->earliest_evacuator == this);
00521 total_len += doc->data_len();
00522 first_key = doc->first_key;
00523 earliest_dir = dir;
00524 if (dir_probe(&first_key, vol, &dir, &last_collision) > 0) {
00525 dir_lookaside_insert(b, vol, &earliest_dir);
00526
00527 SET_HANDLER(&CacheVC::evacuateReadHead);
00528 int ret = do_read_call(&first_key);
00529 if (ret == EVENT_RETURN)
00530 return handleEvent(AIO_EVENT_DONE, 0);
00531 return ret;
00532 }
00533 }
00534 }
00535 break;
00536 }
00537 }
00538 return free_CacheVC(this);
00539 }
00540
00541 static int
00542 evacuate_fragments(CacheKey *key, CacheKey *earliest_key, int force, Vol *vol)
00543 {
00544 Dir dir, *last_collision = 0;
00545 int i = 0;
00546 while (dir_probe(key, vol, &dir, &last_collision)) {
00547
00548
00549 if (dir_head(&dir)
00550 #if TS_USE_INTERIM_CACHE == 1
00551 || dir_ininterim(&dir)
00552 #endif
00553 )
00554 continue;
00555 EvacuationBlock *b = evacuation_block_exists(&dir, vol);
00556 if (!b) {
00557 b = new_EvacuationBlock(vol->mutex->thread_holding);
00558 b->dir = dir;
00559 b->evac_frags.key = *key;
00560 b->evac_frags.earliest_key = *earliest_key;
00561 vol->evacuate[dir_evac_bucket(&dir)].push(b);
00562 i++;
00563 } else {
00564 ink_assert(dir_offset(&dir) == dir_offset(&b->dir));
00565 ink_assert(dir_phase(&dir) == dir_phase(&b->dir));
00566 EvacuationKey *evac_frag = evacuationKeyAllocator.alloc();
00567 evac_frag->key = *key;
00568 evac_frag->earliest_key = *earliest_key;
00569 evac_frag->link.next = b->evac_frags.link.next;
00570 b->evac_frags.link.next = evac_frag;
00571 }
00572 if (force)
00573 b->readers = 0;
00574 DDebug("cache_evac",
00575 "next fragment %X Earliest: %X offset %d phase %d force %d",
00576 (int) key->slice32(0), (int) earliest_key->slice32(0), (int) dir_offset(&dir), (int) dir_phase(&dir), force);
00577 }
00578 return i;
00579 }
00580
00581 int
00582 Vol::evacuateWrite(CacheVC *evacuator, int event, Event *e)
00583 {
00584
00585
00586 evacuator->agg_len = round_to_approx_size(((Doc *)evacuator->buf->data())->len);
00587 agg_todo_size += evacuator->agg_len;
00588
00589 CacheVC *cur = (CacheVC *) agg.head;
00590 CacheVC *after = NULL;
00591 for (; cur && cur->f.evacuator; cur = (CacheVC *) cur->link.next)
00592 after = cur;
00593 ink_assert(evacuator->agg_len <= AGG_SIZE);
00594 agg.insert(evacuator, after);
00595 return aggWrite(event, e);
00596 }
00597
00598 int
00599 Vol::evacuateDocReadDone(int event, Event *e)
00600 {
00601 cancel_trigger();
00602 if (event != AIO_EVENT_DONE)
00603 return EVENT_DONE;
00604 ink_assert(is_io_in_progress());
00605 set_io_not_in_progress();
00606 ink_assert(mutex->thread_holding == this_ethread());
00607 Doc *doc = (Doc *) doc_evacuator->buf->data();
00608 CacheKey next_key;
00609 EvacuationBlock *b = NULL;
00610 if (doc->magic != DOC_MAGIC) {
00611 Debug("cache_evac", "DOC magic: %X %d",
00612 (int) dir_tag(&doc_evacuator->overwrite_dir), (int) dir_offset(&doc_evacuator->overwrite_dir));
00613 ink_assert(doc->magic == DOC_MAGIC);
00614 goto Ldone;
00615 }
00616 DDebug("cache_evac", "evacuateDocReadDone %X offset %d",
00617 (int) doc->key.slice32(0), (int) dir_offset(&doc_evacuator->overwrite_dir));
00618
00619 b = evacuate[dir_evac_bucket(&doc_evacuator->overwrite_dir)].head;
00620 while (b) {
00621 if (dir_offset(&b->dir) == dir_offset(&doc_evacuator->overwrite_dir))
00622 break;
00623 b = b->link.next;
00624 }
00625 if (!b)
00626 goto Ldone;
00627 if ((b->f.pinned && !b->readers) && doc->pinned < (uint32_t) (ink_get_based_hrtime() / HRTIME_SECOND))
00628 goto Ldone;
00629
00630 if (dir_head(&b->dir) && b->f.evacuate_head) {
00631 ink_assert(!b->evac_frags.key.fold());
00632
00633
00634 if (dir_compare_tag(&b->dir, &doc->first_key)) {
00635 doc_evacuator->key = doc->first_key;
00636 b->evac_frags.key = doc->first_key;
00637 DDebug("cache_evac", "evacuating vector %X offset %d",
00638 (int) doc->first_key.slice32(0), (int) dir_offset(&doc_evacuator->overwrite_dir));
00639 b->f.unused = 57;
00640 } else {
00641
00642
00643
00644
00645 doc_evacuator->key = doc->key;
00646 doc_evacuator->earliest_key = doc->key;
00647 b->evac_frags.key = doc->key;
00648 b->evac_frags.earliest_key = doc->key;
00649 b->earliest_evacuator = doc_evacuator;
00650 DDebug("cache_evac", "evacuating earliest %X %X evac: %p offset: %d",
00651 (int) b->evac_frags.key.slice32(0), (int) doc->key.slice32(0),
00652 doc_evacuator, (int) dir_offset(&doc_evacuator->overwrite_dir));
00653 b->f.unused = 67;
00654 }
00655 } else {
00656
00657 EvacuationKey *ek = &b->evac_frags;
00658 for (; ek && !(ek->key == doc->key); ek = ek->link.next);
00659 if (!ek) {
00660 b->f.unused = 77;
00661 goto Ldone;
00662 }
00663 doc_evacuator->key = ek->key;
00664 doc_evacuator->earliest_key = ek->earliest_key;
00665 DDebug("cache_evac", "evacuateDocReadDone key: %X earliest: %X",
00666 (int) ek->key.slice32(0), (int) ek->earliest_key.slice32(0));
00667 b->f.unused = 87;
00668 }
00669
00670
00671
00672
00673 if (!dir_head(&b->dir) || !dir_compare_tag(&b->dir, &doc->first_key)) {
00674 next_CacheKey(&next_key, &doc->key);
00675 evacuate_fragments(&next_key, &doc_evacuator->earliest_key, !b->readers, this);
00676 }
00677 return evacuateWrite(doc_evacuator, event, e);
00678 Ldone:
00679 free_CacheVC(doc_evacuator);
00680 doc_evacuator = 0;
00681 return aggWrite(event, e);
00682 }
00683
00684 int
00685 Vol::evac_range(off_t low, off_t high, int evac_phase)
00686 {
00687 off_t s = offset_to_vol_offset(this, low);
00688 off_t e = offset_to_vol_offset(this, high);
00689 int si = dir_offset_evac_bucket(s);
00690 int ei = dir_offset_evac_bucket(e);
00691
00692 for (int i = si; i <= ei; i++) {
00693 EvacuationBlock *b = evacuate[i].head;
00694 EvacuationBlock *first = 0;
00695 int64_t first_offset = INT64_MAX;
00696 for (; b; b = b->link.next) {
00697 int64_t offset = dir_offset(&b->dir);
00698 int phase = dir_phase(&b->dir);
00699 if (offset >= s && offset < e && !b->f.done && phase == evac_phase)
00700 if (offset < first_offset) {
00701 first = b;
00702 first_offset = offset;
00703 }
00704 }
00705 if (first) {
00706 first->f.done = 1;
00707 io.aiocb.aio_fildes = fd;
00708 io.aiocb.aio_nbytes = dir_approx_size(&first->dir);
00709 io.aiocb.aio_offset = vol_offset(this, &first->dir);
00710 if ((off_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > (off_t)(skip + len))
00711 io.aiocb.aio_nbytes = skip + len - io.aiocb.aio_offset;
00712 doc_evacuator = new_DocEvacuator(io.aiocb.aio_nbytes, this);
00713 doc_evacuator->overwrite_dir = first->dir;
00714
00715 io.aiocb.aio_buf = doc_evacuator->buf->data();
00716 io.action = this;
00717 io.thread = AIO_CALLBACK_THREAD_ANY;
00718 DDebug("cache_evac", "evac_range evacuating %X %d", (int)dir_tag(&first->dir), (int)dir_offset(&first->dir));
00719 SET_HANDLER(&Vol::evacuateDocReadDone);
00720 ink_assert(ink_aio_read(&io) >= 0);
00721 return -1;
00722 }
00723 }
00724 return 0;
00725 }
00726
00727
00728 static int
00729 agg_copy(char *p, CacheVC *vc)
00730 {
00731 Vol *vol = vc->vol;
00732 off_t o = vol->header->write_pos + vol->agg_buf_pos;
00733
00734 if (!vc->f.evacuator) {
00735 Doc *doc = (Doc *) p;
00736 IOBufferBlock *res_alt_blk = 0;
00737
00738 uint32_t len = vc->write_len + vc->header_len + vc->frag_len + sizeofDoc;
00739 ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeofDoc);
00740 ink_assert(vol->round_to_approx_size(len) == vc->agg_len);
00741
00742 dir_set_approx_size(&vc->dir, vc->agg_len);
00743 dir_set_offset(&vc->dir, offset_to_vol_offset(vol, o));
00744 ink_assert(vol_offset(vol, &vc->dir) < (vol->skip + vol->len));
00745 #if TS_USE_INTERIM_CACHE == 1
00746 dir_set_indisk(&vc->dir);
00747 #endif
00748 dir_set_phase(&vc->dir, vol->header->phase);
00749
00750
00751 doc->magic = DOC_MAGIC;
00752 doc->len = len;
00753 doc->hlen = vc->header_len;
00754 doc->doc_type = vc->frag_type;
00755 doc->v_major = CACHE_DB_MAJOR_VERSION;
00756 doc->v_minor = CACHE_DB_MINOR_VERSION;
00757 doc->unused = 0;
00758 doc->total_len = vc->total_len;
00759 doc->first_key = vc->first_key;
00760 doc->sync_serial = vol->header->sync_serial;
00761 vc->write_serial = doc->write_serial = vol->header->write_serial;
00762 doc->checksum = DOC_NO_CHECKSUM;
00763 if (vc->pin_in_cache) {
00764 dir_set_pinned(&vc->dir, 1);
00765 doc->pinned = (uint32_t) (ink_get_based_hrtime() / HRTIME_SECOND) + vc->pin_in_cache;
00766 } else {
00767 dir_set_pinned(&vc->dir, 0);
00768 doc->pinned = 0;
00769 }
00770
00771 if (vc->f.use_first_key) {
00772 if (doc->data_len()
00773 #ifdef HTTP_CACHE
00774 || vc->f.allow_empty_doc
00775 #endif
00776 )
00777 doc->key = vc->earliest_key;
00778 else
00779 prev_CacheKey(&doc->key, &vc->earliest_key);
00780 dir_set_head(&vc->dir, true);
00781 } else {
00782 doc->key = vc->key;
00783 dir_set_head(&vc->dir, !vc->fragment);
00784 }
00785
00786 #ifdef HTTP_CACHE
00787 if (vc->f.rewrite_resident_alt) {
00788 ink_assert(vc->f.use_first_key);
00789 Doc *res_doc = (Doc *) vc->first_buf->data();
00790 res_alt_blk = new_IOBufferBlock(vc->first_buf, res_doc->data_len(), sizeofDoc + res_doc->hlen);
00791 doc->key = res_doc->key;
00792 doc->total_len = res_doc->data_len();
00793 }
00794 #endif
00795
00796 if (vc->header_len) {
00797 ink_assert(vc->f.use_first_key);
00798 #ifdef HTTP_CACHE
00799 if (vc->frag_type == CACHE_FRAG_TYPE_HTTP) {
00800 ink_assert(vc->write_vector->count() > 0);
00801 if (!vc->f.update && !vc->f.evac_vector) {
00802 ink_assert(!(vc->first_key == zero_key));
00803 CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
00804 http_info->object_size_set(vc->total_len);
00805 }
00806
00807
00808 if (vc->f.update && vc->total_len) {
00809 CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
00810 http_info->object_size_set(vc->total_len);
00811 }
00812 ink_assert(!(((uintptr_t) &doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK));
00813 ink_assert(vc->header_len == vc->write_vector->marshal(doc->hdr(), vc->header_len));
00814 } else
00815 #endif
00816 memcpy(doc->hdr(), vc->header_to_write, vc->header_len);
00817
00818
00819 vc->f.single_fragment = doc->single_fragment();
00820 }
00821
00822 if (vc->write_len) {
00823 {
00824 ProxyMutex *mutex ATS_UNUSED = vc->vol->mutex;
00825 ink_assert(mutex->thread_holding == this_ethread());
00826 CACHE_DEBUG_SUM_DYN_STAT(cache_write_bytes_stat, vc->write_len);
00827 }
00828 #ifdef HTTP_CACHE
00829 if (vc->f.rewrite_resident_alt)
00830 iobufferblock_memcpy(doc->data(), vc->write_len, res_alt_blk, 0);
00831 else
00832 #endif
00833 iobufferblock_memcpy(doc->data(), vc->write_len, vc->blocks, vc->offset);
00834 #ifdef VERIFY_JTEST_DATA
00835 if (f.use_first_key && header_len) {
00836 int ib = 0, xd = 0;
00837 char xx[500];
00838 new_info.request_get().url_get().print(xx, 500, &ib, &xd);
00839 char *x = xx;
00840 for (int q = 0; q < 3; q++)
00841 x = strchr(x + 1, '/');
00842 ink_assert(!memcmp(doc->hdr(), x, ib - (x - xx)));
00843 }
00844 #endif
00845
00846 }
00847 if (cache_config_enable_checksum) {
00848 doc->checksum = 0;
00849 for (char *b = doc->hdr(); b < (char *) doc + doc->len; b++)
00850 doc->checksum += *b;
00851 }
00852 if (vc->frag_type == CACHE_FRAG_TYPE_HTTP && vc->f.single_fragment)
00853 ink_assert(doc->hlen);
00854
00855 if (res_alt_blk)
00856 res_alt_blk->free();
00857
00858 return vc->agg_len;
00859 } else {
00860
00861 Doc *doc = (Doc *) vc->buf->data();
00862 int l = vc->vol->round_to_approx_size(doc->len);
00863 {
00864 ProxyMutex *mutex ATS_UNUSED = vc->vol->mutex;
00865 ink_assert(mutex->thread_holding == this_ethread());
00866 CACHE_DEBUG_INCREMENT_DYN_STAT(cache_gc_frags_evacuated_stat);
00867 CACHE_DEBUG_SUM_DYN_STAT(cache_gc_bytes_evacuated_stat, l);
00868 }
00869
00870 doc->sync_serial = vc->vol->header->sync_serial;
00871 doc->write_serial = vc->vol->header->write_serial;
00872
00873 memcpy(p, doc, doc->len);
00874
00875 vc->dir = vc->overwrite_dir;
00876 dir_set_offset(&vc->dir, offset_to_vol_offset(vc->vol, o));
00877 dir_set_phase(&vc->dir, vc->vol->header->phase);
00878 #if TS_USE_INTERIM_CACHE == 1
00879 dir_set_indisk(&vc->dir);
00880 #endif
00881 return l;
00882 }
00883 }
00884
00885 inline void
00886 Vol::evacuate_cleanup_blocks(int i)
00887 {
00888 EvacuationBlock *b = evacuate[i].head;
00889 while (b) {
00890 if (b->f.done &&
00891 ((header->phase != dir_phase(&b->dir) &&
00892 header->write_pos > vol_offset(this, &b->dir)) ||
00893 (header->phase == dir_phase(&b->dir) && header->write_pos <= vol_offset(this, &b->dir)))) {
00894 EvacuationBlock *x = b;
00895 DDebug("cache_evac", "evacuate cleanup free %X offset %d",
00896 (int) b->evac_frags.key.slice32(0), (int) dir_offset(&b->dir));
00897 b = b->link.next;
00898 evacuate[i].remove(x);
00899 free_EvacuationBlock(x, mutex->thread_holding);
00900 continue;
00901 }
00902 b = b->link.next;
00903 }
00904 }
00905
00906 void
00907 Vol::evacuate_cleanup()
00908 {
00909 int64_t eo = ((header->write_pos - start) / CACHE_BLOCK_SIZE) + 1;
00910 int64_t e = dir_offset_evac_bucket(eo);
00911 int64_t sx = e - (evacuate_size / PIN_SCAN_EVERY) - 1;
00912 int64_t s = sx;
00913 int i;
00914
00915 if (e > evacuate_size)
00916 e = evacuate_size;
00917 if (sx < 0)
00918 s = 0;
00919 for (i = s; i < e; i++)
00920 evacuate_cleanup_blocks(i);
00921
00922
00923 if (sx <= 0) {
00924 s = evacuate_size + sx - 2;
00925 if (s < 0)
00926 s = 0;
00927 for (i = s; i < evacuate_size; i++)
00928 evacuate_cleanup_blocks(i);
00929 }
00930 }
00931
00932 void
00933 Vol::periodic_scan()
00934 {
00935 evacuate_cleanup();
00936 scan_for_pinned_documents();
00937 if (header->write_pos == start)
00938 scan_pos = start;
00939 scan_pos += len / PIN_SCAN_EVERY;
00940 }
00941
00942 void
00943 Vol::agg_wrap()
00944 {
00945 header->write_pos = start;
00946 header->phase = !header->phase;
00947
00948 header->cycle++;
00949 header->agg_pos = header->write_pos;
00950 dir_lookaside_cleanup(this);
00951 dir_clean_vol(this);
00952 periodic_scan();
00953 }
00954
00955
00956
00957
00958
00959
00960
00961
00962 int
00963 Vol::aggWrite(int event, void *)
00964 {
00965 ink_assert(!is_io_in_progress());
00966
00967 Que(CacheVC, link) tocall;
00968 CacheVC *c;
00969
00970 cancel_trigger();
00971
00972 Lagain:
00973
00974 for (c = (CacheVC *) agg.head; c;) {
00975 int writelen = c->agg_len;
00976
00977 ink_assert(writelen <= AGG_SIZE);
00978 if (agg_buf_pos + writelen > AGG_SIZE ||
00979 header->write_pos + agg_buf_pos + writelen > (skip + len))
00980 break;
00981 DDebug("agg_read", "copying: %d, %" PRIu64 ", key: %d",
00982 agg_buf_pos, header->write_pos + agg_buf_pos, c->first_key.slice32(0));
00983 int wrotelen = agg_copy(agg_buffer + agg_buf_pos, c);
00984 ink_assert(writelen == wrotelen);
00985 agg_todo_size -= writelen;
00986 agg_buf_pos += writelen;
00987 CacheVC *n = (CacheVC *)c->link.next;
00988 agg.dequeue();
00989 if (c->f.sync && c->f.use_first_key) {
00990 CacheVC *last = sync.tail;
00991 while (last && UINT_WRAP_LT(c->write_serial, last->write_serial))
00992 last = (CacheVC*)last->link.prev;
00993 sync.insert(c, last);
00994 } else if (c->f.evacuator)
00995 c->handleEvent(AIO_EVENT_DONE, 0);
00996 else
00997 tocall.enqueue(c);
00998 c = n;
00999 }
01000
01001
01002 if (!agg_buf_pos) {
01003 if (!agg.head && !sync.head)
01004 return EVENT_CONT;
01005 if (header->write_pos == start) {
01006
01007 Note("write aggregation exceeds vol size");
01008 ink_assert(!tocall.head);
01009 ink_assert(false);
01010 while ((c = agg.dequeue())) {
01011 agg_todo_size -= c->agg_len;
01012 if (c->initial_thread != NULL)
01013 c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
01014 else
01015 eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE);
01016 }
01017 return EVENT_CONT;
01018 }
01019
01020 if (agg.head) {
01021 agg_wrap();
01022 goto Lagain;
01023 }
01024 }
01025
01026
01027 off_t end = header->write_pos + agg_buf_pos + EVACUATION_SIZE;
01028 if (evac_range(header->write_pos, end, !header->phase) < 0)
01029 goto Lwait;
01030 if (end > skip + len)
01031 if (evac_range(start, start + (end - (skip + len)), header->phase) < 0)
01032 goto Lwait;
01033
01034
01035
01036 if (agg_buf_pos < AGG_HIGH_WATER && !agg.head && !sync.head && !dir_sync_waiting)
01037 goto Lwait;
01038
01039
01040 if (!agg_buf_pos) {
01041 ink_assert(sync.head);
01042 int l = round_to_approx_size(sizeof(Doc));
01043 agg_buf_pos = l;
01044 Doc *d = (Doc*)agg_buffer;
01045 memset(d, 0, sizeof(Doc));
01046 d->magic = DOC_MAGIC;
01047 d->len = l;
01048 d->sync_serial = header->sync_serial;
01049 d->write_serial = header->write_serial;
01050 }
01051
01052
01053 header->agg_pos = header->write_pos + agg_buf_pos;
01054
01055 io.aiocb.aio_fildes = fd;
01056 io.aiocb.aio_offset = header->write_pos;
01057 io.aiocb.aio_buf = agg_buffer;
01058 io.aiocb.aio_nbytes = agg_buf_pos;
01059 io.action = this;
01060
01061
01062
01063
01064
01065 io.thread = AIO_CALLBACK_THREAD_AIO;
01066 SET_HANDLER(&Vol::aggWriteDone);
01067 ink_aio_write(&io);
01068
01069 Lwait:
01070 int ret = EVENT_CONT;
01071 while ((c = tocall.dequeue())) {
01072 if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding)
01073 ret = EVENT_RETURN;
01074 else if (c->initial_thread != NULL)
01075 c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
01076 else
01077 eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE);
01078 }
01079 return ret;
01080 }
01081
01082 int
01083 CacheVC::openWriteCloseDir(int , Event *)
01084 {
01085 cancel_trigger();
01086 {
01087 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
01088 if (!lock) {
01089 SET_HANDLER(&CacheVC::openWriteCloseDir);
01090 ink_assert(!is_io_in_progress());
01091 VC_SCHED_LOCK_RETRY();
01092 }
01093 vol->close_write(this);
01094 if (closed < 0 && fragment)
01095 dir_delete(&earliest_key, vol, &earliest_dir);
01096 }
01097 if (is_debug_tag_set("cache_update")) {
01098 if (f.update && closed > 0) {
01099 if (!total_len && alternate_index != CACHE_ALT_REMOVED) {
01100 Debug("cache_update", "header only %d (%" PRIu64 ", %" PRIu64 ")\n",
01101 DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0], update_key.b[1]);
01102
01103 } else if (total_len && alternate_index != CACHE_ALT_REMOVED) {
01104 Debug("cache_update", "header body, %d, (%" PRIu64 ", %" PRIu64 "), (%" PRIu64 ", %" PRIu64 ")\n",
01105 DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0], update_key.b[1], earliest_key.b[0], earliest_key.b[1]);
01106 } else if (!total_len && alternate_index == CACHE_ALT_REMOVED) {
01107 Debug("cache_update", "alt delete, %d, (%" PRIu64 ", %" PRIu64 ")\n",
01108 DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0], update_key.b[1]);
01109 }
01110 }
01111 }
01112
01113
01114
01115
01116
01117 if ((closed == 1) && (total_len > 0
01118 #ifdef HTTP_CACHE
01119 || f.allow_empty_doc
01120 #endif
01121 )) {
01122 DDebug("cache_stats", "Fragment = %d", fragment);
01123 switch (fragment) {
01124 case 0: CACHE_INCREMENT_DYN_STAT(cache_single_fragment_document_count_stat); break;
01125 case 1: CACHE_INCREMENT_DYN_STAT(cache_two_fragment_document_count_stat); break;
01126 default: CACHE_INCREMENT_DYN_STAT(cache_three_plus_plus_fragment_document_count_stat); break;
01127 }
01128 }
01129 if (f.close_complete) {
01130 recursive++;
01131 ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
01132 vio._cont->handleEvent(VC_EVENT_WRITE_COMPLETE, (void *) &vio);
01133 recursive--;
01134 }
01135 return free_CacheVC(this);
01136 }
01137
01138 int
01139 CacheVC::openWriteCloseHeadDone(int event, Event *e)
01140 {
01141 if (event == AIO_EVENT_DONE)
01142 set_io_not_in_progress();
01143 else if (is_io_in_progress())
01144 return EVENT_CONT;
01145 {
01146 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
01147 if (!lock)
01148 VC_LOCK_RETRY_EVENT();
01149 od->writing_vec = 0;
01150 if (!io.ok())
01151 goto Lclose;
01152 ink_assert(f.use_first_key);
01153 if (!od->dont_update_directory) {
01154 if (dir_is_empty(&od->first_dir)) {
01155 dir_insert(&first_key, vol, &dir);
01156 } else {
01157
01158 dir_overwrite(&first_key, vol, &dir, &od->first_dir, false);
01159
01160 if (od->move_resident_alt) {
01161 if (dir_valid(vol, &od->single_doc_dir))
01162 dir_insert(&od->single_doc_key, vol, &od->single_doc_dir);
01163 od->move_resident_alt = 0;
01164 }
01165 }
01166 od->first_dir = dir;
01167 if (frag_type == CACHE_FRAG_TYPE_HTTP && f.single_fragment) {
01168
01169 od->move_resident_alt = 1;
01170 if (!f.rewrite_resident_alt) {
01171 od->single_doc_key = earliest_key;
01172 }
01173 dir_assign(&od->single_doc_dir, &dir);
01174 dir_set_tag(&od->single_doc_dir, od->single_doc_key.slice32(2));
01175 }
01176 }
01177 }
01178 Lclose:
01179 return openWriteCloseDir(event, e);
01180 }
01181
01182 int
01183 CacheVC::openWriteCloseHead(int event, Event *e)
01184 {
01185 cancel_trigger();
01186 f.use_first_key = 1;
01187 if (io.ok())
01188 ink_assert(fragment || (length == (int64_t)total_len));
01189 else
01190 return openWriteCloseDir(event, e);
01191 if (f.data_done)
01192 write_len = 0;
01193 else
01194 write_len = length;
01195 #ifdef HTTP_CACHE
01196 if (frag_type == CACHE_FRAG_TYPE_HTTP) {
01197 SET_HANDLER(&CacheVC::updateVector);
01198 return updateVector(EVENT_IMMEDIATE, 0);
01199 } else {
01200 #endif
01201 header_len = header_to_write_len;
01202 SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
01203 return do_write_lock();
01204 #ifdef HTTP_CACHE
01205 }
01206 #endif
01207 }
01208
01209 int
01210 CacheVC::openWriteCloseDataDone(int event, Event *e)
01211 {
01212 int ret = 0;
01213 cancel_trigger();
01214
01215 if (event == AIO_EVENT_DONE)
01216 set_io_not_in_progress();
01217 else if (is_io_in_progress())
01218 return EVENT_CONT;
01219 if (!io.ok())
01220 return openWriteCloseDir(event, e);
01221 {
01222 CACHE_TRY_LOCK(lock, vol->mutex, this_ethread());
01223 if (!lock)
01224 VC_LOCK_RETRY_EVENT();
01225 if (!fragment) {
01226 ink_assert(key == earliest_key);
01227 earliest_dir = dir;
01228 #ifdef HTTP_CACHE
01229 } else {
01230
01231
01232 if (alternate.valid())
01233 alternate.push_frag_offset(write_pos);
01234 #endif
01235 }
01236 fragment++;
01237 write_pos += write_len;
01238 dir_insert(&key, vol, &dir);
01239 blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
01240 next_CacheKey(&key, &key);
01241 if (length) {
01242 write_len = length;
01243 if (write_len > MAX_FRAG_SIZE)
01244 write_len = MAX_FRAG_SIZE;
01245 if ((ret = do_write_call()) == EVENT_RETURN)
01246 goto Lcallreturn;
01247 return ret;
01248 }
01249 f.data_done = 1;
01250 return openWriteCloseHead(event, e);
01251 }
01252 Lcallreturn:
01253 return handleEvent(AIO_EVENT_DONE, 0);
01254 }
01255
01256 int
01257 CacheVC::openWriteClose(int event, Event *e)
01258 {
01259 cancel_trigger();
01260 if (is_io_in_progress()) {
01261 if (event != AIO_EVENT_DONE)
01262 return EVENT_CONT;
01263 set_io_not_in_progress();
01264 if (!io.ok())
01265 return openWriteCloseDir(event, e);
01266 }
01267 if (closed > 0
01268 #ifdef HTTP_CACHE
01269 || f.allow_empty_doc
01270 #endif
01271 ) {
01272 if (total_len == 0) {
01273 #ifdef HTTP_CACHE
01274 if (f.update || f.allow_empty_doc) {
01275 return updateVector(event, e);
01276 } else {
01277
01278
01279 closed = -1;
01280 return openWriteCloseDir(event, e);
01281 }
01282 #else
01283 return openWriteCloseDir(event, e);
01284 #endif
01285 }
01286 if (length && (fragment || length > MAX_FRAG_SIZE)) {
01287 SET_HANDLER(&CacheVC::openWriteCloseDataDone);
01288 write_len = length;
01289 if (write_len > MAX_FRAG_SIZE)
01290 write_len = MAX_FRAG_SIZE;
01291 return do_write_lock_call();
01292 } else
01293 return openWriteCloseHead(event, e);
01294 } else
01295 return openWriteCloseDir(event, e);
01296 }
01297
01298 int
01299 CacheVC::openWriteWriteDone(int event, Event *e)
01300 {
01301 cancel_trigger();
01302 if (event == AIO_EVENT_DONE)
01303 set_io_not_in_progress();
01304 else
01305 if (is_io_in_progress())
01306 return EVENT_CONT;
01307
01308 if (!io.ok()) {
01309 if (closed) {
01310 closed = -1;
01311 return die();
01312 }
01313 SET_HANDLER(&CacheVC::openWriteMain);
01314 return calluser(VC_EVENT_ERROR);
01315 }
01316 {
01317 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
01318 if (!lock)
01319 VC_LOCK_RETRY_EVENT();
01320
01321
01322 if (!fragment) {
01323 ink_assert(key == earliest_key);
01324 earliest_dir = dir;
01325 #ifdef HTTP_CACHE
01326 } else {
01327
01328
01329 if (alternate.valid())
01330 alternate.push_frag_offset(write_pos);
01331 #endif
01332 }
01333 ++fragment;
01334 write_pos += write_len;
01335 dir_insert(&key, vol, &dir);
01336 DDebug("cache_insert", "WriteDone: %X, %X, %d", key.slice32(0), first_key.slice32(0), write_len);
01337 blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
01338 next_CacheKey(&key, &key);
01339 }
01340 if (closed)
01341 return die();
01342 SET_HANDLER(&CacheVC::openWriteMain);
01343 return openWriteMain(event, e);
01344 }
01345
01346 static inline int target_fragment_size() {
01347 return cache_config_target_fragment_size - sizeofDoc;
01348 }
01349
01350 int
01351 CacheVC::openWriteMain(int , Event *)
01352 {
01353 cancel_trigger();
01354 int called_user = 0;
01355 ink_assert(!is_io_in_progress());
01356 Lagain:
01357 if (!vio.buffer.writer()) {
01358 if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE)
01359 return EVENT_DONE;
01360 if (!vio.buffer.writer())
01361 return EVENT_CONT;
01362 }
01363 if (vio.ntodo() <= 0) {
01364 called_user = 1;
01365 if (calluser(VC_EVENT_WRITE_COMPLETE) == EVENT_DONE)
01366 return EVENT_DONE;
01367 ink_assert(!f.close_complete || !"close expected after write COMPLETE");
01368 if (vio.ntodo() <= 0)
01369 return EVENT_CONT;
01370 }
01371 int64_t ntodo = (int64_t)(vio.ntodo() + length);
01372 int64_t total_avail = vio.buffer.reader()->read_avail();
01373 int64_t avail = total_avail;
01374 int64_t towrite = avail + length;
01375 if (towrite > ntodo) {
01376 avail -= (towrite - ntodo);
01377 towrite = ntodo;
01378 }
01379 if (towrite > MAX_FRAG_SIZE) {
01380 avail -= (towrite - MAX_FRAG_SIZE);
01381 towrite = MAX_FRAG_SIZE;
01382 }
01383 if (!blocks && towrite) {
01384 blocks = vio.buffer.reader()->block;
01385 offset = vio.buffer.reader()->start_offset;
01386 }
01387 if (avail > 0) {
01388 vio.buffer.reader()->consume(avail);
01389 vio.ndone += avail;
01390 total_len += avail;
01391 }
01392 length = (uint64_t)towrite;
01393 if (length > target_fragment_size() &&
01394 (length < target_fragment_size() + target_fragment_size() / 4))
01395 write_len = target_fragment_size();
01396 else
01397 write_len = length;
01398 bool not_writing = towrite != ntodo && towrite < target_fragment_size();
01399 if (!called_user) {
01400 if (not_writing) {
01401 called_user = 1;
01402 if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE)
01403 return EVENT_DONE;
01404 goto Lagain;
01405 } else if (vio.ntodo() <= 0)
01406 goto Lagain;
01407 }
01408 if (not_writing)
01409 return EVENT_CONT;
01410 if (towrite == ntodo && f.close_complete) {
01411 closed = 1;
01412 SET_HANDLER(&CacheVC::openWriteClose);
01413 return openWriteClose(EVENT_NONE, NULL);
01414 }
01415 SET_HANDLER(&CacheVC::openWriteWriteDone);
01416 return do_write_lock_call();
01417 }
01418
01419
01420 int
01421 CacheVC::openWriteOverwrite(int event, Event *e)
01422 {
01423 cancel_trigger();
01424 if (event != AIO_EVENT_DONE) {
01425 if (event == EVENT_IMMEDIATE)
01426 last_collision = 0;
01427 } else {
01428 Doc *doc = NULL;
01429 set_io_not_in_progress();
01430 if (_action.cancelled)
01431 return openWriteCloseDir(event, e);
01432 if (!io.ok())
01433 goto Ldone;
01434 doc = (Doc *) buf->data();
01435 if (!(doc->first_key == first_key))
01436 goto Lcollision;
01437 od->first_dir = dir;
01438 first_buf = buf;
01439 goto Ldone;
01440 }
01441 Lcollision:
01442 {
01443 CACHE_TRY_LOCK(lock, vol->mutex, this_ethread());
01444 if (!lock)
01445 VC_LOCK_RETRY_EVENT();
01446 int res = dir_probe(&first_key, vol, &dir, &last_collision);
01447 if (res > 0) {
01448 if ((res = do_read_call(&first_key)) == EVENT_RETURN)
01449 goto Lcallreturn;
01450 return res;
01451 }
01452 }
01453 Ldone:
01454 SET_HANDLER(&CacheVC::openWriteMain);
01455 return callcont(CACHE_EVENT_OPEN_WRITE);
01456 Lcallreturn:
01457 return handleEvent(AIO_EVENT_DONE, 0);
01458 }
01459
01460 #ifdef HTTP_CACHE
01461
01462
01463 int
01464 CacheVC::openWriteStartDone(int event, Event *e)
01465 {
01466 intptr_t err = ECACHE_NO_DOC;
01467 cancel_trigger();
01468 if (is_io_in_progress()) {
01469 if (event != AIO_EVENT_DONE)
01470 return EVENT_CONT;
01471 set_io_not_in_progress();
01472 }
01473 {
01474 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
01475 if (!lock)
01476 VC_LOCK_RETRY_EVENT();
01477
01478 if (_action.cancelled && (!od || !od->has_multiple_writers()))
01479 goto Lcancel;
01480
01481 if (event == AIO_EVENT_DONE) {
01482 Doc *doc = (Doc *) buf->data();
01483 if (!io.ok()) {
01484 err = ECACHE_READ_FAIL;
01485 goto Lfailure;
01486 }
01487
01488
01489
01490
01491
01492
01493 if (!dir_valid(vol, &dir)) {
01494 DDebug("cache_write",
01495 "OpenReadStartDone: Dir not valid: Write Head: %" PRId64 ", Dir: %" PRId64,
01496 (int64_t)offset_to_vol_offset(vol, vol->header->write_pos), dir_offset(&dir));
01497 last_collision = NULL;
01498 goto Lcollision;
01499 }
01500 if (!(doc->first_key == first_key))
01501 goto Lcollision;
01502
01503 if (doc->magic != DOC_MAGIC || !doc->hlen ||
01504 this->load_http_info(write_vector, doc, buf) != doc->hlen) {
01505 err = ECACHE_BAD_META_DATA;
01506 #if TS_USE_INTERIM_CACHE == 1
01507 if (dir_ininterim(&dir)) {
01508 dir_delete(&first_key, vol, &dir);
01509 last_collision = NULL;
01510 goto Lcollision;
01511 }
01512 #endif
01513 goto Lfailure;
01514 }
01515 ink_assert(write_vector->count() > 0);
01516 #if TS_USE_INTERIM_CACHE == 1
01517 Lagain:
01518 if (dir_ininterim(&dir)) {
01519 dir_delete(&first_key, vol, &dir);
01520 last_collision = NULL;
01521 if (dir_probe(&first_key, vol, &dir, &last_collision)) {
01522 goto Lagain;
01523 } else {
01524 if (f.update) {
01525
01526 goto Lfailure;
01527 }
01528 }
01529 }
01530 #endif
01531 od->first_dir = dir;
01532 first_dir = dir;
01533 if (doc->single_fragment()) {
01534
01535 od->move_resident_alt = 1;
01536 od->single_doc_key = doc->key;
01537 dir_assign(&od->single_doc_dir, &dir);
01538 dir_set_tag(&od->single_doc_dir, od->single_doc_key.slice32(2));
01539 }
01540 first_buf = buf;
01541 goto Lsuccess;
01542 }
01543
01544 Lcollision:
01545 int if_writers = ((uintptr_t) info == CACHE_ALLOW_MULTIPLE_WRITES);
01546 if (!od) {
01547 if ((err = vol->open_write(
01548 this, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0)
01549 goto Lfailure;
01550 if (od->has_multiple_writers()) {
01551 MUTEX_RELEASE(lock);
01552 SET_HANDLER(&CacheVC::openWriteMain);
01553 return callcont(CACHE_EVENT_OPEN_WRITE);
01554 }
01555 }
01556
01557 if (dir_probe(&first_key, vol, &dir, &last_collision)) {
01558 od->reading_vec = 1;
01559 int ret = do_read_call(&first_key);
01560 if (ret == EVENT_RETURN)
01561 goto Lcallreturn;
01562 return ret;
01563 }
01564 if (f.update) {
01565
01566 goto Lfailure;
01567 }
01568 }
01569 Lsuccess:
01570 od->reading_vec = 0;
01571 if (_action.cancelled)
01572 goto Lcancel;
01573 SET_HANDLER(&CacheVC::openWriteMain);
01574 return callcont(CACHE_EVENT_OPEN_WRITE);
01575
01576 Lfailure:
01577 CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
01578 _action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -err);
01579 Lcancel:
01580 if (od) {
01581 od->reading_vec = 0;
01582 return openWriteCloseDir(event, e);
01583 } else
01584 return free_CacheVC(this);
01585 Lcallreturn:
01586 return handleEvent(AIO_EVENT_DONE, 0);
01587 }
01588 #endif
01589
01590
01591 int
01592 CacheVC::openWriteStartBegin(int , Event *)
01593 {
01594 intptr_t err;
01595 cancel_trigger();
01596 if (_action.cancelled)
01597 return free_CacheVC(this);
01598 if (((err = vol->open_write_lock(this, false, 1)) > 0)) {
01599 CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
01600 free_CacheVC(this);
01601 _action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -err);
01602 return EVENT_DONE;
01603 }
01604 if (err < 0)
01605 VC_SCHED_LOCK_RETRY();
01606 if (f.overwrite) {
01607 SET_HANDLER(&CacheVC::openWriteOverwrite);
01608 return openWriteOverwrite(EVENT_IMMEDIATE, 0);
01609 } else {
01610
01611 SET_HANDLER(&CacheVC::openWriteMain);
01612 return callcont(CACHE_EVENT_OPEN_WRITE);
01613 }
01614 }
01615
01616
01617 Action *
01618 Cache::open_write(Continuation *cont, CacheKey *key, CacheFragType frag_type,
01619 int options, time_t apin_in_cache, char *hostname, int host_len)
01620 {
01621
01622 if (!CacheProcessor::IsCacheReady(frag_type)) {
01623 cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -ECACHE_NOT_READY);
01624 return ACTION_RESULT_DONE;
01625 }
01626
01627 ink_assert(caches[frag_type] == this);
01628
01629 intptr_t res = 0;
01630 CacheVC *c = new_CacheVC(cont);
01631 ProxyMutex *mutex = cont->mutex;
01632 MUTEX_LOCK(lock, c->mutex, this_ethread());
01633 c->vio.op = VIO::WRITE;
01634 c->base_stat = cache_write_active_stat;
01635 c->vol = key_to_vol(key, hostname, host_len);
01636 Vol *vol = c->vol;
01637 CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
01638 c->first_key = c->key = *key;
01639 c->frag_type = frag_type;
01640
01641
01642
01643
01644
01645
01646
01647 do {
01648 rand_CacheKey(&c->key, cont->mutex);
01649 } while (DIR_MASK_TAG(c->key.slice32(2)) == DIR_MASK_TAG(c->first_key.slice32(2)));
01650 c->earliest_key = c->key;
01651 #ifdef HTTP_CACHE
01652 c->info = 0;
01653 #endif
01654 c->f.overwrite = (options & CACHE_WRITE_OPT_OVERWRITE) != 0;
01655 c->f.close_complete = (options & CACHE_WRITE_OPT_CLOSE_COMPLETE) != 0;
01656 c->f.sync = (options & CACHE_WRITE_OPT_SYNC) == CACHE_WRITE_OPT_SYNC;
01657 c->pin_in_cache = (uint32_t) apin_in_cache;
01658
01659 if ((res = c->vol->open_write_lock(c, false, 1)) > 0) {
01660
01661 CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_FAILURE);
01662 cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -res);
01663 free_CacheVC(c);
01664 return ACTION_RESULT_DONE;
01665 }
01666 if (res < 0) {
01667 SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartBegin);
01668 c->trigger = CONT_SCHED_LOCK_RETRY(c);
01669 return &c->_action;
01670 }
01671 if (!c->f.overwrite) {
01672 SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
01673 c->callcont(CACHE_EVENT_OPEN_WRITE);
01674 return ACTION_RESULT_DONE;
01675 } else {
01676 SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteOverwrite);
01677 if (c->openWriteOverwrite(EVENT_IMMEDIATE, 0) == EVENT_DONE)
01678 return ACTION_RESULT_DONE;
01679 else
01680 return &c->_action;
01681 }
01682 }
01683
01684 #ifdef HTTP_CACHE
01685
01686 Action *
01687 Cache::open_write(Continuation *cont, CacheKey *key, CacheHTTPInfo *info, time_t apin_in_cache,
01688 CacheKey *, CacheFragType type, char *hostname, int host_len)
01689 {
01690 if (!CacheProcessor::IsCacheReady(type)) {
01691 cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -ECACHE_NOT_READY);
01692 return ACTION_RESULT_DONE;
01693 }
01694
01695 ink_assert(caches[type] == this);
01696 intptr_t err = 0;
01697 int if_writers = (uintptr_t) info == CACHE_ALLOW_MULTIPLE_WRITES;
01698 CacheVC *c = new_CacheVC(cont);
01699 ProxyMutex *mutex = cont->mutex;
01700 c->vio.op = VIO::WRITE;
01701 c->first_key = *key;
01702
01703
01704
01705
01706
01707
01708
01709 do {
01710 rand_CacheKey(&c->key, cont->mutex);
01711 }
01712 while (DIR_MASK_TAG(c->key.slice32(2)) == DIR_MASK_TAG(c->first_key.slice32(2)));
01713 c->earliest_key = c->key;
01714 c->frag_type = CACHE_FRAG_TYPE_HTTP;
01715 c->vol = key_to_vol(key, hostname, host_len);
01716 Vol *vol = c->vol;
01717 c->info = info;
01718 if (c->info && (uintptr_t) info != CACHE_ALLOW_MULTIPLE_WRITES) {
01719
01720
01721
01722
01723
01724
01725
01726
01727
01728
01729
01730
01731
01732
01733
01734
01735
01736
01737
01738
01739
01740
01741
01742
01743
01744
01745
01746
01747
01748 c->f.update = 1;
01749 c->base_stat = cache_update_active_stat;
01750 DDebug("cache_update", "Update called");
01751 info->object_key_get(&c->update_key);
01752 ink_assert(!(c->update_key == zero_key));
01753 c->update_len = info->object_size_get();
01754 } else
01755 c->base_stat = cache_write_active_stat;
01756 CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
01757 c->pin_in_cache = (uint32_t) apin_in_cache;
01758
01759 {
01760 CACHE_TRY_LOCK(lock, c->vol->mutex, cont->mutex->thread_holding);
01761 if (lock) {
01762 if ((err = c->vol->open_write(c, if_writers,
01763 cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0)
01764 goto Lfailure;
01765
01766
01767
01768 if (c->od->has_multiple_writers())
01769 goto Lmiss;
01770 if (!dir_probe(key, c->vol, &c->dir, &c->last_collision)) {
01771 if (c->f.update) {
01772
01773
01774 err = ECACHE_NO_DOC;
01775 goto Lfailure;
01776 }
01777
01778 goto Lmiss;
01779 } else {
01780 c->od->reading_vec = 1;
01781
01782 SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
01783 switch (c->do_read_call(&c->first_key)) {
01784 case EVENT_DONE: return ACTION_RESULT_DONE;
01785 case EVENT_RETURN: goto Lcallreturn;
01786 default: return &c->_action;
01787 }
01788 }
01789 }
01790
01791 SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
01792 CONT_SCHED_LOCK_RETRY(c);
01793 return &c->_action;
01794 }
01795
01796 Lmiss:
01797 SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
01798 c->callcont(CACHE_EVENT_OPEN_WRITE);
01799 return ACTION_RESULT_DONE;
01800
01801 Lfailure:
01802 CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_FAILURE);
01803 cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -err);
01804 if (c->od) {
01805 c->openWriteCloseDir(EVENT_IMMEDIATE, 0);
01806 return ACTION_RESULT_DONE;
01807 }
01808 free_CacheVC(c);
01809 return ACTION_RESULT_DONE;
01810
01811 Lcallreturn:
01812 if (c->handleEvent(AIO_EVENT_DONE, 0) == EVENT_DONE)
01813 return ACTION_RESULT_DONE;
01814 return &c->_action;
01815 }
01816
01817 #if TS_USE_INTERIM_CACHE == 1
01818 int
01819 InterimCacheVol::aggWrite(int , void * )
01820 {
01821 ink_assert(!is_io_in_progress());
01822 MigrateToInterimCache *mts;
01823 Doc *doc;
01824 uint64_t old_off, new_off;
01825 ink_assert(this_ethread() == mutex.m_ptr->thread_holding
01826 && vol->mutex.m_ptr == mutex.m_ptr);
01827 Lagain:
01828
01829 while ((mts = agg.head) != NULL) {
01830 doc = (Doc *) mts->buf->data();
01831 uint32_t agg_len = dir_approx_size(&mts->dir);
01832 ink_assert(agg_len == mts->agg_len);
01833 ink_assert(agg_len <= AGG_SIZE && agg_buf_pos <= AGG_SIZE);
01834
01835 if (agg_buf_pos + agg_len > AGG_SIZE
01836 || header->agg_pos + agg_len > (skip + len))
01837 break;
01838 mts = agg.dequeue();
01839
01840 if (!mts->notMigrate) {
01841 old_off = dir_get_offset(&mts->dir);
01842 Dir old_dir = mts->dir;
01843 doc->sync_serial = header->sync_serial;
01844 doc->write_serial = header->write_serial;
01845
01846 memcpy(agg_buffer + agg_buf_pos, doc, doc->len);
01847 off_t o = header->write_pos + agg_buf_pos;
01848 dir_set_offset(&mts->dir, offset_to_vol_offset(this, o));
01849 ink_assert(this == mts->interim_vol);
01850 ink_assert(vol_offset(this, &mts->dir) < mts->interim_vol->skip + mts->interim_vol->len);
01851 dir_set_phase(&mts->dir, header->phase);
01852 dir_set_ininterim(&mts->dir);
01853 dir_set_index(&mts->dir, (this - vol->interim_vols));
01854
01855 agg_buf_pos += agg_len;
01856 header->agg_pos = header->write_pos + agg_buf_pos;
01857 new_off = dir_get_offset(&mts->dir);
01858
01859 if (mts->rewrite)
01860 dir_overwrite(&mts->key, vol, &mts->dir, &old_dir);
01861 else
01862 dir_insert(&mts->key, vol, &mts->dir);
01863 DDebug("cache_insert", "InterimCache: WriteDone: key: %X, first_key: %X, write_len: %d, write_offset: %" PRId64 ", dir_last_word: %X",
01864 doc->key.slice32(0), doc->first_key.slice32(0), mts->agg_len, o, mts->dir.w[4]);
01865
01866 if (mts->copy) {
01867 mts->interim_vol->vol->ram_cache->fixup(&mts->key, (uint32_t)(old_off >> 32), (uint32_t)old_off,
01868 (uint32_t)(new_off >> 32), (uint32_t)new_off);
01869 } else {
01870 mts->vc->f.ram_fixup = 1;
01871 mts->vc->dir_off = new_off;
01872 }
01873 vol->set_migrate_done(mts);
01874 } else
01875 vol->set_migrate_failed(mts);
01876
01877 mts->buf = NULL;
01878 migrateToInterimCacheAllocator.free(mts);
01879 }
01880
01881 if (!agg_buf_pos) {
01882 if (header->write_pos + AGG_SIZE > (skip + len)) {
01883 header->write_pos = start;
01884 header->phase = !(header->phase);
01885
01886 header->cycle++;
01887 header->agg_pos = header->write_pos;
01888 dir_clean_interimvol(this);
01889 goto Lagain;
01890 }
01891 return EVENT_CONT;
01892 }
01893
01894 if (agg.head == NULL && agg_buf_pos < (AGG_SIZE / 2) && !sync
01895 && header->write_pos + AGG_SIZE <= (skip + len))
01896 return EVENT_CONT;
01897
01898 for (mts = agg.head; mts != NULL; mts = mts->link.next) {
01899 if (!mts->copy) {
01900 Ptr<IOBufferData> buf = mts->buf;
01901 doc = (Doc *) buf->data();
01902 mts->buf = new_IOBufferData(iobuffer_size_to_index(mts->agg_len, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
01903 mts->copy = true;
01904 memcpy(mts->buf->data(), buf->data(), doc->len);
01905 buf = NULL;
01906 }
01907 }
01908
01909
01910 io.aiocb.aio_fildes = fd;
01911 io.aiocb.aio_offset = header->write_pos;
01912 io.aiocb.aio_buf = agg_buffer;
01913 io.aiocb.aio_nbytes = agg_buf_pos;
01914 io.action = this;
01915
01916
01917
01918
01919
01920 io.thread = AIO_CALLBACK_THREAD_AIO;
01921 SET_HANDLER(&InterimCacheVol::aggWriteDone);
01922 ink_aio_write(&io);
01923 return EVENT_CONT;
01924 }
01925
01926 int
01927 InterimCacheVol::aggWriteDone(int event, void *e)
01928 {
01929 ink_release_assert(this_ethread() == mutex.m_ptr->thread_holding
01930 && vol->mutex.m_ptr == mutex.m_ptr);
01931 if (io.ok()) {
01932 header->last_write_pos = header->write_pos;
01933 header->write_pos += io.aiocb.aio_nbytes;
01934 ink_assert(header->write_pos >= start);
01935 DDebug("cache_agg", "Write: %" PRIu64 ", last Write: %" PRIu64 "\n",
01936 header->write_pos, header->last_write_pos);
01937 ink_assert(header->write_pos == header->agg_pos);
01938 agg_buf_pos = 0;
01939 header->write_serial++;
01940 } else {
01941
01942
01943 Debug("cache_disk_error", "Write error on disk %s\n \
01944 write range : [%" PRIu64 " - %" PRIu64 " bytes] [%" PRIu64 " - %" PRIu64 " blocks] \n",
01945 "InterimCache ID", (uint64_t)io.aiocb.aio_offset, (uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes),
01946 (uint64_t)io.aiocb.aio_offset / CACHE_BLOCK_SIZE,
01947 (uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / CACHE_BLOCK_SIZE);
01948 Dir del_dir;
01949 dir_clear(&del_dir);
01950 dir_set_ininterim(&del_dir);
01951 dir_set_index(&del_dir, (this - vol->interim_vols));
01952 for (int done = 0; done < agg_buf_pos;) {
01953 Doc *doc = (Doc *) (agg_buffer + done);
01954 dir_set_offset(&del_dir, header->write_pos + done);
01955 dir_delete(&doc->key, vol, &del_dir);
01956 done += this->round_to_approx_size(doc->len);
01957 }
01958 agg_buf_pos = 0;
01959 }
01960 set_io_not_in_progress();
01961 sync = false;
01962 if (agg.head)
01963 aggWrite(event, e);
01964 return EVENT_CONT;
01965 }
01966 #endif
01967 #endif