• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

CacheWrite.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 
00025 #include "P_Cache.h"
00026 
00027 #define IS_POWER_2(_x) (!((_x)&((_x)-1)))
00028 #define UINT_WRAP_LTE(_x, _y) (((_y)-(_x)) < INT_MAX) // exploit overflow
00029 #define UINT_WRAP_GTE(_x, _y) (((_x)-(_y)) < INT_MAX) // exploit overflow
00030 #define UINT_WRAP_LT(_x, _y) (((_x)-(_y)) >= INT_MAX) // exploit overflow
00031 
00032 // Given a key, finds the index of the alternate which matches
00033 // used to get the alternate which is actually present in the document
00034 #ifdef HTTP_CACHE
00035 int
00036 get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key)
00037 {
00038   int alt_count = cache_vector->count();
00039   CacheHTTPInfo *obj;
00040   if (!alt_count)
00041     return -1;
00042   for (int i = 0; i < alt_count; i++) {
00043     obj = cache_vector->get(i);
00044     if (obj->compare_object_key(&key)) {
00045       // Debug("cache_key", "Resident alternate key  %X", key.slice32(0));
00046       return i;
00047     }
00048   }
00049   return -1;
00050 }
00051 
00052 // Adds/Deletes alternate to the od->vector (write_vector). If the vector
00053 // is empty, deletes the directory entry pointing to the vector. Each
00054 // CacheVC must write the vector down to disk after making changes. If we
00055 // wait till the last writer, that writer will have the responsibility of
00056 // of writing the vector even if the http state machine aborts.  This
00057 // makes it easier to handle situations where writers abort.
00058 int
00059 CacheVC::updateVector(int /* event ATS_UNUSED */, Event */* e ATS_UNUSED */)
00060 {
00061   cancel_trigger();
00062   if (od->reading_vec || od->writing_vec)
00063     VC_SCHED_LOCK_RETRY();
00064   int ret = 0;
00065   {
00066     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00067     if (!lock || od->writing_vec)
00068       VC_SCHED_LOCK_RETRY();
00069 
00070     int vec = alternate.valid();
00071     if (f.update) {
00072       // all Update cases. Need to get the alternate index.
00073       alternate_index = get_alternate_index(write_vector, update_key);
00074       Debug("cache_update", "updating alternate index %d frags %d", alternate_index, alternate_index >=0 ? write_vector->get(alternate_index)->get_frag_offset_count() : -1);
00075       // if its an alternate delete
00076       if (!vec) {
00077         ink_assert(!total_len);
00078         if (alternate_index >= 0) {
00079           write_vector->remove(alternate_index, true);
00080           alternate_index = CACHE_ALT_REMOVED;
00081           if (!write_vector->count())
00082             dir_delete(&first_key, vol, &od->first_dir);
00083         }
00084         // the alternate is not there any more. somebody might have
00085         // deleted it. Just close this writer
00086         if (alternate_index != CACHE_ALT_REMOVED || !write_vector->count()) {
00087           SET_HANDLER(&CacheVC::openWriteCloseDir);
00088           return openWriteCloseDir(EVENT_IMMEDIATE, 0);
00089         }
00090       }
00091       if (update_key == od->single_doc_key && (total_len || !vec))
00092         od->move_resident_alt = 0;
00093     }
00094     if (cache_config_http_max_alts > 1 && write_vector->count() >= cache_config_http_max_alts && alternate_index < 0) {
00095       if (od->move_resident_alt && get_alternate_index(write_vector, od->single_doc_key) == 0)
00096         od->move_resident_alt = 0;
00097       write_vector->remove(0, true);
00098     }
00099     if (vec) {
00100       /* preserve fragment offset data from old info. This method is
00101          called iff the update is a header only update so the fragment
00102          data should remain valid.
00103       */
00104       if (alternate_index >= 0)
00105         alternate.copy_frag_offsets_from(write_vector->get(alternate_index));
00106       alternate_index = write_vector->insert(&alternate, alternate_index);
00107     }
00108 
00109     if (od->move_resident_alt && first_buf._ptr() && !od->has_multiple_writers()) {
00110       Doc *doc = (Doc *) first_buf->data();
00111       int small_doc = (int64_t)doc->data_len() < (int64_t)cache_config_alt_rewrite_max_size;
00112       int have_res_alt = doc->key == od->single_doc_key;
00113       // if the new alternate is not written with the vector
00114       // then move the old one with the vector
00115       // if its a header only update move the resident alternate
00116       // with the vector.
00117       // We are sure that the body of the resident alternate that we are
00118       // rewriting has not changed and the alternate is not being deleted,
00119       // since we set od->move_resident_alt  to 0 in that case
00120       // (in updateVector)
00121       if (small_doc && have_res_alt && (fragment || (f.update && !total_len))) {
00122         // for multiple fragment document, we must have done
00123         // CacheVC:openWriteCloseDataDone
00124         ink_assert(!fragment || f.data_done);
00125         od->move_resident_alt = 0;
00126         f.rewrite_resident_alt = 1;
00127         write_len = doc->data_len();
00128         Debug("cache_update_alt",
00129               "rewriting resident alt size: %d key: %X, first_key: %X", write_len, doc->key.slice32(0), first_key.slice32(0));
00130       }
00131     }
00132     header_len = write_vector->marshal_length();
00133     od->writing_vec = 1;
00134     f.use_first_key = 1;
00135     SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
00136     ret = do_write_call();
00137   }
00138   if (ret == EVENT_RETURN)
00139     return handleEvent(AIO_EVENT_DONE, 0);
00140   return ret;
00141 }
00142 #endif
00143 /*
00144    The following fields of the CacheVC are used when writing down a fragment.
00145    Make sure that each of the fields is set to a valid value before calling
00146    this function
00147    - frag_type. Checked to see if a vector needs to be marshalled.
00148    - f.use_first_key. To decide if the vector should be marshalled and to set
00149      the doc->key to the appropriate key (first_key or earliest_key)
00150    - f.evac_vector. If set, the writer is pushed in the beginning of the
00151      agg queue. And if !f.evac_vector && !f.update the alternate->object_size
00152      is set to vc->total_len
00153    - f.readers.  If set, assumes that this is an evacuation, so the write
00154      is not aborted even if vol->agg_todo_size > agg_write_backlog
00155    - f.evacuator. If this is an evacuation.
00156    - f.rewrite_resident_alt. The resident alternate is rewritten.
00157    - f.update. Used only if the write_vector needs to be written to disk.
00158      Used to set the length of the alternate to total_len.
00159    - write_vector. Used only if frag_type == CACHE_FRAG_TYPE_HTTP &&
00160      (f.use_fist_key || f.evac_vector) is set. Write_vector is written to disk
00161    - alternate_index. Used only if write_vector needs to be written to disk.
00162      Used to find out the VC's alternate in the write_vector and set its
00163      length to tatal_len.
00164    - write_len. The number of bytes for this fragment.
00165    - total_len. The total number of bytes for the document so far.
00166      Doc->total_len and alternate's total len is set to this value.
00167    - first_key. Doc's first_key is set to this value.
00168    - pin_in_cache. Doc's pinned value is set to this + ink_get_hrtime().
00169    - earliest_key. If f.use_first_key, Doc's key is set to this value.
00170    - key. If !f.use_first_key, Doc's key is set to this value.
00171    - blocks. Used only if write_len is set. Data to be written
00172    - offset. Used only if write_len is set. offset into the block to copy
00173      the data from.
00174    - buf. Used only if f.evacuator is set. Should point to the old document.
00175    The functions sets the length, offset, pinned, head and phase of vc->dir.
00176    */
00177 
00178 int
00179 CacheVC::handleWrite(int event, Event */* e ATS_UNUSED */)
00180 {
00181   // plain write case
00182   ink_assert(!trigger);
00183   frag_len = 0;
00184 
00185   set_agg_write_in_progress();
00186   POP_HANDLER;
00187   agg_len = vol->round_to_approx_size(write_len + header_len + frag_len + sizeofDoc);
00188   vol->agg_todo_size += agg_len;
00189   bool agg_error =
00190     (agg_len > AGG_SIZE || header_len + sizeofDoc > MAX_FRAG_SIZE ||
00191      (!f.readers && (vol->agg_todo_size > cache_config_agg_write_backlog + AGG_SIZE) && write_len));
00192 #ifdef CACHE_AGG_FAIL_RATE
00193   agg_error = agg_error || ((uint32_t) mutex->thread_holding->generator.random() <
00194                             (uint32_t) (UINT_MAX * CACHE_AGG_FAIL_RATE));
00195 #endif
00196   bool max_doc_error = (cache_config_max_doc_size &&
00197                         (cache_config_max_doc_size < vio.ndone ||
00198                          (vio.nbytes != INT64_MAX && (cache_config_max_doc_size < vio.nbytes))));
00199 
00200   if (agg_error || max_doc_error) {
00201     CACHE_INCREMENT_DYN_STAT(cache_write_backlog_failure_stat);
00202     CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
00203     vol->agg_todo_size -= agg_len;
00204     io.aio_result = AIO_SOFT_FAILURE;
00205     if (event == EVENT_CALL)
00206       return EVENT_RETURN;
00207     return handleEvent(AIO_EVENT_DONE, 0);
00208   }
00209   ink_assert(agg_len <= AGG_SIZE);
00210   if (f.evac_vector)
00211     vol->agg.push(this);
00212   else
00213     vol->agg.enqueue(this);
00214   if (!vol->is_io_in_progress())
00215     return vol->aggWrite(event, this);
00216   return EVENT_CONT;
00217 }
00218 
00219 static char *
00220 iobufferblock_memcpy(char *p, int len, IOBufferBlock *ab, int offset)
00221 {
00222   IOBufferBlock *b = ab;
00223   while (b && len >= 0) {
00224     char *start = b->_start;
00225     char *end = b->_end;
00226     int max_bytes = end - start;
00227     max_bytes -= offset;
00228     if (max_bytes <= 0) {
00229       offset = -max_bytes;
00230       b = b->next;
00231       continue;
00232     }
00233     int bytes = len;
00234     if (bytes >= max_bytes)
00235       bytes = max_bytes;
00236     ::memcpy(p, start + offset, bytes);
00237     p += bytes;
00238     len -= bytes;
00239     b = b->next;
00240     offset = 0;
00241   }
00242   return p;
00243 }
00244 
00245 EvacuationBlock *
00246 Vol::force_evacuate_head(Dir *evac_dir, int pinned)
00247 {
00248   // build an evacuation block for the object
00249   EvacuationBlock *b = evacuation_block_exists(evac_dir, this);
00250   // if we have already started evacuating this document, its too late
00251   // to evacuate the head...bad luck
00252   if (b && b->f.done)
00253     return b;
00254 
00255   if (!b) {
00256     b = new_EvacuationBlock(mutex->thread_holding);
00257     b->dir = *evac_dir;
00258     DDebug("cache_evac", "force: %d, %d", (int) dir_offset(evac_dir), (int) dir_phase(evac_dir));
00259     evacuate[dir_evac_bucket(evac_dir)].push(b);
00260   }
00261   b->f.pinned = pinned;
00262   b->f.evacuate_head = 1;
00263   b->evac_frags.key = zero_key;  // ensure that the block gets
00264   // evacuated no matter what
00265   b->readers = 0;             // ensure that the block does not disappear
00266   return b;
00267 }
00268 
00269 void
00270 Vol::scan_for_pinned_documents()
00271 {
00272   if (cache_config_permit_pinning) {
00273     // we can't evacuate anything between header->write_pos and
00274     // header->write_pos + AGG_SIZE.
00275     int ps = offset_to_vol_offset(this, header->write_pos + AGG_SIZE);
00276     int pe = offset_to_vol_offset(this, header->write_pos + 2 * EVACUATION_SIZE + (len / PIN_SCAN_EVERY));
00277     int vol_end_offset = offset_to_vol_offset(this, len + skip);
00278     int before_end_of_vol = pe < vol_end_offset;
00279     DDebug("cache_evac", "scan %d %d", ps, pe);
00280     for (int i = 0; i < vol_direntries(this); i++) {
00281       // is it a valid pinned object?
00282       if (!dir_is_empty(&dir[i]) && dir_pinned(&dir[i]) && dir_head(&dir[i])) {
00283         // select objects only within this PIN_SCAN region
00284         int o = dir_offset(&dir[i]);
00285         if (dir_phase(&dir[i]) == header->phase) {
00286           if (before_end_of_vol || o >= (pe - vol_end_offset))
00287             continue;
00288         } else {
00289           if (o<ps || o>= pe)
00290             continue;
00291         }
00292         force_evacuate_head(&dir[i], 1);
00293         //      DDebug("cache_evac", "scan pinned at offset %d %d %d %d %d %d",
00294         //            (int)dir_offset(&b->dir), ps, o , pe, i, (int)b->f.done);
00295       }
00296     }
00297   }
00298 }
00299 
00300 /* NOTE:: This state can be called by an AIO thread, so DON'T DON'T
00301    DON'T schedule any events on this thread using VC_SCHED_XXX or
00302    mutex->thread_holding->schedule_xxx_local(). ALWAYS use
00303    eventProcessor.schedule_xxx().
00304    */
00305 int
00306 Vol::aggWriteDone(int event, Event *e)
00307 {
00308   cancel_trigger();
00309 
00310   // ensure we have the cacheDirSync lock if we intend to call it later
00311   // retaking the current mutex recursively is a NOOP
00312   CACHE_TRY_LOCK(lock, dir_sync_waiting ? cacheDirSync->mutex : mutex, mutex->thread_holding);
00313   if (!lock) {
00314     eventProcessor.schedule_in(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
00315     return EVENT_CONT;
00316   }
00317   if (io.ok()) {
00318     header->last_write_pos = header->write_pos;
00319     header->write_pos += io.aiocb.aio_nbytes;
00320     ink_assert(header->write_pos >= start);
00321     DDebug("cache_agg", "Dir %s, Write: %" PRIu64 ", last Write: %" PRIu64 "\n",
00322           hash_text.get(), header->write_pos, header->last_write_pos);
00323     ink_assert(header->write_pos == header->agg_pos);
00324     if (header->write_pos + EVACUATION_SIZE > scan_pos)
00325       periodic_scan();
00326     agg_buf_pos = 0;
00327     header->write_serial++;
00328   } else {
00329     // delete all the directory entries that we inserted
00330     // for fragments is this aggregation buffer
00331     Debug("cache_disk_error", "Write error on disk %s\n \
00332               write range : [%" PRIu64 " - %" PRIu64 " bytes]  [%" PRIu64 " - %" PRIu64 " blocks] \n",
00333           hash_text.get(), (uint64_t)io.aiocb.aio_offset,
00334           (uint64_t)io.aiocb.aio_offset + io.aiocb.aio_nbytes,
00335           (uint64_t)io.aiocb.aio_offset / CACHE_BLOCK_SIZE,
00336           (uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / CACHE_BLOCK_SIZE);
00337     Dir del_dir;
00338     dir_clear(&del_dir);
00339     for (int done = 0; done < agg_buf_pos;) {
00340       Doc *doc = (Doc *) (agg_buffer + done);
00341       dir_set_offset(&del_dir, header->write_pos + done);
00342       dir_delete(&doc->key, this, &del_dir);
00343       done += round_to_approx_size(doc->len);
00344     }
00345     agg_buf_pos = 0;
00346   }
00347   set_io_not_in_progress();
00348   // callback ready sync CacheVCs
00349   CacheVC *c = 0;
00350   while ((c = sync.dequeue())) {
00351     if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial))
00352       c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
00353     else {
00354       sync.push(c); // put it back on the front
00355       break;
00356     }
00357   }
00358   if (dir_sync_waiting) {
00359     dir_sync_waiting = 0;
00360     cacheDirSync->handleEvent(EVENT_IMMEDIATE, 0);
00361   }
00362   if (agg.head || sync.head)
00363     return aggWrite(event, e);
00364   return EVENT_CONT;
00365 }
00366 
00367 CacheVC *
00368 new_DocEvacuator(int nbytes, Vol *vol)
00369 {
00370   CacheVC *c = new_CacheVC(vol);
00371   ProxyMutex *mutex = vol->mutex;
00372   c->base_stat = cache_evacuate_active_stat;
00373   CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
00374   c->buf = new_IOBufferData(iobuffer_size_to_index(nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
00375   c->vol = vol;
00376   c->f.evacuator = 1;
00377   c->earliest_key = zero_key;
00378   SET_CONTINUATION_HANDLER(c, &CacheVC::evacuateDocDone);
00379   return c;
00380 }
00381 
00382 int
00383 CacheVC::evacuateReadHead(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
00384 {
00385   // The evacuator vc shares the lock with the volition mutex
00386   ink_assert(vol->mutex->thread_holding == this_ethread());
00387   cancel_trigger();
00388   Doc *doc = (Doc *) buf->data();
00389 #ifdef HTTP_CACHE
00390   CacheHTTPInfo *alternate_tmp = 0;
00391 #endif
00392   if (!io.ok())
00393     goto Ldone;
00394   // a directory entry which is nolonger valid may have been overwritten
00395   if (!dir_valid(vol, &dir)) {
00396     last_collision = NULL;
00397     goto Lcollision;
00398   }
00399   if (doc->magic != DOC_MAGIC || !(doc->first_key == first_key))
00400     goto Lcollision;
00401 #ifdef HTTP_CACHE
00402   alternate_tmp = 0;
00403   if (doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen) {
00404     // its an http document
00405     if (this->load_http_info(&vector, doc) != doc->hlen) {
00406       Note("bad vector detected during evacuation");
00407       goto Ldone;
00408     }
00409     alternate_index = get_alternate_index(&vector, earliest_key);
00410     if (alternate_index < 0)
00411       goto Ldone;
00412     alternate_tmp = vector.get(alternate_index);
00413     doc_len = alternate_tmp->object_size_get();
00414     Debug("cache_evac", "evacuateReadHead http earliest %X first: %X len: %" PRId64,
00415           first_key.slice32(0), earliest_key.slice32(0), doc_len);
00416   } else
00417 #endif
00418   {
00419     // non-http document
00420     CacheKey next_key;
00421     next_CacheKey(&next_key, &doc->key);
00422     if (!(next_key == earliest_key))
00423       goto Ldone;
00424     doc_len = doc->total_len;
00425     DDebug("cache_evac",
00426           "evacuateReadHead non-http earliest %X first: %X len: %" PRId64, first_key.slice32(0), earliest_key.slice32(0), doc_len);
00427   }
00428   if (doc_len == total_len) {
00429     // the whole document has been evacuated. Insert the directory
00430     // entry in the directory.
00431     dir_lookaside_fixup(&earliest_key, vol);
00432     return free_CacheVC(this);
00433   }
00434   return EVENT_CONT;
00435 Lcollision:
00436   if (dir_probe(&first_key, vol, &dir, &last_collision)) {
00437     int ret = do_read_call(&first_key);
00438     if (ret == EVENT_RETURN)
00439       return handleEvent(AIO_EVENT_DONE, 0);
00440     return ret;
00441   }
00442 Ldone:
00443   dir_lookaside_remove(&earliest_key, vol);
00444   return free_CacheVC(this);
00445 }
00446 
00447 int
00448 CacheVC::evacuateDocDone(int /* event ATS_UNUSED */, Event */* e ATS_UNUSED */)
00449 {
00450   ink_assert(vol->mutex->thread_holding == this_ethread());
00451   Doc *doc = (Doc *) buf->data();
00452   DDebug("cache_evac", "evacuateDocDone %X o %d p %d new_o %d new_p %d",
00453         (int) key.slice32(0), (int) dir_offset(&overwrite_dir),
00454         (int) dir_phase(&overwrite_dir), (int) dir_offset(&dir), (int) dir_phase(&dir));
00455   int i = dir_evac_bucket(&overwrite_dir);
00456   // nasty beeping race condition, need to have the EvacuationBlock here
00457   EvacuationBlock *b = vol->evacuate[i].head;
00458   for (; b; b = b->link.next) {
00459     if (dir_offset(&b->dir) == dir_offset(&overwrite_dir)) {
00460 
00461       // If the document is single fragment (although not tied to the vector),
00462       // then we don't have to put the directory entry in the lookaside
00463       // buffer. But, we have no way of finding out if the document is
00464       // single fragment. doc->single_fragment() can be true for a multiple
00465       // fragment document since total_len and doc->len could be equal at
00466       // the time we write the fragment down. To be on the safe side, we
00467       // only overwrite the entry in the directory if its not a head.
00468       if (!dir_head(&overwrite_dir)) {
00469         // find the earliest key
00470         EvacuationKey *evac = &b->evac_frags;
00471         for (; evac && !(evac->key == doc->key); evac = evac->link.next);
00472         ink_assert(evac);
00473         if (!evac)
00474           break;
00475         if (evac->earliest_key.fold()) {
00476           DDebug("cache_evac", "evacdocdone: evacuating key %X earliest %X",
00477                 evac->key.slice32(0), evac->earliest_key.slice32(0));
00478           EvacuationBlock *eblock = 0;
00479           Dir dir_tmp;
00480           dir_lookaside_probe(&evac->earliest_key, vol, &dir_tmp, &eblock);
00481           if (eblock) {
00482             CacheVC *earliest_evac = eblock->earliest_evacuator;
00483             earliest_evac->total_len += doc->data_len();
00484             if (earliest_evac->total_len == earliest_evac->doc_len) {
00485               dir_lookaside_fixup(&evac->earliest_key, vol);
00486               free_CacheVC(earliest_evac);
00487             }
00488           }
00489         }
00490         dir_overwrite(&doc->key, vol, &dir, &overwrite_dir);
00491       }
00492       // if the tag in the overwrite_dir matches the first_key in the
00493       // document, then it has to be the vector. We gaurantee that
00494       // the first_key and the earliest_key will never collide (see
00495       // Cache::open_write). Once we know its the vector, we can
00496       // safely overwrite the first_key in the directory.
00497       if (dir_head(&overwrite_dir) && b->f.evacuate_head) {
00498         DDebug("cache_evac",
00499               "evacuateDocDone evacuate_head %X %X hlen %d offset %d",
00500               (int) key.slice32(0), (int) doc->key.slice32(0), doc->hlen, (int) dir_offset(&overwrite_dir));
00501 
00502         if (dir_compare_tag(&overwrite_dir, &doc->first_key)) {
00503           OpenDirEntry *cod;
00504           DDebug("cache_evac", "evacuating vector: %X %d",
00505                 (int) doc->first_key.slice32(0), (int) dir_offset(&overwrite_dir));
00506           if ((cod = vol->open_read(&doc->first_key))) {
00507             // writer  exists
00508             DDebug("cache_evac", "overwriting the open directory %X %d %d",
00509                   (int) doc->first_key.slice32(0), (int) dir_offset(&cod->first_dir), (int) dir_offset(&dir));
00510             cod->first_dir = dir;
00511 
00512           }
00513           if (dir_overwrite(&doc->first_key, vol, &dir, &overwrite_dir)) {
00514             int64_t o = dir_offset(&overwrite_dir), n = dir_offset(&dir);
00515             vol->ram_cache->fixup(&doc->first_key, (uint32_t)(o >> 32), (uint32_t)o, (uint32_t)(n >> 32), (uint32_t)n);
00516           }
00517         } else {
00518           DDebug("cache_evac", "evacuating earliest: %X %d", (int) doc->key.slice32(0), (int) dir_offset(&overwrite_dir));
00519           ink_assert(dir_compare_tag(&overwrite_dir, &doc->key));
00520           ink_assert(b->earliest_evacuator == this);
00521           total_len += doc->data_len();
00522           first_key = doc->first_key;
00523           earliest_dir = dir;
00524           if (dir_probe(&first_key, vol, &dir, &last_collision) > 0) {
00525             dir_lookaside_insert(b, vol, &earliest_dir);
00526             // read the vector
00527             SET_HANDLER(&CacheVC::evacuateReadHead);
00528             int ret = do_read_call(&first_key);
00529             if (ret == EVENT_RETURN)
00530               return handleEvent(AIO_EVENT_DONE, 0);
00531             return ret;
00532           }
00533         }
00534       }
00535       break;
00536     }
00537   }
00538   return free_CacheVC(this);
00539 }
00540 
00541 static int
00542 evacuate_fragments(CacheKey *key, CacheKey *earliest_key, int force, Vol *vol)
00543 {
00544   Dir dir, *last_collision = 0;
00545   int i = 0;
00546   while (dir_probe(key, vol, &dir, &last_collision)) {
00547     // next fragment cannot be a head...if it is, it must have been a
00548     // directory collision.
00549     if (dir_head(&dir)
00550 #if TS_USE_INTERIM_CACHE == 1
00551         || dir_ininterim(&dir)
00552 #endif
00553         )
00554       continue;
00555     EvacuationBlock *b = evacuation_block_exists(&dir, vol);
00556     if (!b) {
00557       b = new_EvacuationBlock(vol->mutex->thread_holding);
00558       b->dir = dir;
00559       b->evac_frags.key = *key;
00560       b->evac_frags.earliest_key = *earliest_key;
00561       vol->evacuate[dir_evac_bucket(&dir)].push(b);
00562       i++;
00563     } else {
00564       ink_assert(dir_offset(&dir) == dir_offset(&b->dir));
00565       ink_assert(dir_phase(&dir) == dir_phase(&b->dir));
00566       EvacuationKey *evac_frag = evacuationKeyAllocator.alloc();
00567       evac_frag->key = *key;
00568       evac_frag->earliest_key = *earliest_key;
00569       evac_frag->link.next = b->evac_frags.link.next;
00570       b->evac_frags.link.next = evac_frag;
00571     }
00572     if (force)
00573       b->readers = 0;
00574     DDebug("cache_evac",
00575           "next fragment %X Earliest: %X offset %d phase %d force %d",
00576           (int) key->slice32(0), (int) earliest_key->slice32(0), (int) dir_offset(&dir), (int) dir_phase(&dir), force);
00577   }
00578   return i;
00579 }
00580 
00581 int
00582 Vol::evacuateWrite(CacheVC *evacuator, int event, Event *e)
00583 {
00584   // push to front of aggregation write list, so it is written first
00585 
00586   evacuator->agg_len = round_to_approx_size(((Doc *)evacuator->buf->data())->len);
00587   agg_todo_size += evacuator->agg_len;
00588   /* insert the evacuator after all the other evacuators */
00589   CacheVC *cur = (CacheVC *) agg.head;
00590   CacheVC *after = NULL;
00591   for (; cur && cur->f.evacuator; cur = (CacheVC *) cur->link.next)
00592     after = cur;
00593   ink_assert(evacuator->agg_len <= AGG_SIZE);
00594   agg.insert(evacuator, after);
00595   return aggWrite(event, e);
00596 }
00597 
00598 int
00599 Vol::evacuateDocReadDone(int event, Event *e)
00600 {
00601   cancel_trigger();
00602   if (event != AIO_EVENT_DONE)
00603     return EVENT_DONE;
00604   ink_assert(is_io_in_progress());
00605   set_io_not_in_progress();
00606   ink_assert(mutex->thread_holding == this_ethread());
00607   Doc *doc = (Doc *) doc_evacuator->buf->data();
00608   CacheKey next_key;
00609   EvacuationBlock *b = NULL;
00610   if (doc->magic != DOC_MAGIC) {
00611     Debug("cache_evac", "DOC magic: %X %d",
00612           (int) dir_tag(&doc_evacuator->overwrite_dir), (int) dir_offset(&doc_evacuator->overwrite_dir));
00613     ink_assert(doc->magic == DOC_MAGIC);
00614     goto Ldone;
00615   }
00616   DDebug("cache_evac", "evacuateDocReadDone %X offset %d",
00617         (int) doc->key.slice32(0), (int) dir_offset(&doc_evacuator->overwrite_dir));
00618 
00619   b = evacuate[dir_evac_bucket(&doc_evacuator->overwrite_dir)].head;
00620   while (b) {
00621     if (dir_offset(&b->dir) == dir_offset(&doc_evacuator->overwrite_dir))
00622       break;
00623     b = b->link.next;
00624   }
00625   if (!b)
00626     goto Ldone;
00627   if ((b->f.pinned && !b->readers) && doc->pinned < (uint32_t) (ink_get_based_hrtime() / HRTIME_SECOND))
00628     goto Ldone;
00629 
00630   if (dir_head(&b->dir) && b->f.evacuate_head) {
00631     ink_assert(!b->evac_frags.key.fold());
00632     // if its a head (vector), evacuation is real simple...we just
00633     // need to write this vector down and overwrite the directory entry.
00634     if (dir_compare_tag(&b->dir, &doc->first_key)) {
00635       doc_evacuator->key = doc->first_key;
00636       b->evac_frags.key = doc->first_key;
00637       DDebug("cache_evac", "evacuating vector %X offset %d",
00638             (int) doc->first_key.slice32(0), (int) dir_offset(&doc_evacuator->overwrite_dir));
00639       b->f.unused = 57;
00640     } else {
00641       // if its an earliest fragment (alternate) evacuation, things get
00642       // a little tricky. We have to propagate the earliest key to the next
00643       // fragments for this alternate. The last fragment to be evacuated
00644       // fixes up the lookaside buffer.
00645       doc_evacuator->key = doc->key;
00646       doc_evacuator->earliest_key = doc->key;
00647       b->evac_frags.key = doc->key;
00648       b->evac_frags.earliest_key = doc->key;
00649       b->earliest_evacuator = doc_evacuator;
00650       DDebug("cache_evac", "evacuating earliest %X %X evac: %p offset: %d",
00651             (int) b->evac_frags.key.slice32(0), (int) doc->key.slice32(0),
00652             doc_evacuator, (int) dir_offset(&doc_evacuator->overwrite_dir));
00653       b->f.unused = 67;
00654     }
00655   } else {
00656     // find which key matches the document
00657     EvacuationKey *ek = &b->evac_frags;
00658     for (; ek && !(ek->key == doc->key); ek = ek->link.next);
00659     if (!ek) {
00660       b->f.unused = 77;
00661       goto Ldone;
00662     }
00663     doc_evacuator->key = ek->key;
00664     doc_evacuator->earliest_key = ek->earliest_key;
00665     DDebug("cache_evac", "evacuateDocReadDone key: %X earliest: %X",
00666           (int) ek->key.slice32(0), (int) ek->earliest_key.slice32(0));
00667     b->f.unused = 87;
00668   }
00669   // if the tag in the c->dir does match the first_key in the
00670   // document, then it has to be the earliest fragment. We gaurantee that
00671   // the first_key and the earliest_key will never collide (see
00672   // Cache::open_write).
00673   if (!dir_head(&b->dir) || !dir_compare_tag(&b->dir, &doc->first_key)) {
00674     next_CacheKey(&next_key, &doc->key);
00675     evacuate_fragments(&next_key, &doc_evacuator->earliest_key, !b->readers, this);
00676   }
00677   return evacuateWrite(doc_evacuator, event, e);
00678 Ldone:
00679   free_CacheVC(doc_evacuator);
00680   doc_evacuator = 0;
00681   return aggWrite(event, e);
00682 }
00683 
00684 int
00685 Vol::evac_range(off_t low, off_t high, int evac_phase)
00686 {
00687   off_t s = offset_to_vol_offset(this, low);
00688   off_t e = offset_to_vol_offset(this, high);
00689   int si = dir_offset_evac_bucket(s);
00690   int ei = dir_offset_evac_bucket(e);
00691 
00692   for (int i = si; i <= ei; i++) {
00693     EvacuationBlock *b = evacuate[i].head;
00694     EvacuationBlock *first = 0;
00695     int64_t first_offset = INT64_MAX;
00696     for (; b; b = b->link.next) {
00697       int64_t offset = dir_offset(&b->dir);
00698       int phase = dir_phase(&b->dir);
00699       if (offset >= s && offset < e && !b->f.done && phase == evac_phase)
00700         if (offset < first_offset) {
00701           first = b;
00702           first_offset = offset;
00703         }
00704     }
00705     if (first) {
00706       first->f.done = 1;
00707       io.aiocb.aio_fildes = fd;
00708       io.aiocb.aio_nbytes = dir_approx_size(&first->dir);
00709       io.aiocb.aio_offset = vol_offset(this, &first->dir);
00710       if ((off_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > (off_t)(skip + len))
00711         io.aiocb.aio_nbytes = skip + len - io.aiocb.aio_offset;
00712       doc_evacuator = new_DocEvacuator(io.aiocb.aio_nbytes, this);
00713       doc_evacuator->overwrite_dir = first->dir;
00714 
00715       io.aiocb.aio_buf = doc_evacuator->buf->data();
00716       io.action = this;
00717       io.thread = AIO_CALLBACK_THREAD_ANY;
00718       DDebug("cache_evac", "evac_range evacuating %X %d", (int)dir_tag(&first->dir), (int)dir_offset(&first->dir));
00719       SET_HANDLER(&Vol::evacuateDocReadDone);
00720       ink_assert(ink_aio_read(&io) >= 0);
00721       return -1;
00722     }
00723   }
00724   return 0;
00725 }
00726 
00727 
00728 static int
00729 agg_copy(char *p, CacheVC *vc)
00730 {
00731   Vol *vol = vc->vol;
00732   off_t o = vol->header->write_pos + vol->agg_buf_pos;
00733 
00734   if (!vc->f.evacuator) {
00735     Doc *doc = (Doc *) p;
00736     IOBufferBlock *res_alt_blk = 0;
00737 
00738     uint32_t len = vc->write_len + vc->header_len + vc->frag_len + sizeofDoc;
00739     ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeofDoc);
00740     ink_assert(vol->round_to_approx_size(len) == vc->agg_len);
00741     // update copy of directory entry for this document
00742     dir_set_approx_size(&vc->dir, vc->agg_len);
00743     dir_set_offset(&vc->dir, offset_to_vol_offset(vol, o));
00744     ink_assert(vol_offset(vol, &vc->dir) < (vol->skip + vol->len));
00745 #if TS_USE_INTERIM_CACHE == 1
00746     dir_set_indisk(&vc->dir);
00747 #endif
00748     dir_set_phase(&vc->dir, vol->header->phase);
00749 
00750     // fill in document header
00751     doc->magic = DOC_MAGIC;
00752     doc->len = len;
00753     doc->hlen = vc->header_len;
00754     doc->doc_type = vc->frag_type;
00755     doc->v_major = CACHE_DB_MAJOR_VERSION;
00756     doc->v_minor = CACHE_DB_MINOR_VERSION;
00757     doc->unused = 0; // force this for forward compatibility.
00758     doc->total_len = vc->total_len;
00759     doc->first_key = vc->first_key;
00760     doc->sync_serial = vol->header->sync_serial;
00761     vc->write_serial = doc->write_serial = vol->header->write_serial;
00762     doc->checksum = DOC_NO_CHECKSUM;
00763     if (vc->pin_in_cache) {
00764       dir_set_pinned(&vc->dir, 1);
00765       doc->pinned = (uint32_t) (ink_get_based_hrtime() / HRTIME_SECOND) + vc->pin_in_cache;
00766     } else {
00767       dir_set_pinned(&vc->dir, 0);
00768       doc->pinned = 0;
00769     }
00770 
00771     if (vc->f.use_first_key) {
00772       if (doc->data_len()
00773 #ifdef HTTP_CACHE
00774                   || vc->f.allow_empty_doc
00775 #endif
00776                   )
00777         doc->key = vc->earliest_key;
00778       else // the vector is being written by itself
00779         prev_CacheKey(&doc->key, &vc->earliest_key);
00780       dir_set_head(&vc->dir, true);
00781     } else {
00782       doc->key = vc->key;
00783       dir_set_head(&vc->dir, !vc->fragment);
00784     }
00785 
00786 #ifdef HTTP_CACHE
00787     if (vc->f.rewrite_resident_alt) {
00788       ink_assert(vc->f.use_first_key);
00789       Doc *res_doc = (Doc *) vc->first_buf->data();
00790       res_alt_blk = new_IOBufferBlock(vc->first_buf, res_doc->data_len(), sizeofDoc + res_doc->hlen);
00791       doc->key = res_doc->key;
00792       doc->total_len = res_doc->data_len();
00793     }
00794 #endif
00795     // update the new_info object_key, and total_len and dirinfo
00796     if (vc->header_len) {
00797       ink_assert(vc->f.use_first_key);
00798 #ifdef HTTP_CACHE
00799       if (vc->frag_type == CACHE_FRAG_TYPE_HTTP) {
00800         ink_assert(vc->write_vector->count() > 0);
00801         if (!vc->f.update && !vc->f.evac_vector) {
00802           ink_assert(!(vc->first_key == zero_key));
00803           CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
00804           http_info->object_size_set(vc->total_len);
00805         }
00806         // update + data_written =>  Update case (b)
00807         // need to change the old alternate's object length
00808         if (vc->f.update && vc->total_len) {
00809           CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
00810           http_info->object_size_set(vc->total_len);
00811         }
00812         ink_assert(!(((uintptr_t) &doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK));
00813         ink_assert(vc->header_len == vc->write_vector->marshal(doc->hdr(), vc->header_len));
00814       } else
00815 #endif
00816         memcpy(doc->hdr(), vc->header_to_write, vc->header_len);
00817       // the single fragment flag is not used in the write call.
00818       // putting it in for completeness.
00819       vc->f.single_fragment = doc->single_fragment();
00820     }
00821     // move data
00822     if (vc->write_len) {
00823       {
00824         ProxyMutex *mutex ATS_UNUSED = vc->vol->mutex;
00825         ink_assert(mutex->thread_holding == this_ethread());
00826         CACHE_DEBUG_SUM_DYN_STAT(cache_write_bytes_stat, vc->write_len);
00827       }
00828 #ifdef HTTP_CACHE
00829       if (vc->f.rewrite_resident_alt)
00830         iobufferblock_memcpy(doc->data(), vc->write_len, res_alt_blk, 0);
00831       else
00832 #endif
00833         iobufferblock_memcpy(doc->data(), vc->write_len, vc->blocks, vc->offset);
00834 #ifdef VERIFY_JTEST_DATA
00835       if (f.use_first_key && header_len) {
00836         int ib = 0, xd = 0;
00837         char xx[500];
00838         new_info.request_get().url_get().print(xx, 500, &ib, &xd);
00839         char *x = xx;
00840         for (int q = 0; q < 3; q++)
00841           x = strchr(x + 1, '/');
00842         ink_assert(!memcmp(doc->hdr(), x, ib - (x - xx)));
00843       }
00844 #endif
00845 
00846     }
00847     if (cache_config_enable_checksum) {
00848       doc->checksum = 0;
00849       for (char *b = doc->hdr(); b < (char *) doc + doc->len; b++)
00850         doc->checksum += *b;
00851     }
00852     if (vc->frag_type == CACHE_FRAG_TYPE_HTTP && vc->f.single_fragment)
00853       ink_assert(doc->hlen);
00854 
00855     if (res_alt_blk)
00856       res_alt_blk->free();
00857 
00858     return vc->agg_len;
00859   } else {
00860     // for evacuated documents, copy the data, and update directory
00861     Doc *doc = (Doc *) vc->buf->data();
00862     int l = vc->vol->round_to_approx_size(doc->len);
00863     {
00864       ProxyMutex *mutex ATS_UNUSED = vc->vol->mutex;
00865       ink_assert(mutex->thread_holding == this_ethread());
00866       CACHE_DEBUG_INCREMENT_DYN_STAT(cache_gc_frags_evacuated_stat);
00867       CACHE_DEBUG_SUM_DYN_STAT(cache_gc_bytes_evacuated_stat, l);
00868     }
00869 
00870     doc->sync_serial = vc->vol->header->sync_serial;
00871     doc->write_serial = vc->vol->header->write_serial;
00872 
00873     memcpy(p, doc, doc->len);
00874 
00875     vc->dir = vc->overwrite_dir;
00876     dir_set_offset(&vc->dir, offset_to_vol_offset(vc->vol, o));
00877     dir_set_phase(&vc->dir, vc->vol->header->phase);
00878 #if TS_USE_INTERIM_CACHE == 1
00879     dir_set_indisk(&vc->dir);
00880 #endif
00881     return l;
00882   }
00883 }
00884 
00885 inline void
00886 Vol::evacuate_cleanup_blocks(int i)
00887 {
00888   EvacuationBlock *b = evacuate[i].head;
00889   while (b) {
00890     if (b->f.done &&
00891         ((header->phase != dir_phase(&b->dir) &&
00892           header->write_pos > vol_offset(this, &b->dir)) ||
00893          (header->phase == dir_phase(&b->dir) && header->write_pos <= vol_offset(this, &b->dir)))) {
00894       EvacuationBlock *x = b;
00895       DDebug("cache_evac", "evacuate cleanup free %X offset %d",
00896             (int) b->evac_frags.key.slice32(0), (int) dir_offset(&b->dir));
00897       b = b->link.next;
00898       evacuate[i].remove(x);
00899       free_EvacuationBlock(x, mutex->thread_holding);
00900       continue;
00901     }
00902     b = b->link.next;
00903   }
00904 }
00905 
00906 void
00907 Vol::evacuate_cleanup()
00908 {
00909   int64_t eo = ((header->write_pos - start) / CACHE_BLOCK_SIZE) + 1;
00910   int64_t e = dir_offset_evac_bucket(eo);
00911   int64_t sx = e - (evacuate_size / PIN_SCAN_EVERY) - 1;
00912   int64_t s = sx;
00913   int i;
00914 
00915   if (e > evacuate_size)
00916     e = evacuate_size;
00917   if (sx < 0)
00918     s = 0;
00919   for (i = s; i < e; i++)
00920     evacuate_cleanup_blocks(i);
00921 
00922   // if we have wrapped, handle the end bit
00923   if (sx <= 0) {
00924     s = evacuate_size + sx - 2;
00925     if (s < 0)
00926       s = 0;
00927     for (i = s; i < evacuate_size; i++)
00928       evacuate_cleanup_blocks(i);
00929   }
00930 }
00931 
00932 void
00933 Vol::periodic_scan()
00934 {
00935   evacuate_cleanup();
00936   scan_for_pinned_documents();
00937   if (header->write_pos == start)
00938     scan_pos = start;
00939   scan_pos += len / PIN_SCAN_EVERY;
00940 }
00941 
00942 void
00943 Vol::agg_wrap()
00944 {
00945   header->write_pos = start;
00946   header->phase = !header->phase;
00947 
00948   header->cycle++;
00949   header->agg_pos = header->write_pos;
00950   dir_lookaside_cleanup(this);
00951   dir_clean_vol(this);
00952   periodic_scan();
00953 }
00954 
00955 /* NOTE: This state can be called by an AIO thread, so DON'T DON'T
00956    DON'T schedule any events on this thread using VC_SCHED_XXX or
00957    mutex->thread_holding->schedule_xxx_local(). ALWAYS use
00958    eventProcessor.schedule_xxx().
00959    Also, make sure that any functions called by this also use
00960    the eventProcessor to schedule events
00961 */
00962 int
00963 Vol::aggWrite(int event, void */* e ATS_UNUSED */)
00964 {
00965   ink_assert(!is_io_in_progress());
00966 
00967   Que(CacheVC, link) tocall;
00968   CacheVC *c;
00969 
00970   cancel_trigger();
00971 
00972 Lagain:
00973   // calculate length of aggregated write
00974   for (c = (CacheVC *) agg.head; c;) {
00975     int writelen = c->agg_len;
00976     // [amc] this is checked multiple places, on here was it strictly less.
00977     ink_assert(writelen <= AGG_SIZE);
00978     if (agg_buf_pos + writelen > AGG_SIZE ||
00979         header->write_pos + agg_buf_pos + writelen > (skip + len))
00980       break;
00981     DDebug("agg_read", "copying: %d, %" PRIu64 ", key: %d",
00982           agg_buf_pos, header->write_pos + agg_buf_pos, c->first_key.slice32(0));
00983     int wrotelen = agg_copy(agg_buffer + agg_buf_pos, c);
00984     ink_assert(writelen == wrotelen);
00985     agg_todo_size -= writelen;
00986     agg_buf_pos += writelen;
00987     CacheVC *n = (CacheVC *)c->link.next;
00988     agg.dequeue();
00989     if (c->f.sync && c->f.use_first_key) {
00990       CacheVC *last = sync.tail;
00991       while (last && UINT_WRAP_LT(c->write_serial, last->write_serial))
00992         last = (CacheVC*)last->link.prev;
00993       sync.insert(c, last);
00994     } else if (c->f.evacuator)
00995       c->handleEvent(AIO_EVENT_DONE, 0);
00996     else
00997       tocall.enqueue(c);
00998     c = n;
00999   }
01000 
01001   // if we got nothing...
01002   if (!agg_buf_pos) {
01003     if (!agg.head && !sync.head) // nothing to get
01004       return EVENT_CONT;
01005     if (header->write_pos == start) {
01006       // write aggregation too long, bad bad, punt on everything.
01007       Note("write aggregation exceeds vol size");
01008       ink_assert(!tocall.head);
01009       ink_assert(false);
01010       while ((c = agg.dequeue())) {
01011         agg_todo_size -= c->agg_len;
01012         if (c->initial_thread != NULL)
01013           c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
01014         else
01015           eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE);
01016       }
01017       return EVENT_CONT;
01018     }
01019     // start back
01020     if (agg.head) {
01021       agg_wrap();
01022       goto Lagain;
01023     }
01024   }
01025 
01026   // evacuate space
01027   off_t end = header->write_pos + agg_buf_pos + EVACUATION_SIZE;
01028   if (evac_range(header->write_pos, end, !header->phase) < 0)
01029     goto Lwait;
01030   if (end > skip + len)
01031     if (evac_range(start, start + (end - (skip + len)), header->phase) < 0)
01032       goto Lwait;
01033 
01034   // if agg.head, then we are near the end of the disk, so
01035   // write down the aggregation in whatever size it is.
01036   if (agg_buf_pos < AGG_HIGH_WATER && !agg.head && !sync.head && !dir_sync_waiting)
01037     goto Lwait;
01038 
01039   // write sync marker
01040   if (!agg_buf_pos) {
01041     ink_assert(sync.head);
01042     int l = round_to_approx_size(sizeof(Doc));
01043     agg_buf_pos = l;
01044     Doc *d = (Doc*)agg_buffer;
01045     memset(d, 0, sizeof(Doc));
01046     d->magic = DOC_MAGIC;
01047     d->len = l;
01048     d->sync_serial = header->sync_serial;
01049     d->write_serial = header->write_serial;
01050   }
01051 
01052   // set write limit
01053   header->agg_pos = header->write_pos + agg_buf_pos;
01054 
01055   io.aiocb.aio_fildes = fd;
01056   io.aiocb.aio_offset = header->write_pos;
01057   io.aiocb.aio_buf = agg_buffer;
01058   io.aiocb.aio_nbytes = agg_buf_pos;
01059   io.action = this;
01060   /*
01061     Callback on AIO thread so that we can issue a new write ASAP
01062     as all writes are serialized in the volume.  This is not necessary
01063     for reads proceed independently.
01064    */
01065   io.thread = AIO_CALLBACK_THREAD_AIO;
01066   SET_HANDLER(&Vol::aggWriteDone);
01067   ink_aio_write(&io);
01068 
01069 Lwait:
01070   int ret = EVENT_CONT;
01071   while ((c = tocall.dequeue())) {
01072     if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding)
01073       ret = EVENT_RETURN;
01074     else if (c->initial_thread != NULL)
01075       c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
01076     else
01077       eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE);
01078   }
01079   return ret;
01080 }
01081 
01082 int
01083 CacheVC::openWriteCloseDir(int /* event ATS_UNUSED */, Event */* e ATS_UNUSED */)
01084 {
01085   cancel_trigger();
01086   {
01087     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
01088     if (!lock) {
01089       SET_HANDLER(&CacheVC::openWriteCloseDir);
01090       ink_assert(!is_io_in_progress());
01091       VC_SCHED_LOCK_RETRY();
01092     }
01093     vol->close_write(this);
01094     if (closed < 0 && fragment)
01095       dir_delete(&earliest_key, vol, &earliest_dir);
01096   }
01097   if (is_debug_tag_set("cache_update")) {
01098     if (f.update && closed > 0) {
01099       if (!total_len && alternate_index != CACHE_ALT_REMOVED) {
01100         Debug("cache_update", "header only %d (%" PRIu64 ", %" PRIu64 ")\n",
01101               DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0], update_key.b[1]);
01102 
01103       } else if (total_len && alternate_index != CACHE_ALT_REMOVED) {
01104         Debug("cache_update", "header body, %d, (%" PRIu64 ", %" PRIu64 "), (%" PRIu64 ", %" PRIu64 ")\n",
01105               DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0], update_key.b[1], earliest_key.b[0], earliest_key.b[1]);
01106       } else if (!total_len && alternate_index == CACHE_ALT_REMOVED) {
01107         Debug("cache_update", "alt delete, %d, (%" PRIu64 ", %" PRIu64 ")\n",
01108               DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0], update_key.b[1]);
01109       }
01110     }
01111   }
01112   // update the appropriate stat variable
01113   // These variables may not give the current no of documents with
01114   // one, two and three or more fragments. This is because for
01115   // updates we dont decrement the variable corresponding the old
01116   // size of the document
01117   if ((closed == 1) && (total_len > 0
01118 #ifdef HTTP_CACHE
01119                   || f.allow_empty_doc
01120 #endif
01121                   )) {
01122     DDebug("cache_stats", "Fragment = %d", fragment);
01123     switch (fragment) {
01124       case 0: CACHE_INCREMENT_DYN_STAT(cache_single_fragment_document_count_stat); break;
01125       case 1: CACHE_INCREMENT_DYN_STAT(cache_two_fragment_document_count_stat); break;
01126       default: CACHE_INCREMENT_DYN_STAT(cache_three_plus_plus_fragment_document_count_stat); break;
01127     }
01128   }
01129   if (f.close_complete) {
01130     recursive++;
01131     ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
01132     vio._cont->handleEvent(VC_EVENT_WRITE_COMPLETE, (void *) &vio);
01133     recursive--;
01134   }
01135   return free_CacheVC(this);
01136 }
01137 
01138 int
01139 CacheVC::openWriteCloseHeadDone(int event, Event *e)
01140 {
01141   if (event == AIO_EVENT_DONE)
01142     set_io_not_in_progress();
01143   else if (is_io_in_progress())
01144     return EVENT_CONT;
01145   {
01146     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
01147     if (!lock)
01148       VC_LOCK_RETRY_EVENT();
01149     od->writing_vec = 0;
01150     if (!io.ok())
01151       goto Lclose;
01152     ink_assert(f.use_first_key);
01153     if (!od->dont_update_directory) {
01154       if (dir_is_empty(&od->first_dir)) {
01155         dir_insert(&first_key, vol, &dir);
01156       } else {
01157         // multiple fragment vector write
01158         dir_overwrite(&first_key, vol, &dir, &od->first_dir, false);
01159         // insert moved resident alternate
01160         if (od->move_resident_alt) {
01161           if (dir_valid(vol, &od->single_doc_dir))
01162             dir_insert(&od->single_doc_key, vol, &od->single_doc_dir);
01163           od->move_resident_alt = 0;
01164         }
01165       }
01166       od->first_dir = dir;
01167       if (frag_type == CACHE_FRAG_TYPE_HTTP && f.single_fragment) {
01168         // fragment is tied to the vector
01169         od->move_resident_alt = 1;
01170         if (!f.rewrite_resident_alt) {
01171           od->single_doc_key = earliest_key;
01172         }
01173         dir_assign(&od->single_doc_dir, &dir);
01174         dir_set_tag(&od->single_doc_dir, od->single_doc_key.slice32(2));
01175       }
01176     }
01177   }
01178 Lclose:
01179   return openWriteCloseDir(event, e);
01180 }
01181 
01182 int
01183 CacheVC::openWriteCloseHead(int event, Event *e)
01184 {
01185   cancel_trigger();
01186   f.use_first_key = 1;
01187   if (io.ok())
01188     ink_assert(fragment || (length == (int64_t)total_len));
01189   else
01190     return openWriteCloseDir(event, e);
01191   if (f.data_done)
01192     write_len = 0;
01193   else
01194     write_len = length;
01195 #ifdef HTTP_CACHE
01196   if (frag_type == CACHE_FRAG_TYPE_HTTP) {
01197     SET_HANDLER(&CacheVC::updateVector);
01198     return updateVector(EVENT_IMMEDIATE, 0);
01199   } else {
01200 #endif
01201     header_len = header_to_write_len;
01202     SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
01203     return do_write_lock();
01204 #ifdef HTTP_CACHE
01205   }
01206 #endif
01207 }
01208 
01209 int
01210 CacheVC::openWriteCloseDataDone(int event, Event *e)
01211 {
01212   int ret = 0;
01213   cancel_trigger();
01214 
01215   if (event == AIO_EVENT_DONE)
01216     set_io_not_in_progress();
01217   else if (is_io_in_progress())
01218     return EVENT_CONT;
01219   if (!io.ok())
01220     return openWriteCloseDir(event, e);
01221   {
01222     CACHE_TRY_LOCK(lock, vol->mutex, this_ethread());
01223     if (!lock)
01224       VC_LOCK_RETRY_EVENT();
01225     if (!fragment) {
01226       ink_assert(key == earliest_key);
01227       earliest_dir = dir;
01228 #ifdef HTTP_CACHE
01229     } else {
01230       // Store the offset only if there is a table.
01231       // Currently there is no alt (and thence no table) for non-HTTP.
01232       if (alternate.valid())
01233         alternate.push_frag_offset(write_pos);
01234 #endif
01235     }
01236     fragment++;
01237     write_pos += write_len;
01238     dir_insert(&key, vol, &dir);
01239     blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
01240     next_CacheKey(&key, &key);
01241     if (length) {
01242       write_len = length;
01243       if (write_len > MAX_FRAG_SIZE)
01244         write_len = MAX_FRAG_SIZE;
01245       if ((ret = do_write_call()) == EVENT_RETURN)
01246         goto Lcallreturn;
01247       return ret;
01248     }
01249     f.data_done = 1;
01250     return openWriteCloseHead(event, e); // must be called under vol lock from here
01251   }
01252 Lcallreturn:
01253   return handleEvent(AIO_EVENT_DONE, 0);
01254 }
01255 
01256 int
01257 CacheVC::openWriteClose(int event, Event *e)
01258 {
01259   cancel_trigger();
01260   if (is_io_in_progress()) {
01261     if (event != AIO_EVENT_DONE)
01262       return EVENT_CONT;
01263     set_io_not_in_progress();
01264     if (!io.ok())
01265       return openWriteCloseDir(event, e);
01266   }
01267   if (closed > 0
01268 #ifdef HTTP_CACHE
01269                   || f.allow_empty_doc
01270 #endif
01271                   ) {
01272     if (total_len == 0) {
01273 #ifdef HTTP_CACHE
01274       if (f.update || f.allow_empty_doc) {
01275         return updateVector(event, e);
01276       } else {
01277         // If we've been CLOSE'd but nothing has been written then
01278         // this close is transformed into an abort.
01279         closed = -1;
01280         return openWriteCloseDir(event, e);
01281       }
01282 #else
01283       return openWriteCloseDir(event, e);
01284 #endif
01285     }
01286     if (length && (fragment || length > MAX_FRAG_SIZE)) {
01287       SET_HANDLER(&CacheVC::openWriteCloseDataDone);
01288       write_len = length;
01289       if (write_len > MAX_FRAG_SIZE)
01290         write_len = MAX_FRAG_SIZE;
01291       return do_write_lock_call();
01292     } else
01293       return openWriteCloseHead(event, e);
01294   } else
01295     return openWriteCloseDir(event, e);
01296 }
01297 
01298 int
01299 CacheVC::openWriteWriteDone(int event, Event *e)
01300 {
01301   cancel_trigger();
01302   if (event == AIO_EVENT_DONE)
01303     set_io_not_in_progress();
01304   else
01305     if (is_io_in_progress())
01306       return EVENT_CONT;
01307   // In the event of VC_EVENT_ERROR, the cont must do an io_close
01308   if (!io.ok()) {
01309     if (closed) {
01310       closed = -1;
01311       return die();
01312     }
01313     SET_HANDLER(&CacheVC::openWriteMain);
01314     return calluser(VC_EVENT_ERROR);
01315   }
01316   {
01317     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
01318     if (!lock)
01319       VC_LOCK_RETRY_EVENT();
01320     // store the earliest directory. Need to remove the earliest dir
01321     // in case the writer aborts.
01322     if (!fragment) {
01323       ink_assert(key == earliest_key);
01324       earliest_dir = dir;
01325 #ifdef HTTP_CACHE
01326     } else {
01327       // Store the offset only if there is a table.
01328       // Currently there is no alt (and thence no table) for non-HTTP.
01329       if (alternate.valid())
01330         alternate.push_frag_offset(write_pos);
01331 #endif
01332     }
01333     ++fragment;
01334     write_pos += write_len;
01335     dir_insert(&key, vol, &dir);
01336     DDebug("cache_insert", "WriteDone: %X, %X, %d", key.slice32(0), first_key.slice32(0), write_len);
01337     blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
01338     next_CacheKey(&key, &key);
01339   }
01340   if (closed)
01341     return die();
01342   SET_HANDLER(&CacheVC::openWriteMain);
01343   return openWriteMain(event, e);
01344 }
01345 
01346 static inline int target_fragment_size() {
01347   return cache_config_target_fragment_size - sizeofDoc;
01348 }
01349 
01350 int
01351 CacheVC::openWriteMain(int /* event ATS_UNUSED */, Event */* e ATS_UNUSED */)
01352 {
01353   cancel_trigger();
01354   int called_user = 0;
01355   ink_assert(!is_io_in_progress());
01356 Lagain:
01357   if (!vio.buffer.writer()) {
01358     if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE)
01359       return EVENT_DONE;
01360     if (!vio.buffer.writer())
01361       return EVENT_CONT;
01362   }
01363   if (vio.ntodo() <= 0) {
01364     called_user = 1;
01365     if (calluser(VC_EVENT_WRITE_COMPLETE) == EVENT_DONE)
01366       return EVENT_DONE;
01367     ink_assert(!f.close_complete || !"close expected after write COMPLETE");
01368     if (vio.ntodo() <= 0)
01369       return EVENT_CONT;
01370   }
01371   int64_t ntodo = (int64_t)(vio.ntodo() + length);
01372   int64_t total_avail = vio.buffer.reader()->read_avail();
01373   int64_t avail = total_avail;
01374   int64_t towrite = avail + length;
01375   if (towrite > ntodo) {
01376     avail -= (towrite - ntodo);
01377     towrite = ntodo;
01378   }
01379   if (towrite > MAX_FRAG_SIZE) {
01380     avail -= (towrite - MAX_FRAG_SIZE);
01381     towrite = MAX_FRAG_SIZE;
01382   }
01383   if (!blocks && towrite) {
01384     blocks = vio.buffer.reader()->block;
01385     offset = vio.buffer.reader()->start_offset;
01386   }
01387   if (avail > 0) {
01388     vio.buffer.reader()->consume(avail);
01389     vio.ndone += avail;
01390     total_len += avail;
01391   }
01392   length = (uint64_t)towrite;
01393   if (length > target_fragment_size() && 
01394       (length < target_fragment_size() + target_fragment_size() / 4))
01395     write_len = target_fragment_size();
01396   else
01397     write_len = length;
01398   bool not_writing = towrite != ntodo && towrite < target_fragment_size();
01399   if (!called_user) {
01400     if (not_writing) {
01401       called_user = 1;
01402       if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE)
01403         return EVENT_DONE;
01404       goto Lagain;
01405     } else if (vio.ntodo() <= 0)
01406       goto Lagain;
01407   }
01408   if (not_writing)
01409     return EVENT_CONT;
01410   if (towrite == ntodo && f.close_complete) {
01411     closed = 1;
01412     SET_HANDLER(&CacheVC::openWriteClose);
01413     return openWriteClose(EVENT_NONE, NULL);
01414   }
01415   SET_HANDLER(&CacheVC::openWriteWriteDone);
01416   return do_write_lock_call();
01417 }
01418 
01419 // begin overwrite
01420 int
01421 CacheVC::openWriteOverwrite(int event, Event *e)
01422 {
01423   cancel_trigger();
01424   if (event != AIO_EVENT_DONE) {
01425     if (event == EVENT_IMMEDIATE)
01426       last_collision = 0;
01427   } else {
01428     Doc *doc = NULL;
01429     set_io_not_in_progress();
01430     if (_action.cancelled)
01431       return openWriteCloseDir(event, e);
01432     if (!io.ok())
01433       goto Ldone;
01434     doc = (Doc *) buf->data();
01435     if (!(doc->first_key == first_key))
01436       goto Lcollision;
01437     od->first_dir = dir;
01438     first_buf = buf;
01439     goto Ldone;
01440   }
01441 Lcollision:
01442   {
01443     CACHE_TRY_LOCK(lock, vol->mutex, this_ethread());
01444     if (!lock)
01445       VC_LOCK_RETRY_EVENT();
01446     int res = dir_probe(&first_key, vol, &dir, &last_collision);
01447     if (res > 0) {
01448       if ((res = do_read_call(&first_key)) == EVENT_RETURN)
01449         goto Lcallreturn;
01450       return res;
01451     }
01452   }
01453 Ldone:
01454   SET_HANDLER(&CacheVC::openWriteMain);
01455   return callcont(CACHE_EVENT_OPEN_WRITE);
01456 Lcallreturn:
01457   return handleEvent(AIO_EVENT_DONE, 0); // hopefully a tail call
01458 }
01459 
01460 #ifdef HTTP_CACHE
01461 // openWriteStartDone handles vector read (addition of alternates)
01462 // and lock misses
01463 int
01464 CacheVC::openWriteStartDone(int event, Event *e)
01465 {
01466   intptr_t err = ECACHE_NO_DOC;
01467   cancel_trigger();
01468   if (is_io_in_progress()) {
01469     if (event != AIO_EVENT_DONE)
01470       return EVENT_CONT;
01471     set_io_not_in_progress();
01472   }
01473   {
01474     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
01475     if (!lock)
01476       VC_LOCK_RETRY_EVENT();
01477 
01478     if (_action.cancelled && (!od || !od->has_multiple_writers()))
01479       goto Lcancel;
01480 
01481     if (event == AIO_EVENT_DONE) {        // vector read done
01482       Doc *doc = (Doc *) buf->data();
01483       if (!io.ok()) {
01484         err = ECACHE_READ_FAIL;
01485         goto Lfailure;
01486       }
01487 
01488       /* INKqa07123.
01489          A directory entry which is no longer valid may have been overwritten.
01490          We need to start afresh from the beginning by setting last_collision
01491          to NULL.
01492        */
01493       if (!dir_valid(vol, &dir)) {
01494         DDebug("cache_write",
01495                "OpenReadStartDone: Dir not valid: Write Head: %" PRId64 ", Dir: %" PRId64,
01496                (int64_t)offset_to_vol_offset(vol, vol->header->write_pos), dir_offset(&dir));
01497         last_collision = NULL;
01498         goto Lcollision;
01499       }
01500       if (!(doc->first_key == first_key))
01501         goto Lcollision;
01502 
01503       if (doc->magic != DOC_MAGIC || !doc->hlen ||
01504           this->load_http_info(write_vector, doc, buf) != doc->hlen) {
01505         err = ECACHE_BAD_META_DATA;
01506 #if TS_USE_INTERIM_CACHE == 1
01507         if (dir_ininterim(&dir)) {
01508           dir_delete(&first_key, vol, &dir);
01509           last_collision = NULL;
01510           goto Lcollision;
01511         }
01512 #endif
01513         goto Lfailure;
01514       }
01515       ink_assert(write_vector->count() > 0);
01516 #if TS_USE_INTERIM_CACHE == 1
01517 Lagain:
01518       if (dir_ininterim(&dir)) {
01519         dir_delete(&first_key, vol, &dir);
01520         last_collision = NULL;
01521         if (dir_probe(&first_key, vol, &dir, &last_collision)) {
01522           goto Lagain;
01523         } else {
01524           if (f.update) {
01525             // fail update because vector has been GC'd
01526             goto Lfailure;
01527           }
01528         }
01529       }
01530 #endif
01531       od->first_dir = dir;
01532       first_dir = dir;
01533       if (doc->single_fragment()) {
01534         // fragment is tied to the vector
01535         od->move_resident_alt = 1;
01536         od->single_doc_key = doc->key;
01537         dir_assign(&od->single_doc_dir, &dir);
01538         dir_set_tag(&od->single_doc_dir, od->single_doc_key.slice32(2));
01539       }
01540       first_buf = buf;
01541       goto Lsuccess;
01542     }
01543 
01544 Lcollision:
01545     int if_writers = ((uintptr_t) info == CACHE_ALLOW_MULTIPLE_WRITES);
01546     if (!od) {
01547       if ((err = vol->open_write(
01548                 this, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0)
01549         goto Lfailure;
01550       if (od->has_multiple_writers()) {
01551         MUTEX_RELEASE(lock);
01552         SET_HANDLER(&CacheVC::openWriteMain);
01553         return callcont(CACHE_EVENT_OPEN_WRITE);
01554       }
01555     }
01556     // check for collision
01557     if (dir_probe(&first_key, vol, &dir, &last_collision)) {
01558       od->reading_vec = 1;
01559       int ret = do_read_call(&first_key);
01560       if (ret == EVENT_RETURN)
01561         goto Lcallreturn;
01562       return ret;
01563     }
01564     if (f.update) {
01565       // fail update because vector has been GC'd
01566       goto Lfailure;
01567     }
01568   }
01569 Lsuccess:
01570   od->reading_vec = 0;
01571   if (_action.cancelled)
01572     goto Lcancel;
01573   SET_HANDLER(&CacheVC::openWriteMain);
01574   return callcont(CACHE_EVENT_OPEN_WRITE);
01575 
01576 Lfailure:
01577   CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
01578   _action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -err);
01579 Lcancel:
01580   if (od) {
01581     od->reading_vec = 0;
01582     return openWriteCloseDir(event, e);
01583   } else
01584     return free_CacheVC(this);
01585 Lcallreturn:
01586   return handleEvent(AIO_EVENT_DONE, 0); // hopefully a tail call
01587 }
01588 #endif
01589 
01590 // handle lock failures from main Cache::open_write entry points below
01591 int
01592 CacheVC::openWriteStartBegin(int /* event ATS_UNUSED */, Event */* e ATS_UNUSED */)
01593 {
01594   intptr_t err;
01595   cancel_trigger();
01596   if (_action.cancelled)
01597     return free_CacheVC(this);
01598   if (((err = vol->open_write_lock(this, false, 1)) > 0)) {
01599     CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
01600     free_CacheVC(this);
01601     _action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -err);
01602     return EVENT_DONE;
01603   }
01604   if (err < 0)
01605     VC_SCHED_LOCK_RETRY();
01606   if (f.overwrite) {
01607     SET_HANDLER(&CacheVC::openWriteOverwrite);
01608     return openWriteOverwrite(EVENT_IMMEDIATE, 0);
01609   } else {
01610     // write by key
01611     SET_HANDLER(&CacheVC::openWriteMain);
01612     return callcont(CACHE_EVENT_OPEN_WRITE);
01613   }
01614 }
01615 
01616 // main entry point for writing of of non-http documents
01617 Action *
01618 Cache::open_write(Continuation *cont, CacheKey *key, CacheFragType frag_type,
01619                   int options, time_t apin_in_cache, char *hostname, int host_len)
01620 {
01621 
01622   if (!CacheProcessor::IsCacheReady(frag_type)) {
01623     cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -ECACHE_NOT_READY);
01624     return ACTION_RESULT_DONE;
01625   }
01626 
01627   ink_assert(caches[frag_type] == this);
01628 
01629   intptr_t res = 0;
01630   CacheVC *c = new_CacheVC(cont);
01631   ProxyMutex *mutex = cont->mutex;
01632   MUTEX_LOCK(lock, c->mutex, this_ethread());
01633   c->vio.op = VIO::WRITE;
01634   c->base_stat = cache_write_active_stat;
01635   c->vol = key_to_vol(key, hostname, host_len);
01636   Vol *vol = c->vol;
01637   CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
01638   c->first_key = c->key = *key;
01639   c->frag_type = frag_type;
01640   /*
01641      The transition from single fragment document to a multi-fragment document
01642      would cause a problem if the key and the first_key collide. In case of
01643      a collision, old vector data could be served to HTTP. Need to avoid that.
01644      Also, when evacuating a fragment, we have to decide if its the first_key
01645      or the earliest_key based on the dir_tag.
01646    */
01647   do {
01648     rand_CacheKey(&c->key, cont->mutex);
01649   } while (DIR_MASK_TAG(c->key.slice32(2)) == DIR_MASK_TAG(c->first_key.slice32(2)));
01650   c->earliest_key = c->key;
01651 #ifdef HTTP_CACHE
01652   c->info = 0;
01653 #endif
01654   c->f.overwrite = (options & CACHE_WRITE_OPT_OVERWRITE) != 0;
01655   c->f.close_complete = (options & CACHE_WRITE_OPT_CLOSE_COMPLETE) != 0;
01656   c->f.sync = (options & CACHE_WRITE_OPT_SYNC) == CACHE_WRITE_OPT_SYNC;
01657   c->pin_in_cache = (uint32_t) apin_in_cache;
01658 
01659   if ((res = c->vol->open_write_lock(c, false, 1)) > 0) {
01660     // document currently being written, abort
01661     CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_FAILURE);
01662     cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -res);
01663     free_CacheVC(c);
01664     return ACTION_RESULT_DONE;
01665   }
01666   if (res < 0) {
01667     SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartBegin);
01668     c->trigger = CONT_SCHED_LOCK_RETRY(c);
01669     return &c->_action;
01670   }
01671   if (!c->f.overwrite) {
01672     SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
01673     c->callcont(CACHE_EVENT_OPEN_WRITE);
01674     return ACTION_RESULT_DONE;
01675   } else {
01676     SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteOverwrite);
01677     if (c->openWriteOverwrite(EVENT_IMMEDIATE, 0) == EVENT_DONE)
01678       return ACTION_RESULT_DONE;
01679     else
01680       return &c->_action;
01681   }
01682 }
01683 
01684 #ifdef HTTP_CACHE
01685 // main entry point for writing of http documents
01686 Action *
01687 Cache::open_write(Continuation *cont, CacheKey *key, CacheHTTPInfo *info, time_t apin_in_cache,
01688                   CacheKey */* key1 ATS_UNUSED */, CacheFragType type, char *hostname, int host_len)
01689 {
01690   if (!CacheProcessor::IsCacheReady(type)) {
01691     cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -ECACHE_NOT_READY);
01692     return ACTION_RESULT_DONE;
01693   }
01694 
01695   ink_assert(caches[type] == this);
01696   intptr_t err = 0;
01697   int if_writers = (uintptr_t) info == CACHE_ALLOW_MULTIPLE_WRITES;
01698   CacheVC *c = new_CacheVC(cont);
01699   ProxyMutex *mutex = cont->mutex;
01700   c->vio.op = VIO::WRITE;
01701   c->first_key = *key;
01702   /*
01703      The transition from single fragment document to a multi-fragment document
01704      would cause a problem if the key and the first_key collide. In case of
01705      a collision, old vector data could be served to HTTP. Need to avoid that.
01706      Also, when evacuating a fragment, we have to decide if its the first_key
01707      or the earliest_key based on the dir_tag.
01708    */
01709   do {
01710     rand_CacheKey(&c->key, cont->mutex);
01711   }
01712   while (DIR_MASK_TAG(c->key.slice32(2)) == DIR_MASK_TAG(c->first_key.slice32(2)));
01713   c->earliest_key = c->key;
01714   c->frag_type = CACHE_FRAG_TYPE_HTTP;
01715   c->vol = key_to_vol(key, hostname, host_len);
01716   Vol *vol = c->vol;
01717   c->info = info;
01718   if (c->info && (uintptr_t) info != CACHE_ALLOW_MULTIPLE_WRITES) {
01719     /*
01720        Update has the following code paths :
01721        a) Update alternate header only :
01722        In this case the vector has to be rewritten. The content
01723        length(update_len) and the key for the document are set in the
01724        new_info in the set_http_info call.
01725        HTTP OPERATIONS
01726        open_write with info set
01727        set_http_info new_info
01728        (total_len == 0)
01729        close
01730        b) Update alternate and data
01731        In this case both the vector and the data needs to be rewritten.
01732        This case is similar to the standard write of a document case except
01733        that the new_info is inserted into the vector at the alternate_index
01734        (overwriting the old alternate) rather than the end of the vector.
01735        HTTP OPERATIONS
01736        open_write with info set
01737        set_http_info new_info
01738        do_io_write =>  (total_len > 0)
01739        close
01740        c) Delete an alternate
01741        The vector may need to be deleted (if there was only one alternate) or
01742        rewritten (if there were more than one alternate). The deletion of the
01743        vector is done in openWriteRemoveVector.
01744        HTTP OPERATIONS
01745        open_write with info set
01746        close
01747      */
01748     c->f.update = 1;
01749     c->base_stat = cache_update_active_stat;
01750     DDebug("cache_update", "Update called");
01751     info->object_key_get(&c->update_key);
01752     ink_assert(!(c->update_key == zero_key));
01753     c->update_len = info->object_size_get();
01754   } else
01755     c->base_stat = cache_write_active_stat;
01756   CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
01757   c->pin_in_cache = (uint32_t) apin_in_cache;
01758 
01759   {
01760     CACHE_TRY_LOCK(lock, c->vol->mutex, cont->mutex->thread_holding);
01761     if (lock) {
01762       if ((err = c->vol->open_write(c, if_writers,
01763                                      cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0)
01764         goto Lfailure;
01765       // If there are multiple writers, then this one cannot be an update.
01766       // Only the first writer can do an update. If that's the case, we can
01767       // return success to the state machine now.;
01768       if (c->od->has_multiple_writers())
01769         goto Lmiss;
01770       if (!dir_probe(key, c->vol, &c->dir, &c->last_collision)) {
01771         if (c->f.update) {
01772           // fail update because vector has been GC'd
01773           // This situation can also arise in openWriteStartDone
01774           err = ECACHE_NO_DOC;
01775           goto Lfailure;
01776         }
01777         // document doesn't exist, begin write
01778         goto Lmiss;
01779       } else {
01780         c->od->reading_vec = 1;
01781         // document exists, read vector
01782         SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
01783         switch (c->do_read_call(&c->first_key)) {
01784           case EVENT_DONE: return ACTION_RESULT_DONE;
01785           case EVENT_RETURN: goto Lcallreturn;
01786           default: return &c->_action;
01787         }
01788       }
01789     }
01790     // missed lock
01791     SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
01792     CONT_SCHED_LOCK_RETRY(c);
01793     return &c->_action;
01794   }
01795 
01796 Lmiss:
01797   SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
01798   c->callcont(CACHE_EVENT_OPEN_WRITE);
01799   return ACTION_RESULT_DONE;
01800 
01801 Lfailure:
01802   CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_FAILURE);
01803   cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -err);
01804   if (c->od) {
01805     c->openWriteCloseDir(EVENT_IMMEDIATE, 0);
01806     return ACTION_RESULT_DONE;
01807   }
01808   free_CacheVC(c);
01809   return ACTION_RESULT_DONE;
01810 
01811 Lcallreturn:
01812   if (c->handleEvent(AIO_EVENT_DONE, 0) == EVENT_DONE)
01813     return ACTION_RESULT_DONE;
01814   return &c->_action;
01815 }
01816 
01817 #if TS_USE_INTERIM_CACHE == 1
01818 int
01819 InterimCacheVol::aggWrite(int /* event ATS_UNUSED */, void * /* ATS_UNUSED e */)
01820 {
01821   ink_assert(!is_io_in_progress());
01822   MigrateToInterimCache *mts;
01823   Doc *doc;
01824   uint64_t old_off, new_off;
01825   ink_assert(this_ethread() == mutex.m_ptr->thread_holding
01826       && vol->mutex.m_ptr == mutex.m_ptr);
01827 Lagain:
01828 
01829   while ((mts = agg.head) != NULL) {
01830     doc = (Doc *) mts->buf->data();
01831     uint32_t agg_len = dir_approx_size(&mts->dir);
01832     ink_assert(agg_len == mts->agg_len);
01833     ink_assert(agg_len <= AGG_SIZE && agg_buf_pos <= AGG_SIZE);
01834 
01835     if (agg_buf_pos + agg_len > AGG_SIZE
01836         || header->agg_pos + agg_len > (skip + len))
01837       break;
01838     mts = agg.dequeue();
01839 
01840     if (!mts->notMigrate) {
01841       old_off = dir_get_offset(&mts->dir);
01842       Dir old_dir = mts->dir;
01843       doc->sync_serial = header->sync_serial;
01844       doc->write_serial = header->write_serial;
01845 
01846       memcpy(agg_buffer + agg_buf_pos, doc, doc->len);
01847       off_t o = header->write_pos + agg_buf_pos;
01848       dir_set_offset(&mts->dir, offset_to_vol_offset(this, o));
01849       ink_assert(this == mts->interim_vol);
01850       ink_assert(vol_offset(this, &mts->dir) < mts->interim_vol->skip + mts->interim_vol->len);
01851       dir_set_phase(&mts->dir, header->phase);
01852       dir_set_ininterim(&mts->dir);
01853       dir_set_index(&mts->dir, (this - vol->interim_vols));
01854 
01855       agg_buf_pos += agg_len;
01856       header->agg_pos = header->write_pos + agg_buf_pos;
01857       new_off = dir_get_offset(&mts->dir);
01858 
01859       if (mts->rewrite)
01860         dir_overwrite(&mts->key, vol, &mts->dir, &old_dir);
01861       else
01862         dir_insert(&mts->key, vol, &mts->dir);
01863       DDebug("cache_insert", "InterimCache: WriteDone: key: %X, first_key: %X, write_len: %d, write_offset: %" PRId64 ", dir_last_word: %X",
01864           doc->key.slice32(0), doc->first_key.slice32(0), mts->agg_len, o, mts->dir.w[4]);
01865 
01866       if (mts->copy) {
01867         mts->interim_vol->vol->ram_cache->fixup(&mts->key, (uint32_t)(old_off >> 32), (uint32_t)old_off,
01868             (uint32_t)(new_off >> 32), (uint32_t)new_off);
01869       } else {
01870         mts->vc->f.ram_fixup = 1;
01871         mts->vc->dir_off = new_off;
01872       }
01873       vol->set_migrate_done(mts);
01874     } else
01875       vol->set_migrate_failed(mts);
01876 
01877     mts->buf = NULL;
01878     migrateToInterimCacheAllocator.free(mts);
01879   }
01880 
01881   if (!agg_buf_pos) {
01882     if (header->write_pos + AGG_SIZE > (skip + len)) {
01883       header->write_pos = start;
01884       header->phase = !(header->phase);
01885 
01886       header->cycle++;
01887       header->agg_pos = header->write_pos;
01888       dir_clean_interimvol(this);
01889       goto Lagain;
01890     }
01891     return EVENT_CONT;
01892   }
01893 
01894   if (agg.head == NULL && agg_buf_pos < (AGG_SIZE / 2) && !sync
01895       && header->write_pos + AGG_SIZE <= (skip + len))
01896     return EVENT_CONT;
01897 
01898   for (mts = agg.head; mts != NULL; mts = mts->link.next) {
01899     if (!mts->copy) {
01900       Ptr<IOBufferData> buf = mts->buf;
01901       doc = (Doc *) buf->data();
01902       mts->buf = new_IOBufferData(iobuffer_size_to_index(mts->agg_len, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
01903       mts->copy = true;
01904       memcpy(mts->buf->data(), buf->data(), doc->len);
01905       buf = NULL;
01906     }
01907   }
01908   // set write limit
01909 
01910   io.aiocb.aio_fildes = fd;
01911   io.aiocb.aio_offset = header->write_pos;
01912   io.aiocb.aio_buf = agg_buffer;
01913   io.aiocb.aio_nbytes = agg_buf_pos;
01914   io.action = this;
01915   /*
01916     Callback on AIO thread so that we can issue a new write ASAP
01917     as all writes are serialized in the volume.  This is not necessary
01918     for reads proceed independently.
01919    */
01920   io.thread = AIO_CALLBACK_THREAD_AIO;
01921   SET_HANDLER(&InterimCacheVol::aggWriteDone);
01922   ink_aio_write(&io);
01923   return EVENT_CONT;
01924 }
01925 
01926 int
01927 InterimCacheVol::aggWriteDone(int event, void *e)
01928 {
01929   ink_release_assert(this_ethread() == mutex.m_ptr->thread_holding
01930         && vol->mutex.m_ptr == mutex.m_ptr);
01931   if (io.ok()) {
01932      header->last_write_pos = header->write_pos;
01933      header->write_pos += io.aiocb.aio_nbytes;
01934      ink_assert(header->write_pos >= start);
01935      DDebug("cache_agg", "Write: %" PRIu64 ", last Write: %" PRIu64 "\n",
01936            header->write_pos, header->last_write_pos);
01937      ink_assert(header->write_pos == header->agg_pos);
01938      agg_buf_pos = 0;
01939      header->write_serial++;
01940    } else {
01941      // delete all the directory entries that we inserted
01942      // for fragments is this aggregation buffer
01943      Debug("cache_disk_error", "Write error on disk %s\n \
01944                write range : [%" PRIu64 " - %" PRIu64 " bytes]  [%" PRIu64 " - %" PRIu64 " blocks] \n",
01945            "InterimCache ID", (uint64_t)io.aiocb.aio_offset, (uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes),
01946            (uint64_t)io.aiocb.aio_offset / CACHE_BLOCK_SIZE,
01947            (uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / CACHE_BLOCK_SIZE);
01948      Dir del_dir;
01949      dir_clear(&del_dir);
01950      dir_set_ininterim(&del_dir);
01951      dir_set_index(&del_dir, (this - vol->interim_vols));
01952      for (int done = 0; done < agg_buf_pos;) {
01953        Doc *doc = (Doc *) (agg_buffer + done);
01954        dir_set_offset(&del_dir, header->write_pos + done);
01955        dir_delete(&doc->key, vol, &del_dir);
01956        done += this->round_to_approx_size(doc->len);
01957      }
01958      agg_buf_pos = 0;
01959    }
01960    set_io_not_in_progress();
01961    sync = false;
01962    if (agg.head)
01963      aggWrite(event, e);
01964    return EVENT_CONT;
01965 }
01966 #endif
01967 #endif

Generated by  doxygen 1.7.1