00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 #include "P_Cache.h"
00026 
00027 #define IS_POWER_2(_x) (!((_x)&((_x)-1)))
00028 #define UINT_WRAP_LTE(_x, _y) (((_y)-(_x)) < INT_MAX) // exploit overflow
00029 #define UINT_WRAP_GTE(_x, _y) (((_x)-(_y)) < INT_MAX) // exploit overflow
00030 #define UINT_WRAP_LT(_x, _y) (((_x)-(_y)) >= INT_MAX) // exploit overflow
00031 
00032 
00033 
00034 #ifdef HTTP_CACHE
00035 int
00036 get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key)
00037 {
00038   int alt_count = cache_vector->count();
00039   CacheHTTPInfo *obj;
00040   if (!alt_count)
00041     return -1;
00042   for (int i = 0; i < alt_count; i++) {
00043     obj = cache_vector->get(i);
00044     if (obj->compare_object_key(&key)) {
00045       
00046       return i;
00047     }
00048   }
00049   return -1;
00050 }
00051 
00052 
00053 
00054 
00055 
00056 
00057 
00058 int
00059 CacheVC::updateVector(int , Event *)
00060 {
00061   cancel_trigger();
00062   if (od->reading_vec || od->writing_vec)
00063     VC_SCHED_LOCK_RETRY();
00064   int ret = 0;
00065   {
00066     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00067     if (!lock || od->writing_vec)
00068       VC_SCHED_LOCK_RETRY();
00069 
00070     int vec = alternate.valid();
00071     if (f.update) {
00072       
00073       alternate_index = get_alternate_index(write_vector, update_key);
00074       Debug("cache_update", "updating alternate index %d frags %d", alternate_index, alternate_index >=0 ? write_vector->get(alternate_index)->get_frag_offset_count() : -1);
00075       
00076       if (!vec) {
00077         ink_assert(!total_len);
00078         if (alternate_index >= 0) {
00079           write_vector->remove(alternate_index, true);
00080           alternate_index = CACHE_ALT_REMOVED;
00081           if (!write_vector->count())
00082             dir_delete(&first_key, vol, &od->first_dir);
00083         }
00084         
00085         
00086         if (alternate_index != CACHE_ALT_REMOVED || !write_vector->count()) {
00087           SET_HANDLER(&CacheVC::openWriteCloseDir);
00088           return openWriteCloseDir(EVENT_IMMEDIATE, 0);
00089         }
00090       }
00091       if (update_key == od->single_doc_key && (total_len || !vec))
00092         od->move_resident_alt = 0;
00093     }
00094     if (cache_config_http_max_alts > 1 && write_vector->count() >= cache_config_http_max_alts && alternate_index < 0) {
00095       if (od->move_resident_alt && get_alternate_index(write_vector, od->single_doc_key) == 0)
00096         od->move_resident_alt = 0;
00097       write_vector->remove(0, true);
00098     }
00099     if (vec) {
00100       
00101 
00102 
00103 
00104       if (alternate_index >= 0)
00105         alternate.copy_frag_offsets_from(write_vector->get(alternate_index));
00106       alternate_index = write_vector->insert(&alternate, alternate_index);
00107     }
00108 
00109     if (od->move_resident_alt && first_buf._ptr() && !od->has_multiple_writers()) {
00110       Doc *doc = (Doc *) first_buf->data();
00111       int small_doc = (int64_t)doc->data_len() < (int64_t)cache_config_alt_rewrite_max_size;
00112       int have_res_alt = doc->key == od->single_doc_key;
00113       
00114       
00115       
00116       
00117       
00118       
00119       
00120       
00121       if (small_doc && have_res_alt && (fragment || (f.update && !total_len))) {
00122         
00123         
00124         ink_assert(!fragment || f.data_done);
00125         od->move_resident_alt = 0;
00126         f.rewrite_resident_alt = 1;
00127         write_len = doc->data_len();
00128         Debug("cache_update_alt",
00129               "rewriting resident alt size: %d key: %X, first_key: %X", write_len, doc->key.slice32(0), first_key.slice32(0));
00130       }
00131     }
00132     header_len = write_vector->marshal_length();
00133     od->writing_vec = 1;
00134     f.use_first_key = 1;
00135     SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
00136     ret = do_write_call();
00137   }
00138   if (ret == EVENT_RETURN)
00139     return handleEvent(AIO_EVENT_DONE, 0);
00140   return ret;
00141 }
00142 #endif
00143 
00144 
00145 
00146 
00147 
00148 
00149 
00150 
00151 
00152 
00153 
00154 
00155 
00156 
00157 
00158 
00159 
00160 
00161 
00162 
00163 
00164 
00165 
00166 
00167 
00168 
00169 
00170 
00171 
00172 
00173 
00174 
00175 
00176 
00177 
00178 int
00179 CacheVC::handleWrite(int event, Event *)
00180 {
00181   
00182   ink_assert(!trigger);
00183   frag_len = 0;
00184 
00185   set_agg_write_in_progress();
00186   POP_HANDLER;
00187   agg_len = vol->round_to_approx_size(write_len + header_len + frag_len + sizeofDoc);
00188   vol->agg_todo_size += agg_len;
00189   bool agg_error =
00190     (agg_len > AGG_SIZE || header_len + sizeofDoc > MAX_FRAG_SIZE ||
00191      (!f.readers && (vol->agg_todo_size > cache_config_agg_write_backlog + AGG_SIZE) && write_len));
00192 #ifdef CACHE_AGG_FAIL_RATE
00193   agg_error = agg_error || ((uint32_t) mutex->thread_holding->generator.random() <
00194                             (uint32_t) (UINT_MAX * CACHE_AGG_FAIL_RATE));
00195 #endif
00196   bool max_doc_error = (cache_config_max_doc_size &&
00197                         (cache_config_max_doc_size < vio.ndone ||
00198                          (vio.nbytes != INT64_MAX && (cache_config_max_doc_size < vio.nbytes))));
00199 
00200   if (agg_error || max_doc_error) {
00201     CACHE_INCREMENT_DYN_STAT(cache_write_backlog_failure_stat);
00202     CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
00203     vol->agg_todo_size -= agg_len;
00204     io.aio_result = AIO_SOFT_FAILURE;
00205     if (event == EVENT_CALL)
00206       return EVENT_RETURN;
00207     return handleEvent(AIO_EVENT_DONE, 0);
00208   }
00209   ink_assert(agg_len <= AGG_SIZE);
00210   if (f.evac_vector)
00211     vol->agg.push(this);
00212   else
00213     vol->agg.enqueue(this);
00214   if (!vol->is_io_in_progress())
00215     return vol->aggWrite(event, this);
00216   return EVENT_CONT;
00217 }
00218 
00219 static char *
00220 iobufferblock_memcpy(char *p, int len, IOBufferBlock *ab, int offset)
00221 {
00222   IOBufferBlock *b = ab;
00223   while (b && len >= 0) {
00224     char *start = b->_start;
00225     char *end = b->_end;
00226     int max_bytes = end - start;
00227     max_bytes -= offset;
00228     if (max_bytes <= 0) {
00229       offset = -max_bytes;
00230       b = b->next;
00231       continue;
00232     }
00233     int bytes = len;
00234     if (bytes >= max_bytes)
00235       bytes = max_bytes;
00236     ::memcpy(p, start + offset, bytes);
00237     p += bytes;
00238     len -= bytes;
00239     b = b->next;
00240     offset = 0;
00241   }
00242   return p;
00243 }
00244 
00245 EvacuationBlock *
00246 Vol::force_evacuate_head(Dir *evac_dir, int pinned)
00247 {
00248   
00249   EvacuationBlock *b = evacuation_block_exists(evac_dir, this);
00250   
00251   
00252   if (b && b->f.done)
00253     return b;
00254 
00255   if (!b) {
00256     b = new_EvacuationBlock(mutex->thread_holding);
00257     b->dir = *evac_dir;
00258     DDebug("cache_evac", "force: %d, %d", (int) dir_offset(evac_dir), (int) dir_phase(evac_dir));
00259     evacuate[dir_evac_bucket(evac_dir)].push(b);
00260   }
00261   b->f.pinned = pinned;
00262   b->f.evacuate_head = 1;
00263   b->evac_frags.key = zero_key;  
00264   
00265   b->readers = 0;             
00266   return b;
00267 }
00268 
00269 void
00270 Vol::scan_for_pinned_documents()
00271 {
00272   if (cache_config_permit_pinning) {
00273     
00274     
00275     int ps = offset_to_vol_offset(this, header->write_pos + AGG_SIZE);
00276     int pe = offset_to_vol_offset(this, header->write_pos + 2 * EVACUATION_SIZE + (len / PIN_SCAN_EVERY));
00277     int vol_end_offset = offset_to_vol_offset(this, len + skip);
00278     int before_end_of_vol = pe < vol_end_offset;
00279     DDebug("cache_evac", "scan %d %d", ps, pe);
00280     for (int i = 0; i < vol_direntries(this); i++) {
00281       
00282       if (!dir_is_empty(&dir[i]) && dir_pinned(&dir[i]) && dir_head(&dir[i])) {
00283         
00284         int o = dir_offset(&dir[i]);
00285         if (dir_phase(&dir[i]) == header->phase) {
00286           if (before_end_of_vol || o >= (pe - vol_end_offset))
00287             continue;
00288         } else {
00289           if (o<ps || o>= pe)
00290             continue;
00291         }
00292         force_evacuate_head(&dir[i], 1);
00293         
00294         
00295       }
00296     }
00297   }
00298 }
00299 
00300 
00301 
00302 
00303 
00304 
00305 int
00306 Vol::aggWriteDone(int event, Event *e)
00307 {
00308   cancel_trigger();
00309 
00310   
00311   
00312   CACHE_TRY_LOCK(lock, dir_sync_waiting ? cacheDirSync->mutex : mutex, mutex->thread_holding);
00313   if (!lock) {
00314     eventProcessor.schedule_in(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
00315     return EVENT_CONT;
00316   }
00317   if (io.ok()) {
00318     header->last_write_pos = header->write_pos;
00319     header->write_pos += io.aiocb.aio_nbytes;
00320     ink_assert(header->write_pos >= start);
00321     DDebug("cache_agg", "Dir %s, Write: %" PRIu64 ", last Write: %" PRIu64 "\n",
00322           hash_text.get(), header->write_pos, header->last_write_pos);
00323     ink_assert(header->write_pos == header->agg_pos);
00324     if (header->write_pos + EVACUATION_SIZE > scan_pos)
00325       periodic_scan();
00326     agg_buf_pos = 0;
00327     header->write_serial++;
00328   } else {
00329     
00330     
00331     Debug("cache_disk_error", "Write error on disk %s\n \
00332               write range : [%" PRIu64 " - %" PRIu64 " bytes]  [%" PRIu64 " - %" PRIu64 " blocks] \n",
00333           hash_text.get(), (uint64_t)io.aiocb.aio_offset,
00334           (uint64_t)io.aiocb.aio_offset + io.aiocb.aio_nbytes,
00335           (uint64_t)io.aiocb.aio_offset / CACHE_BLOCK_SIZE,
00336           (uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / CACHE_BLOCK_SIZE);
00337     Dir del_dir;
00338     dir_clear(&del_dir);
00339     for (int done = 0; done < agg_buf_pos;) {
00340       Doc *doc = (Doc *) (agg_buffer + done);
00341       dir_set_offset(&del_dir, header->write_pos + done);
00342       dir_delete(&doc->key, this, &del_dir);
00343       done += round_to_approx_size(doc->len);
00344     }
00345     agg_buf_pos = 0;
00346   }
00347   set_io_not_in_progress();
00348   
00349   CacheVC *c = 0;
00350   while ((c = sync.dequeue())) {
00351     if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial))
00352       c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
00353     else {
00354       sync.push(c); 
00355       break;
00356     }
00357   }
00358   if (dir_sync_waiting) {
00359     dir_sync_waiting = 0;
00360     cacheDirSync->handleEvent(EVENT_IMMEDIATE, 0);
00361   }
00362   if (agg.head || sync.head)
00363     return aggWrite(event, e);
00364   return EVENT_CONT;
00365 }
00366 
00367 CacheVC *
00368 new_DocEvacuator(int nbytes, Vol *vol)
00369 {
00370   CacheVC *c = new_CacheVC(vol);
00371   ProxyMutex *mutex = vol->mutex;
00372   c->base_stat = cache_evacuate_active_stat;
00373   CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
00374   c->buf = new_IOBufferData(iobuffer_size_to_index(nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
00375   c->vol = vol;
00376   c->f.evacuator = 1;
00377   c->earliest_key = zero_key;
00378   SET_CONTINUATION_HANDLER(c, &CacheVC::evacuateDocDone);
00379   return c;
00380 }
00381 
00382 int
00383 CacheVC::evacuateReadHead(int , Event * )
00384 {
00385   
00386   ink_assert(vol->mutex->thread_holding == this_ethread());
00387   cancel_trigger();
00388   Doc *doc = (Doc *) buf->data();
00389 #ifdef HTTP_CACHE
00390   CacheHTTPInfo *alternate_tmp = 0;
00391 #endif
00392   if (!io.ok())
00393     goto Ldone;
00394   
00395   if (!dir_valid(vol, &dir)) {
00396     last_collision = NULL;
00397     goto Lcollision;
00398   }
00399   if (doc->magic != DOC_MAGIC || !(doc->first_key == first_key))
00400     goto Lcollision;
00401 #ifdef HTTP_CACHE
00402   alternate_tmp = 0;
00403   if (doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen) {
00404     
00405     if (this->load_http_info(&vector, doc) != doc->hlen) {
00406       Note("bad vector detected during evacuation");
00407       goto Ldone;
00408     }
00409     alternate_index = get_alternate_index(&vector, earliest_key);
00410     if (alternate_index < 0)
00411       goto Ldone;
00412     alternate_tmp = vector.get(alternate_index);
00413     doc_len = alternate_tmp->object_size_get();
00414     Debug("cache_evac", "evacuateReadHead http earliest %X first: %X len: %" PRId64,
00415           first_key.slice32(0), earliest_key.slice32(0), doc_len);
00416   } else
00417 #endif
00418   {
00419     
00420     CacheKey next_key;
00421     next_CacheKey(&next_key, &doc->key);
00422     if (!(next_key == earliest_key))
00423       goto Ldone;
00424     doc_len = doc->total_len;
00425     DDebug("cache_evac",
00426           "evacuateReadHead non-http earliest %X first: %X len: %" PRId64, first_key.slice32(0), earliest_key.slice32(0), doc_len);
00427   }
00428   if (doc_len == total_len) {
00429     
00430     
00431     dir_lookaside_fixup(&earliest_key, vol);
00432     return free_CacheVC(this);
00433   }
00434   return EVENT_CONT;
00435 Lcollision:
00436   if (dir_probe(&first_key, vol, &dir, &last_collision)) {
00437     int ret = do_read_call(&first_key);
00438     if (ret == EVENT_RETURN)
00439       return handleEvent(AIO_EVENT_DONE, 0);
00440     return ret;
00441   }
00442 Ldone:
00443   dir_lookaside_remove(&earliest_key, vol);
00444   return free_CacheVC(this);
00445 }
00446 
00447 int
00448 CacheVC::evacuateDocDone(int , Event *)
00449 {
00450   ink_assert(vol->mutex->thread_holding == this_ethread());
00451   Doc *doc = (Doc *) buf->data();
00452   DDebug("cache_evac", "evacuateDocDone %X o %d p %d new_o %d new_p %d",
00453         (int) key.slice32(0), (int) dir_offset(&overwrite_dir),
00454         (int) dir_phase(&overwrite_dir), (int) dir_offset(&dir), (int) dir_phase(&dir));
00455   int i = dir_evac_bucket(&overwrite_dir);
00456   
00457   EvacuationBlock *b = vol->evacuate[i].head;
00458   for (; b; b = b->link.next) {
00459     if (dir_offset(&b->dir) == dir_offset(&overwrite_dir)) {
00460 
00461       
00462       
00463       
00464       
00465       
00466       
00467       
00468       if (!dir_head(&overwrite_dir)) {
00469         
00470         EvacuationKey *evac = &b->evac_frags;
00471         for (; evac && !(evac->key == doc->key); evac = evac->link.next);
00472         ink_assert(evac);
00473         if (!evac)
00474           break;
00475         if (evac->earliest_key.fold()) {
00476           DDebug("cache_evac", "evacdocdone: evacuating key %X earliest %X",
00477                 evac->key.slice32(0), evac->earliest_key.slice32(0));
00478           EvacuationBlock *eblock = 0;
00479           Dir dir_tmp;
00480           dir_lookaside_probe(&evac->earliest_key, vol, &dir_tmp, &eblock);
00481           if (eblock) {
00482             CacheVC *earliest_evac = eblock->earliest_evacuator;
00483             earliest_evac->total_len += doc->data_len();
00484             if (earliest_evac->total_len == earliest_evac->doc_len) {
00485               dir_lookaside_fixup(&evac->earliest_key, vol);
00486               free_CacheVC(earliest_evac);
00487             }
00488           }
00489         }
00490         dir_overwrite(&doc->key, vol, &dir, &overwrite_dir);
00491       }
00492       
00493       
00494       
00495       
00496       
00497       if (dir_head(&overwrite_dir) && b->f.evacuate_head) {
00498         DDebug("cache_evac",
00499               "evacuateDocDone evacuate_head %X %X hlen %d offset %d",
00500               (int) key.slice32(0), (int) doc->key.slice32(0), doc->hlen, (int) dir_offset(&overwrite_dir));
00501 
00502         if (dir_compare_tag(&overwrite_dir, &doc->first_key)) {
00503           OpenDirEntry *cod;
00504           DDebug("cache_evac", "evacuating vector: %X %d",
00505                 (int) doc->first_key.slice32(0), (int) dir_offset(&overwrite_dir));
00506           if ((cod = vol->open_read(&doc->first_key))) {
00507             
00508             DDebug("cache_evac", "overwriting the open directory %X %d %d",
00509                   (int) doc->first_key.slice32(0), (int) dir_offset(&cod->first_dir), (int) dir_offset(&dir));
00510             cod->first_dir = dir;
00511 
00512           }
00513           if (dir_overwrite(&doc->first_key, vol, &dir, &overwrite_dir)) {
00514             int64_t o = dir_offset(&overwrite_dir), n = dir_offset(&dir);
00515             vol->ram_cache->fixup(&doc->first_key, (uint32_t)(o >> 32), (uint32_t)o, (uint32_t)(n >> 32), (uint32_t)n);
00516           }
00517         } else {
00518           DDebug("cache_evac", "evacuating earliest: %X %d", (int) doc->key.slice32(0), (int) dir_offset(&overwrite_dir));
00519           ink_assert(dir_compare_tag(&overwrite_dir, &doc->key));
00520           ink_assert(b->earliest_evacuator == this);
00521           total_len += doc->data_len();
00522           first_key = doc->first_key;
00523           earliest_dir = dir;
00524           if (dir_probe(&first_key, vol, &dir, &last_collision) > 0) {
00525             dir_lookaside_insert(b, vol, &earliest_dir);
00526             
00527             SET_HANDLER(&CacheVC::evacuateReadHead);
00528             int ret = do_read_call(&first_key);
00529             if (ret == EVENT_RETURN)
00530               return handleEvent(AIO_EVENT_DONE, 0);
00531             return ret;
00532           }
00533         }
00534       }
00535       break;
00536     }
00537   }
00538   return free_CacheVC(this);
00539 }
00540 
00541 static int
00542 evacuate_fragments(CacheKey *key, CacheKey *earliest_key, int force, Vol *vol)
00543 {
00544   Dir dir, *last_collision = 0;
00545   int i = 0;
00546   while (dir_probe(key, vol, &dir, &last_collision)) {
00547     
00548     
00549     if (dir_head(&dir)
00550 #if TS_USE_INTERIM_CACHE == 1
00551         || dir_ininterim(&dir)
00552 #endif
00553         )
00554       continue;
00555     EvacuationBlock *b = evacuation_block_exists(&dir, vol);
00556     if (!b) {
00557       b = new_EvacuationBlock(vol->mutex->thread_holding);
00558       b->dir = dir;
00559       b->evac_frags.key = *key;
00560       b->evac_frags.earliest_key = *earliest_key;
00561       vol->evacuate[dir_evac_bucket(&dir)].push(b);
00562       i++;
00563     } else {
00564       ink_assert(dir_offset(&dir) == dir_offset(&b->dir));
00565       ink_assert(dir_phase(&dir) == dir_phase(&b->dir));
00566       EvacuationKey *evac_frag = evacuationKeyAllocator.alloc();
00567       evac_frag->key = *key;
00568       evac_frag->earliest_key = *earliest_key;
00569       evac_frag->link.next = b->evac_frags.link.next;
00570       b->evac_frags.link.next = evac_frag;
00571     }
00572     if (force)
00573       b->readers = 0;
00574     DDebug("cache_evac",
00575           "next fragment %X Earliest: %X offset %d phase %d force %d",
00576           (int) key->slice32(0), (int) earliest_key->slice32(0), (int) dir_offset(&dir), (int) dir_phase(&dir), force);
00577   }
00578   return i;
00579 }
00580 
00581 int
00582 Vol::evacuateWrite(CacheVC *evacuator, int event, Event *e)
00583 {
00584   
00585 
00586   evacuator->agg_len = round_to_approx_size(((Doc *)evacuator->buf->data())->len);
00587   agg_todo_size += evacuator->agg_len;
00588   
00589   CacheVC *cur = (CacheVC *) agg.head;
00590   CacheVC *after = NULL;
00591   for (; cur && cur->f.evacuator; cur = (CacheVC *) cur->link.next)
00592     after = cur;
00593   ink_assert(evacuator->agg_len <= AGG_SIZE);
00594   agg.insert(evacuator, after);
00595   return aggWrite(event, e);
00596 }
00597 
00598 int
00599 Vol::evacuateDocReadDone(int event, Event *e)
00600 {
00601   cancel_trigger();
00602   if (event != AIO_EVENT_DONE)
00603     return EVENT_DONE;
00604   ink_assert(is_io_in_progress());
00605   set_io_not_in_progress();
00606   ink_assert(mutex->thread_holding == this_ethread());
00607   Doc *doc = (Doc *) doc_evacuator->buf->data();
00608   CacheKey next_key;
00609   EvacuationBlock *b = NULL;
00610   if (doc->magic != DOC_MAGIC) {
00611     Debug("cache_evac", "DOC magic: %X %d",
00612           (int) dir_tag(&doc_evacuator->overwrite_dir), (int) dir_offset(&doc_evacuator->overwrite_dir));
00613     ink_assert(doc->magic == DOC_MAGIC);
00614     goto Ldone;
00615   }
00616   DDebug("cache_evac", "evacuateDocReadDone %X offset %d",
00617         (int) doc->key.slice32(0), (int) dir_offset(&doc_evacuator->overwrite_dir));
00618 
00619   b = evacuate[dir_evac_bucket(&doc_evacuator->overwrite_dir)].head;
00620   while (b) {
00621     if (dir_offset(&b->dir) == dir_offset(&doc_evacuator->overwrite_dir))
00622       break;
00623     b = b->link.next;
00624   }
00625   if (!b)
00626     goto Ldone;
00627   if ((b->f.pinned && !b->readers) && doc->pinned < (uint32_t) (ink_get_based_hrtime() / HRTIME_SECOND))
00628     goto Ldone;
00629 
00630   if (dir_head(&b->dir) && b->f.evacuate_head) {
00631     ink_assert(!b->evac_frags.key.fold());
00632     
00633     
00634     if (dir_compare_tag(&b->dir, &doc->first_key)) {
00635       doc_evacuator->key = doc->first_key;
00636       b->evac_frags.key = doc->first_key;
00637       DDebug("cache_evac", "evacuating vector %X offset %d",
00638             (int) doc->first_key.slice32(0), (int) dir_offset(&doc_evacuator->overwrite_dir));
00639       b->f.unused = 57;
00640     } else {
00641       
00642       
00643       
00644       
00645       doc_evacuator->key = doc->key;
00646       doc_evacuator->earliest_key = doc->key;
00647       b->evac_frags.key = doc->key;
00648       b->evac_frags.earliest_key = doc->key;
00649       b->earliest_evacuator = doc_evacuator;
00650       DDebug("cache_evac", "evacuating earliest %X %X evac: %p offset: %d",
00651             (int) b->evac_frags.key.slice32(0), (int) doc->key.slice32(0),
00652             doc_evacuator, (int) dir_offset(&doc_evacuator->overwrite_dir));
00653       b->f.unused = 67;
00654     }
00655   } else {
00656     
00657     EvacuationKey *ek = &b->evac_frags;
00658     for (; ek && !(ek->key == doc->key); ek = ek->link.next);
00659     if (!ek) {
00660       b->f.unused = 77;
00661       goto Ldone;
00662     }
00663     doc_evacuator->key = ek->key;
00664     doc_evacuator->earliest_key = ek->earliest_key;
00665     DDebug("cache_evac", "evacuateDocReadDone key: %X earliest: %X",
00666           (int) ek->key.slice32(0), (int) ek->earliest_key.slice32(0));
00667     b->f.unused = 87;
00668   }
00669   
00670   
00671   
00672   
00673   if (!dir_head(&b->dir) || !dir_compare_tag(&b->dir, &doc->first_key)) {
00674     next_CacheKey(&next_key, &doc->key);
00675     evacuate_fragments(&next_key, &doc_evacuator->earliest_key, !b->readers, this);
00676   }
00677   return evacuateWrite(doc_evacuator, event, e);
00678 Ldone:
00679   free_CacheVC(doc_evacuator);
00680   doc_evacuator = 0;
00681   return aggWrite(event, e);
00682 }
00683 
00684 int
00685 Vol::evac_range(off_t low, off_t high, int evac_phase)
00686 {
00687   off_t s = offset_to_vol_offset(this, low);
00688   off_t e = offset_to_vol_offset(this, high);
00689   int si = dir_offset_evac_bucket(s);
00690   int ei = dir_offset_evac_bucket(e);
00691 
00692   for (int i = si; i <= ei; i++) {
00693     EvacuationBlock *b = evacuate[i].head;
00694     EvacuationBlock *first = 0;
00695     int64_t first_offset = INT64_MAX;
00696     for (; b; b = b->link.next) {
00697       int64_t offset = dir_offset(&b->dir);
00698       int phase = dir_phase(&b->dir);
00699       if (offset >= s && offset < e && !b->f.done && phase == evac_phase)
00700         if (offset < first_offset) {
00701           first = b;
00702           first_offset = offset;
00703         }
00704     }
00705     if (first) {
00706       first->f.done = 1;
00707       io.aiocb.aio_fildes = fd;
00708       io.aiocb.aio_nbytes = dir_approx_size(&first->dir);
00709       io.aiocb.aio_offset = vol_offset(this, &first->dir);
00710       if ((off_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > (off_t)(skip + len))
00711         io.aiocb.aio_nbytes = skip + len - io.aiocb.aio_offset;
00712       doc_evacuator = new_DocEvacuator(io.aiocb.aio_nbytes, this);
00713       doc_evacuator->overwrite_dir = first->dir;
00714 
00715       io.aiocb.aio_buf = doc_evacuator->buf->data();
00716       io.action = this;
00717       io.thread = AIO_CALLBACK_THREAD_ANY;
00718       DDebug("cache_evac", "evac_range evacuating %X %d", (int)dir_tag(&first->dir), (int)dir_offset(&first->dir));
00719       SET_HANDLER(&Vol::evacuateDocReadDone);
00720       ink_assert(ink_aio_read(&io) >= 0);
00721       return -1;
00722     }
00723   }
00724   return 0;
00725 }
00726 
00727 
00728 static int
00729 agg_copy(char *p, CacheVC *vc)
00730 {
00731   Vol *vol = vc->vol;
00732   off_t o = vol->header->write_pos + vol->agg_buf_pos;
00733 
00734   if (!vc->f.evacuator) {
00735     Doc *doc = (Doc *) p;
00736     IOBufferBlock *res_alt_blk = 0;
00737 
00738     uint32_t len = vc->write_len + vc->header_len + vc->frag_len + sizeofDoc;
00739     ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeofDoc);
00740     ink_assert(vol->round_to_approx_size(len) == vc->agg_len);
00741     
00742     dir_set_approx_size(&vc->dir, vc->agg_len);
00743     dir_set_offset(&vc->dir, offset_to_vol_offset(vol, o));
00744     ink_assert(vol_offset(vol, &vc->dir) < (vol->skip + vol->len));
00745 #if TS_USE_INTERIM_CACHE == 1
00746     dir_set_indisk(&vc->dir);
00747 #endif
00748     dir_set_phase(&vc->dir, vol->header->phase);
00749 
00750     
00751     doc->magic = DOC_MAGIC;
00752     doc->len = len;
00753     doc->hlen = vc->header_len;
00754     doc->doc_type = vc->frag_type;
00755     doc->v_major = CACHE_DB_MAJOR_VERSION;
00756     doc->v_minor = CACHE_DB_MINOR_VERSION;
00757     doc->unused = 0; 
00758     doc->total_len = vc->total_len;
00759     doc->first_key = vc->first_key;
00760     doc->sync_serial = vol->header->sync_serial;
00761     vc->write_serial = doc->write_serial = vol->header->write_serial;
00762     doc->checksum = DOC_NO_CHECKSUM;
00763     if (vc->pin_in_cache) {
00764       dir_set_pinned(&vc->dir, 1);
00765       doc->pinned = (uint32_t) (ink_get_based_hrtime() / HRTIME_SECOND) + vc->pin_in_cache;
00766     } else {
00767       dir_set_pinned(&vc->dir, 0);
00768       doc->pinned = 0;
00769     }
00770 
00771     if (vc->f.use_first_key) {
00772       if (doc->data_len()
00773 #ifdef HTTP_CACHE
00774                   || vc->f.allow_empty_doc
00775 #endif
00776                   )
00777         doc->key = vc->earliest_key;
00778       else 
00779         prev_CacheKey(&doc->key, &vc->earliest_key);
00780       dir_set_head(&vc->dir, true);
00781     } else {
00782       doc->key = vc->key;
00783       dir_set_head(&vc->dir, !vc->fragment);
00784     }
00785 
00786 #ifdef HTTP_CACHE
00787     if (vc->f.rewrite_resident_alt) {
00788       ink_assert(vc->f.use_first_key);
00789       Doc *res_doc = (Doc *) vc->first_buf->data();
00790       res_alt_blk = new_IOBufferBlock(vc->first_buf, res_doc->data_len(), sizeofDoc + res_doc->hlen);
00791       doc->key = res_doc->key;
00792       doc->total_len = res_doc->data_len();
00793     }
00794 #endif
00795     
00796     if (vc->header_len) {
00797       ink_assert(vc->f.use_first_key);
00798 #ifdef HTTP_CACHE
00799       if (vc->frag_type == CACHE_FRAG_TYPE_HTTP) {
00800         ink_assert(vc->write_vector->count() > 0);
00801         if (!vc->f.update && !vc->f.evac_vector) {
00802           ink_assert(!(vc->first_key == zero_key));
00803           CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
00804           http_info->object_size_set(vc->total_len);
00805         }
00806         
00807         
00808         if (vc->f.update && vc->total_len) {
00809           CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
00810           http_info->object_size_set(vc->total_len);
00811         }
00812         ink_assert(!(((uintptr_t) &doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK));
00813         ink_assert(vc->header_len == vc->write_vector->marshal(doc->hdr(), vc->header_len));
00814       } else
00815 #endif
00816         memcpy(doc->hdr(), vc->header_to_write, vc->header_len);
00817       
00818       
00819       vc->f.single_fragment = doc->single_fragment();
00820     }
00821     
00822     if (vc->write_len) {
00823       {
00824         ProxyMutex *mutex ATS_UNUSED = vc->vol->mutex;
00825         ink_assert(mutex->thread_holding == this_ethread());
00826         CACHE_DEBUG_SUM_DYN_STAT(cache_write_bytes_stat, vc->write_len);
00827       }
00828 #ifdef HTTP_CACHE
00829       if (vc->f.rewrite_resident_alt)
00830         iobufferblock_memcpy(doc->data(), vc->write_len, res_alt_blk, 0);
00831       else
00832 #endif
00833         iobufferblock_memcpy(doc->data(), vc->write_len, vc->blocks, vc->offset);
00834 #ifdef VERIFY_JTEST_DATA
00835       if (f.use_first_key && header_len) {
00836         int ib = 0, xd = 0;
00837         char xx[500];
00838         new_info.request_get().url_get().print(xx, 500, &ib, &xd);
00839         char *x = xx;
00840         for (int q = 0; q < 3; q++)
00841           x = strchr(x + 1, '/');
00842         ink_assert(!memcmp(doc->hdr(), x, ib - (x - xx)));
00843       }
00844 #endif
00845 
00846     }
00847     if (cache_config_enable_checksum) {
00848       doc->checksum = 0;
00849       for (char *b = doc->hdr(); b < (char *) doc + doc->len; b++)
00850         doc->checksum += *b;
00851     }
00852     if (vc->frag_type == CACHE_FRAG_TYPE_HTTP && vc->f.single_fragment)
00853       ink_assert(doc->hlen);
00854 
00855     if (res_alt_blk)
00856       res_alt_blk->free();
00857 
00858     return vc->agg_len;
00859   } else {
00860     
00861     Doc *doc = (Doc *) vc->buf->data();
00862     int l = vc->vol->round_to_approx_size(doc->len);
00863     {
00864       ProxyMutex *mutex ATS_UNUSED = vc->vol->mutex;
00865       ink_assert(mutex->thread_holding == this_ethread());
00866       CACHE_DEBUG_INCREMENT_DYN_STAT(cache_gc_frags_evacuated_stat);
00867       CACHE_DEBUG_SUM_DYN_STAT(cache_gc_bytes_evacuated_stat, l);
00868     }
00869 
00870     doc->sync_serial = vc->vol->header->sync_serial;
00871     doc->write_serial = vc->vol->header->write_serial;
00872 
00873     memcpy(p, doc, doc->len);
00874 
00875     vc->dir = vc->overwrite_dir;
00876     dir_set_offset(&vc->dir, offset_to_vol_offset(vc->vol, o));
00877     dir_set_phase(&vc->dir, vc->vol->header->phase);
00878 #if TS_USE_INTERIM_CACHE == 1
00879     dir_set_indisk(&vc->dir);
00880 #endif
00881     return l;
00882   }
00883 }
00884 
00885 inline void
00886 Vol::evacuate_cleanup_blocks(int i)
00887 {
00888   EvacuationBlock *b = evacuate[i].head;
00889   while (b) {
00890     if (b->f.done &&
00891         ((header->phase != dir_phase(&b->dir) &&
00892           header->write_pos > vol_offset(this, &b->dir)) ||
00893          (header->phase == dir_phase(&b->dir) && header->write_pos <= vol_offset(this, &b->dir)))) {
00894       EvacuationBlock *x = b;
00895       DDebug("cache_evac", "evacuate cleanup free %X offset %d",
00896             (int) b->evac_frags.key.slice32(0), (int) dir_offset(&b->dir));
00897       b = b->link.next;
00898       evacuate[i].remove(x);
00899       free_EvacuationBlock(x, mutex->thread_holding);
00900       continue;
00901     }
00902     b = b->link.next;
00903   }
00904 }
00905 
00906 void
00907 Vol::evacuate_cleanup()
00908 {
00909   int64_t eo = ((header->write_pos - start) / CACHE_BLOCK_SIZE) + 1;
00910   int64_t e = dir_offset_evac_bucket(eo);
00911   int64_t sx = e - (evacuate_size / PIN_SCAN_EVERY) - 1;
00912   int64_t s = sx;
00913   int i;
00914 
00915   if (e > evacuate_size)
00916     e = evacuate_size;
00917   if (sx < 0)
00918     s = 0;
00919   for (i = s; i < e; i++)
00920     evacuate_cleanup_blocks(i);
00921 
00922   
00923   if (sx <= 0) {
00924     s = evacuate_size + sx - 2;
00925     if (s < 0)
00926       s = 0;
00927     for (i = s; i < evacuate_size; i++)
00928       evacuate_cleanup_blocks(i);
00929   }
00930 }
00931 
00932 void
00933 Vol::periodic_scan()
00934 {
00935   evacuate_cleanup();
00936   scan_for_pinned_documents();
00937   if (header->write_pos == start)
00938     scan_pos = start;
00939   scan_pos += len / PIN_SCAN_EVERY;
00940 }
00941 
00942 void
00943 Vol::agg_wrap()
00944 {
00945   header->write_pos = start;
00946   header->phase = !header->phase;
00947 
00948   header->cycle++;
00949   header->agg_pos = header->write_pos;
00950   dir_lookaside_cleanup(this);
00951   dir_clean_vol(this);
00952   periodic_scan();
00953 }
00954 
00955 
00956 
00957 
00958 
00959 
00960 
00961 
00962 int
00963 Vol::aggWrite(int event, void *)
00964 {
00965   ink_assert(!is_io_in_progress());
00966 
00967   Que(CacheVC, link) tocall;
00968   CacheVC *c;
00969 
00970   cancel_trigger();
00971 
00972 Lagain:
00973   
00974   for (c = (CacheVC *) agg.head; c;) {
00975     int writelen = c->agg_len;
00976     
00977     ink_assert(writelen <= AGG_SIZE);
00978     if (agg_buf_pos + writelen > AGG_SIZE ||
00979         header->write_pos + agg_buf_pos + writelen > (skip + len))
00980       break;
00981     DDebug("agg_read", "copying: %d, %" PRIu64 ", key: %d",
00982           agg_buf_pos, header->write_pos + agg_buf_pos, c->first_key.slice32(0));
00983     int wrotelen = agg_copy(agg_buffer + agg_buf_pos, c);
00984     ink_assert(writelen == wrotelen);
00985     agg_todo_size -= writelen;
00986     agg_buf_pos += writelen;
00987     CacheVC *n = (CacheVC *)c->link.next;
00988     agg.dequeue();
00989     if (c->f.sync && c->f.use_first_key) {
00990       CacheVC *last = sync.tail;
00991       while (last && UINT_WRAP_LT(c->write_serial, last->write_serial))
00992         last = (CacheVC*)last->link.prev;
00993       sync.insert(c, last);
00994     } else if (c->f.evacuator)
00995       c->handleEvent(AIO_EVENT_DONE, 0);
00996     else
00997       tocall.enqueue(c);
00998     c = n;
00999   }
01000 
01001   
01002   if (!agg_buf_pos) {
01003     if (!agg.head && !sync.head) 
01004       return EVENT_CONT;
01005     if (header->write_pos == start) {
01006       
01007       Note("write aggregation exceeds vol size");
01008       ink_assert(!tocall.head);
01009       ink_assert(false);
01010       while ((c = agg.dequeue())) {
01011         agg_todo_size -= c->agg_len;
01012         if (c->initial_thread != NULL)
01013           c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
01014         else
01015           eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE);
01016       }
01017       return EVENT_CONT;
01018     }
01019     
01020     if (agg.head) {
01021       agg_wrap();
01022       goto Lagain;
01023     }
01024   }
01025 
01026   
01027   off_t end = header->write_pos + agg_buf_pos + EVACUATION_SIZE;
01028   if (evac_range(header->write_pos, end, !header->phase) < 0)
01029     goto Lwait;
01030   if (end > skip + len)
01031     if (evac_range(start, start + (end - (skip + len)), header->phase) < 0)
01032       goto Lwait;
01033 
01034   
01035   
01036   if (agg_buf_pos < AGG_HIGH_WATER && !agg.head && !sync.head && !dir_sync_waiting)
01037     goto Lwait;
01038 
01039   
01040   if (!agg_buf_pos) {
01041     ink_assert(sync.head);
01042     int l = round_to_approx_size(sizeof(Doc));
01043     agg_buf_pos = l;
01044     Doc *d = (Doc*)agg_buffer;
01045     memset(d, 0, sizeof(Doc));
01046     d->magic = DOC_MAGIC;
01047     d->len = l;
01048     d->sync_serial = header->sync_serial;
01049     d->write_serial = header->write_serial;
01050   }
01051 
01052   
01053   header->agg_pos = header->write_pos + agg_buf_pos;
01054 
01055   io.aiocb.aio_fildes = fd;
01056   io.aiocb.aio_offset = header->write_pos;
01057   io.aiocb.aio_buf = agg_buffer;
01058   io.aiocb.aio_nbytes = agg_buf_pos;
01059   io.action = this;
01060   
01061 
01062 
01063 
01064 
01065   io.thread = AIO_CALLBACK_THREAD_AIO;
01066   SET_HANDLER(&Vol::aggWriteDone);
01067   ink_aio_write(&io);
01068 
01069 Lwait:
01070   int ret = EVENT_CONT;
01071   while ((c = tocall.dequeue())) {
01072     if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding)
01073       ret = EVENT_RETURN;
01074     else if (c->initial_thread != NULL)
01075       c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
01076     else
01077       eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE);
01078   }
01079   return ret;
01080 }
01081 
01082 int
01083 CacheVC::openWriteCloseDir(int , Event *)
01084 {
01085   cancel_trigger();
01086   {
01087     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
01088     if (!lock) {
01089       SET_HANDLER(&CacheVC::openWriteCloseDir);
01090       ink_assert(!is_io_in_progress());
01091       VC_SCHED_LOCK_RETRY();
01092     }
01093     vol->close_write(this);
01094     if (closed < 0 && fragment)
01095       dir_delete(&earliest_key, vol, &earliest_dir);
01096   }
01097   if (is_debug_tag_set("cache_update")) {
01098     if (f.update && closed > 0) {
01099       if (!total_len && alternate_index != CACHE_ALT_REMOVED) {
01100         Debug("cache_update", "header only %d (%" PRIu64 ", %" PRIu64 ")\n",
01101               DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0], update_key.b[1]);
01102 
01103       } else if (total_len && alternate_index != CACHE_ALT_REMOVED) {
01104         Debug("cache_update", "header body, %d, (%" PRIu64 ", %" PRIu64 "), (%" PRIu64 ", %" PRIu64 ")\n",
01105               DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0], update_key.b[1], earliest_key.b[0], earliest_key.b[1]);
01106       } else if (!total_len && alternate_index == CACHE_ALT_REMOVED) {
01107         Debug("cache_update", "alt delete, %d, (%" PRIu64 ", %" PRIu64 ")\n",
01108               DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0], update_key.b[1]);
01109       }
01110     }
01111   }
01112   
01113   
01114   
01115   
01116   
01117   if ((closed == 1) && (total_len > 0
01118 #ifdef HTTP_CACHE
01119                   || f.allow_empty_doc
01120 #endif
01121                   )) {
01122     DDebug("cache_stats", "Fragment = %d", fragment);
01123     switch (fragment) {
01124       case 0: CACHE_INCREMENT_DYN_STAT(cache_single_fragment_document_count_stat); break;
01125       case 1: CACHE_INCREMENT_DYN_STAT(cache_two_fragment_document_count_stat); break;
01126       default: CACHE_INCREMENT_DYN_STAT(cache_three_plus_plus_fragment_document_count_stat); break;
01127     }
01128   }
01129   if (f.close_complete) {
01130     recursive++;
01131     ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
01132     vio._cont->handleEvent(VC_EVENT_WRITE_COMPLETE, (void *) &vio);
01133     recursive--;
01134   }
01135   return free_CacheVC(this);
01136 }
01137 
01138 int
01139 CacheVC::openWriteCloseHeadDone(int event, Event *e)
01140 {
01141   if (event == AIO_EVENT_DONE)
01142     set_io_not_in_progress();
01143   else if (is_io_in_progress())
01144     return EVENT_CONT;
01145   {
01146     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
01147     if (!lock)
01148       VC_LOCK_RETRY_EVENT();
01149     od->writing_vec = 0;
01150     if (!io.ok())
01151       goto Lclose;
01152     ink_assert(f.use_first_key);
01153     if (!od->dont_update_directory) {
01154       if (dir_is_empty(&od->first_dir)) {
01155         dir_insert(&first_key, vol, &dir);
01156       } else {
01157         
01158         dir_overwrite(&first_key, vol, &dir, &od->first_dir, false);
01159         
01160         if (od->move_resident_alt) {
01161           if (dir_valid(vol, &od->single_doc_dir))
01162             dir_insert(&od->single_doc_key, vol, &od->single_doc_dir);
01163           od->move_resident_alt = 0;
01164         }
01165       }
01166       od->first_dir = dir;
01167       if (frag_type == CACHE_FRAG_TYPE_HTTP && f.single_fragment) {
01168         
01169         od->move_resident_alt = 1;
01170         if (!f.rewrite_resident_alt) {
01171           od->single_doc_key = earliest_key;
01172         }
01173         dir_assign(&od->single_doc_dir, &dir);
01174         dir_set_tag(&od->single_doc_dir, od->single_doc_key.slice32(2));
01175       }
01176     }
01177   }
01178 Lclose:
01179   return openWriteCloseDir(event, e);
01180 }
01181 
01182 int
01183 CacheVC::openWriteCloseHead(int event, Event *e)
01184 {
01185   cancel_trigger();
01186   f.use_first_key = 1;
01187   if (io.ok())
01188     ink_assert(fragment || (length == (int64_t)total_len));
01189   else
01190     return openWriteCloseDir(event, e);
01191   if (f.data_done)
01192     write_len = 0;
01193   else
01194     write_len = length;
01195 #ifdef HTTP_CACHE
01196   if (frag_type == CACHE_FRAG_TYPE_HTTP) {
01197     SET_HANDLER(&CacheVC::updateVector);
01198     return updateVector(EVENT_IMMEDIATE, 0);
01199   } else {
01200 #endif
01201     header_len = header_to_write_len;
01202     SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
01203     return do_write_lock();
01204 #ifdef HTTP_CACHE
01205   }
01206 #endif
01207 }
01208 
01209 int
01210 CacheVC::openWriteCloseDataDone(int event, Event *e)
01211 {
01212   int ret = 0;
01213   cancel_trigger();
01214 
01215   if (event == AIO_EVENT_DONE)
01216     set_io_not_in_progress();
01217   else if (is_io_in_progress())
01218     return EVENT_CONT;
01219   if (!io.ok())
01220     return openWriteCloseDir(event, e);
01221   {
01222     CACHE_TRY_LOCK(lock, vol->mutex, this_ethread());
01223     if (!lock)
01224       VC_LOCK_RETRY_EVENT();
01225     if (!fragment) {
01226       ink_assert(key == earliest_key);
01227       earliest_dir = dir;
01228 #ifdef HTTP_CACHE
01229     } else {
01230       
01231       
01232       if (alternate.valid())
01233         alternate.push_frag_offset(write_pos);
01234 #endif
01235     }
01236     fragment++;
01237     write_pos += write_len;
01238     dir_insert(&key, vol, &dir);
01239     blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
01240     next_CacheKey(&key, &key);
01241     if (length) {
01242       write_len = length;
01243       if (write_len > MAX_FRAG_SIZE)
01244         write_len = MAX_FRAG_SIZE;
01245       if ((ret = do_write_call()) == EVENT_RETURN)
01246         goto Lcallreturn;
01247       return ret;
01248     }
01249     f.data_done = 1;
01250     return openWriteCloseHead(event, e); 
01251   }
01252 Lcallreturn:
01253   return handleEvent(AIO_EVENT_DONE, 0);
01254 }
01255 
01256 int
01257 CacheVC::openWriteClose(int event, Event *e)
01258 {
01259   cancel_trigger();
01260   if (is_io_in_progress()) {
01261     if (event != AIO_EVENT_DONE)
01262       return EVENT_CONT;
01263     set_io_not_in_progress();
01264     if (!io.ok())
01265       return openWriteCloseDir(event, e);
01266   }
01267   if (closed > 0
01268 #ifdef HTTP_CACHE
01269                   || f.allow_empty_doc
01270 #endif
01271                   ) {
01272     if (total_len == 0) {
01273 #ifdef HTTP_CACHE
01274       if (f.update || f.allow_empty_doc) {
01275         return updateVector(event, e);
01276       } else {
01277         
01278         
01279         closed = -1;
01280         return openWriteCloseDir(event, e);
01281       }
01282 #else
01283       return openWriteCloseDir(event, e);
01284 #endif
01285     }
01286     if (length && (fragment || length > MAX_FRAG_SIZE)) {
01287       SET_HANDLER(&CacheVC::openWriteCloseDataDone);
01288       write_len = length;
01289       if (write_len > MAX_FRAG_SIZE)
01290         write_len = MAX_FRAG_SIZE;
01291       return do_write_lock_call();
01292     } else
01293       return openWriteCloseHead(event, e);
01294   } else
01295     return openWriteCloseDir(event, e);
01296 }
01297 
01298 int
01299 CacheVC::openWriteWriteDone(int event, Event *e)
01300 {
01301   cancel_trigger();
01302   if (event == AIO_EVENT_DONE)
01303     set_io_not_in_progress();
01304   else
01305     if (is_io_in_progress())
01306       return EVENT_CONT;
01307   
01308   if (!io.ok()) {
01309     if (closed) {
01310       closed = -1;
01311       return die();
01312     }
01313     SET_HANDLER(&CacheVC::openWriteMain);
01314     return calluser(VC_EVENT_ERROR);
01315   }
01316   {
01317     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
01318     if (!lock)
01319       VC_LOCK_RETRY_EVENT();
01320     
01321     
01322     if (!fragment) {
01323       ink_assert(key == earliest_key);
01324       earliest_dir = dir;
01325 #ifdef HTTP_CACHE
01326     } else {
01327       
01328       
01329       if (alternate.valid())
01330         alternate.push_frag_offset(write_pos);
01331 #endif
01332     }
01333     ++fragment;
01334     write_pos += write_len;
01335     dir_insert(&key, vol, &dir);
01336     DDebug("cache_insert", "WriteDone: %X, %X, %d", key.slice32(0), first_key.slice32(0), write_len);
01337     blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
01338     next_CacheKey(&key, &key);
01339   }
01340   if (closed)
01341     return die();
01342   SET_HANDLER(&CacheVC::openWriteMain);
01343   return openWriteMain(event, e);
01344 }
01345 
01346 static inline int target_fragment_size() {
01347   return cache_config_target_fragment_size - sizeofDoc;
01348 }
01349 
01350 int
01351 CacheVC::openWriteMain(int , Event *)
01352 {
01353   cancel_trigger();
01354   int called_user = 0;
01355   ink_assert(!is_io_in_progress());
01356 Lagain:
01357   if (!vio.buffer.writer()) {
01358     if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE)
01359       return EVENT_DONE;
01360     if (!vio.buffer.writer())
01361       return EVENT_CONT;
01362   }
01363   if (vio.ntodo() <= 0) {
01364     called_user = 1;
01365     if (calluser(VC_EVENT_WRITE_COMPLETE) == EVENT_DONE)
01366       return EVENT_DONE;
01367     ink_assert(!f.close_complete || !"close expected after write COMPLETE");
01368     if (vio.ntodo() <= 0)
01369       return EVENT_CONT;
01370   }
01371   int64_t ntodo = (int64_t)(vio.ntodo() + length);
01372   int64_t total_avail = vio.buffer.reader()->read_avail();
01373   int64_t avail = total_avail;
01374   int64_t towrite = avail + length;
01375   if (towrite > ntodo) {
01376     avail -= (towrite - ntodo);
01377     towrite = ntodo;
01378   }
01379   if (towrite > MAX_FRAG_SIZE) {
01380     avail -= (towrite - MAX_FRAG_SIZE);
01381     towrite = MAX_FRAG_SIZE;
01382   }
01383   if (!blocks && towrite) {
01384     blocks = vio.buffer.reader()->block;
01385     offset = vio.buffer.reader()->start_offset;
01386   }
01387   if (avail > 0) {
01388     vio.buffer.reader()->consume(avail);
01389     vio.ndone += avail;
01390     total_len += avail;
01391   }
01392   length = (uint64_t)towrite;
01393   if (length > target_fragment_size() && 
01394       (length < target_fragment_size() + target_fragment_size() / 4))
01395     write_len = target_fragment_size();
01396   else
01397     write_len = length;
01398   bool not_writing = towrite != ntodo && towrite < target_fragment_size();
01399   if (!called_user) {
01400     if (not_writing) {
01401       called_user = 1;
01402       if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE)
01403         return EVENT_DONE;
01404       goto Lagain;
01405     } else if (vio.ntodo() <= 0)
01406       goto Lagain;
01407   }
01408   if (not_writing)
01409     return EVENT_CONT;
01410   if (towrite == ntodo && f.close_complete) {
01411     closed = 1;
01412     SET_HANDLER(&CacheVC::openWriteClose);
01413     return openWriteClose(EVENT_NONE, NULL);
01414   }
01415   SET_HANDLER(&CacheVC::openWriteWriteDone);
01416   return do_write_lock_call();
01417 }
01418 
01419 
01420 int
01421 CacheVC::openWriteOverwrite(int event, Event *e)
01422 {
01423   cancel_trigger();
01424   if (event != AIO_EVENT_DONE) {
01425     if (event == EVENT_IMMEDIATE)
01426       last_collision = 0;
01427   } else {
01428     Doc *doc = NULL;
01429     set_io_not_in_progress();
01430     if (_action.cancelled)
01431       return openWriteCloseDir(event, e);
01432     if (!io.ok())
01433       goto Ldone;
01434     doc = (Doc *) buf->data();
01435     if (!(doc->first_key == first_key))
01436       goto Lcollision;
01437     od->first_dir = dir;
01438     first_buf = buf;
01439     goto Ldone;
01440   }
01441 Lcollision:
01442   {
01443     CACHE_TRY_LOCK(lock, vol->mutex, this_ethread());
01444     if (!lock)
01445       VC_LOCK_RETRY_EVENT();
01446     int res = dir_probe(&first_key, vol, &dir, &last_collision);
01447     if (res > 0) {
01448       if ((res = do_read_call(&first_key)) == EVENT_RETURN)
01449         goto Lcallreturn;
01450       return res;
01451     }
01452   }
01453 Ldone:
01454   SET_HANDLER(&CacheVC::openWriteMain);
01455   return callcont(CACHE_EVENT_OPEN_WRITE);
01456 Lcallreturn:
01457   return handleEvent(AIO_EVENT_DONE, 0); 
01458 }
01459 
01460 #ifdef HTTP_CACHE
01461 
01462 
01463 int
01464 CacheVC::openWriteStartDone(int event, Event *e)
01465 {
01466   intptr_t err = ECACHE_NO_DOC;
01467   cancel_trigger();
01468   if (is_io_in_progress()) {
01469     if (event != AIO_EVENT_DONE)
01470       return EVENT_CONT;
01471     set_io_not_in_progress();
01472   }
01473   {
01474     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
01475     if (!lock)
01476       VC_LOCK_RETRY_EVENT();
01477 
01478     if (_action.cancelled && (!od || !od->has_multiple_writers()))
01479       goto Lcancel;
01480 
01481     if (event == AIO_EVENT_DONE) {        
01482       Doc *doc = (Doc *) buf->data();
01483       if (!io.ok()) {
01484         err = ECACHE_READ_FAIL;
01485         goto Lfailure;
01486       }
01487 
01488       
01489 
01490 
01491 
01492 
01493       if (!dir_valid(vol, &dir)) {
01494         DDebug("cache_write",
01495                "OpenReadStartDone: Dir not valid: Write Head: %" PRId64 ", Dir: %" PRId64,
01496                (int64_t)offset_to_vol_offset(vol, vol->header->write_pos), dir_offset(&dir));
01497         last_collision = NULL;
01498         goto Lcollision;
01499       }
01500       if (!(doc->first_key == first_key))
01501         goto Lcollision;
01502 
01503       if (doc->magic != DOC_MAGIC || !doc->hlen ||
01504           this->load_http_info(write_vector, doc, buf) != doc->hlen) {
01505         err = ECACHE_BAD_META_DATA;
01506 #if TS_USE_INTERIM_CACHE == 1
01507         if (dir_ininterim(&dir)) {
01508           dir_delete(&first_key, vol, &dir);
01509           last_collision = NULL;
01510           goto Lcollision;
01511         }
01512 #endif
01513         goto Lfailure;
01514       }
01515       ink_assert(write_vector->count() > 0);
01516 #if TS_USE_INTERIM_CACHE == 1
01517 Lagain:
01518       if (dir_ininterim(&dir)) {
01519         dir_delete(&first_key, vol, &dir);
01520         last_collision = NULL;
01521         if (dir_probe(&first_key, vol, &dir, &last_collision)) {
01522           goto Lagain;
01523         } else {
01524           if (f.update) {
01525             
01526             goto Lfailure;
01527           }
01528         }
01529       }
01530 #endif
01531       od->first_dir = dir;
01532       first_dir = dir;
01533       if (doc->single_fragment()) {
01534         
01535         od->move_resident_alt = 1;
01536         od->single_doc_key = doc->key;
01537         dir_assign(&od->single_doc_dir, &dir);
01538         dir_set_tag(&od->single_doc_dir, od->single_doc_key.slice32(2));
01539       }
01540       first_buf = buf;
01541       goto Lsuccess;
01542     }
01543 
01544 Lcollision:
01545     int if_writers = ((uintptr_t) info == CACHE_ALLOW_MULTIPLE_WRITES);
01546     if (!od) {
01547       if ((err = vol->open_write(
01548                 this, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0)
01549         goto Lfailure;
01550       if (od->has_multiple_writers()) {
01551         MUTEX_RELEASE(lock);
01552         SET_HANDLER(&CacheVC::openWriteMain);
01553         return callcont(CACHE_EVENT_OPEN_WRITE);
01554       }
01555     }
01556     
01557     if (dir_probe(&first_key, vol, &dir, &last_collision)) {
01558       od->reading_vec = 1;
01559       int ret = do_read_call(&first_key);
01560       if (ret == EVENT_RETURN)
01561         goto Lcallreturn;
01562       return ret;
01563     }
01564     if (f.update) {
01565       
01566       goto Lfailure;
01567     }
01568   }
01569 Lsuccess:
01570   od->reading_vec = 0;
01571   if (_action.cancelled)
01572     goto Lcancel;
01573   SET_HANDLER(&CacheVC::openWriteMain);
01574   return callcont(CACHE_EVENT_OPEN_WRITE);
01575 
01576 Lfailure:
01577   CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
01578   _action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -err);
01579 Lcancel:
01580   if (od) {
01581     od->reading_vec = 0;
01582     return openWriteCloseDir(event, e);
01583   } else
01584     return free_CacheVC(this);
01585 Lcallreturn:
01586   return handleEvent(AIO_EVENT_DONE, 0); 
01587 }
01588 #endif
01589 
01590 
01591 int
01592 CacheVC::openWriteStartBegin(int , Event *)
01593 {
01594   intptr_t err;
01595   cancel_trigger();
01596   if (_action.cancelled)
01597     return free_CacheVC(this);
01598   if (((err = vol->open_write_lock(this, false, 1)) > 0)) {
01599     CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
01600     free_CacheVC(this);
01601     _action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -err);
01602     return EVENT_DONE;
01603   }
01604   if (err < 0)
01605     VC_SCHED_LOCK_RETRY();
01606   if (f.overwrite) {
01607     SET_HANDLER(&CacheVC::openWriteOverwrite);
01608     return openWriteOverwrite(EVENT_IMMEDIATE, 0);
01609   } else {
01610     
01611     SET_HANDLER(&CacheVC::openWriteMain);
01612     return callcont(CACHE_EVENT_OPEN_WRITE);
01613   }
01614 }
01615 
01616 
01617 Action *
01618 Cache::open_write(Continuation *cont, CacheKey *key, CacheFragType frag_type,
01619                   int options, time_t apin_in_cache, char *hostname, int host_len)
01620 {
01621 
01622   if (!CacheProcessor::IsCacheReady(frag_type)) {
01623     cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -ECACHE_NOT_READY);
01624     return ACTION_RESULT_DONE;
01625   }
01626 
01627   ink_assert(caches[frag_type] == this);
01628 
01629   intptr_t res = 0;
01630   CacheVC *c = new_CacheVC(cont);
01631   ProxyMutex *mutex = cont->mutex;
01632   MUTEX_LOCK(lock, c->mutex, this_ethread());
01633   c->vio.op = VIO::WRITE;
01634   c->base_stat = cache_write_active_stat;
01635   c->vol = key_to_vol(key, hostname, host_len);
01636   Vol *vol = c->vol;
01637   CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
01638   c->first_key = c->key = *key;
01639   c->frag_type = frag_type;
01640   
01641 
01642 
01643 
01644 
01645 
01646 
01647   do {
01648     rand_CacheKey(&c->key, cont->mutex);
01649   } while (DIR_MASK_TAG(c->key.slice32(2)) == DIR_MASK_TAG(c->first_key.slice32(2)));
01650   c->earliest_key = c->key;
01651 #ifdef HTTP_CACHE
01652   c->info = 0;
01653 #endif
01654   c->f.overwrite = (options & CACHE_WRITE_OPT_OVERWRITE) != 0;
01655   c->f.close_complete = (options & CACHE_WRITE_OPT_CLOSE_COMPLETE) != 0;
01656   c->f.sync = (options & CACHE_WRITE_OPT_SYNC) == CACHE_WRITE_OPT_SYNC;
01657   c->pin_in_cache = (uint32_t) apin_in_cache;
01658 
01659   if ((res = c->vol->open_write_lock(c, false, 1)) > 0) {
01660     
01661     CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_FAILURE);
01662     cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -res);
01663     free_CacheVC(c);
01664     return ACTION_RESULT_DONE;
01665   }
01666   if (res < 0) {
01667     SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartBegin);
01668     c->trigger = CONT_SCHED_LOCK_RETRY(c);
01669     return &c->_action;
01670   }
01671   if (!c->f.overwrite) {
01672     SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
01673     c->callcont(CACHE_EVENT_OPEN_WRITE);
01674     return ACTION_RESULT_DONE;
01675   } else {
01676     SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteOverwrite);
01677     if (c->openWriteOverwrite(EVENT_IMMEDIATE, 0) == EVENT_DONE)
01678       return ACTION_RESULT_DONE;
01679     else
01680       return &c->_action;
01681   }
01682 }
01683 
01684 #ifdef HTTP_CACHE
01685 
01686 Action *
01687 Cache::open_write(Continuation *cont, CacheKey *key, CacheHTTPInfo *info, time_t apin_in_cache,
01688                   CacheKey *, CacheFragType type, char *hostname, int host_len)
01689 {
01690   if (!CacheProcessor::IsCacheReady(type)) {
01691     cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -ECACHE_NOT_READY);
01692     return ACTION_RESULT_DONE;
01693   }
01694 
01695   ink_assert(caches[type] == this);
01696   intptr_t err = 0;
01697   int if_writers = (uintptr_t) info == CACHE_ALLOW_MULTIPLE_WRITES;
01698   CacheVC *c = new_CacheVC(cont);
01699   ProxyMutex *mutex = cont->mutex;
01700   c->vio.op = VIO::WRITE;
01701   c->first_key = *key;
01702   
01703 
01704 
01705 
01706 
01707 
01708 
01709   do {
01710     rand_CacheKey(&c->key, cont->mutex);
01711   }
01712   while (DIR_MASK_TAG(c->key.slice32(2)) == DIR_MASK_TAG(c->first_key.slice32(2)));
01713   c->earliest_key = c->key;
01714   c->frag_type = CACHE_FRAG_TYPE_HTTP;
01715   c->vol = key_to_vol(key, hostname, host_len);
01716   Vol *vol = c->vol;
01717   c->info = info;
01718   if (c->info && (uintptr_t) info != CACHE_ALLOW_MULTIPLE_WRITES) {
01719     
01720 
01721 
01722 
01723 
01724 
01725 
01726 
01727 
01728 
01729 
01730 
01731 
01732 
01733 
01734 
01735 
01736 
01737 
01738 
01739 
01740 
01741 
01742 
01743 
01744 
01745 
01746 
01747 
01748     c->f.update = 1;
01749     c->base_stat = cache_update_active_stat;
01750     DDebug("cache_update", "Update called");
01751     info->object_key_get(&c->update_key);
01752     ink_assert(!(c->update_key == zero_key));
01753     c->update_len = info->object_size_get();
01754   } else
01755     c->base_stat = cache_write_active_stat;
01756   CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
01757   c->pin_in_cache = (uint32_t) apin_in_cache;
01758 
01759   {
01760     CACHE_TRY_LOCK(lock, c->vol->mutex, cont->mutex->thread_holding);
01761     if (lock) {
01762       if ((err = c->vol->open_write(c, if_writers,
01763                                      cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0)
01764         goto Lfailure;
01765       
01766       
01767       
01768       if (c->od->has_multiple_writers())
01769         goto Lmiss;
01770       if (!dir_probe(key, c->vol, &c->dir, &c->last_collision)) {
01771         if (c->f.update) {
01772           
01773           
01774           err = ECACHE_NO_DOC;
01775           goto Lfailure;
01776         }
01777         
01778         goto Lmiss;
01779       } else {
01780         c->od->reading_vec = 1;
01781         
01782         SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
01783         switch (c->do_read_call(&c->first_key)) {
01784           case EVENT_DONE: return ACTION_RESULT_DONE;
01785           case EVENT_RETURN: goto Lcallreturn;
01786           default: return &c->_action;
01787         }
01788       }
01789     }
01790     
01791     SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
01792     CONT_SCHED_LOCK_RETRY(c);
01793     return &c->_action;
01794   }
01795 
01796 Lmiss:
01797   SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
01798   c->callcont(CACHE_EVENT_OPEN_WRITE);
01799   return ACTION_RESULT_DONE;
01800 
01801 Lfailure:
01802   CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_FAILURE);
01803   cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -err);
01804   if (c->od) {
01805     c->openWriteCloseDir(EVENT_IMMEDIATE, 0);
01806     return ACTION_RESULT_DONE;
01807   }
01808   free_CacheVC(c);
01809   return ACTION_RESULT_DONE;
01810 
01811 Lcallreturn:
01812   if (c->handleEvent(AIO_EVENT_DONE, 0) == EVENT_DONE)
01813     return ACTION_RESULT_DONE;
01814   return &c->_action;
01815 }
01816 
01817 #if TS_USE_INTERIM_CACHE == 1
01818 int
01819 InterimCacheVol::aggWrite(int , void * )
01820 {
01821   ink_assert(!is_io_in_progress());
01822   MigrateToInterimCache *mts;
01823   Doc *doc;
01824   uint64_t old_off, new_off;
01825   ink_assert(this_ethread() == mutex.m_ptr->thread_holding
01826       && vol->mutex.m_ptr == mutex.m_ptr);
01827 Lagain:
01828 
01829   while ((mts = agg.head) != NULL) {
01830     doc = (Doc *) mts->buf->data();
01831     uint32_t agg_len = dir_approx_size(&mts->dir);
01832     ink_assert(agg_len == mts->agg_len);
01833     ink_assert(agg_len <= AGG_SIZE && agg_buf_pos <= AGG_SIZE);
01834 
01835     if (agg_buf_pos + agg_len > AGG_SIZE
01836         || header->agg_pos + agg_len > (skip + len))
01837       break;
01838     mts = agg.dequeue();
01839 
01840     if (!mts->notMigrate) {
01841       old_off = dir_get_offset(&mts->dir);
01842       Dir old_dir = mts->dir;
01843       doc->sync_serial = header->sync_serial;
01844       doc->write_serial = header->write_serial;
01845 
01846       memcpy(agg_buffer + agg_buf_pos, doc, doc->len);
01847       off_t o = header->write_pos + agg_buf_pos;
01848       dir_set_offset(&mts->dir, offset_to_vol_offset(this, o));
01849       ink_assert(this == mts->interim_vol);
01850       ink_assert(vol_offset(this, &mts->dir) < mts->interim_vol->skip + mts->interim_vol->len);
01851       dir_set_phase(&mts->dir, header->phase);
01852       dir_set_ininterim(&mts->dir);
01853       dir_set_index(&mts->dir, (this - vol->interim_vols));
01854 
01855       agg_buf_pos += agg_len;
01856       header->agg_pos = header->write_pos + agg_buf_pos;
01857       new_off = dir_get_offset(&mts->dir);
01858 
01859       if (mts->rewrite)
01860         dir_overwrite(&mts->key, vol, &mts->dir, &old_dir);
01861       else
01862         dir_insert(&mts->key, vol, &mts->dir);
01863       DDebug("cache_insert", "InterimCache: WriteDone: key: %X, first_key: %X, write_len: %d, write_offset: %" PRId64 ", dir_last_word: %X",
01864           doc->key.slice32(0), doc->first_key.slice32(0), mts->agg_len, o, mts->dir.w[4]);
01865 
01866       if (mts->copy) {
01867         mts->interim_vol->vol->ram_cache->fixup(&mts->key, (uint32_t)(old_off >> 32), (uint32_t)old_off,
01868             (uint32_t)(new_off >> 32), (uint32_t)new_off);
01869       } else {
01870         mts->vc->f.ram_fixup = 1;
01871         mts->vc->dir_off = new_off;
01872       }
01873       vol->set_migrate_done(mts);
01874     } else
01875       vol->set_migrate_failed(mts);
01876 
01877     mts->buf = NULL;
01878     migrateToInterimCacheAllocator.free(mts);
01879   }
01880 
01881   if (!agg_buf_pos) {
01882     if (header->write_pos + AGG_SIZE > (skip + len)) {
01883       header->write_pos = start;
01884       header->phase = !(header->phase);
01885 
01886       header->cycle++;
01887       header->agg_pos = header->write_pos;
01888       dir_clean_interimvol(this);
01889       goto Lagain;
01890     }
01891     return EVENT_CONT;
01892   }
01893 
01894   if (agg.head == NULL && agg_buf_pos < (AGG_SIZE / 2) && !sync
01895       && header->write_pos + AGG_SIZE <= (skip + len))
01896     return EVENT_CONT;
01897 
01898   for (mts = agg.head; mts != NULL; mts = mts->link.next) {
01899     if (!mts->copy) {
01900       Ptr<IOBufferData> buf = mts->buf;
01901       doc = (Doc *) buf->data();
01902       mts->buf = new_IOBufferData(iobuffer_size_to_index(mts->agg_len, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
01903       mts->copy = true;
01904       memcpy(mts->buf->data(), buf->data(), doc->len);
01905       buf = NULL;
01906     }
01907   }
01908   
01909 
01910   io.aiocb.aio_fildes = fd;
01911   io.aiocb.aio_offset = header->write_pos;
01912   io.aiocb.aio_buf = agg_buffer;
01913   io.aiocb.aio_nbytes = agg_buf_pos;
01914   io.action = this;
01915   
01916 
01917 
01918 
01919 
01920   io.thread = AIO_CALLBACK_THREAD_AIO;
01921   SET_HANDLER(&InterimCacheVol::aggWriteDone);
01922   ink_aio_write(&io);
01923   return EVENT_CONT;
01924 }
01925 
01926 int
01927 InterimCacheVol::aggWriteDone(int event, void *e)
01928 {
01929   ink_release_assert(this_ethread() == mutex.m_ptr->thread_holding
01930         && vol->mutex.m_ptr == mutex.m_ptr);
01931   if (io.ok()) {
01932      header->last_write_pos = header->write_pos;
01933      header->write_pos += io.aiocb.aio_nbytes;
01934      ink_assert(header->write_pos >= start);
01935      DDebug("cache_agg", "Write: %" PRIu64 ", last Write: %" PRIu64 "\n",
01936            header->write_pos, header->last_write_pos);
01937      ink_assert(header->write_pos == header->agg_pos);
01938      agg_buf_pos = 0;
01939      header->write_serial++;
01940    } else {
01941      
01942      
01943      Debug("cache_disk_error", "Write error on disk %s\n \
01944                write range : [%" PRIu64 " - %" PRIu64 " bytes]  [%" PRIu64 " - %" PRIu64 " blocks] \n",
01945            "InterimCache ID", (uint64_t)io.aiocb.aio_offset, (uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes),
01946            (uint64_t)io.aiocb.aio_offset / CACHE_BLOCK_SIZE,
01947            (uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / CACHE_BLOCK_SIZE);
01948      Dir del_dir;
01949      dir_clear(&del_dir);
01950      dir_set_ininterim(&del_dir);
01951      dir_set_index(&del_dir, (this - vol->interim_vols));
01952      for (int done = 0; done < agg_buf_pos;) {
01953        Doc *doc = (Doc *) (agg_buffer + done);
01954        dir_set_offset(&del_dir, header->write_pos + done);
01955        dir_delete(&doc->key, vol, &del_dir);
01956        done += this->round_to_approx_size(doc->len);
01957      }
01958      agg_buf_pos = 0;
01959    }
01960    set_io_not_in_progress();
01961    sync = false;
01962    if (agg.head)
01963      aggWrite(event, e);
01964    return EVENT_CONT;
01965 }
01966 #endif
01967 #endif