• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

Cache.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 
00025 #include "P_Cache.h"
00026 
00027 // Cache Inspector and State Pages
00028 #include "P_CacheTest.h"
00029 #include "StatPages.h"
00030 
00031 #include "I_Layout.h"
00032 
00033 #ifdef HTTP_CACHE
00034 #include "HttpTransactCache.h"
00035 #include "HttpSM.h"
00036 #include "HttpCacheSM.h"
00037 #include "InkAPIInternal.h"
00038 #include "P_CacheBC.h"
00039 #endif
00040 
00041 // Compilation Options
00042 #define USELESS_REENABLES       // allow them for now
00043 // #define VERIFY_JTEST_DATA
00044 
00045 static size_t DEFAULT_RAM_CACHE_MULTIPLIER = 10; // I.e. 10x 1MB per 1GB of disk.
00046 
00047 // This is the oldest version number that is still usable.
00048 static short int const CACHE_DB_MAJOR_VERSION_COMPATIBLE = 21;
00049 
00050 #define DOCACHE_CLEAR_DYN_STAT(x) \
00051 do { \
00052         RecSetRawStatSum(rsb, x, 0); \
00053         RecSetRawStatCount(rsb, x, 0); \
00054 } while (0);
00055 
00056 
00057 // Configuration
00058 
00059 int64_t cache_config_ram_cache_size = AUTO_SIZE_RAM_CACHE;
00060 int cache_config_ram_cache_algorithm = 0;
00061 int cache_config_ram_cache_compress = 0;
00062 int cache_config_ram_cache_compress_percent = 90;
00063 int cache_config_ram_cache_use_seen_filter = 0;
00064 int cache_config_http_max_alts = 3;
00065 int cache_config_dir_sync_frequency = 60;
00066 int cache_config_permit_pinning = 0;
00067 int cache_config_vary_on_user_agent = 0;
00068 int cache_config_select_alternate = 1;
00069 int cache_config_max_doc_size = 0;
00070 int cache_config_min_average_object_size = ESTIMATED_OBJECT_SIZE;
00071 int64_t cache_config_ram_cache_cutoff = AGG_SIZE;
00072 int cache_config_max_disk_errors = 5;
00073 int cache_config_hit_evacuate_percent = 10;
00074 int cache_config_hit_evacuate_size_limit = 0;
00075 int cache_config_force_sector_size = 0;
00076 int cache_config_target_fragment_size = DEFAULT_TARGET_FRAGMENT_SIZE;
00077 int cache_config_agg_write_backlog = AGG_SIZE * 2;
00078 int cache_config_enable_checksum = 0;
00079 int cache_config_alt_rewrite_max_size = 4096;
00080 int cache_config_read_while_writer = 0;
00081 int cache_config_mutex_retry_delay = 2;
00082 #ifdef HTTP_CACHE
00083 static int enable_cache_empty_http_doc = 0;
00084 /// Fix up a specific known problem with the 4.2.0 release.
00085 /// Not used for stripes with a cache version later than 4.2.0.
00086 int cache_config_compatibility_4_2_0_fixup = 1;
00087 #endif
00088 
00089 #if TS_USE_INTERIM_CACHE == 1
00090 int migrate_threshold = 2;
00091 #endif
00092 
00093 // Globals
00094 
00095 RecRawStatBlock *cache_rsb = NULL;
00096 Cache *theStreamCache = 0;
00097 Cache *theCache = 0;
00098 CacheDisk **gdisks = NULL;
00099 int gndisks = 0;
00100 static volatile int initialize_disk = 0;
00101 Cache *caches[NUM_CACHE_FRAG_TYPES] = { 0 };
00102 CacheSync *cacheDirSync = 0;
00103 Store theCacheStore;
00104 volatile int CacheProcessor::initialized = CACHE_INITIALIZING;
00105 volatile uint32_t CacheProcessor::cache_ready = 0;
00106 volatile int CacheProcessor::start_done = 0;
00107 int CacheProcessor::clear = 0;
00108 int CacheProcessor::fix = 0;
00109 int CacheProcessor::start_internal_flags = 0;
00110 int CacheProcessor::auto_clear_flag = 0;
00111 CacheProcessor cacheProcessor;
00112 Vol **gvol = NULL;
00113 volatile int gnvol = 0;
00114 #if TS_USE_INTERIM_CACHE == 1
00115 CacheDisk **g_interim_disks = NULL;
00116 int gn_interim_disks = 0;
00117 int good_interim_disks = 0;
00118 uint64_t total_cache_size = 0;
00119 #endif
00120 ClassAllocator<CacheVC> cacheVConnectionAllocator("cacheVConnection");
00121 ClassAllocator<EvacuationBlock> evacuationBlockAllocator("evacuationBlock");
00122 ClassAllocator<CacheRemoveCont> cacheRemoveContAllocator("cacheRemoveCont");
00123 ClassAllocator<EvacuationKey> evacuationKeyAllocator("evacuationKey");
00124 int CacheVC::size_to_init = -1;
00125 CacheKey zero_key;
00126 #if TS_USE_INTERIM_CACHE == 1
00127 ClassAllocator<MigrateToInterimCache> migrateToInterimCacheAllocator("migrateToInterimCache");
00128 #endif
00129 
00130 struct VolInitInfo
00131 {
00132   off_t recover_pos;
00133   AIOCallbackInternal vol_aio[4];
00134   char *vol_h_f;
00135 
00136   VolInitInfo()
00137   {
00138     recover_pos = 0;
00139     vol_h_f = (char *)ats_memalign(ats_pagesize(), 4 * STORE_BLOCK_SIZE);
00140     memset(vol_h_f, 0, 4 * STORE_BLOCK_SIZE);
00141   }
00142 
00143   ~VolInitInfo()
00144   {
00145     for (int i = 0; i < 4; i++) {
00146       vol_aio[i].action = NULL;
00147       vol_aio[i].mutex.clear();
00148     }
00149     free(vol_h_f);
00150   }
00151 };
00152 
00153 #if AIO_MODE == AIO_MODE_NATIVE
00154 struct VolInit : public Continuation
00155 {
00156   Vol *vol;
00157   char *path;
00158   off_t blocks;
00159   int64_t offset;
00160   bool vol_clear;
00161 
00162   int mainEvent(int /* event ATS_UNUSED */, Event */* e ATS_UNUSED */) {
00163     vol->init(path, blocks, offset, vol_clear);
00164     mutex.clear();
00165     delete this;
00166     return EVENT_DONE;
00167   }
00168 
00169   VolInit(Vol *v, char *p, off_t b, int64_t o, bool c) : Continuation(v->mutex),
00170     vol(v), path(p), blocks(b), offset(o), vol_clear(c) {
00171     SET_HANDLER(&VolInit::mainEvent);
00172   }
00173 };
00174 
00175 struct DiskInit : public Continuation
00176 {
00177   CacheDisk *disk;
00178   char *s;
00179   off_t blocks;
00180   off_t askip;
00181   int ahw_sector_size;
00182   int fildes;
00183   bool clear;
00184 
00185   int mainEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) {
00186     disk->open(s, blocks, askip, ahw_sector_size, fildes, clear);
00187     ats_free(s);
00188     mutex.clear();
00189     delete this;
00190     return EVENT_DONE;
00191   }
00192 
00193   DiskInit(CacheDisk *d, char *str, off_t b, off_t skip, int sector, int f, bool c) : Continuation(d->mutex),
00194       disk(d), s(ats_strdup(str)), blocks(b), askip(skip), ahw_sector_size(sector), fildes(f), clear(c) {
00195     SET_HANDLER(&DiskInit::mainEvent);
00196   }
00197 };
00198 #endif
00199 void cplist_init();
00200 static void cplist_update();
00201 int cplist_reconfigure();
00202 static int create_volume(int volume_number, off_t size_in_blocks, int scheme, CacheVol *cp);
00203 static void rebuild_host_table(Cache *cache);
00204 void register_cache_stats(RecRawStatBlock *rsb, const char *prefix);
00205 
00206 Queue<CacheVol> cp_list;
00207 int cp_list_len = 0;
00208 ConfigVolumes config_volumes;
00209 
00210 #if TS_HAS_TESTS
00211 void force_link_CacheTestCaller() {
00212   force_link_CacheTest();
00213 }
00214 #endif
00215 
00216 int64_t
00217 cache_bytes_used(int volume)
00218 {
00219   uint64_t used = 0;
00220 
00221   for (int i = 0; i < gnvol; i++) {
00222     if (!DISK_BAD(gvol[i]->disk) && (volume == -1 || gvol[i]->cache_vol->vol_number == volume)) {
00223       if (!gvol[i]->header->cycle)
00224           used += gvol[i]->header->write_pos - gvol[i]->start;
00225       else
00226           used += gvol[i]->len - vol_dirlen(gvol[i]) - EVACUATION_SIZE;
00227     }
00228   }
00229 
00230   return used;
00231 }
00232 
00233 int
00234 cache_stats_bytes_used_cb(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
00235 {
00236   int volume = -1;
00237   char *p;
00238 
00239   // Well, there's no way to pass along the volume ID, so extracting it from the stat name.
00240   p = strstr((char *) name, "volume_");
00241   if (p != NULL) {
00242     // I'm counting on the compiler to optimize out strlen("volume_").
00243     volume = strtol(p + strlen("volume_"), NULL, 10);
00244   }
00245 
00246   if (cacheProcessor.initialized == CACHE_INITIALIZED) {
00247     int64_t used, total =0;
00248     float percent_full;
00249 
00250     used =  cache_bytes_used(volume);
00251     RecSetGlobalRawStatSum(rsb, id, used);
00252     RecRawStatSyncSum(name, data_type, data, rsb, id);
00253     RecGetGlobalRawStatSum(rsb, (int) cache_bytes_total_stat, &total);
00254     percent_full = (float)used / (float)total * 100;
00255     // The perent_full float below gets rounded down
00256     RecSetGlobalRawStatSum(rsb, (int) cache_percent_full_stat, (int64_t) percent_full);
00257   }
00258 
00259   return 1;
00260 }
00261 
00262 static int
00263 validate_rww(int new_value)
00264 {
00265   if (new_value) {
00266     float http_bg_fill;
00267 
00268     REC_ReadConfigFloat(http_bg_fill, "proxy.config.http.background_fill_completed_threshold");
00269     if (http_bg_fill > 0.0) {
00270       Note("to enable reading while writing a document, %s should be 0.0: read while writing disabled",
00271            "proxy.config.http.background_fill_completed_threshold");
00272       return 0;
00273     }
00274     if (cache_config_max_doc_size > 0) {
00275       Note("to enable reading while writing a document, %s should be 0: read while writing disabled",
00276            "proxy.config.cache.max_doc_size");
00277       return 0;
00278     }
00279     return new_value;
00280   }
00281   return 0;
00282 }
00283 
00284 static int
00285 update_cache_config(const char * /* name ATS_UNUSED */, RecDataT /* data_type ATS_UNUSED */, RecData data,
00286                     void * /* cookie ATS_UNUSED */)
00287 {
00288   volatile int new_value = validate_rww(data.rec_int);
00289   cache_config_read_while_writer = new_value;
00290 
00291   return 0;
00292 }
00293 
00294 CacheVC::CacheVC():alternate_index(CACHE_ALT_INDEX_DEFAULT)
00295 {
00296   size_to_init = sizeof(CacheVC) - (size_t) & ((CacheVC *) 0)->vio;
00297   // coverity[overrun-buffer-arg]
00298   memset((void *) &vio, 0, size_to_init);
00299   // the constructor does a memset() on the members that need to be initialized
00300   //coverity[uninit_member]
00301 }
00302 
00303 #ifdef HTTP_CACHE
00304 HTTPInfo::FragOffset*
00305 CacheVC::get_frag_table()
00306 {
00307   ink_assert(alternate.valid());
00308   return alternate.valid() ? alternate.get_frag_table() : 0;
00309 }
00310 #endif
00311 
00312 VIO *
00313 CacheVC::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *abuf)
00314 {
00315   ink_assert(vio.op == VIO::READ);
00316   vio.buffer.writer_for(abuf);
00317   vio.set_continuation(c);
00318   vio.ndone = 0;
00319   vio.nbytes = nbytes;
00320   vio.vc_server = this;
00321 #ifdef DEBUG
00322   ink_assert(c->mutex->thread_holding);
00323 #endif
00324   if (!trigger && !recursive)
00325     trigger = c->mutex->thread_holding->schedule_imm_local(this);
00326   return &vio;
00327 }
00328 
00329 VIO *
00330 CacheVC::do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *abuf, int64_t offset)
00331 {
00332   ink_assert(vio.op == VIO::READ);
00333   vio.buffer.writer_for(abuf);
00334   vio.set_continuation(c);
00335   vio.ndone = 0;
00336   vio.nbytes = nbytes;
00337   vio.vc_server = this;
00338   seek_to = offset;
00339 #ifdef DEBUG
00340   ink_assert(c->mutex->thread_holding);
00341 #endif
00342   if (!trigger && !recursive)
00343     trigger = c->mutex->thread_holding->schedule_imm_local(this);
00344   return &vio;
00345 }
00346 
00347 VIO *
00348 CacheVC::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *abuf, bool owner)
00349 {
00350   ink_assert(vio.op == VIO::WRITE);
00351   ink_assert(!owner);
00352   vio.buffer.reader_for(abuf);
00353   vio.set_continuation(c);
00354   vio.ndone = 0;
00355   vio.nbytes = nbytes;
00356   vio.vc_server = this;
00357 #ifdef DEBUG
00358   ink_assert(c->mutex->thread_holding);
00359 #endif
00360   if (!trigger && !recursive)
00361     trigger = c->mutex->thread_holding->schedule_imm_local(this);
00362   return &vio;
00363 }
00364 
00365 void
00366 CacheVC::do_io_close(int alerrno)
00367 {
00368   ink_assert(mutex->thread_holding == this_ethread());
00369   int previous_closed = closed;
00370   closed = (alerrno == -1) ? 1 : -1;    // Stupid default arguments
00371   DDebug("cache_close", "do_io_close %p %d %d", this, alerrno, closed);
00372   if (!previous_closed && !recursive)
00373     die();
00374 }
00375 
00376 void
00377 CacheVC::reenable(VIO *avio)
00378 {
00379   DDebug("cache_reenable", "reenable %p", this);
00380   (void) avio;
00381 #ifdef DEBUG
00382   ink_assert(avio->mutex->thread_holding);
00383 #endif
00384   if (!trigger) {
00385 #ifndef USELESS_REENABLES
00386     if (vio.op == VIO::READ) {
00387       if (vio.buffer.mbuf->max_read_avail() > vio.buffer.writer()->water_mark)
00388         ink_assert(!"useless reenable of cache read");
00389     } else if (!vio.buffer.reader()->read_avail())
00390       ink_assert(!"useless reenable of cache write");
00391 #endif
00392     trigger = avio->mutex->thread_holding->schedule_imm_local(this);
00393   }
00394 }
00395 
00396 void
00397 CacheVC::reenable_re(VIO *avio)
00398 {
00399   DDebug("cache_reenable", "reenable_re %p", this);
00400   (void) avio;
00401 #ifdef DEBUG
00402   ink_assert(avio->mutex->thread_holding);
00403 #endif
00404   if (!trigger) {
00405     if (!is_io_in_progress() && !recursive) {
00406       handleEvent(EVENT_NONE, (void *) 0);
00407     } else
00408       trigger = avio->mutex->thread_holding->schedule_imm_local(this);
00409   }
00410 }
00411 
00412 bool
00413 CacheVC::get_data(int i, void *data)
00414 {
00415   switch (i) {
00416 #ifdef HTTP_CACHE
00417   case CACHE_DATA_HTTP_INFO:
00418     *((CacheHTTPInfo **) data) = &alternate;
00419     return true;
00420 #endif
00421   case CACHE_DATA_RAM_CACHE_HIT_FLAG:
00422     *((int *) data) = !f.not_from_ram_cache;
00423     return true;
00424   default:
00425     break;
00426   }
00427   return false;
00428 }
00429 
00430 int64_t
00431 CacheVC::get_object_size()
00432 {
00433   return ((CacheVC *) this)->doc_len;
00434 }
00435 
00436 bool CacheVC::set_data(int /* i ATS_UNUSED */ , void * /* data */ )
00437 {
00438   ink_assert(!"CacheVC::set_data should not be called!");
00439   return true;
00440 }
00441 
00442 #ifdef HTTP_CACHE
00443 void
00444 CacheVC::get_http_info(CacheHTTPInfo ** ainfo)
00445 {
00446   *ainfo = &((CacheVC *) this)->alternate;
00447 }
00448 
00449 // set_http_info must be called before do_io_write
00450 // cluster vc does an optimization where it calls do_io_write() before
00451 // calling set_http_info(), but it guarantees that the info will
00452 // be set before transferring any bytes
00453 void
00454 CacheVC::set_http_info(CacheHTTPInfo *ainfo)
00455 {
00456   ink_assert(!total_len);
00457   if (f.update) {
00458     ainfo->object_key_set(update_key);
00459     ainfo->object_size_set(update_len);
00460   } else {
00461     ainfo->object_key_set(earliest_key);
00462     // don't know the total len yet
00463   }
00464   if (enable_cache_empty_http_doc) {
00465     MIMEField *field = ainfo->m_alt->m_response_hdr.field_find(MIME_FIELD_CONTENT_LENGTH, MIME_LEN_CONTENT_LENGTH);
00466     if (field && !field->value_get_int64()) 
00467       f.allow_empty_doc = 1;
00468     else
00469       f.allow_empty_doc = 0;
00470   } else 
00471     f.allow_empty_doc = 0;
00472   alternate.copy_shallow(ainfo);
00473   ainfo->clear();
00474 }
00475 #endif
00476 
00477 bool CacheVC::set_pin_in_cache(time_t time_pin)
00478 {
00479   if (total_len) {
00480     ink_assert(!"should Pin the document before writing");
00481     return false;
00482   }
00483   if (vio.op != VIO::WRITE) {
00484     ink_assert(!"Pinning only allowed while writing objects to the cache");
00485     return false;
00486   }
00487   pin_in_cache = time_pin;
00488   return true;
00489 }
00490 
00491 bool CacheVC::set_disk_io_priority(int priority)
00492 {
00493 
00494   ink_assert(priority >= AIO_LOWEST_PRIORITY);
00495   io.aiocb.aio_reqprio = priority;
00496   return true;
00497 }
00498 
00499 time_t CacheVC::get_pin_in_cache()
00500 {
00501   return pin_in_cache;
00502 }
00503 
00504 int
00505 CacheVC::get_disk_io_priority()
00506 {
00507   return io.aiocb.aio_reqprio;
00508 }
00509 
00510 int
00511 Vol::begin_read(CacheVC *cont)
00512 {
00513   ink_assert(cont->mutex->thread_holding == this_ethread());
00514   ink_assert(mutex->thread_holding == this_ethread());
00515 #ifdef CACHE_STAT_PAGES
00516   ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
00517   stat_cache_vcs.enqueue(cont, cont->stat_link);
00518 #endif
00519   // no need for evacuation as the entire document is already in memory
00520   if (cont->f.single_fragment)
00521     return 0;
00522 #if TS_USE_INTERIM_CACHE == 1
00523   if (dir_ininterim(&cont->earliest_dir))
00524     return 0;
00525 #endif
00526   int i = dir_evac_bucket(&cont->earliest_dir);
00527   EvacuationBlock *b;
00528   for (b = evacuate[i].head; b; b = b->link.next) {
00529     if (dir_offset(&b->dir) != dir_offset(&cont->earliest_dir))
00530       continue;
00531     if (b->readers)
00532       b->readers = b->readers + 1;
00533     return 0;
00534   }
00535   // we don't actually need to preserve this block as it is already in
00536   // memory, but this is easier, and evacuations are rare
00537   EThread *t = cont->mutex->thread_holding;
00538   b = new_EvacuationBlock(t);
00539   b->readers = 1;
00540   b->dir = cont->earliest_dir;
00541   b->evac_frags.key = cont->earliest_key;
00542   evacuate[i].push(b);
00543   return 1;
00544 }
00545 
00546 int
00547 Vol::close_read(CacheVC *cont)
00548 {
00549   EThread *t = cont->mutex->thread_holding;
00550   ink_assert(t == this_ethread());
00551   ink_assert(t == mutex->thread_holding);
00552   if (dir_is_empty(&cont->earliest_dir))
00553     return 1;
00554   int i = dir_evac_bucket(&cont->earliest_dir);
00555   EvacuationBlock *b;
00556   for (b = evacuate[i].head; b;) {
00557     EvacuationBlock *next = b->link.next;
00558     if (dir_offset(&b->dir) != dir_offset(&cont->earliest_dir)) {
00559       b = next;
00560       continue;
00561     }
00562     if (b->readers && !--b->readers) {
00563       evacuate[i].remove(b);
00564       free_EvacuationBlock(b, t);
00565       break;
00566     }
00567     b = next;
00568   }
00569 #ifdef CACHE_STAT_PAGES
00570   stat_cache_vcs.remove(cont, cont->stat_link);
00571   ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
00572 #endif
00573   return 1;
00574 }
00575 
00576 // Cache Processor
00577 
00578 int
00579 CacheProcessor::start(int, size_t)
00580 {
00581   return start_internal(0);
00582 }
00583 
00584 static const int DEFAULT_CACHE_OPTIONS = (O_RDWR | _O_ATTRIB_OVERLAPPED);
00585 
00586 int
00587 CacheProcessor::start_internal(int flags)
00588 {
00589 
00590   ink_assert((int)TS_EVENT_CACHE_OPEN_READ == (int)CACHE_EVENT_OPEN_READ);
00591   ink_assert((int)TS_EVENT_CACHE_OPEN_READ_FAILED == (int)CACHE_EVENT_OPEN_READ_FAILED);
00592   ink_assert((int)TS_EVENT_CACHE_OPEN_WRITE == (int)CACHE_EVENT_OPEN_WRITE);
00593   ink_assert((int)TS_EVENT_CACHE_OPEN_WRITE_FAILED == (int)CACHE_EVENT_OPEN_WRITE_FAILED);
00594   ink_assert((int)TS_EVENT_CACHE_REMOVE == (int)CACHE_EVENT_REMOVE);
00595   ink_assert((int)TS_EVENT_CACHE_REMOVE_FAILED == (int)CACHE_EVENT_REMOVE_FAILED);
00596   ink_assert((int)TS_EVENT_CACHE_SCAN == (int)CACHE_EVENT_SCAN);
00597   ink_assert((int)TS_EVENT_CACHE_SCAN_FAILED == (int)CACHE_EVENT_SCAN_FAILED);
00598   ink_assert((int)TS_EVENT_CACHE_SCAN_OBJECT == (int)CACHE_EVENT_SCAN_OBJECT);
00599   ink_assert((int)TS_EVENT_CACHE_SCAN_OPERATION_BLOCKED == (int)CACHE_EVENT_SCAN_OPERATION_BLOCKED);
00600   ink_assert((int)TS_EVENT_CACHE_SCAN_OPERATION_FAILED == (int)CACHE_EVENT_SCAN_OPERATION_FAILED);
00601   ink_assert((int)TS_EVENT_CACHE_SCAN_DONE == (int)CACHE_EVENT_SCAN_DONE);
00602 
00603 #if AIO_MODE == AIO_MODE_NATIVE
00604   int etype = ET_NET;
00605   int n_netthreads = eventProcessor.n_threads_for_type[etype];
00606   EThread **netthreads = eventProcessor.eventthread[etype];
00607   for (int i = 0; i < n_netthreads; ++i) {
00608     netthreads[i]->diskHandler = new DiskHandler();
00609     netthreads[i]->schedule_imm(netthreads[i]->diskHandler);
00610   }
00611 #endif
00612 
00613   start_internal_flags = flags;
00614   clear = !!(flags & PROCESSOR_RECONFIGURE) || auto_clear_flag;
00615   fix = !!(flags & PROCESSOR_FIX);
00616   start_done = 0;
00617   int diskok = 1;
00618   Span *sd;
00619 #if TS_USE_INTERIM_CACHE == 1
00620   gn_interim_disks = theCacheStore.n_interim_disks;
00621   g_interim_disks = (CacheDisk **) ats_malloc(gn_interim_disks * sizeof(CacheDisk *));
00622 
00623   gn_interim_disks = 0;
00624 
00625   for (int i = 0; i < theCacheStore.n_interim_disks; i++) {
00626     sd = theCacheStore.interim_disk[i];
00627     char path[PATH_MAX];
00628     int opts = O_RDWR;
00629     ink_strlcpy(path, sd->pathname, sizeof(path));
00630     if (!sd->file_pathname) {
00631 #if !defined(_WIN32)
00632       if (config_volumes.num_http_volumes && config_volumes.num_stream_volumes) {
00633         Warning(
00634             "It is suggested that you use raw disks if streaming and http are in the same cache");
00635       }
00636 #endif
00637       ink_strlcat(path, "/cache.db", sizeof(path));
00638       opts |= O_CREAT;
00639     }
00640     opts |= _O_ATTRIB_OVERLAPPED;
00641 #ifdef O_DIRECT
00642     opts |= O_DIRECT;
00643 #endif
00644 #ifdef O_DSYNC
00645     opts |= O_DSYNC;
00646 #endif
00647 
00648     int fd = open(path, opts, 0644);
00649     int blocks = sd->blocks;
00650     if (fd > 0) {
00651       if (!sd->file_pathname) {
00652         if (ftruncate(fd, ((uint64_t) blocks) * STORE_BLOCK_SIZE) < 0) {
00653           Warning("unable to truncate cache file '%s' to %d blocks", path, blocks);
00654           diskok = 0;
00655         }
00656       }
00657       if (diskok) {
00658         CacheDisk *disk = new CacheDisk();
00659         Debug("cache_hosting", "interim Disk: %d, blocks: %d", gn_interim_disks, blocks);
00660         int sector_size = sd->hw_sector_size;
00661         if (sector_size < cache_config_force_sector_size)
00662           sector_size = cache_config_force_sector_size;
00663         if (sd->hw_sector_size <= 0 || sector_size > STORE_BLOCK_SIZE) {
00664           Warning("bad hardware sector size %d, resetting to %d", sector_size, STORE_BLOCK_SIZE);
00665           sector_size = STORE_BLOCK_SIZE;
00666         }
00667         off_t skip = ROUND_TO_STORE_BLOCK((sd->offset * STORE_BLOCK_SIZE < START_POS ? START_POS + sd->alignment :
00668                                            sd->offset * STORE_BLOCK_SIZE));
00669         blocks = blocks - (skip >> STORE_BLOCK_SHIFT);
00670         disk->path = ats_strdup(path);
00671         disk->hw_sector_size = sector_size;
00672         disk->fd = fd;
00673         disk->skip = skip;
00674         disk->start = skip;
00675         /* we can't use fractions of store blocks. */
00676         disk->len = blocks;
00677         disk->io.aiocb.aio_fildes = fd;
00678         disk->io.aiocb.aio_reqprio = 0;
00679         disk->io.action = disk;
00680         disk->io.thread = AIO_CALLBACK_THREAD_ANY;
00681         g_interim_disks[gn_interim_disks++] = disk;
00682       }
00683     } else
00684       Warning("cache unable to open '%s': %s", path, strerror(errno));
00685   }
00686 
00687   if (gn_interim_disks == 0) {
00688     Warning("unable to open cache disk(s): InterimCache Cache Disabled\n");
00689   }
00690   good_interim_disks = gn_interim_disks;
00691   diskok = 1;
00692 #endif
00693   /* read the config file and create the data structures corresponding
00694      to the file */
00695   gndisks = theCacheStore.n_disks;
00696   gdisks = (CacheDisk **)ats_malloc(gndisks * sizeof(CacheDisk *));
00697 
00698   gndisks = 0;
00699   ink_aio_set_callback(new AIO_Callback_handler());
00700 
00701   config_volumes.read_config_file();
00702 #if TS_USE_INTERIM_CACHE == 1
00703   total_cache_size = 0;
00704   for (unsigned i = 0; i < theCacheStore.n_disks; i++)
00705     total_cache_size += theCacheStore.disk[i]->blocks;
00706 #endif
00707   for (unsigned i = 0; i < theCacheStore.n_disks; i++) {
00708     sd = theCacheStore.disk[i];
00709     char path[PATH_NAME_MAX];
00710     int opts = DEFAULT_CACHE_OPTIONS;
00711 
00712     ink_strlcpy(path, sd->pathname, sizeof(path));
00713     if (!sd->file_pathname) {
00714       if (config_volumes.num_http_volumes && config_volumes.num_stream_volumes) {
00715         Warning("It is suggested that you use raw disks if streaming and http are in the same cache");
00716       }
00717       ink_strlcat(path, "/cache.db", sizeof(path));
00718       opts |= O_CREAT;
00719     }
00720 
00721 #ifdef O_DIRECT
00722     opts |= O_DIRECT;
00723 #endif
00724 #ifdef O_DSYNC
00725     opts |= O_DSYNC;
00726 #endif
00727 
00728     int fd = open(path, opts, 0644);
00729     int blocks = sd->blocks;
00730 
00731     if (fd < 0 && (opts & O_CREAT))  // Try without O_DIRECT if this is a file on filesystem, e.g. tmpfs.
00732       fd = open(path, DEFAULT_CACHE_OPTIONS | O_CREAT, 0644);
00733 
00734     if (fd > 0) {
00735       if (!sd->file_pathname) {
00736         if (ftruncate(fd, ((uint64_t) blocks) * STORE_BLOCK_SIZE) < 0) {
00737           Warning("unable to truncate cache file '%s' to %d blocks", path, blocks);
00738           diskok = 0;
00739         }
00740       }
00741       if (diskok) {
00742         int sector_size = sd->hw_sector_size;
00743 
00744         gdisks[gndisks] = new CacheDisk();
00745         gdisks[gndisks]->forced_volume_num = sd->forced_volume_num;
00746         if (sd->hash_base_string)
00747           gdisks[gndisks]->hash_base_string = ats_strdup(sd->hash_base_string);
00748 
00749         Debug("cache_hosting", "Disk: %d, blocks: %d", gndisks, blocks);
00750 
00751         if (sector_size < cache_config_force_sector_size)
00752           sector_size = cache_config_force_sector_size;
00753         if (sd->hw_sector_size <= 0 || sector_size > STORE_BLOCK_SIZE) {
00754           Warning("bad hardware sector size %d, resetting to %d", sector_size, STORE_BLOCK_SIZE);
00755           sector_size = STORE_BLOCK_SIZE;
00756         }
00757         off_t skip = ROUND_TO_STORE_BLOCK((sd->offset < START_POS ? START_POS + sd->alignment : sd->offset));
00758         blocks = blocks - (skip >> STORE_BLOCK_SHIFT);
00759 #if AIO_MODE == AIO_MODE_NATIVE
00760         eventProcessor.schedule_imm(new DiskInit(gdisks[gndisks], path, blocks, skip, sector_size, fd, clear));
00761 #else
00762         gdisks[gndisks]->open(path, blocks, skip, sector_size, fd, clear);
00763 #endif
00764         fd = 0;
00765         gndisks++;
00766       }
00767     } else {
00768       if (errno == EINVAL)
00769         Warning("cache unable to open '%s': It must be placed on a file system that supports direct I/O.", path);
00770       else
00771         Warning("cache unable to open '%s': %s", path, strerror(errno));
00772     }
00773     if(fd > 0) {
00774         close(fd);
00775     }
00776   }
00777 
00778   if (gndisks == 0) {
00779     Warning("unable to open cache disk(s): Cache Disabled\n");
00780     return -1;
00781   }
00782   start_done = 1;
00783 
00784   return 0;
00785 }
00786 
00787 void
00788 CacheProcessor::diskInitialized()
00789 {
00790   int n_init = ink_atomic_increment(&initialize_disk, 1);
00791   int bad_disks = 0;
00792   int res = 0;
00793   if (n_init == gndisks - 1) {
00794 
00795     int i;
00796     for (i = 0; i < gndisks; i++) {
00797       if (DISK_BAD(gdisks[i]))
00798         bad_disks++;
00799     }
00800 
00801     if (bad_disks != 0) {
00802       // create a new array
00803       CacheDisk **p_good_disks;
00804       if ((gndisks - bad_disks) > 0)
00805         p_good_disks = (CacheDisk **)ats_malloc((gndisks - bad_disks) * sizeof(CacheDisk *));
00806       else
00807         p_good_disks = 0;
00808 
00809       int insert_at = 0;
00810       for (i = 0; i < gndisks; i++) {
00811         if (DISK_BAD(gdisks[i])) {
00812           delete gdisks[i];
00813           continue;
00814         }
00815         if (p_good_disks != NULL) {
00816           p_good_disks[insert_at++] = gdisks[i];
00817         }
00818       }
00819       ats_free(gdisks);
00820       gdisks = p_good_disks;
00821       gndisks = gndisks - bad_disks;
00822     }
00823 
00824     /* create the cachevol list only if num volumes are greater
00825        than 0. */
00826     if (config_volumes.num_volumes == 0) {
00827       res = cplist_reconfigure();
00828       /* if no volumes, default to just an http cache */
00829     } else {
00830       // else
00831       /* create the cachevol list. */
00832       cplist_init();
00833       /* now change the cachevol list based on the config file */
00834       res = cplist_reconfigure();
00835     }
00836 
00837     if (res == -1) {
00838       /* problems initializing the volume.config. Punt */
00839       gnvol = 0;
00840       cacheInitialized();
00841       return;
00842     } else {
00843       CacheVol *cp = cp_list.head;
00844       for (; cp; cp = cp->link.next) {
00845         cp->vol_rsb = RecAllocateRawStatBlock((int) cache_stat_count);
00846         char vol_stat_str_prefix[256];
00847         snprintf(vol_stat_str_prefix, sizeof(vol_stat_str_prefix), "proxy.process.cache.volume_%d", cp->vol_number);
00848         register_cache_stats(cp->vol_rsb, vol_stat_str_prefix);
00849       }
00850     }
00851 
00852     gvol = (Vol **)ats_malloc(gnvol * sizeof(Vol *));
00853     memset(gvol, 0, gnvol * sizeof(Vol *));
00854     gnvol = 0;
00855     for (i = 0; i < gndisks; i++) {
00856       CacheDisk *d = gdisks[i];
00857       if (is_debug_tag_set("cache_hosting")) {
00858         int j;
00859         Debug("cache_hosting", "Disk: %d: Vol Blocks: %u: Free space: %" PRIu64,
00860               i, d->header->num_diskvol_blks, d->free_space);
00861         for (j = 0; j < (int) d->header->num_volumes; j++) {
00862           Debug("cache_hosting", "\tVol: %d Size: %" PRIu64, d->disk_vols[j]->vol_number, d->disk_vols[j]->size);
00863         }
00864         for (j = 0; j < (int) d->header->num_diskvol_blks; j++) {
00865           Debug("cache_hosting", "\tBlock No: %d Size: %" PRIu64" Free: %u",
00866                 d->header->vol_info[j].number, d->header->vol_info[j].len, d->header->vol_info[j].free);
00867         }
00868       }
00869       d->sync();
00870     }
00871     if (config_volumes.num_volumes == 0) {
00872       theCache = new Cache();
00873       theCache->scheme = CACHE_HTTP_TYPE;
00874       theCache->open(clear, fix);
00875       return;
00876     }
00877     if (config_volumes.num_http_volumes != 0) {
00878       theCache = new Cache();
00879       theCache->scheme = CACHE_HTTP_TYPE;
00880       theCache->open(clear, fix);
00881     }
00882 
00883     if (config_volumes.num_stream_volumes != 0) {
00884       theStreamCache = new Cache();
00885       theStreamCache->scheme = CACHE_RTSP_TYPE;
00886       theStreamCache->open(clear, fix);
00887     }
00888 
00889   }
00890 }
00891 
00892 void
00893 CacheProcessor::cacheInitialized()
00894 {
00895   int i;
00896 
00897   if ((theCache && (theCache->ready == CACHE_INITIALIZING)) ||
00898       (theStreamCache && (theStreamCache->ready == CACHE_INITIALIZING)))
00899     return;
00900   int caches_ready = 0;
00901   int cache_init_ok = 0;
00902   /* allocate ram size in proportion to the disk space the
00903      volume accupies */
00904   int64_t total_size = 0;               // count in HTTP & MIXT
00905   uint64_t total_cache_bytes = 0;       // bytes that can used in total_size
00906   uint64_t total_direntries = 0;        // all the direntries in the cache
00907   uint64_t used_direntries = 0;         //   and used
00908   uint64_t vol_total_cache_bytes = 0;
00909   uint64_t vol_total_direntries = 0;
00910   uint64_t vol_used_direntries = 0;
00911   Vol *vol;
00912 
00913   ProxyMutex *mutex = this_ethread()->mutex;
00914 
00915   if (theCache) {
00916     total_size += theCache->cache_size;
00917     Debug("cache_init", "CacheProcessor::cacheInitialized - theCache, total_size = %" PRId64 " = %" PRId64 " MB",
00918           total_size, total_size / ((1024 * 1024) / STORE_BLOCK_SIZE));
00919   }
00920   if (theStreamCache) {
00921     total_size += theStreamCache->cache_size;
00922     Debug("cache_init", "CacheProcessor::cacheInitialized - theStreamCache, total_size = %" PRId64 " = %" PRId64 " MB",
00923           total_size, total_size / ((1024 * 1024) / STORE_BLOCK_SIZE));
00924   }
00925 
00926   if (theCache) {
00927     if (theCache->ready == CACHE_INIT_FAILED) {
00928       Debug("cache_init", "CacheProcessor::cacheInitialized - failed to initialize the cache for http: cache disabled");
00929       Warning("failed to initialize the cache for http: cache disabled\n");
00930     } else {
00931       caches_ready = caches_ready | (1 << CACHE_FRAG_TYPE_HTTP);
00932       caches_ready = caches_ready | (1 << CACHE_FRAG_TYPE_NONE);
00933       caches[CACHE_FRAG_TYPE_HTTP] = theCache;
00934       caches[CACHE_FRAG_TYPE_NONE] = theCache;
00935     }
00936   }
00937   if (theStreamCache) {
00938     if (theStreamCache->ready == CACHE_INIT_FAILED) {
00939       Debug("cache_init",
00940             "CacheProcessor::cacheInitialized - failed to initialize the cache for streaming: cache disabled");
00941       Warning("failed to initialize the cache for streaming: cache disabled\n");
00942     } else {
00943       caches_ready = caches_ready | (1 << CACHE_FRAG_TYPE_RTSP);
00944       caches[CACHE_FRAG_TYPE_RTSP] = theStreamCache;
00945     }
00946   }
00947 
00948   // Update stripe version data.
00949   if (gnvol) // start with whatever the first stripe is.
00950     cacheProcessor.min_stripe_version = cacheProcessor.max_stripe_version = gvol[0]->header->version;
00951   // scan the rest of the stripes.
00952   for (i = 1; i < gnvol; i++) {
00953     Vol* v = gvol[i];
00954     if (v->header->version < cacheProcessor.min_stripe_version)
00955       cacheProcessor.min_stripe_version = v->header->version;
00956     if (cacheProcessor.max_stripe_version < v->header->version)
00957       cacheProcessor.max_stripe_version = v->header->version;
00958   }
00959 
00960 
00961   if (caches_ready) {
00962     Debug("cache_init", "CacheProcessor::cacheInitialized - caches_ready=0x%0X, gnvol=%d", (unsigned int) caches_ready,
00963           gnvol);
00964 
00965     int64_t ram_cache_bytes = 0;
00966 
00967     if (gnvol) {
00968       // new ram_caches, with algorithm from the config
00969       for (i = 0; i < gnvol; i++) {
00970         switch (cache_config_ram_cache_algorithm) {
00971           default:
00972           case RAM_CACHE_ALGORITHM_CLFUS:
00973             gvol[i]->ram_cache = new_RamCacheCLFUS();
00974             break;
00975           case RAM_CACHE_ALGORITHM_LRU:
00976             gvol[i]->ram_cache = new_RamCacheLRU();
00977             break;
00978         }
00979       }
00980       // let us calculate the Size
00981       if (cache_config_ram_cache_size == AUTO_SIZE_RAM_CACHE) {
00982         Debug("cache_init", "CacheProcessor::cacheInitialized - cache_config_ram_cache_size == AUTO_SIZE_RAM_CACHE");
00983         for (i = 0; i < gnvol; i++) {
00984           vol = gvol[i];
00985           gvol[i]->ram_cache->init(vol_dirlen(vol) * DEFAULT_RAM_CACHE_MULTIPLIER, vol);
00986 #if TS_USE_INTERIM_CACHE == 1
00987           gvol[i]->history.init(1<<20, 2097143);
00988 #endif
00989           ram_cache_bytes += vol_dirlen(gvol[i]);
00990           Debug("cache_init", "CacheProcessor::cacheInitialized - ram_cache_bytes = %" PRId64 " = %" PRId64 "Mb",
00991                 ram_cache_bytes, ram_cache_bytes / (1024 * 1024));
00992           CACHE_VOL_SUM_DYN_STAT(cache_ram_cache_bytes_total_stat, (int64_t) vol_dirlen(gvol[i]));
00993 
00994           vol_total_cache_bytes = gvol[i]->len - vol_dirlen(gvol[i]);
00995           total_cache_bytes += vol_total_cache_bytes;
00996           Debug("cache_init", "CacheProcessor::cacheInitialized - total_cache_bytes = %" PRId64 " = %" PRId64 "Mb",
00997                 total_cache_bytes, total_cache_bytes / (1024 * 1024));
00998 
00999           CACHE_VOL_SUM_DYN_STAT(cache_bytes_total_stat, vol_total_cache_bytes);
01000 
01001 
01002           vol_total_direntries = gvol[i]->buckets * gvol[i]->segments * DIR_DEPTH;
01003           total_direntries += vol_total_direntries;
01004           CACHE_VOL_SUM_DYN_STAT(cache_direntries_total_stat, vol_total_direntries);
01005 
01006 
01007           vol_used_direntries = dir_entries_used(gvol[i]);
01008           CACHE_VOL_SUM_DYN_STAT(cache_direntries_used_stat, vol_used_direntries);
01009           used_direntries += vol_used_direntries;
01010         }
01011 
01012       } else {
01013         // we got configured memory size
01014         // TODO, should we check the available system memories, or you will
01015         //   OOM or swapout, that is not a good situation for the server
01016         Debug("cache_init", "CacheProcessor::cacheInitialized - %" PRId64 " != AUTO_SIZE_RAM_CACHE",
01017               cache_config_ram_cache_size);
01018         int64_t http_ram_cache_size =
01019           (theCache) ? (int64_t) (((double) theCache->cache_size / total_size) * cache_config_ram_cache_size) : 0;
01020         Debug("cache_init", "CacheProcessor::cacheInitialized - http_ram_cache_size = %" PRId64 " = %" PRId64 "Mb",
01021               http_ram_cache_size, http_ram_cache_size / (1024 * 1024));
01022         int64_t stream_ram_cache_size = cache_config_ram_cache_size - http_ram_cache_size;
01023         Debug("cache_init", "CacheProcessor::cacheInitialized - stream_ram_cache_size = %" PRId64 " = %" PRId64 "Mb",
01024               stream_ram_cache_size, stream_ram_cache_size / (1024 * 1024));
01025 
01026         // Dump some ram_cache size information in debug mode.
01027         Debug("ram_cache", "config: size = %" PRId64 ", cutoff = %" PRId64 "",
01028               cache_config_ram_cache_size, cache_config_ram_cache_cutoff);
01029 
01030         for (i = 0; i < gnvol; i++) {
01031           vol = gvol[i];
01032           double factor;
01033           if (gvol[i]->cache == theCache) {
01034             factor = (double) (int64_t) (gvol[i]->len >> STORE_BLOCK_SHIFT) / (int64_t) theCache->cache_size;
01035             Debug("cache_init", "CacheProcessor::cacheInitialized - factor = %f", factor);
01036             gvol[i]->ram_cache->init((int64_t) (http_ram_cache_size * factor), vol);
01037             ram_cache_bytes += (int64_t) (http_ram_cache_size * factor);
01038             CACHE_VOL_SUM_DYN_STAT(cache_ram_cache_bytes_total_stat, (int64_t) (http_ram_cache_size * factor));
01039           } else {
01040             factor = (double) (int64_t) (gvol[i]->len >> STORE_BLOCK_SHIFT) / (int64_t) theStreamCache->cache_size;
01041             Debug("cache_init", "CacheProcessor::cacheInitialized - factor = %f", factor);
01042             gvol[i]->ram_cache->init((int64_t) (stream_ram_cache_size * factor), vol);
01043             ram_cache_bytes += (int64_t) (stream_ram_cache_size * factor);
01044             CACHE_VOL_SUM_DYN_STAT(cache_ram_cache_bytes_total_stat, (int64_t) (stream_ram_cache_size * factor));
01045           }
01046           Debug("cache_init", "CacheProcessor::cacheInitialized[%d] - ram_cache_bytes = %" PRId64 " = %" PRId64 "Mb",
01047                 i, ram_cache_bytes, ram_cache_bytes / (1024 * 1024));
01048 #if TS_USE_INTERIM_CACHE == 1
01049           gvol[i]->history.init(1<<20, 2097143);
01050 #endif
01051           vol_total_cache_bytes = gvol[i]->len - vol_dirlen(gvol[i]);
01052           total_cache_bytes += vol_total_cache_bytes;
01053           CACHE_VOL_SUM_DYN_STAT(cache_bytes_total_stat, vol_total_cache_bytes);
01054           Debug("cache_init", "CacheProcessor::cacheInitialized - total_cache_bytes = %" PRId64 " = %" PRId64 "Mb",
01055                 total_cache_bytes, total_cache_bytes / (1024 * 1024));
01056 
01057           vol_total_direntries = gvol[i]->buckets * gvol[i]->segments * DIR_DEPTH;
01058           total_direntries += vol_total_direntries;
01059           CACHE_VOL_SUM_DYN_STAT(cache_direntries_total_stat, vol_total_direntries);
01060 
01061 
01062           vol_used_direntries = dir_entries_used(gvol[i]);
01063           CACHE_VOL_SUM_DYN_STAT(cache_direntries_used_stat, vol_used_direntries);
01064           used_direntries += vol_used_direntries;
01065 
01066         }
01067       }
01068       switch (cache_config_ram_cache_compress) {
01069         default:
01070           Fatal("unknown RAM cache compression type: %d", cache_config_ram_cache_compress);
01071         case CACHE_COMPRESSION_NONE:
01072         case CACHE_COMPRESSION_FASTLZ:
01073           break;
01074         case CACHE_COMPRESSION_LIBZ:
01075 #if ! TS_HAS_LIBZ
01076           Fatal("libz not available for RAM cache compression");
01077 #endif
01078           break;
01079         case CACHE_COMPRESSION_LIBLZMA:
01080 #if ! TS_HAS_LZMA
01081           Fatal("lzma not available for RAM cache compression");
01082 #endif
01083           break;
01084       }
01085 
01086       GLOBAL_CACHE_SET_DYN_STAT(cache_ram_cache_bytes_total_stat, ram_cache_bytes);
01087       GLOBAL_CACHE_SET_DYN_STAT(cache_bytes_total_stat, total_cache_bytes);
01088       GLOBAL_CACHE_SET_DYN_STAT(cache_direntries_total_stat, total_direntries);
01089       GLOBAL_CACHE_SET_DYN_STAT(cache_direntries_used_stat, used_direntries);
01090       dir_sync_init();
01091       cache_init_ok = 1;
01092     } else
01093       Warning("cache unable to open any vols, disabled");
01094   }
01095   if (cache_init_ok) {
01096     // Initialize virtual cache
01097     CacheProcessor::initialized = CACHE_INITIALIZED;
01098     CacheProcessor::cache_ready = caches_ready;
01099     Note("cache enabled");
01100 #ifdef CLUSTER_CACHE
01101     if (!(start_internal_flags & PROCESSOR_RECONFIGURE)) {
01102       CacheContinuation::init();
01103       clusterProcessor.start();
01104     }
01105 #endif
01106   } else {
01107     CacheProcessor::initialized = CACHE_INIT_FAILED;
01108     Note("cache disabled");
01109   }
01110   // Fire callback to signal initialization finished.
01111   if (cb_after_init)
01112     cb_after_init();
01113 }
01114 
01115 void
01116 CacheProcessor::stop()
01117 {
01118 }
01119 
01120 int
01121 CacheProcessor::dir_check(bool afix)
01122 {
01123   for (int i = 0; i < gnvol; i++)
01124     gvol[i]->dir_check(afix);
01125   return 0;
01126 }
01127 
01128 int
01129 CacheProcessor::db_check(bool afix)
01130 {
01131   for (int i = 0; i < gnvol; i++)
01132     gvol[i]->db_check(afix);
01133   return 0;
01134 }
01135 
01136 int
01137 Vol::db_check(bool /* fix ATS_UNUSED */ )
01138 {
01139   char tt[256];
01140   printf("    Data for [%s]\n", hash_text.get());
01141   printf("        Length:          %" PRIu64 "\n", (uint64_t)len);
01142   printf("        Write Position:  %" PRIu64 "\n", (uint64_t) (header->write_pos - skip));
01143   printf("        Phase:           %d\n", (int)!!header->phase);
01144   ink_ctime_r(&header->create_time, tt);
01145   tt[strlen(tt) - 1] = 0;
01146   printf("        Create Time:     %s\n", tt);
01147   printf("        Sync Serial:     %u\n", (unsigned int)header->sync_serial);
01148   printf("        Write Serial:    %u\n", (unsigned int)header->write_serial);
01149   printf("\n");
01150 
01151   return 0;
01152 }
01153 
01154 static void
01155 vol_init_data_internal(Vol *d)
01156 {
01157   d->buckets = ((d->len - (d->start - d->skip)) / cache_config_min_average_object_size) / DIR_DEPTH;
01158   d->segments = (d->buckets + (((1<<16)-1)/DIR_DEPTH)) / ((1<<16)/DIR_DEPTH);
01159   d->buckets = (d->buckets + d->segments - 1) / d->segments;
01160   d->start = d->skip + 2 *vol_dirlen(d);
01161 }
01162 
01163 static void
01164 vol_init_data(Vol *d) {
01165   // iteratively calculate start + buckets
01166   vol_init_data_internal(d);
01167   vol_init_data_internal(d);
01168   vol_init_data_internal(d);
01169 }
01170 
01171 void
01172 vol_init_dir(Vol *d)
01173 {
01174   int b, s, l;
01175 
01176   for (s = 0; s < d->segments; s++) {
01177     d->header->freelist[s] = 0;
01178     Dir *seg = dir_segment(s, d);
01179     for (l = 1; l < DIR_DEPTH; l++) {
01180       for (b = 0; b < d->buckets; b++) {
01181         Dir *bucket = dir_bucket(b, seg);
01182         dir_free_entry(dir_bucket_row(bucket, l), s, d);
01183       }
01184     }
01185   }
01186 }
01187 
01188 #if TS_USE_INTERIM_CACHE == 1
01189 void
01190 interimvol_clear_init(InterimCacheVol *d)
01191 {
01192   memset(d->header, 0, sizeof(InterimVolHeaderFooter));
01193   d->header->magic = VOL_MAGIC;
01194   d->header->version.ink_major = CACHE_DB_MAJOR_VERSION;
01195   d->header->version.ink_minor = CACHE_DB_MINOR_VERSION;
01196   d->header->agg_pos = d->header->write_pos = d->start;
01197   d->header->last_write_pos = d->header->write_pos;
01198   d->header->phase = 0;
01199   d->header->cycle = 0;
01200   d->header->create_time = time(NULL);
01201   d->header->dirty = 0;
01202   d->sector_size = d->header->sector_size = d->disk->hw_sector_size;
01203 }
01204 #endif
01205 
01206 void
01207 vol_clear_init(Vol *d)
01208 {
01209   size_t dir_len = vol_dirlen(d);
01210   memset(d->raw_dir, 0, dir_len);
01211   vol_init_dir(d);
01212   d->header->magic = VOL_MAGIC;
01213   d->header->version.ink_major = CACHE_DB_MAJOR_VERSION;
01214   d->header->version.ink_minor = CACHE_DB_MINOR_VERSION;
01215   d->scan_pos = d->header->agg_pos = d->header->write_pos = d->start;
01216   d->header->last_write_pos = d->header->write_pos;
01217   d->header->phase = 0;
01218   d->header->cycle = 0;
01219   d->header->create_time = time(NULL);
01220   d->header->dirty = 0;
01221   d->sector_size = d->header->sector_size = d->disk->hw_sector_size;
01222   *d->footer = *d->header;
01223 
01224 #if TS_USE_INTERIM_CACHE == 1
01225   for (int i = 0; i < d->num_interim_vols; i++) {
01226     interimvol_clear_init(&(d->interim_vols[i]));
01227   }
01228 #endif
01229 }
01230 
01231 int
01232 vol_dir_clear(Vol *d)
01233 {
01234   size_t dir_len = vol_dirlen(d);
01235   vol_clear_init(d);
01236 
01237   if (pwrite(d->fd, d->raw_dir, dir_len, d->skip) < 0) {
01238     Warning("unable to clear cache directory '%s'", d->hash_text.get());
01239     return -1;
01240   }
01241   return 0;
01242 }
01243 
01244 int
01245 Vol::clear_dir()
01246 {
01247   size_t dir_len = vol_dirlen(this);
01248   vol_clear_init(this);
01249 
01250   SET_HANDLER(&Vol::handle_dir_clear);
01251 
01252   io.aiocb.aio_fildes = fd;
01253   io.aiocb.aio_buf = raw_dir;
01254   io.aiocb.aio_nbytes = dir_len;
01255   io.aiocb.aio_offset = skip;
01256   io.action = this;
01257   io.thread = AIO_CALLBACK_THREAD_ANY;
01258   io.then = 0;
01259   ink_assert(ink_aio_write(&io));
01260   return 0;
01261 }
01262 
01263 int
01264 Vol::init(char *s, off_t blocks, off_t dir_skip, bool clear)
01265 {
01266   char* seed_str = disk->hash_base_string ? disk->hash_base_string : s;
01267   const size_t hash_seed_size = strlen(seed_str);
01268   const size_t hash_text_size = hash_seed_size + 32;
01269 
01270   hash_text = static_cast<char *>(ats_malloc(hash_text_size));
01271   ink_strlcpy(hash_text, seed_str, hash_text_size);
01272   snprintf(hash_text + hash_seed_size, (hash_text_size - hash_seed_size), " %" PRIu64 ":%" PRIu64 "",
01273            (uint64_t)dir_skip, (uint64_t)blocks);
01274   MD5Context().hash_immediate(hash_id, hash_text, strlen(hash_text));
01275 
01276   dir_skip = ROUND_TO_STORE_BLOCK((dir_skip < START_POS ? START_POS : dir_skip));
01277   path = ats_strdup(s);
01278   len = blocks * STORE_BLOCK_SIZE;
01279   ink_assert(len <= MAX_VOL_SIZE);
01280   skip = dir_skip;
01281   prev_recover_pos = 0;
01282 
01283   // successive approximation, directory/meta data eats up some storage
01284   start = dir_skip;
01285   vol_init_data(this);
01286   data_blocks = (len - (start - skip)) / STORE_BLOCK_SIZE;
01287   hit_evacuate_window = (data_blocks * cache_config_hit_evacuate_percent) / 100;
01288 
01289   evacuate_size = (int) (len / EVACUATION_BUCKET_SIZE) + 2;
01290   int evac_len = (int) evacuate_size * sizeof(DLL<EvacuationBlock>);
01291   evacuate = (DLL<EvacuationBlock> *)ats_malloc(evac_len);
01292   memset(evacuate, 0, evac_len);
01293 
01294   Debug("cache_init", "allocating %zu directory bytes for a %lld byte volume (%lf%%)",
01295     vol_dirlen(this), (long long)this->len, (double)vol_dirlen(this) / (double)this->len * 100.0);
01296   raw_dir = (char *)ats_memalign(ats_pagesize(), vol_dirlen(this));
01297   dir = (Dir *) (raw_dir + vol_headerlen(this));
01298   header = (VolHeaderFooter *) raw_dir;
01299   footer = (VolHeaderFooter *) (raw_dir + vol_dirlen(this) - ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter)));
01300 
01301 #if TS_USE_INTERIM_CACHE == 1
01302   num_interim_vols = good_interim_disks;
01303   ink_assert(num_interim_vols >= 0 && num_interim_vols <= 8);
01304   for (int i = 0; i < num_interim_vols; i++) {
01305     double r = (double) blocks / total_cache_size;
01306     off_t vlen = off_t (r * g_interim_disks[i]->len * STORE_BLOCK_SIZE);
01307     vlen = (vlen / STORE_BLOCK_SIZE) * STORE_BLOCK_SIZE;
01308     off_t start = ink_atomic_increment(&g_interim_disks[i]->skip, vlen);
01309     interim_vols[i].init(start, vlen, g_interim_disks[i], this, &(this->header->interim_header[i]));
01310     ink_assert(interim_vols[i].start + interim_vols[i].len <= g_interim_disks[i]->len * STORE_BLOCK_SIZE);
01311   }
01312 #endif
01313 
01314   if (clear) {
01315     Note("clearing cache directory '%s'", hash_text.get());
01316     return clear_dir();
01317   }
01318 
01319   init_info = new VolInitInfo();
01320   int footerlen = ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter));
01321   off_t footer_offset = vol_dirlen(this) - footerlen;
01322   // try A
01323   off_t as = skip;
01324   if (is_debug_tag_set("cache_init"))
01325     Note("reading directory '%s'", hash_text.get());
01326   SET_HANDLER(&Vol::handle_header_read);
01327   init_info->vol_aio[0].aiocb.aio_offset = as;
01328   init_info->vol_aio[1].aiocb.aio_offset = as + footer_offset;
01329   off_t bs = skip + vol_dirlen(this);
01330   init_info->vol_aio[2].aiocb.aio_offset = bs;
01331   init_info->vol_aio[3].aiocb.aio_offset = bs + footer_offset;
01332 
01333   for (unsigned i = 0; i < countof(init_info->vol_aio); i++) {
01334     AIOCallback *aio = &(init_info->vol_aio[i]);
01335     aio->aiocb.aio_fildes = fd;
01336     aio->aiocb.aio_buf = &(init_info->vol_h_f[i * STORE_BLOCK_SIZE]);
01337     aio->aiocb.aio_nbytes = footerlen;
01338     aio->action = this;
01339     aio->thread = AIO_CALLBACK_THREAD_ANY;
01340     aio->then = (i < 3) ? &(init_info->vol_aio[i + 1]) : 0;
01341   }
01342 #if AIO_MODE == AIO_MODE_NATIVE
01343   ink_assert(ink_aio_readv(init_info->vol_aio));
01344 #else
01345   ink_assert(ink_aio_read(init_info->vol_aio));
01346 #endif
01347   return 0;
01348 }
01349 
01350 int
01351 Vol::handle_dir_clear(int event, void *data)
01352 {
01353   size_t dir_len = vol_dirlen(this);
01354   AIOCallback *op;
01355 
01356   if (event == AIO_EVENT_DONE) {
01357     op = (AIOCallback *) data;
01358     if ((size_t) op->aio_result != (size_t) op->aiocb.aio_nbytes) {
01359       Warning("unable to clear cache directory '%s'", hash_text.get());
01360       fd = -1;
01361     }
01362 
01363     if (op->aiocb.aio_nbytes == dir_len) {
01364       /* clear the header for directory B. We don't need to clear the
01365          whole of directory B. The header for directory B starts at
01366          skip + len */
01367       op->aiocb.aio_nbytes = ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter));
01368       op->aiocb.aio_offset = skip + dir_len;
01369       ink_assert(ink_aio_write(op));
01370       return EVENT_DONE;
01371     }
01372     set_io_not_in_progress();
01373     SET_HANDLER(&Vol::dir_init_done);
01374     dir_init_done(EVENT_IMMEDIATE, 0);
01375     /* mark the volume as bad */
01376   }
01377   return EVENT_DONE;
01378 }
01379 
01380 int
01381 Vol::handle_dir_read(int event, void *data)
01382 {
01383   AIOCallback *op = (AIOCallback *) data;
01384 
01385   if (event == AIO_EVENT_DONE) {
01386     if ((size_t) op->aio_result != (size_t) op->aiocb.aio_nbytes) {
01387       clear_dir();
01388       return EVENT_DONE;
01389     }
01390   }
01391 
01392   if (!(header->magic == VOL_MAGIC &&  footer->magic == VOL_MAGIC &&
01393         CACHE_DB_MAJOR_VERSION_COMPATIBLE <= header->version.ink_major &&  header->version.ink_major <= CACHE_DB_MAJOR_VERSION
01394     )) {
01395     Warning("bad footer in cache directory for '%s', clearing", hash_text.get());
01396     Note("clearing cache directory '%s'", hash_text.get());
01397     clear_dir();
01398     return EVENT_DONE;
01399   }
01400   CHECK_DIR(this);
01401 
01402   sector_size = header->sector_size;
01403 
01404 #if TS_USE_INTERIM_CACHE == 1
01405   if (num_interim_vols > 0) {
01406     interim_done = 0;
01407     for (int i = 0; i < num_interim_vols; i++) {
01408       interim_vols[i].recover_data();
01409     }
01410   } else {
01411 #endif
01412 
01413   return this->recover_data();
01414 
01415 #if TS_USE_INTERIM_CACHE == 1
01416   }
01417 #endif
01418 
01419   return EVENT_CONT;
01420 }
01421 
01422 int
01423 Vol::recover_data()
01424 {
01425   SET_HANDLER(&Vol::handle_recover_from_data);
01426   return handle_recover_from_data(EVENT_IMMEDIATE, 0);
01427 }
01428 
01429 /*
01430    Philosophy:  The idea is to find the region of disk that could be
01431    inconsistent and remove all directory entries pointing to that potentially
01432    inconsistent region.
01433    Start from a consistent position (the write_pos of the last directory
01434    synced to disk) and scan forward. Two invariants for docs that were
01435    written to the disk after the directory was synced:
01436 
01437    1. doc->magic == DOC_MAGIC
01438 
01439    The following two cases happen only when the previous generation
01440    documents are aligned with the current ones.
01441 
01442    2. All the docs written to the disk
01443    after the directory was synced will have their sync_serial <=
01444    header->sync_serial + 1,  because the write aggregation can take
01445    indeterminate amount of time to sync. The doc->sync_serial can be
01446    equal to header->sync_serial + 1, because we increment the sync_serial
01447    before we sync the directory to disk.
01448 
01449    3. The doc->sync_serial will always increase. If doc->sync_serial
01450    decreases, the document was written in the previous phase
01451 
01452    If either of these conditions fail and we are not too close to the end
01453    (see the next comment ) then we're done
01454 
01455    We actually start from header->last_write_pos instead of header->write_pos
01456    to make sure that we haven't wrapped around the whole disk without
01457    syncing the directory.  Since the sync serial is 60 seconds, it is
01458    entirely possible to write through the whole cache without
01459    once syncing the directory. In this case, we need to clear the
01460    cache.The documents written right before we synced the
01461    directory to disk should have the write_serial <= header->sync_serial.
01462 
01463       */
01464 
01465 int
01466 Vol::handle_recover_from_data(int event, void * /* data ATS_UNUSED */ )
01467 {
01468   uint32_t got_len = 0;
01469   uint32_t max_sync_serial = header->sync_serial;
01470   char *s, *e;
01471   if (event == EVENT_IMMEDIATE) {
01472     if (header->sync_serial == 0) {
01473       io.aiocb.aio_buf = NULL;
01474       SET_HANDLER(&Vol::handle_recover_write_dir);
01475       return handle_recover_write_dir(EVENT_IMMEDIATE, 0);
01476     }
01477     // initialize
01478     recover_wrapped = 0;
01479     last_sync_serial = 0;
01480     last_write_serial = 0;
01481     recover_pos = header->last_write_pos;
01482     if (recover_pos >= skip + len) {
01483       recover_wrapped = 1;
01484       recover_pos = start;
01485     }
01486     io.aiocb.aio_buf = (char *)ats_memalign(ats_pagesize(), RECOVERY_SIZE);
01487     io.aiocb.aio_nbytes = RECOVERY_SIZE;
01488     if ((off_t)(recover_pos + io.aiocb.aio_nbytes) > (off_t)(skip + len))
01489       io.aiocb.aio_nbytes = (skip + len) - recover_pos;
01490   } else if (event == AIO_EVENT_DONE) {
01491     if ((size_t) io.aiocb.aio_nbytes != (size_t) io.aio_result) {
01492       Warning("disk read error on recover '%s', clearing", hash_text.get());
01493       goto Lclear;
01494     }
01495     if (io.aiocb.aio_offset == header->last_write_pos) {
01496 
01497       /* check that we haven't wrapped around without syncing
01498          the directory. Start from last_write_serial (write pos the documents
01499          were written to just before syncing the directory) and make sure
01500          that all documents have write_serial <= header->write_serial.
01501        */
01502       uint32_t to_check = header->write_pos - header->last_write_pos;
01503       ink_assert(to_check && to_check < (uint32_t)io.aiocb.aio_nbytes);
01504       uint32_t done = 0;
01505       s = (char *) io.aiocb.aio_buf;
01506       while (done < to_check) {
01507         Doc *doc = (Doc *) (s + done);
01508         if (doc->magic != DOC_MAGIC || doc->write_serial > header->write_serial) {
01509           Warning("no valid directory found while recovering '%s', clearing", hash_text.get());
01510           goto Lclear;
01511         }
01512         done += round_to_approx_size(doc->len);
01513         if (doc->sync_serial > last_write_serial)
01514           last_sync_serial = doc->sync_serial;
01515       }
01516       ink_assert(done == to_check);
01517 
01518       got_len = io.aiocb.aio_nbytes - done;
01519       recover_pos += io.aiocb.aio_nbytes;
01520       s = (char *) io.aiocb.aio_buf + done;
01521       e = s + got_len;
01522     } else {
01523       got_len = io.aiocb.aio_nbytes;
01524       recover_pos += io.aiocb.aio_nbytes;
01525       s = (char *) io.aiocb.aio_buf;
01526       e = s + got_len;
01527     }
01528   }
01529   // examine what we got
01530   if (got_len) {
01531 
01532     Doc *doc = NULL;
01533 
01534     if (recover_wrapped && start == io.aiocb.aio_offset) {
01535       doc = (Doc *) s;
01536       if (doc->magic != DOC_MAGIC || doc->write_serial < last_write_serial) {
01537         recover_pos = skip + len - EVACUATION_SIZE;
01538         goto Ldone;
01539       }
01540     }
01541 
01542     while (s < e) {
01543       doc = (Doc *) s;
01544 
01545       if (doc->magic != DOC_MAGIC || doc->sync_serial != last_sync_serial) {
01546 
01547         if (doc->magic == DOC_MAGIC) {
01548           if (doc->sync_serial > header->sync_serial)
01549             max_sync_serial = doc->sync_serial;
01550 
01551           /*
01552              doc->magic == DOC_MAGIC, but doc->sync_serial != last_sync_serial
01553              This might happen in the following situations
01554              1. We are starting off recovery. In this case the
01555              last_sync_serial == header->sync_serial, but the doc->sync_serial
01556              can be anywhere in the range (0, header->sync_serial + 1]
01557              If this is the case, update last_sync_serial and continue;
01558 
01559              2. A dir sync started between writing documents to the
01560              aggregation buffer and hence the doc->sync_serial went up.
01561              If the doc->sync_serial is greater than the last
01562              sync serial and less than (header->sync_serial + 2) then
01563              continue;
01564 
01565              3. If the position we are recovering from is within AGG_SIZE
01566              from the disk end, then we can't trust this document. The
01567              aggregation buffer might have been larger than the remaining space
01568              at the end and we decided to wrap around instead of writing
01569              anything at that point. In this case, wrap around and start
01570              from the beginning.
01571 
01572              If neither of these 3 cases happen, then we are indeed done.
01573 
01574            */
01575 
01576           // case 1
01577           // case 2
01578           if (doc->sync_serial > last_sync_serial && doc->sync_serial <= header->sync_serial + 1) {
01579             last_sync_serial = doc->sync_serial;
01580             s += round_to_approx_size(doc->len);
01581             continue;
01582           }
01583           // case 3 - we have already recoverd some data and
01584           // (doc->sync_serial < last_sync_serial) ||
01585           // (doc->sync_serial > header->sync_serial + 1).
01586           // if we are too close to the end, wrap around
01587           else if (recover_pos - (e - s) > (skip + len) - AGG_SIZE) {
01588             recover_wrapped = 1;
01589             recover_pos = start;
01590             io.aiocb.aio_nbytes = RECOVERY_SIZE;
01591 
01592             break;
01593           }
01594           // we are done. This doc was written in the earlier phase
01595           recover_pos -= e - s;
01596           goto Ldone;
01597         } else {
01598           // doc->magic != DOC_MAGIC
01599           // If we are in the danger zone - recover_pos is within AGG_SIZE
01600           // from the end, then wrap around
01601           recover_pos -= e - s;
01602           if (recover_pos > (skip + len) - AGG_SIZE) {
01603             recover_wrapped = 1;
01604             recover_pos = start;
01605             io.aiocb.aio_nbytes = RECOVERY_SIZE;
01606 
01607             break;
01608           }
01609           // we ar not in the danger zone
01610           goto Ldone;
01611         }
01612       }
01613       // doc->magic == DOC_MAGIC && doc->sync_serial == last_sync_serial
01614       last_write_serial = doc->write_serial;
01615       s += round_to_approx_size(doc->len);
01616     }
01617 
01618     /* if (s > e) then we gone through RECOVERY_SIZE; we need to
01619        read more data off disk and continue recovering */
01620     if (s >= e) {
01621       /* In the last iteration, we increment s by doc->len...need to undo
01622          that change */
01623       if (s > e)
01624         s -= round_to_approx_size(doc->len);
01625       recover_pos -= e - s;
01626       if (recover_pos >= skip + len)
01627         recover_pos = start;
01628       io.aiocb.aio_nbytes = RECOVERY_SIZE;
01629       if ((off_t)(recover_pos + io.aiocb.aio_nbytes) > (off_t)(skip + len))
01630         io.aiocb.aio_nbytes = (skip + len) - recover_pos;
01631     }
01632   }
01633   if (recover_pos == prev_recover_pos) // this should never happen, but if it does break the loop
01634     goto Lclear;
01635   prev_recover_pos = recover_pos;
01636   io.aiocb.aio_offset = recover_pos;
01637   ink_assert(ink_aio_read(&io));
01638   return EVENT_CONT;
01639 
01640 Ldone:{
01641     /* if we come back to the starting position, then we don't have to recover anything */
01642     if (recover_pos == header->write_pos && recover_wrapped) {
01643       SET_HANDLER(&Vol::handle_recover_write_dir);
01644       if (is_debug_tag_set("cache_init"))
01645         Note("recovery wrapped around. nothing to clear\n");
01646       return handle_recover_write_dir(EVENT_IMMEDIATE, 0);
01647     }
01648 
01649     recover_pos += EVACUATION_SIZE;   // safely cover the max write size
01650     if (recover_pos < header->write_pos && (recover_pos + EVACUATION_SIZE >= header->write_pos)) {
01651       Debug("cache_init", "Head Pos: %" PRIu64 ", Rec Pos: %" PRIu64 ", Wrapped:%d", header->write_pos, recover_pos, recover_wrapped);
01652       Warning("no valid directory found while recovering '%s', clearing", hash_text.get());
01653       goto Lclear;
01654     }
01655 
01656     if (recover_pos > skip + len)
01657       recover_pos -= skip + len;
01658     // bump sync number so it is different from that in the Doc structs
01659     uint32_t next_sync_serial = max_sync_serial + 1;
01660     // make that the next sync does not overwrite our good copy!
01661     if (!(header->sync_serial & 1) == !(next_sync_serial & 1))
01662       next_sync_serial++;
01663     // clear effected portion of the cache
01664     off_t clear_start = offset_to_vol_offset(this, header->write_pos);
01665     off_t clear_end = offset_to_vol_offset(this, recover_pos);
01666     if (clear_start <= clear_end)
01667       dir_clear_range(clear_start, clear_end, this);
01668     else {
01669       dir_clear_range(clear_end, DIR_OFFSET_MAX, this);
01670       dir_clear_range(1, clear_start, this);
01671     }
01672     if (is_debug_tag_set("cache_init"))
01673       Note("recovery clearing offsets [%" PRIu64 ", %" PRIu64 "] sync_serial %d next %d\n",
01674            header->write_pos, recover_pos, header->sync_serial, next_sync_serial);
01675     footer->sync_serial = header->sync_serial = next_sync_serial;
01676 
01677     for (int i = 0; i < 3; i++) {
01678       AIOCallback *aio = &(init_info->vol_aio[i]);
01679       aio->aiocb.aio_fildes = fd;
01680       aio->action = this;
01681       aio->thread = AIO_CALLBACK_THREAD_ANY;
01682       aio->then = (i < 2) ? &(init_info->vol_aio[i + 1]) : 0;
01683     }
01684     int footerlen = ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter));
01685     size_t dirlen = vol_dirlen(this);
01686     int B = header->sync_serial & 1;
01687     off_t ss = skip + (B ? dirlen : 0);
01688 
01689     init_info->vol_aio[0].aiocb.aio_buf = raw_dir;
01690     init_info->vol_aio[0].aiocb.aio_nbytes = footerlen;
01691     init_info->vol_aio[0].aiocb.aio_offset = ss;
01692     init_info->vol_aio[1].aiocb.aio_buf = raw_dir + footerlen;
01693     init_info->vol_aio[1].aiocb.aio_nbytes = dirlen - 2 * footerlen;
01694     init_info->vol_aio[1].aiocb.aio_offset = ss + footerlen;
01695     init_info->vol_aio[2].aiocb.aio_buf = raw_dir + dirlen - footerlen;
01696     init_info->vol_aio[2].aiocb.aio_nbytes = footerlen;
01697     init_info->vol_aio[2].aiocb.aio_offset = ss + dirlen - footerlen;
01698 
01699     SET_HANDLER(&Vol::handle_recover_write_dir);
01700 #if AIO_MODE == AIO_MODE_NATIVE
01701     ink_assert(ink_aio_writev(init_info->vol_aio));
01702 #else
01703     ink_assert(ink_aio_write(init_info->vol_aio));
01704 #endif
01705     return EVENT_CONT;
01706   }
01707 
01708 Lclear:
01709   free((char *) io.aiocb.aio_buf);
01710   delete init_info;
01711   init_info = 0;
01712   clear_dir();
01713   return EVENT_CONT;
01714 }
01715 
01716 int
01717 Vol::handle_recover_write_dir(int /* event ATS_UNUSED */ , void * /* data ATS_UNUSED */ )
01718 {
01719   if (io.aiocb.aio_buf)
01720     free((char *) io.aiocb.aio_buf);
01721   delete init_info;
01722   init_info = 0;
01723   set_io_not_in_progress();
01724   scan_pos = header->write_pos;
01725   periodic_scan();
01726   SET_HANDLER(&Vol::dir_init_done);
01727   return dir_init_done(EVENT_IMMEDIATE, 0);
01728 }
01729 
01730 int
01731 Vol::handle_header_read(int event, void *data)
01732 {
01733   AIOCallback *op;
01734   VolHeaderFooter *hf[4];
01735   switch (event) {
01736   case AIO_EVENT_DONE:
01737     op = (AIOCallback *) data;
01738     for (int i = 0; i < 4; i++) {
01739       ink_assert(op != 0);
01740       hf[i] = (VolHeaderFooter *) (op->aiocb.aio_buf);
01741       if ((size_t) op->aio_result != (size_t) op->aiocb.aio_nbytes) {
01742         clear_dir();
01743         return EVENT_DONE;
01744       }
01745       op = op->then;
01746     }
01747 
01748     io.aiocb.aio_fildes = fd;
01749     io.aiocb.aio_nbytes = vol_dirlen(this);
01750     io.aiocb.aio_buf = raw_dir;
01751     io.action = this;
01752     io.thread = AIO_CALLBACK_THREAD_ANY;
01753     io.then = 0;
01754 
01755     if (hf[0]->sync_serial == hf[1]->sync_serial &&
01756         (hf[0]->sync_serial >= hf[2]->sync_serial || hf[2]->sync_serial != hf[3]->sync_serial)) {
01757       SET_HANDLER(&Vol::handle_dir_read);
01758       if (is_debug_tag_set("cache_init"))
01759         Note("using directory A for '%s'", hash_text.get());
01760       io.aiocb.aio_offset = skip;
01761       ink_assert(ink_aio_read(&io));
01762     }
01763     // try B
01764     else if (hf[2]->sync_serial == hf[3]->sync_serial) {
01765 
01766       SET_HANDLER(&Vol::handle_dir_read);
01767       if (is_debug_tag_set("cache_init"))
01768         Note("using directory B for '%s'", hash_text.get());
01769       io.aiocb.aio_offset = skip + vol_dirlen(this);
01770       ink_assert(ink_aio_read(&io));
01771     } else {
01772       Note("no good directory, clearing '%s'", hash_text.get());
01773       clear_dir();
01774       delete init_info;
01775       init_info = 0;
01776     }
01777     return EVENT_DONE;
01778   default:
01779     ink_assert(!"not reach here");
01780   }
01781   return EVENT_DONE;
01782 }
01783 
01784 int
01785 Vol::dir_init_done(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */ )
01786 {
01787   if (!cache->cache_read_done) {
01788     eventProcessor.schedule_in(this, HRTIME_MSECONDS(5), ET_CALL);
01789     return EVENT_CONT;
01790   } else {
01791     int vol_no = ink_atomic_increment(&gnvol, 1);
01792     ink_assert(!gvol[vol_no]);
01793     gvol[vol_no] = this;
01794     SET_HANDLER(&Vol::aggWrite);
01795     if (fd == -1)
01796       cache->vol_initialized(0);
01797     else
01798       cache->vol_initialized(1);
01799     return EVENT_DONE;
01800   }
01801 }
01802 
01803 #if TS_USE_INTERIM_CACHE == 1
01804 int
01805 InterimCacheVol::recover_data()
01806 {
01807   io.aiocb.aio_fildes = fd;
01808   io.action = this;
01809   io.thread = AIO_CALLBACK_THREAD_ANY;
01810   io.then = 0;
01811 
01812   SET_HANDLER(&InterimCacheVol::handle_recover_from_data);
01813   return handle_recover_from_data(EVENT_IMMEDIATE, 0);
01814 }
01815 
01816 int
01817 InterimCacheVol::handle_recover_from_data(int event, void *data)
01818 {
01819   (void)data;
01820   uint32_t got_len = 0;
01821   uint32_t max_sync_serial = header->sync_serial;
01822   char *s, *e;
01823   int ndone, offset;
01824 
01825   if (event == EVENT_IMMEDIATE) {
01826     if (header->magic != VOL_MAGIC || header->version.ink_major != CACHE_DB_MAJOR_VERSION) {
01827       Warning("bad header in cache directory for '%s', clearing", hash_text.get());
01828       goto Lclear;
01829     } else if (header->sync_serial == 0) {
01830       io.aiocb.aio_buf = NULL;
01831       goto Lfinish;
01832     }
01833 
01834     // initialize
01835     recover_wrapped = 0;
01836     last_sync_serial = 0;
01837     last_write_serial = 0;
01838     recover_pos = header->last_write_pos;
01839     if (recover_pos >= skip + len) {
01840       recover_wrapped = 1;
01841       recover_pos = start;
01842     }
01843 
01844     io.aiocb.aio_buf = (char *)ats_memalign(sysconf(_SC_PAGESIZE), RECOVERY_SIZE);
01845     io.aiocb.aio_nbytes = RECOVERY_SIZE;
01846     if ((off_t)(recover_pos + io.aiocb.aio_nbytes) > (off_t)(skip + len))
01847       io.aiocb.aio_nbytes = (skip + len) - recover_pos;
01848 
01849   } else if (event == AIO_EVENT_DONE) {
01850     if ((size_t) io.aiocb.aio_nbytes != (size_t) io.aio_result) {
01851       Warning("disk read error on recover '%s', clearing", hash_text.get());
01852       goto Lclear;
01853     }
01854 
01855     if (io.aiocb.aio_offset == header->last_write_pos) {
01856       uint32_t to_check = header->write_pos - header->last_write_pos;
01857       ink_assert(to_check && to_check < (uint32_t)io.aiocb.aio_nbytes);
01858       uint32_t done = 0;
01859       s = (char *) io.aiocb.aio_buf;
01860       while (done < to_check) {
01861         Doc *doc = (Doc *) (s + done);
01862         if (doc->magic != DOC_MAGIC || doc->write_serial > header->write_serial) {
01863           Warning("no valid directory found while recovering '%s', clearing", hash_text.get());
01864           goto Lclear;
01865         }
01866         done += round_to_approx_size(doc->len);
01867         if (doc->sync_serial > last_write_serial)
01868           last_sync_serial = doc->sync_serial;
01869       }
01870       ink_assert(done == to_check);
01871 
01872       got_len = io.aiocb.aio_nbytes - done;
01873       recover_pos += io.aiocb.aio_nbytes;
01874       s = (char *) io.aiocb.aio_buf + done;
01875       e = s + got_len;
01876     } else {
01877       got_len = io.aiocb.aio_nbytes;
01878       recover_pos += io.aiocb.aio_nbytes;
01879       s = (char *) io.aiocb.aio_buf;
01880       e = s + got_len;
01881     }
01882   }
01883 
01884   // examine what we got
01885   if (got_len) {
01886 
01887     Doc *doc = NULL;
01888 
01889     if (recover_wrapped && start == io.aiocb.aio_offset) {
01890       doc = (Doc *) s;
01891       if (doc->magic != DOC_MAGIC || doc->write_serial < last_write_serial) {
01892         recover_pos = skip + len - EVACUATION_SIZE;
01893         goto Ldone;
01894       }
01895     }
01896 
01897     while (s < e) {
01898       doc = (Doc *) s;
01899 
01900       if (doc->magic != DOC_MAGIC || doc->sync_serial != last_sync_serial) {
01901 
01902         if (doc->magic == DOC_MAGIC) {
01903           if (doc->sync_serial > header->sync_serial)
01904             max_sync_serial = doc->sync_serial;
01905 
01906           if (doc->sync_serial > last_sync_serial && doc->sync_serial <= header->sync_serial + 1) {
01907             last_sync_serial = doc->sync_serial;
01908             s += round_to_approx_size(doc->len);
01909             continue;
01910 
01911           } else if (recover_pos - (e - s) > (skip + len) - AGG_SIZE) {
01912             recover_wrapped = 1;
01913             recover_pos = start;
01914             io.aiocb.aio_nbytes = RECOVERY_SIZE;
01915             break;
01916           }
01917 
01918           recover_pos -= e - s;
01919           goto Ldone;
01920 
01921         } else {
01922           recover_pos -= e - s;
01923           if (recover_pos > (skip + len) - AGG_SIZE) {
01924             recover_wrapped = 1;
01925             recover_pos = start;
01926             io.aiocb.aio_nbytes = RECOVERY_SIZE;
01927             break;
01928           }
01929 
01930           goto Ldone;
01931         }
01932       }
01933 
01934       last_write_serial = doc->write_serial;
01935       s += round_to_approx_size(doc->len);
01936     }
01937 
01938     if (s >= e) {
01939 
01940       if (s > e)
01941         s -= round_to_approx_size(doc->len);
01942 
01943       recover_pos -= e - s;
01944       if (recover_pos >= skip + len)
01945         recover_pos = start;
01946 
01947       io.aiocb.aio_nbytes = RECOVERY_SIZE;
01948       if ((off_t)(recover_pos + io.aiocb.aio_nbytes) > (off_t)(skip + len))
01949         io.aiocb.aio_nbytes = (skip + len) - recover_pos;
01950     }
01951   }
01952 
01953   if (recover_pos == prev_recover_pos)
01954     goto Lclear;
01955 
01956   prev_recover_pos = recover_pos;
01957   io.aiocb.aio_offset = recover_pos;
01958   ink_assert(ink_aio_read(&io));
01959   return EVENT_CONT;
01960 
01961 Ldone: {
01962 
01963     if (recover_pos == header->write_pos && recover_wrapped) {
01964       goto Lfinish;
01965     }
01966 
01967     recover_pos += EVACUATION_SIZE;
01968     if (recover_pos < header->write_pos && (recover_pos + EVACUATION_SIZE >= header->write_pos)) {
01969       Debug("cache_init", "Head Pos: %" PRIu64 ", Rec Pos: %" PRIu64 ", Wrapped:%d", header->write_pos, recover_pos, recover_wrapped);
01970       Warning("no valid directory found while recovering '%s', clearing", hash_text.get());
01971       goto Lclear;
01972     }
01973 
01974     if (recover_pos > skip + len)
01975       recover_pos -= skip + len;
01976 
01977     uint32_t next_sync_serial = max_sync_serial + 1;
01978     if (!(header->sync_serial & 1) == !(next_sync_serial & 1))
01979       next_sync_serial++;
01980 
01981     off_t clear_start = offset_to_vol_offset(this, header->write_pos);
01982     off_t clear_end = offset_to_vol_offset(this, recover_pos);
01983 
01984     if (clear_start <= clear_end)
01985       dir_clean_range_interimvol(clear_start, clear_end, this);
01986     else {
01987       dir_clean_range_interimvol(clear_end, DIR_OFFSET_MAX, this);
01988       dir_clean_range_interimvol(1, clear_start, this);
01989     }
01990 
01991     header->sync_serial = next_sync_serial;
01992 
01993     goto Lfinish;
01994   }
01995 
01996 Lclear:
01997 
01998   interimvol_clear_init(this);
01999   offset = this - vol->interim_vols;
02000   clear_interimvol_dir(vol, offset);          // remove this interimvol dir
02001 
02002 Lfinish:
02003 
02004   free((char*)io.aiocb.aio_buf);
02005   io.aiocb.aio_buf = NULL;
02006 
02007   set_io_not_in_progress();
02008 
02009   ndone = ink_atomic_increment(&vol->interim_done, 1);
02010   if (ndone == vol->num_interim_vols - 1) {         // all interim finished
02011     return vol->recover_data();
02012   }
02013 
02014   return EVENT_CONT;
02015 }
02016 #endif
02017 
02018 // explicit pair for random table in build_vol_hash_table
02019 struct rtable_pair {
02020   unsigned int rval; ///< relative value, used to sort.
02021   unsigned int idx; ///< volume mapping table index.
02022 };
02023 
02024 // comparison operator for random table in build_vol_hash_table
02025 // sorts based on the randomly assigned rval
02026 static int
02027 cmprtable(const void *aa, const void *bb) {
02028   rtable_pair *a = (rtable_pair*)aa;
02029   rtable_pair *b = (rtable_pair*)bb;
02030   if (a->rval < b->rval) return -1;
02031   if (a->rval > b->rval) return 1;
02032   return 0;
02033 }
02034 
02035 void
02036 build_vol_hash_table(CacheHostRecord *cp)
02037 {
02038   int num_vols = cp->num_vols;
02039   unsigned int *mapping = (unsigned int *)ats_malloc(sizeof(unsigned int) * num_vols);
02040   Vol **p = (Vol **)ats_malloc(sizeof(Vol *) * num_vols);
02041 
02042   memset(mapping, 0, num_vols * sizeof(unsigned int));
02043   memset(p, 0, num_vols * sizeof(Vol *));
02044   uint64_t total = 0;
02045   int bad_vols = 0;
02046   int map = 0;
02047   uint64_t used = 0;
02048   // initialize number of elements per vol
02049   for (int i = 0; i < num_vols; i++) {
02050     if (DISK_BAD(cp->vols[i]->disk)) {
02051       bad_vols++;
02052       continue;
02053     }
02054     mapping[map] = i;
02055     p[map++] = cp->vols[i];
02056     total += (cp->vols[i]->len >> STORE_BLOCK_SHIFT);
02057   }
02058 
02059   num_vols -= bad_vols;
02060 
02061   if (!num_vols) {
02062     // all the disks are corrupt,
02063     if (cp->vol_hash_table) {
02064       new_Freer(cp->vol_hash_table, CACHE_MEM_FREE_TIMEOUT);
02065     }
02066     cp->vol_hash_table = NULL;
02067     ats_free(mapping);
02068     ats_free(p);
02069     return;
02070   }
02071 
02072   unsigned int *forvol = (unsigned int *) ats_malloc(sizeof(unsigned int) * num_vols);
02073   unsigned int *gotvol = (unsigned int *) ats_malloc(sizeof(unsigned int) * num_vols);
02074   unsigned int *rnd = (unsigned int *) ats_malloc(sizeof(unsigned int) * num_vols);
02075   unsigned short *ttable = (unsigned short *)ats_malloc(sizeof(unsigned short) * VOL_HASH_TABLE_SIZE);
02076   unsigned short *old_table;
02077   unsigned int *rtable_entries = (unsigned int *) ats_malloc(sizeof(unsigned int) * num_vols);
02078   unsigned int rtable_size = 0;
02079 
02080   // estimate allocation
02081   for (int i = 0; i < num_vols; i++) {
02082     forvol[i] = (VOL_HASH_TABLE_SIZE * (p[i]->len >> STORE_BLOCK_SHIFT)) / total;
02083     used += forvol[i];
02084     rtable_entries[i] = p[i]->len / VOL_HASH_ALLOC_SIZE;
02085     rtable_size += rtable_entries[i];
02086     gotvol[i] = 0;
02087   }
02088   // spread around the excess
02089   int extra = VOL_HASH_TABLE_SIZE - used;
02090   for (int i = 0; i < extra; i++)
02091     forvol[i % num_vols]++;
02092   // seed random number generator
02093   for (int i = 0; i < num_vols; i++) {
02094     uint64_t x = p[i]->hash_id.fold();
02095     rnd[i] = (unsigned int) x;
02096   }
02097   // initialize table to "empty"
02098   for (int i = 0; i < VOL_HASH_TABLE_SIZE; i++)
02099     ttable[i] = VOL_HASH_EMPTY;
02100   // generate random numbers proportaion to allocation
02101   rtable_pair *rtable = (rtable_pair *)ats_malloc(sizeof(rtable_pair) * rtable_size);
02102   int rindex = 0;
02103   for (int i = 0; i < num_vols; i++)
02104     for (int j = 0; j < (int)rtable_entries[i]; j++) {
02105       rtable[rindex].rval = next_rand(&rnd[i]);
02106       rtable[rindex].idx = i;
02107       rindex++;
02108     }
02109   ink_assert(rindex == (int)rtable_size);
02110   // sort (rand #, vol $ pairs)
02111   qsort(rtable, rtable_size, sizeof(rtable_pair), cmprtable);
02112   unsigned int width = (1LL << 32) / VOL_HASH_TABLE_SIZE;
02113   unsigned int pos;  // target position to allocate
02114   // select vol with closest random number for each bucket
02115   int i = 0;  // index moving through the random numbers
02116   for (int j = 0; j < VOL_HASH_TABLE_SIZE; j++) {
02117     pos = width / 2 + j * width;  // position to select closest to
02118     while (pos > rtable[i].rval && i < (int)rtable_size - 1) i++;
02119     ttable[j] = mapping[rtable[i].idx];
02120     gotvol[rtable[i].idx]++;
02121   }
02122   for (int i = 0; i < num_vols; i++) {
02123     Debug("cache_init", "build_vol_hash_table index %d mapped to %d requested %d got %d", i, mapping[i], forvol[i], gotvol[i]);
02124   }
02125   // install new table
02126   if (0 != (old_table = ink_atomic_swap(&(cp->vol_hash_table), ttable)))
02127     new_Freer(old_table, CACHE_MEM_FREE_TIMEOUT);
02128   ats_free(mapping);
02129   ats_free(p);
02130   ats_free(forvol);
02131   ats_free(gotvol);
02132   ats_free(rnd);
02133   ats_free(rtable_entries);
02134   ats_free(rtable);
02135 }
02136 
02137 void
02138 Cache::vol_initialized(bool result) {
02139   if (result)
02140     ink_atomic_increment(&total_good_nvol, 1);
02141   if (total_nvol == ink_atomic_increment(&total_initialized_vol, 1) + 1)
02142     open_done();
02143 }
02144 
02145 /** Set the state of a disk programmatically.
02146 */
02147 bool
02148 CacheProcessor::mark_storage_offline( CacheDisk* d ///< Target disk
02149   ) {
02150   bool zret; // indicates whether there's any online storage left.
02151   int p;
02152   uint64_t total_bytes_delete = 0;
02153   uint64_t total_dir_delete = 0;
02154   uint64_t used_dir_delete = 0;
02155 
02156   if (!DISK_BAD(d)) SET_DISK_BAD(d);
02157 
02158   for (p = 0; p < gnvol; p++) {
02159     if (d->fd == gvol[p]->fd) {
02160       total_dir_delete += gvol[p]->buckets * gvol[p]->segments * DIR_DEPTH;
02161       used_dir_delete += dir_entries_used(gvol[p]);
02162       total_bytes_delete += gvol[p]->len - vol_dirlen(gvol[p]);
02163     }
02164   }
02165 
02166   RecIncrGlobalRawStat(cache_rsb, cache_bytes_total_stat, -total_bytes_delete);
02167   RecIncrGlobalRawStat(cache_rsb, cache_direntries_total_stat, -total_dir_delete);
02168   RecIncrGlobalRawStat(cache_rsb, cache_direntries_used_stat, -used_dir_delete);
02169 
02170   if (theCache) {
02171     rebuild_host_table(theCache);
02172   }
02173   if (theStreamCache) {
02174     rebuild_host_table(theStreamCache);
02175   }
02176 
02177   zret = this->has_online_storage();
02178   if (!zret) {
02179     Warning("All storage devices offline, cache disabled");
02180     CacheProcessor::cache_ready = 0;
02181   } else { // check cache types specifically
02182     if (theCache && !theCache->hosttable->gen_host_rec.vol_hash_table) {
02183       unsigned int caches_ready = 0;
02184       caches_ready = caches_ready | (1 << CACHE_FRAG_TYPE_HTTP);
02185       caches_ready = caches_ready | (1 << CACHE_FRAG_TYPE_NONE);
02186       caches_ready = ~caches_ready;
02187       CacheProcessor::cache_ready &= caches_ready;
02188       Warning("all volumes for http cache are corrupt, http cache disabled");
02189     }
02190     if (theStreamCache && !theStreamCache->hosttable->gen_host_rec.vol_hash_table) {
02191       unsigned int caches_ready = 0;
02192       caches_ready = caches_ready | (1 << CACHE_FRAG_TYPE_RTSP);
02193       caches_ready = ~caches_ready;
02194       CacheProcessor::cache_ready &= caches_ready;
02195       Warning("all volumes for mixt cache are corrupt, mixt cache disabled");
02196     }
02197   }
02198 
02199   return zret;
02200 }
02201 
02202 bool
02203 CacheProcessor::has_online_storage() const {
02204   CacheDisk** dptr = gdisks;
02205   for (int disk_no = 0 ; disk_no < gndisks ; ++disk_no, ++dptr) {
02206     if (!DISK_BAD(*dptr)) return true;
02207   }
02208   return false;
02209 }
02210 
02211 int
02212 AIO_Callback_handler::handle_disk_failure(int /* event ATS_UNUSED */, void *data) {
02213   /* search for the matching file descriptor */
02214   if (!CacheProcessor::cache_ready)
02215     return EVENT_DONE;
02216   int disk_no = 0;
02217   AIOCallback *cb = (AIOCallback *) data;
02218 #if TS_USE_INTERIM_CACHE == 1
02219   for (; disk_no < gn_interim_disks; disk_no++) {
02220     CacheDisk *d = g_interim_disks[disk_no];
02221 
02222     if (d->fd == cb->aiocb.aio_fildes) {
02223       char message[256];
02224 
02225       d->num_errors++;
02226       if (!DISK_BAD(d)) {
02227         snprintf(message, sizeof(message), "Error accessing Disk %s [%d/%d]", d->path, d->num_errors, cache_config_max_disk_errors);
02228         Warning("%s", message);
02229         RecSignalManager(REC_SIGNAL_CACHE_WARNING, message);
02230       } else if (!DISK_BAD_SIGNALLED(d)) {
02231         snprintf(message, sizeof(message),
02232                  "too many errors [%d] accessing disk %s: declaring disk bad", d->num_errors, d->path);
02233         Warning("%s", message);
02234         RecSignalManager(REC_SIGNAL_CACHE_ERROR, message);
02235         good_interim_disks--;
02236       }
02237     }
02238   }
02239 #endif
02240 
02241   for (; disk_no < gndisks; disk_no++) {
02242     CacheDisk *d = gdisks[disk_no];
02243 
02244     if (d->fd == cb->aiocb.aio_fildes) {
02245       char message[256];
02246       d->num_errors++;
02247 
02248       if (!DISK_BAD(d)) {
02249         snprintf(message, sizeof(message), "Error accessing Disk %s [%d/%d]", d->path, d->num_errors, cache_config_max_disk_errors);
02250         Warning("%s", message);
02251         RecSignalManager(REC_SIGNAL_CACHE_WARNING, message);
02252       } else if (!DISK_BAD_SIGNALLED(d)) {
02253         snprintf(message, sizeof(message), "too many errors accessing disk %s [%d/%d]: declaring disk bad", d->path, d->num_errors, cache_config_max_disk_errors);
02254         Warning("%s", message);
02255         RecSignalManager(REC_SIGNAL_CACHE_ERROR, message);
02256         cacheProcessor.mark_storage_offline(d); // take it out of service
02257       }
02258       break;
02259     }
02260   }
02261 
02262   delete cb;
02263   return EVENT_DONE;
02264 }
02265 
02266 int
02267 Cache::open_done() {
02268   Action *register_ShowCache(Continuation * c, HTTPHdr * h);
02269   Action *register_ShowCacheInternal(Continuation *c, HTTPHdr *h);
02270   statPagesManager.register_http("cache", register_ShowCache);
02271   statPagesManager.register_http("cache-internal", register_ShowCacheInternal);
02272   if (total_good_nvol == 0) {
02273     ready = CACHE_INIT_FAILED;
02274     cacheProcessor.cacheInitialized();
02275     return 0;
02276   }
02277 
02278   hosttable = new CacheHostTable(this, scheme);
02279   hosttable->register_config_callback(&hosttable);
02280 
02281   if (hosttable->gen_host_rec.num_cachevols == 0)
02282     ready = CACHE_INIT_FAILED;
02283   else
02284     ready = CACHE_INITIALIZED;
02285   cacheProcessor.cacheInitialized();
02286 
02287   return 0;
02288 }
02289 
02290 int
02291 Cache::open(bool clear, bool /* fix ATS_UNUSED */) {
02292   int i;
02293   off_t blocks = 0;
02294   cache_read_done = 0;
02295   total_initialized_vol = 0;
02296   total_nvol = 0;
02297   total_good_nvol = 0;
02298 
02299   REC_EstablishStaticConfigInt32(cache_config_min_average_object_size, "proxy.config.cache.min_average_object_size");
02300   Debug("cache_init", "Cache::open - proxy.config.cache.min_average_object_size = %d",
02301         (int)cache_config_min_average_object_size);
02302 
02303   CacheVol *cp = cp_list.head;
02304   for (; cp; cp = cp->link.next) {
02305     if (cp->scheme == scheme) {
02306       cp->vols = (Vol **)ats_malloc(cp->num_vols * sizeof(Vol *));
02307       int vol_no = 0;
02308       for (i = 0; i < gndisks; i++) {
02309         if (cp->disk_vols[i] && !DISK_BAD(cp->disk_vols[i]->disk)) {
02310           DiskVolBlockQueue *q = cp->disk_vols[i]->dpb_queue.head;
02311           for (; q; q = q->link.next) {
02312             cp->vols[vol_no] = new Vol();
02313             CacheDisk *d = cp->disk_vols[i]->disk;
02314             cp->vols[vol_no]->disk = d;
02315             cp->vols[vol_no]->fd = d->fd;
02316             cp->vols[vol_no]->cache = this;
02317             cp->vols[vol_no]->cache_vol = cp;
02318             blocks = q->b->len;
02319 
02320             bool vol_clear = clear || d->cleared || q->new_block;
02321 #if AIO_MODE == AIO_MODE_NATIVE
02322             eventProcessor.schedule_imm(new VolInit(cp->vols[vol_no], d->path, blocks, q->b->offset, vol_clear));
02323 #else
02324             cp->vols[vol_no]->init(d->path, blocks, q->b->offset, vol_clear);
02325 #endif
02326             vol_no++;
02327             cache_size += blocks;
02328           }
02329         }
02330       }
02331       total_nvol += vol_no;
02332     }
02333   }
02334   if (total_nvol == 0)
02335     return open_done();
02336   cache_read_done = 1;
02337   return 0;
02338 }
02339 
02340 int
02341 Cache::close() {
02342   return -1;
02343 }
02344 
02345 int
02346 CacheVC::dead(int /* event ATS_UNUSED */, Event * /*e ATS_UNUSED */) {
02347   ink_assert(0);
02348   return EVENT_DONE;
02349 }
02350 
02351 bool
02352 CacheVC::is_pread_capable()
02353 {
02354   return !f.read_from_writer_called;
02355 }
02356 
02357 #define STORE_COLLISION 1
02358 
02359 #ifdef HTTP_CACHE
02360 static void unmarshal_helper(Doc *doc, Ptr<IOBufferData> &buf, int &okay) {
02361   char *tmp = doc->hdr();
02362   int len = doc->hlen;
02363   while (len > 0) {
02364     int r = HTTPInfo::unmarshal(tmp, len, buf._ptr());
02365     if (r < 0) {
02366       ink_assert(!"CacheVC::handleReadDone unmarshal failed");
02367       okay = 0;
02368       break;
02369     }
02370     len -= r;
02371     tmp += r;
02372   }
02373 }
02374 
02375 /** Upgrade a marshalled fragment buffer to the current version.
02376 
02377     @internal I looked at doing this in place (rather than a copy & modify) but
02378     - The in place logic would be even worse than this mess
02379     - It wouldn't save you that much, since you end up doing inserts early in the buffer.
02380       Without extreme care in the logic it could end up doing more copying thatn
02381       the simpler copy & modify.
02382 
02383     @internal This logic presumes the existence of some slack at the end of the buffer, which
02384     is usually the case (because lots of rounding is done). If there isn't enough slack then
02385     this fails and we have a cache miss. The assumption that this is sufficiently rare that
02386     code simplicity takes precedence should be checked at some point.
02387  */
02388 static bool upgrade_doc_version(Ptr<IOBufferData>& buf) {
02389   // Type definition is close enough to use for initial checking.
02390   cache_bc::Doc_v23* doc = reinterpret_cast<cache_bc::Doc_v23*>(buf->data());
02391   bool zret = true;
02392 
02393   if (DOC_MAGIC == doc->magic) {
02394     if (0 == doc->hlen) {
02395       Debug("cache_bc", "Doc %p without header, no upgrade needed.", doc);
02396     } else if (CACHE_FRAG_TYPE_HTTP_V23 == doc->doc_type) {
02397       cache_bc::HTTPCacheAlt_v21* alt = reinterpret_cast<cache_bc::HTTPCacheAlt_v21*>(doc->hdr());
02398       if (alt && alt->is_unmarshalled_format()) {
02399         Ptr<IOBufferData> d_buf(ioDataAllocator.alloc());
02400         Doc* d_doc;
02401         char* src;
02402         char* dst;
02403         char* hdr_limit = doc->data();
02404         HTTPInfo::FragOffset* frags = reinterpret_cast<HTTPInfo::FragOffset*>(static_cast<char*>(buf->data()) + cache_bc::sizeofDoc_v23);
02405         int frag_count = doc->_flen / sizeof(HTTPInfo::FragOffset);
02406         size_t n = 0;
02407         size_t content_size = doc->data_len();
02408 
02409         Debug("cache_bc", "Doc %p is 3.2", doc);
02410 
02411         // Use the same buffer size, fail if no fit.
02412         d_buf->alloc(buf->_size_index, buf->_mem_type); // Duplicate.
02413         d_doc = reinterpret_cast<Doc*>(d_buf->data());
02414         n = d_buf->block_size();
02415 
02416         src = buf->data();
02417         dst = d_buf->data();
02418         memcpy(dst, src, sizeofDoc);
02419         src += sizeofDoc + doc->_flen; dst += sizeofDoc; n -= sizeofDoc;
02420         
02421         // We copy the fragment table iff there is a fragment table and there is only one alternate.
02422         if (frag_count > 0 && cache_bc::HTTPInfo_v21::marshalled_length(src) > doc->hlen)
02423           frag_count = 0; // inhibit fragment table insertion.
02424 
02425         while (zret && src < hdr_limit) {
02426           zret = cache_bc::HTTPInfo_v21::copy_and_upgrade_unmarshalled_to_v23(dst, src, n, frag_count, frags);
02427         }
02428         if (zret && content_size <= n) {
02429           memcpy(dst, src, content_size); // content
02430           // Must update new Doc::len and Doc::hlen
02431           // dst points at the first byte of the content, or one past the last byte of the alt header.
02432           d_doc->len = (dst - reinterpret_cast<char*>(d_doc)) + content_size;
02433           d_doc->hlen = (dst - reinterpret_cast<char*>(d_doc)) - sizeofDoc;
02434           buf = d_buf; // replace original buffer with new buffer.
02435         } else {
02436           zret = false;
02437         }
02438       }
02439       Doc* n_doc = reinterpret_cast<Doc*>(buf->data()); // access as current version.
02440       // For now the base header size is the same. If that changes we'll need to handle the v22/23 case here
02441       // as with the v21 and shift the content down to accomodate the bigger header.
02442       ink_assert(sizeof(*n_doc) == sizeof(*doc));
02443 
02444       n_doc->doc_type = CACHE_FRAG_TYPE_HTTP; // We converted so adjust doc_type.
02445       // Set these to zero for debugging - they'll be updated to the current values if/when this is
02446       // put in the aggregation buffer.
02447       n_doc->v_major = 0;
02448       n_doc->v_minor = 0;
02449       n_doc->unused = 0; // force to zero to make future use easier.
02450     }
02451   }
02452   return zret;
02453 }
02454 #endif
02455 
02456 // [amc] I think this is where all disk reads from cache funnel through here.
02457 int
02458 CacheVC::handleReadDone(int event, Event *e)
02459 {
02460   cancel_trigger();
02461   ink_assert(this_ethread() == mutex->thread_holding);
02462 
02463   Doc *doc = NULL;
02464   if (event == AIO_EVENT_DONE)
02465     set_io_not_in_progress();
02466   else
02467     if (is_io_in_progress())
02468       return EVENT_CONT;
02469   {
02470     MUTEX_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
02471     if (!lock)
02472       VC_SCHED_LOCK_RETRY();
02473     if ((!dir_valid(vol, &dir)) || (!io.ok())) {
02474       if (!io.ok()) {
02475         Debug("cache_disk_error", "Read error on disk %s\n \
02476             read range : [%" PRIu64 " - %" PRIu64 " bytes]  [%" PRIu64 " - %" PRIu64 " blocks] \n",
02477               vol->hash_text.get(), (uint64_t)io.aiocb.aio_offset, (uint64_t)io.aiocb.aio_offset + io.aiocb.aio_nbytes,
02478               (uint64_t)io.aiocb.aio_offset / 512, (uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / 512);
02479       }
02480       goto Ldone;
02481     }
02482 
02483     doc = reinterpret_cast<Doc*>(buf->data());
02484     ink_assert(vol->mutex->nthread_holding < 1000);
02485     ink_assert(doc->magic == DOC_MAGIC);
02486 
02487     /* We've read the raw data from disk, time to deserialize it. We have to account for a variety of formats that
02488        may be present.
02489 
02490        As of cache version 24 we changed the @a doc_type to indicate a format change in the header which includes
02491        version data inside the header. Prior to that we must use heuristics to deduce the actual format. For this reason
02492        we send that header type off for special processing. Otherwise we can use the in object version to determine
02493        the format.
02494 
02495        All of this processing must produce a serialized header that is compliant with the current version. This includes
02496        updating the doc_type.
02497     */
02498 
02499     if (doc->doc_type == CACHE_FRAG_TYPE_HTTP_V23) {
02500       if (upgrade_doc_version(buf)) {
02501         doc = reinterpret_cast<Doc*>(buf->data()); // buf may be a new copy 
02502       } else {
02503         Debug("cache_bc", "Upgrade of fragment failed - disk %s - doc id = %" PRIx64 ":%" PRIx64 "\n"
02504               , vol->hash_text.get(), read_key->slice64(0), read_key->slice64(1));
02505         doc->magic = DOC_CORRUPT;
02506         // Should really trash the directory entry for this, as it's never going to work in the future.
02507         // Or does that happen later anyway?
02508         goto Ldone;
02509       }
02510     } // else if (doc->doc_type == CACHE_FRAG_TYPE_HTTP) // handle any version updates based on the object version in the header.
02511 
02512 #ifdef VERIFY_JTEST_DATA
02513     char xx[500];
02514     if (read_key && *read_key == doc->key && request.valid() && !dir_head(&dir) && !vio.ndone) {
02515       int ib = 0, xd = 0;
02516       request.url_get()->print(xx, 500, &ib, &xd);
02517       char *x = xx;
02518       for (int q = 0; q < 3; q++)
02519         x = strchr(x + 1, '/');
02520       ink_assert(!memcmp(doc->data(), x, ib - (x - xx)));
02521     }
02522 #endif
02523 
02524     if (is_debug_tag_set("cache_read")) {
02525       char xt[33];
02526       Debug("cache_read",
02527             "Read complete on fragment %s. Length: data payload=%d this fragment=%d total doc=%" PRId64" prefix=%d",
02528             doc->key.toHexStr(xt), doc->data_len(), doc->len, doc->total_len, doc->prefix_len());
02529     }
02530 
02531     // put into ram cache?
02532     if (io.ok() && ((doc->first_key == *read_key) || (doc->key == *read_key) || STORE_COLLISION) &&
02533         doc->magic == DOC_MAGIC) {
02534       int okay = 1;
02535       if (!f.doc_from_ram_cache)
02536         f.not_from_ram_cache = 1;
02537       if (cache_config_enable_checksum && doc->checksum != DOC_NO_CHECKSUM) {
02538         // verify that the checksum matches
02539         uint32_t checksum = 0;
02540         for (char *b = doc->hdr(); b < (char *) doc + doc->len; b++)
02541           checksum += *b;
02542         ink_assert(checksum == doc->checksum);
02543         if (checksum != doc->checksum) {
02544           Note("cache: checksum error for [%" PRIu64 " %" PRIu64 "] len %d, hlen %d, disk %s, offset %" PRIu64 " size %zu",
02545                doc->first_key.b[0], doc->first_key.b[1],
02546                doc->len, doc->hlen, vol->path, (uint64_t)io.aiocb.aio_offset, (size_t)io.aiocb.aio_nbytes);
02547           doc->magic = DOC_CORRUPT;
02548           okay = 0;
02549         }
02550       }
02551 #if TS_USE_INTERIM_CACHE == 1
02552     ink_assert(vol->num_interim_vols >= good_interim_disks);
02553     if (mts && !f.doc_from_ram_cache) {
02554       int indx;
02555       do {
02556         indx = vol->interim_index++ % vol->num_interim_vols;
02557       } while (good_interim_disks > 0 && DISK_BAD(vol->interim_vols[indx].disk));
02558 
02559       if (good_interim_disks) {
02560         if (f.write_into_interim) {
02561           mts->interim_vol = interim_vol = &vol->interim_vols[indx];
02562           mts->agg_len = interim_vol->round_to_approx_size(doc->len);
02563           if (vol->sector_size != interim_vol->sector_size) {
02564             dir_set_approx_size(&mts->dir, mts->agg_len);
02565           }
02566         }
02567         if (f.transistor) {
02568           mts->interim_vol = interim_vol;
02569           mts->agg_len = interim_vol->round_to_approx_size(doc->len);
02570           ink_assert(mts->agg_len == dir_approx_size(&mts->dir));
02571         }
02572 
02573         if (!interim_vol->is_io_in_progress()) {
02574           mts->buf = buf;
02575           mts->copy = false;
02576           interim_vol->agg.enqueue(mts);
02577           interim_vol->aggWrite(event, e);
02578         } else {
02579           mts->buf = new_IOBufferData(iobuffer_size_to_index(mts->agg_len, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
02580           mts->copy = true;
02581           memcpy(mts->buf->data(), buf->data(), doc->len);
02582           interim_vol->agg.enqueue(mts);
02583         }
02584       } else {
02585         vol->set_migrate_failed(mts);
02586         migrateToInterimCacheAllocator.free(mts);
02587       }
02588       mts = NULL;
02589     }
02590 #else
02591     (void)e; // Avoid compiler warnings
02592 #endif
02593       bool http_copy_hdr = false;
02594 #ifdef HTTP_CACHE
02595       http_copy_hdr = cache_config_ram_cache_compress && !f.doc_from_ram_cache &&
02596         doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen;
02597       // If http doc we need to unmarshal the headers before putting in the ram cache
02598       // unless it could be compressed
02599       if (!http_copy_hdr && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen && okay)
02600         unmarshal_helper(doc, buf, okay);
02601 #endif
02602       // Put the request in the ram cache only if its a open_read or lookup
02603       if (vio.op == VIO::READ && okay) {
02604         bool cutoff_check;
02605         // cutoff_check :
02606         // doc_len == 0 for the first fragment (it is set from the vector)
02607         //                The decision on the first fragment is based on
02608         //                doc->total_len
02609         // After that, the decision is based of doc_len (doc_len != 0)
02610         // (cache_config_ram_cache_cutoff == 0) : no cutoffs
02611         cutoff_check = ((!doc_len && (int64_t)doc->total_len < cache_config_ram_cache_cutoff)
02612                         || (doc_len && (int64_t)doc_len < cache_config_ram_cache_cutoff)
02613                         || !cache_config_ram_cache_cutoff);
02614         if (cutoff_check && !f.doc_from_ram_cache) {
02615 #if TS_USE_INTERIM_CACHE == 1
02616           if (!f.ram_fixup) {
02617             uint64_t o = dir_get_offset(&dir);
02618             vol->ram_cache->put(read_key, buf, doc->len, http_copy_hdr, (uint32_t)(o >> 32), (uint32_t)o);
02619           } else {
02620             vol->ram_cache->put(read_key, buf, doc->len, http_copy_hdr, (uint32_t)(dir_off>>32), (uint32_t)dir_off);
02621           }
02622 #else
02623           uint64_t o = dir_offset(&dir);
02624           vol->ram_cache->put(read_key, buf, doc->len, http_copy_hdr, (uint32_t)(o >> 32), (uint32_t)o);
02625 #endif
02626         }
02627         if (!doc_len) {
02628           // keep a pointer to it. In case the state machine decides to
02629           // update this document, we don't have to read it back in memory
02630           // again
02631           vol->first_fragment_key = *read_key;
02632 #if TS_USE_INTERIM_CACHE == 1
02633           if (!f.ram_fixup)
02634             vol->first_fragment_offset = dir_get_offset(&dir);
02635           else
02636             vol->first_fragment_offset = dir_off;
02637 #else
02638           vol->first_fragment_offset = dir_offset(&dir);
02639 #endif
02640           vol->first_fragment_data = buf;
02641         }
02642       }                           // end VIO::READ check
02643 #ifdef HTTP_CACHE
02644       // If it could be compressed, unmarshal after
02645       if (http_copy_hdr && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen && okay)
02646         unmarshal_helper(doc, buf, okay);
02647 #endif
02648     }                             // end io.ok() check
02649 #if TS_USE_INTERIM_CACHE == 1
02650 Ldone:
02651     if (mts) {
02652       vol->set_migrate_failed(mts);
02653       migrateToInterimCacheAllocator.free(mts);
02654       mts = NULL;
02655     }
02656   }
02657 #else
02658   }
02659 Ldone:
02660 #endif
02661   POP_HANDLER;
02662   return handleEvent(AIO_EVENT_DONE, 0);
02663 }
02664 
02665 
02666 int
02667 CacheVC::handleRead(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
02668 {
02669   cancel_trigger();
02670 
02671   f.doc_from_ram_cache = false;
02672 
02673   // check ram cache
02674   ink_assert(vol->mutex->thread_holding == this_ethread());
02675 #if TS_USE_INTERIM_CACHE == 1
02676   uint64_t o = dir_get_offset(&dir);
02677   if (f.read_from_interim && mts && mts->rewrite) {
02678     goto LinterimRead;
02679   }
02680 #else
02681   int64_t o = dir_offset(&dir);
02682 #endif
02683   if (vol->ram_cache->get(read_key, &buf, (uint32_t)(o >> 32), (uint32_t)o)) {
02684     goto LramHit;
02685   }
02686 
02687   // check if it was read in the last open_read call
02688 #if TS_USE_INTERIM_CACHE == 1
02689   if (*read_key == vol->first_fragment_key && dir_get_offset(&dir) == vol->first_fragment_offset) {
02690 #else
02691   if (*read_key == vol->first_fragment_key && dir_offset(&dir) == vol->first_fragment_offset) {
02692 #endif
02693     buf = vol->first_fragment_data;
02694     goto LmemHit;
02695   }
02696 #if TS_USE_INTERIM_CACHE == 1
02697 LinterimRead:
02698   if (f.read_from_interim) {
02699     if (dir_agg_buf_valid(interim_vol, &dir)) {
02700       int interim_agg_offset = vol_offset(interim_vol, &dir) - interim_vol->header->write_pos;
02701       buf = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
02702       ink_assert((interim_agg_offset + io.aiocb.aio_nbytes) <= (unsigned) interim_vol->agg_buf_pos);
02703       char *doc = buf->data();
02704       char *agg = interim_vol->agg_buffer + interim_agg_offset;
02705       memcpy(doc, agg, io.aiocb.aio_nbytes);
02706       io.aio_result = io.aiocb.aio_nbytes;
02707       SET_HANDLER(&CacheVC::handleReadDone);
02708       return EVENT_RETURN;
02709     }
02710 
02711     io.aiocb.aio_fildes = interim_vol->fd;
02712     io.aiocb.aio_offset = vol_offset(interim_vol, &dir);
02713     if ((off_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > (off_t)(interim_vol->skip + interim_vol->len))
02714       io.aiocb.aio_nbytes = interim_vol->skip + interim_vol->len - io.aiocb.aio_offset;
02715     buf = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
02716     io.aiocb.aio_buf = buf->data();
02717     io.action = this;
02718     io.thread = mutex->thread_holding->tt == DEDICATED ? AIO_CALLBACK_THREAD_ANY : mutex->thread_holding;
02719 
02720     SET_HANDLER(&CacheVC::handleReadDone);
02721     ink_assert(ink_aio_read(&io) >= 0);
02722     CACHE_DEBUG_INCREMENT_DYN_STAT(cache_pread_count_stat);
02723     return EVENT_CONT;
02724   }
02725 #endif
02726   // see if its in the aggregation buffer
02727   if (dir_agg_buf_valid(vol, &dir)) {
02728     int agg_offset = vol_offset(vol, &dir) - vol->header->write_pos;
02729     buf = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
02730     ink_assert((agg_offset + io.aiocb.aio_nbytes) <= (unsigned) vol->agg_buf_pos);
02731     char *doc = buf->data();
02732     char *agg = vol->agg_buffer + agg_offset;
02733     memcpy(doc, agg, io.aiocb.aio_nbytes);
02734     io.aio_result = io.aiocb.aio_nbytes;
02735     SET_HANDLER(&CacheVC::handleReadDone);
02736     return EVENT_RETURN;
02737   }
02738 
02739   io.aiocb.aio_fildes = vol->fd;
02740   io.aiocb.aio_offset = vol_offset(vol, &dir);
02741   if ((off_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > (off_t)(vol->skip + vol->len))
02742     io.aiocb.aio_nbytes = vol->skip + vol->len - io.aiocb.aio_offset;
02743   buf = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
02744   io.aiocb.aio_buf = buf->data();
02745   io.action = this;
02746   io.thread = mutex->thread_holding->tt == DEDICATED ? AIO_CALLBACK_THREAD_ANY : mutex->thread_holding;
02747   SET_HANDLER(&CacheVC::handleReadDone);
02748   ink_assert(ink_aio_read(&io) >= 0);
02749   CACHE_DEBUG_INCREMENT_DYN_STAT(cache_pread_count_stat);
02750   return EVENT_CONT;
02751 
02752 LramHit: {
02753     f.doc_from_ram_cache = true;
02754     io.aio_result = io.aiocb.aio_nbytes;
02755     Doc *doc = (Doc*)buf->data();
02756     if (cache_config_ram_cache_compress && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen) {
02757       SET_HANDLER(&CacheVC::handleReadDone);
02758       return EVENT_RETURN;
02759     }
02760   }
02761 LmemHit:
02762   f.doc_from_ram_cache = true;
02763   io.aio_result = io.aiocb.aio_nbytes;
02764 #if TS_USE_INTERIM_CACHE == 1
02765   if (mts) { // for hit from memory, not migrate
02766     vol->set_migrate_failed(mts);
02767     migrateToInterimCacheAllocator.free(mts);
02768     mts = NULL;
02769   }
02770 #endif
02771   POP_HANDLER;
02772   return EVENT_RETURN; // allow the caller to release the volume lock
02773 }
02774 
02775 Action *
02776 Cache::lookup(Continuation *cont, CacheKey *key, CacheFragType type, char const* hostname, int host_len)
02777 {
02778   if (!CacheProcessor::IsCacheReady(type)) {
02779     cont->handleEvent(CACHE_EVENT_LOOKUP_FAILED, 0);
02780     return ACTION_RESULT_DONE;
02781   }
02782 
02783   Vol *vol = key_to_vol(key, hostname, host_len);
02784   ProxyMutex *mutex = cont->mutex;
02785   CacheVC *c = new_CacheVC(cont);
02786   SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
02787   c->vio.op = VIO::READ;
02788   c->base_stat = cache_lookup_active_stat;
02789   CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
02790   c->first_key = c->key = *key;
02791   c->frag_type = type;
02792   c->f.lookup = 1;
02793   c->vol = vol;
02794   c->last_collision = NULL;
02795 
02796   if (c->handleEvent(EVENT_INTERVAL, 0) == EVENT_CONT)
02797     return &c->_action;
02798   else
02799     return ACTION_RESULT_DONE;
02800 }
02801 
02802 Action *
02803 Cache::lookup(Continuation *cont, CacheURL *url, CacheFragType type)
02804 {
02805   CryptoHash id;
02806 
02807   url->hash_get(&id);
02808   int len = 0;
02809   char const* hostname = url->host_get(&len);
02810   return lookup(cont, &id, type, hostname, len);
02811 }
02812 
02813 int
02814 CacheVC::removeEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
02815 {
02816   cancel_trigger();
02817   set_io_not_in_progress();
02818   {
02819     MUTEX_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
02820     if (!lock)
02821       VC_SCHED_LOCK_RETRY();
02822     if (_action.cancelled) {
02823       if (od) {
02824         vol->close_write(this);
02825         od = 0;
02826       }
02827       goto Lfree;
02828     }
02829     if (!f.remove_aborted_writers) {
02830       if (vol->open_write(this, true, 1)) {
02831         // writer  exists
02832         ink_assert(od = vol->open_read(&key));
02833         od->dont_update_directory = 1;
02834         od = NULL;
02835       } else {
02836         od->dont_update_directory = 1;
02837       }
02838       f.remove_aborted_writers = 1;
02839     }
02840   Lread:
02841     SET_HANDLER(&CacheVC::removeEvent);
02842     if (!buf)
02843       goto Lcollision;
02844     if (!dir_valid(vol, &dir)) {
02845       last_collision = NULL;
02846       goto Lcollision;
02847     }
02848     // check read completed correct FIXME: remove bad vols
02849     if ((size_t) io.aio_result != (size_t) io.aiocb.aio_nbytes)
02850       goto Ldone;
02851     {
02852       // verify that this is our document
02853       Doc *doc = (Doc *) buf->data();
02854       /* should be first_key not key..right?? */
02855       if (doc->first_key == key) {
02856         ink_assert(doc->magic == DOC_MAGIC);
02857         if (dir_delete(&key, vol, &dir) > 0) {
02858           if (od)
02859             vol->close_write(this);
02860           od = NULL;
02861           goto Lremoved;
02862         }
02863         goto Ldone;
02864       }
02865     }
02866   Lcollision:
02867     // check for collision
02868     if (dir_probe(&key, vol, &dir, &last_collision) > 0) {
02869 #if TS_USE_INTERIM_CACHE == 1
02870       if (dir_ininterim(&dir)) {
02871         dir_delete(&key, vol, &dir);
02872         last_collision = NULL;
02873         goto Lcollision;
02874       }
02875 #endif
02876       int ret = do_read_call(&key);
02877       if (ret == EVENT_RETURN)
02878         goto Lread;
02879       return ret;
02880     }
02881   Ldone:
02882     CACHE_INCREMENT_DYN_STAT(cache_remove_failure_stat);
02883     if (od)
02884       vol->close_write(this);
02885   }
02886   ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
02887   _action.continuation->handleEvent(CACHE_EVENT_REMOVE_FAILED, (void *) -ECACHE_NO_DOC);
02888   goto Lfree;
02889 Lremoved:
02890   _action.continuation->handleEvent(CACHE_EVENT_REMOVE, 0);
02891 Lfree:
02892   return free_CacheVC(this);
02893 }
02894 
02895 Action *
02896 Cache::remove(Continuation *cont, CacheKey *key, CacheFragType type, bool /* user_agents ATS_UNUSED */,
02897               bool /* link ATS_UNUSED */, char *hostname, int host_len)
02898 {
02899   if (!CacheProcessor::IsCacheReady(type)) {
02900     if (cont)
02901       cont->handleEvent(CACHE_EVENT_REMOVE_FAILED, 0);
02902     return ACTION_RESULT_DONE;
02903   }
02904 
02905   ink_assert(this);
02906 
02907   Ptr<ProxyMutex> mutex;
02908   if (!cont)
02909     cont = new_CacheRemoveCont();
02910 
02911   CACHE_TRY_LOCK(lock, cont->mutex, this_ethread());
02912   ink_assert(lock);
02913   Vol *vol = key_to_vol(key, hostname, host_len);
02914   // coverity[var_decl]
02915   Dir result;
02916   dir_clear(&result);           // initialized here, set result empty so we can recognize missed lock
02917   mutex = cont->mutex;
02918 
02919   CacheVC *c = new_CacheVC(cont);
02920   c->vio.op = VIO::NONE;
02921   c->frag_type = type;
02922   c->base_stat = cache_remove_active_stat;
02923   CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
02924   c->first_key = c->key = *key;
02925   c->vol = vol;
02926   c->dir = result;
02927   c->f.remove = 1;
02928 
02929   SET_CONTINUATION_HANDLER(c, &CacheVC::removeEvent);
02930   int ret = c->removeEvent(EVENT_IMMEDIATE, 0);
02931   if (ret == EVENT_DONE)
02932     return ACTION_RESULT_DONE;
02933   else
02934     return &c->_action;
02935 }
02936 // CacheVConnection
02937 
02938 CacheVConnection::CacheVConnection()
02939   : VConnection(NULL)
02940 { }
02941 
02942 
02943 void
02944 cplist_init()
02945 {
02946   cp_list_len = 0;
02947   for (int i = 0; i < gndisks; i++) {
02948     CacheDisk *d = gdisks[i];
02949     DiskVol **dp = d->disk_vols;
02950     for (unsigned int j = 0; j < d->header->num_volumes; j++) {
02951       ink_assert(dp[j]->dpb_queue.head);
02952       CacheVol *p = cp_list.head;
02953       while (p) {
02954         if (p->vol_number == dp[j]->vol_number) {
02955           ink_assert(p->scheme == (int) dp[j]->dpb_queue.head->b->type);
02956           p->size += dp[j]->size;
02957           p->num_vols += dp[j]->num_volblocks;
02958           p->disk_vols[i] = dp[j];
02959           break;
02960         }
02961         p = p->link.next;
02962       }
02963       if (!p) {
02964         // did not find a volume in the cache vol list...create
02965         // a new one
02966         CacheVol *new_p = new CacheVol();
02967         new_p->vol_number = dp[j]->vol_number;
02968         new_p->num_vols = dp[j]->num_volblocks;
02969         new_p->size = dp[j]->size;
02970         new_p->scheme = dp[j]->dpb_queue.head->b->type;
02971         new_p->disk_vols = (DiskVol **)ats_malloc(gndisks * sizeof(DiskVol *));
02972         memset(new_p->disk_vols, 0, gndisks * sizeof(DiskVol *));
02973         new_p->disk_vols[i] = dp[j];
02974         cp_list.enqueue(new_p);
02975         cp_list_len++;
02976       }
02977     }
02978   }
02979 }
02980 
02981 
02982 void
02983 cplist_update()
02984 {
02985   /* go through cplist and delete volumes that are not in the volume.config */
02986   CacheVol *cp = cp_list.head;
02987   ConfigVol *config_vol;
02988 
02989   while (cp) {
02990     for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) {
02991       if (config_vol->number == cp->vol_number) {
02992         off_t size_in_blocks = config_vol->size << (20 - STORE_BLOCK_SHIFT);
02993         if ((cp->size <= size_in_blocks) && (cp->scheme == config_vol->scheme)) {
02994           config_vol->cachep = cp;
02995         } else {
02996           /* delete this volume from all the disks */
02997           int d_no;
02998           int clearCV = 1;
02999 
03000           for (d_no = 0; d_no < gndisks; d_no++) {
03001             if (cp->disk_vols[d_no]) {
03002               if (cp->disk_vols[d_no]->disk->forced_volume_num == cp->vol_number) {
03003                 clearCV = 0;
03004                 config_vol->cachep = cp;
03005               } else {
03006                 cp->disk_vols[d_no]->disk->delete_volume(cp->vol_number);
03007               }
03008             }
03009           }
03010           if (clearCV) {
03011             config_vol = NULL;
03012           }
03013         }
03014         break;
03015       }
03016     }
03017 
03018     if (!config_vol) {
03019       // did not find a matching volume in the config file.
03020       //Delete hte volume from the cache vol list
03021       int d_no;
03022       for (d_no = 0; d_no < gndisks; d_no++) {
03023         if (cp->disk_vols[d_no])
03024           cp->disk_vols[d_no]->disk->delete_volume(cp->vol_number);
03025       }
03026       CacheVol *temp_cp = cp;
03027       cp = cp->link.next;
03028       cp_list.remove(temp_cp);
03029       cp_list_len--;
03030       delete temp_cp;
03031       continue;
03032     } else
03033       cp = cp->link.next;
03034   }
03035 }
03036 
03037 static int
03038 fillExclusiveDisks(CacheVol *cp)
03039 {
03040   int diskCount = 0;
03041   int volume_number = cp->vol_number;
03042 
03043   Debug("cache_init", "volume %d", volume_number);
03044   for (int i = 0; i < gndisks; i++) {
03045     if (gdisks[i]->forced_volume_num != volume_number) {
03046       continue;
03047     }
03048     /* The user had created several volumes before - clear the disk
03049        and create one volume for http */
03050     for(int j = 0; j < (int)gdisks[i]->header->num_volumes; j++) {
03051       if (volume_number != gdisks[i]->disk_vols[j]->vol_number) {
03052         Note("Clearing Disk: %s", gdisks[i]->path);
03053         gdisks[i]->delete_all_volumes();
03054         break;
03055       }
03056     }
03057     diskCount++;
03058 
03059     int64_t size_diff = gdisks[i]->num_usable_blocks;
03060     DiskVolBlock *dpb;
03061 
03062      do {
03063        dpb = gdisks[i]->create_volume(volume_number, size_diff, cp->scheme);
03064        if (dpb) {
03065          if (!cp->disk_vols[i]) {
03066            cp->disk_vols[i] = gdisks[i]->get_diskvol(volume_number);
03067          }
03068          size_diff -= dpb->len;
03069          cp->size += dpb->len;
03070          cp->num_vols++;
03071        } else {
03072          Debug("cache_init", "create_volume failed");
03073          break;
03074        }
03075      } while ((size_diff > 0));
03076    }
03077    return diskCount;
03078 }
03079 
03080 
03081 int
03082 cplist_reconfigure()
03083 {
03084   int64_t size;
03085   int volume_number;
03086   off_t size_in_blocks;
03087   ConfigVol *config_vol;
03088 
03089   gnvol = 0;
03090   if (config_volumes.num_volumes == 0) {
03091     /* only the http cache */
03092     CacheVol *cp = new CacheVol();
03093     cp->vol_number = 0;
03094     cp->scheme = CACHE_HTTP_TYPE;
03095     cp->disk_vols = (DiskVol **)ats_malloc(gndisks * sizeof(DiskVol *));
03096     memset(cp->disk_vols, 0, gndisks * sizeof(DiskVol *));
03097     cp_list.enqueue(cp);
03098     cp_list_len++;
03099     for (int i = 0; i < gndisks; i++) {
03100       if (gdisks[i]->header->num_volumes != 1 || gdisks[i]->disk_vols[0]->vol_number != 0) {
03101         /* The user had created several volumes before - clear the disk
03102            and create one volume for http */
03103         Note("Clearing Disk: %s", gdisks[i]->path);
03104         gdisks[i]->delete_all_volumes();
03105       }
03106       if (gdisks[i]->cleared) {
03107         uint64_t free_space = gdisks[i]->free_space * STORE_BLOCK_SIZE;
03108         int vols = (free_space / MAX_VOL_SIZE) + 1;
03109         for (int p = 0; p < vols; p++) {
03110           off_t b = gdisks[i]->free_space / (vols - p);
03111           Debug("cache_hosting", "blocks = %" PRId64, (int64_t)b);
03112           DiskVolBlock *dpb = gdisks[i]->create_volume(0, b, CACHE_HTTP_TYPE);
03113           ink_assert(dpb && dpb->len == (uint64_t)b);
03114         }
03115         ink_assert(gdisks[i]->free_space == 0);
03116       }
03117 
03118       ink_assert(gdisks[i]->header->num_volumes == 1);
03119       DiskVol **dp = gdisks[i]->disk_vols;
03120       gnvol += dp[0]->num_volblocks;
03121       cp->size += dp[0]->size;
03122       cp->num_vols += dp[0]->num_volblocks;
03123       cp->disk_vols[i] = dp[0];
03124     }
03125 
03126   } else {
03127     for (int i = 0; i < gndisks; i++) {
03128       if (gdisks[i]->header->num_volumes == 1 && gdisks[i]->disk_vols[0]->vol_number == 0) {
03129         /* The user had created several volumes before - clear the disk
03130            and create one volume for http */
03131         Note("Clearing Disk: %s", gdisks[i]->path);
03132         gdisks[i]->delete_all_volumes();
03133       }
03134     }
03135 
03136     /* change percentages in the config patitions to absolute value */
03137     off_t tot_space_in_blks = 0;
03138     off_t blocks_per_vol = VOL_BLOCK_SIZE / STORE_BLOCK_SIZE;
03139     /* sum up the total space available on all the disks.
03140        round down the space to 128 megabytes */
03141     for (int i = 0; i < gndisks; i++)
03142       tot_space_in_blks += (gdisks[i]->num_usable_blocks / blocks_per_vol) * blocks_per_vol;
03143 
03144     double percent_remaining = 100.00;
03145     for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) {
03146       if (config_vol->in_percent) {
03147         if (config_vol->percent > percent_remaining) {
03148           Warning("total volume sizes added up to more than 100%%!");
03149           Warning("no volumes created");
03150           return -1;
03151         }
03152         int64_t space_in_blks = (int64_t) (((double) (config_vol->percent / percent_remaining)) * tot_space_in_blks);
03153 
03154         space_in_blks = space_in_blks >> (20 - STORE_BLOCK_SHIFT);
03155         /* round down to 128 megabyte multiple */
03156         space_in_blks = (space_in_blks >> 7) << 7;
03157         config_vol->size = space_in_blks;
03158         tot_space_in_blks -= space_in_blks << (20 - STORE_BLOCK_SHIFT);
03159         percent_remaining -= (config_vol->size < 128) ? 0 : config_vol->percent;
03160       }
03161       if (config_vol->size < 128) {
03162         Warning("the size of volume %d (%" PRId64") is less than the minimum required volume size %d",
03163                 config_vol->number, (int64_t)config_vol->size, 128);
03164         Warning("volume %d is not created", config_vol->number);
03165       }
03166       Debug("cache_hosting", "Volume: %d Size: %" PRId64, config_vol->number, (int64_t)config_vol->size);
03167     }
03168     cplist_update();
03169     /* go through volume config and grow and create volumes */
03170 
03171     for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) {
03172       // if volume is given exclusive disks, fill here and continue
03173       if (!config_vol->cachep) {
03174         continue;
03175       }
03176       fillExclusiveDisks(config_vol->cachep);
03177     }
03178 
03179     for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) {
03180 
03181       size = config_vol->size;
03182       if (size < 128)
03183         continue;
03184 
03185       volume_number = config_vol->number;
03186 
03187       size_in_blocks = ((off_t) size * 1024 * 1024) / STORE_BLOCK_SIZE;
03188 
03189       if (config_vol->cachep && config_vol->cachep->num_vols > 0) {
03190         gnvol += config_vol->cachep->num_vols;
03191         continue;
03192       }
03193 
03194       if (!config_vol->cachep) {
03195         // we did not find a corresponding entry in cache vol...creat one
03196 
03197         CacheVol *new_cp = new CacheVol();
03198         new_cp->disk_vols = (DiskVol **)ats_malloc(gndisks * sizeof(DiskVol *));
03199         memset(new_cp->disk_vols, 0, gndisks * sizeof(DiskVol *));
03200         if (create_volume(config_vol->number, size_in_blocks, config_vol->scheme, new_cp)) {
03201                 delete new_cp;
03202                 return -1;
03203         }
03204         cp_list.enqueue(new_cp);
03205         cp_list_len++;
03206         config_vol->cachep = new_cp;
03207         gnvol += new_cp->num_vols;
03208         continue;
03209       }
03210 //    else
03211       CacheVol *cp = config_vol->cachep;
03212       ink_assert(cp->size <= size_in_blocks);
03213       if (cp->size == size_in_blocks) {
03214         gnvol += cp->num_vols;
03215         continue;
03216       }
03217       // else the size is greater...
03218       /* search the cp_list */
03219 
03220       int *sorted_vols = new int[gndisks];
03221       for (int i = 0; i < gndisks; i++)
03222         sorted_vols[i] = i;
03223       for (int i = 0; i < gndisks - 1; i++) {
03224         int smallest = sorted_vols[i];
03225         int smallest_ndx = i;
03226         for (int j = i + 1; j < gndisks; j++) {
03227           int curr = sorted_vols[j];
03228           DiskVol *dvol = cp->disk_vols[curr];
03229           if (gdisks[curr]->cleared) {
03230             ink_assert(!dvol);
03231             // disks that are cleared should be filled first
03232             smallest = curr;
03233             smallest_ndx = j;
03234           } else if (!dvol && cp->disk_vols[smallest]) {
03235 
03236             smallest = curr;
03237             smallest_ndx = j;
03238           } else if (dvol && cp->disk_vols[smallest] && (dvol->size < cp->disk_vols[smallest]->size)) {
03239             smallest = curr;
03240             smallest_ndx = j;
03241           }
03242         }
03243         sorted_vols[smallest_ndx] = sorted_vols[i];
03244         sorted_vols[i] = smallest;
03245       }
03246 
03247       int64_t size_to_alloc = size_in_blocks - cp->size;
03248       int disk_full = 0;
03249       for (int i = 0; (i < gndisks) && size_to_alloc; i++) {
03250 
03251         int disk_no = sorted_vols[i];
03252         ink_assert(cp->disk_vols[sorted_vols[gndisks - 1]]);
03253         int largest_vol = cp->disk_vols[sorted_vols[gndisks - 1]]->size;
03254 
03255         /* allocate storage on new disk. Find the difference
03256            between the biggest volume on any disk and
03257            the volume on this disk and try to make
03258            them equal */
03259         int64_t size_diff = (cp->disk_vols[disk_no]) ? largest_vol - cp->disk_vols[disk_no]->size : largest_vol;
03260         size_diff = (size_diff < size_to_alloc) ? size_diff : size_to_alloc;
03261         /* if size_diff == 0, then then the disks have volumes of the
03262            same sizes, so we don't need to balance the disks */
03263         if (size_diff == 0)
03264           break;
03265 
03266         DiskVolBlock *dpb;
03267         do {
03268           dpb = gdisks[disk_no]->create_volume(volume_number, size_diff, cp->scheme);
03269           if (dpb) {
03270             if (!cp->disk_vols[disk_no]) {
03271               cp->disk_vols[disk_no] = gdisks[disk_no]->get_diskvol(volume_number);
03272             }
03273             size_diff -= dpb->len;
03274             cp->size += dpb->len;
03275             cp->num_vols++;
03276           } else
03277             break;
03278         } while ((size_diff > 0));
03279 
03280         if (!dpb)
03281           disk_full++;
03282 
03283         size_to_alloc = size_in_blocks - cp->size;
03284       }
03285 
03286       delete[]sorted_vols;
03287 
03288       if (size_to_alloc) {
03289         if (create_volume(volume_number, size_to_alloc, cp->scheme, cp))
03290           return -1;
03291       }
03292       gnvol += cp->num_vols;
03293     }
03294   }
03295   return 0;
03296 }
03297 
03298 // This is some really bad code, and needs to be rewritten!
03299 int
03300 create_volume(int volume_number, off_t size_in_blocks, int scheme, CacheVol *cp)
03301 {
03302   static int curr_vol = 0;  // FIXME: this will not reinitialize correctly
03303   off_t to_create = size_in_blocks;
03304   off_t blocks_per_vol = VOL_BLOCK_SIZE >> STORE_BLOCK_SHIFT;
03305   int full_disks = 0;
03306 
03307   cp->vol_number = volume_number;
03308   cp->scheme = scheme;
03309   if (fillExclusiveDisks(cp)) {
03310     Debug("cache_init", "volume successfully filled from forced disks: volume_number=%d", volume_number);
03311     return 0;
03312   }
03313 
03314   int *sp = new int[gndisks];
03315   memset(sp, 0, gndisks * sizeof(int));
03316 
03317   int i = curr_vol;
03318   while (size_in_blocks > 0) {
03319     if (gdisks[i]->free_space >= (sp[i] + blocks_per_vol)) {
03320       sp[i] += blocks_per_vol;
03321       size_in_blocks -= blocks_per_vol;
03322       full_disks = 0;
03323     } else {
03324       full_disks += 1;
03325       if (full_disks == gndisks) {
03326         char config_file[PATH_NAME_MAX];
03327         REC_ReadConfigString(config_file, "proxy.config.cache.volume_filename", PATH_NAME_MAX);
03328         if (cp->size)
03329           Warning("not enough space to increase volume: [%d] to size: [%" PRId64 "]",
03330                   volume_number, (int64_t)((to_create + cp->size) >> (20 - STORE_BLOCK_SHIFT)));
03331         else
03332           Warning("not enough space to create volume: [%d], size: [%" PRId64 "]",
03333                   volume_number, (int64_t)(to_create >> (20 - STORE_BLOCK_SHIFT)));
03334 
03335         Note("edit the %s file and restart traffic_server", config_file);
03336         delete[]sp;
03337         return -1;
03338       }
03339     }
03340     i = (i + 1) % gndisks;
03341   }
03342   cp->vol_number = volume_number;
03343   cp->scheme = scheme;
03344   curr_vol = i;
03345   for (i = 0; i < gndisks; i++) {
03346     if (sp[i] > 0) {
03347       while (sp[i] > 0) {
03348         DiskVolBlock *p = gdisks[i]->create_volume(volume_number, sp[i], scheme);
03349         ink_assert(p && (p->len >= (unsigned int) blocks_per_vol));
03350         sp[i] -= p->len;
03351         cp->num_vols++;
03352         cp->size += p->len;
03353       }
03354       if (!cp->disk_vols[i])
03355         cp->disk_vols[i] = gdisks[i]->get_diskvol(volume_number);
03356     }
03357   }
03358   delete[]sp;
03359   return 0;
03360 }
03361 
03362 void
03363 rebuild_host_table(Cache *cache)
03364 {
03365   build_vol_hash_table(&cache->hosttable->gen_host_rec);
03366   if (cache->hosttable->m_numEntries != 0) {
03367     CacheHostMatcher *hm = cache->hosttable->getHostMatcher();
03368     CacheHostRecord *h_rec = hm->getDataArray();
03369     int h_rec_len = hm->getNumElements();
03370     int i;
03371     for (i = 0; i < h_rec_len; i++) {
03372       build_vol_hash_table(&h_rec[i]);
03373     }
03374   }
03375 }
03376 
03377 // if generic_host_rec.vols == NULL, what do we do???
03378 Vol *
03379 Cache::key_to_vol(CacheKey *key, char const* hostname, int host_len)
03380 {
03381   uint32_t h = (key->slice32(2) >> DIR_TAG_WIDTH) % VOL_HASH_TABLE_SIZE;
03382   unsigned short *hash_table = hosttable->gen_host_rec.vol_hash_table;
03383   CacheHostRecord *host_rec = &hosttable->gen_host_rec;
03384 
03385   if (hosttable->m_numEntries > 0 && host_len) {
03386     CacheHostResult res;
03387     hosttable->Match(hostname, host_len, &res);
03388     if (res.record) {
03389       unsigned short *host_hash_table = res.record->vol_hash_table;
03390       if (host_hash_table) {
03391         if (is_debug_tag_set("cache_hosting")) {
03392           char format_str[50];
03393           snprintf(format_str, sizeof(format_str), "Volume: %%xd for host: %%.%ds", host_len);
03394           Debug("cache_hosting", format_str, res.record, hostname);
03395         }
03396         return res.record->vols[host_hash_table[h]];
03397       }
03398     }
03399   }
03400   if (hash_table) {
03401     if (is_debug_tag_set("cache_hosting")) {
03402       char format_str[50];
03403       snprintf(format_str, sizeof(format_str), "Generic volume: %%xd for host: %%.%ds", host_len);
03404       Debug("cache_hosting", format_str, host_rec, hostname);
03405     }
03406     return host_rec->vols[hash_table[h]];
03407   } else
03408     return host_rec->vols[0];
03409 }
03410 
03411 static void reg_int(const char *str, int stat, RecRawStatBlock *rsb, const char *prefix, RecRawStatSyncCb sync_cb=RecRawStatSyncSum) {
03412   char stat_str[256];
03413   snprintf(stat_str, sizeof(stat_str), "%s.%s", prefix, str);
03414   RecRegisterRawStat(rsb, RECT_PROCESS, stat_str, RECD_INT, RECP_NON_PERSISTENT, stat, sync_cb);
03415   DOCACHE_CLEAR_DYN_STAT(stat)
03416 }
03417 #define REG_INT(_str, _stat) reg_int(_str, (int)_stat, rsb, prefix)
03418 
03419 // Register Stats
03420 void
03421 register_cache_stats(RecRawStatBlock *rsb, const char *prefix)
03422 {
03423   // Special case for this sucker, since it uses its own aggregator.
03424   reg_int("bytes_used", cache_bytes_used_stat, rsb, prefix, cache_stats_bytes_used_cb);
03425 
03426   REG_INT("bytes_total", cache_bytes_total_stat);
03427   REG_INT("ram_cache.total_bytes", cache_ram_cache_bytes_total_stat);
03428   REG_INT("ram_cache.bytes_used", cache_ram_cache_bytes_stat);
03429   REG_INT("ram_cache.hits", cache_ram_cache_hits_stat);
03430   REG_INT("ram_cache.misses", cache_ram_cache_misses_stat);
03431   REG_INT("pread_count", cache_pread_count_stat);
03432   REG_INT("percent_full", cache_percent_full_stat);
03433   REG_INT("lookup.active", cache_lookup_active_stat);
03434   REG_INT("lookup.success", cache_lookup_success_stat);
03435   REG_INT("lookup.failure", cache_lookup_failure_stat);
03436   REG_INT("read.active", cache_read_active_stat);
03437   REG_INT("read.success", cache_read_success_stat);
03438   REG_INT("read.failure", cache_read_failure_stat);
03439 #if TS_USE_INTERIM_CACHE == 1
03440   REG_INT("interim.read.success", cache_interim_read_success_stat);
03441   REG_INT("disk.read.success", cache_disk_read_success_stat);
03442   REG_INT("ram.read.success", cache_ram_read_success_stat);
03443 #endif
03444   REG_INT("write.active", cache_write_active_stat);
03445   REG_INT("write.success", cache_write_success_stat);
03446   REG_INT("write.failure", cache_write_failure_stat);
03447   REG_INT("write.backlog.failure", cache_write_backlog_failure_stat);
03448   REG_INT("update.active", cache_update_active_stat);
03449   REG_INT("update.success", cache_update_success_stat);
03450   REG_INT("update.failure", cache_update_failure_stat);
03451   REG_INT("remove.active", cache_remove_active_stat);
03452   REG_INT("remove.success", cache_remove_success_stat);
03453   REG_INT("remove.failure", cache_remove_failure_stat);
03454   REG_INT("evacuate.active", cache_evacuate_active_stat);
03455   REG_INT("evacuate.success", cache_evacuate_success_stat);
03456   REG_INT("evacuate.failure", cache_evacuate_failure_stat);
03457   REG_INT("scan.active", cache_scan_active_stat);
03458   REG_INT("scan.success", cache_scan_success_stat);
03459   REG_INT("scan.failure", cache_scan_failure_stat);
03460   REG_INT("direntries.total", cache_direntries_total_stat);
03461   REG_INT("direntries.used", cache_direntries_used_stat);
03462   REG_INT("directory_collision", cache_directory_collision_count_stat);
03463   REG_INT("frags_per_doc.1", cache_single_fragment_document_count_stat);
03464   REG_INT("frags_per_doc.2", cache_two_fragment_document_count_stat);
03465   REG_INT("frags_per_doc.3+", cache_three_plus_plus_fragment_document_count_stat);
03466   REG_INT("read_busy.success", cache_read_busy_success_stat);
03467   REG_INT("read_busy.failure", cache_read_busy_failure_stat);
03468   REG_INT("write_bytes_stat", cache_write_bytes_stat);
03469   REG_INT("vector_marshals", cache_hdr_vector_marshal_stat);
03470   REG_INT("hdr_marshals", cache_hdr_marshal_stat);
03471   REG_INT("hdr_marshal_bytes", cache_hdr_marshal_bytes_stat);
03472   REG_INT("gc_bytes_evacuated", cache_gc_bytes_evacuated_stat);
03473   REG_INT("gc_frags_evacuated", cache_gc_frags_evacuated_stat);
03474 }
03475 
03476 
03477 void
03478 ink_cache_init(ModuleVersion v)
03479 {
03480   ink_release_assert(!checkModuleVersion(v, CACHE_MODULE_VERSION));
03481 
03482   cache_rsb = RecAllocateRawStatBlock((int) cache_stat_count);
03483 
03484   REC_EstablishStaticConfigInteger(cache_config_ram_cache_size, "proxy.config.cache.ram_cache.size");
03485   Debug("cache_init", "proxy.config.cache.ram_cache.size = %" PRId64 " = %" PRId64 "Mb",
03486         cache_config_ram_cache_size, cache_config_ram_cache_size / (1024 * 1024));
03487 
03488   REC_EstablishStaticConfigInt32(cache_config_ram_cache_algorithm, "proxy.config.cache.ram_cache.algorithm");
03489   REC_EstablishStaticConfigInt32(cache_config_ram_cache_compress, "proxy.config.cache.ram_cache.compress");
03490   REC_EstablishStaticConfigInt32(cache_config_ram_cache_compress_percent, "proxy.config.cache.ram_cache.compress_percent");
03491   REC_EstablishStaticConfigInt32(cache_config_ram_cache_use_seen_filter, "proxy.config.cache.ram_cache.use_seen_filter");
03492 
03493   REC_EstablishStaticConfigInt32(cache_config_http_max_alts, "proxy.config.cache.limits.http.max_alts");
03494   Debug("cache_init", "proxy.config.cache.limits.http.max_alts = %d", cache_config_http_max_alts);
03495 
03496   REC_EstablishStaticConfigInteger(cache_config_ram_cache_cutoff, "proxy.config.cache.ram_cache_cutoff");
03497   Debug("cache_init", "cache_config_ram_cache_cutoff = %" PRId64 " = %" PRId64 "Mb",
03498         cache_config_ram_cache_cutoff, cache_config_ram_cache_cutoff / (1024 * 1024));
03499 
03500   REC_EstablishStaticConfigInt32(cache_config_permit_pinning, "proxy.config.cache.permit.pinning");
03501   Debug("cache_init", "proxy.config.cache.permit.pinning = %d", cache_config_permit_pinning);
03502 
03503   REC_EstablishStaticConfigInt32(cache_config_dir_sync_frequency, "proxy.config.cache.dir.sync_frequency");
03504   Debug("cache_init", "proxy.config.cache.dir.sync_frequency = %d", cache_config_dir_sync_frequency);
03505 
03506   REC_EstablishStaticConfigInt32(cache_config_vary_on_user_agent, "proxy.config.cache.vary_on_user_agent");
03507   Debug("cache_init", "proxy.config.cache.vary_on_user_agent = %d", cache_config_vary_on_user_agent);
03508 
03509   REC_EstablishStaticConfigInt32(cache_config_select_alternate, "proxy.config.cache.select_alternate");
03510   Debug("cache_init", "proxy.config.cache.select_alternate = %d", cache_config_select_alternate);
03511 
03512   REC_EstablishStaticConfigInt32(cache_config_max_doc_size, "proxy.config.cache.max_doc_size");
03513   Debug("cache_init", "proxy.config.cache.max_doc_size = %d = %dMb",
03514         cache_config_max_doc_size, cache_config_max_doc_size / (1024 * 1024));
03515 
03516   REC_EstablishStaticConfigInt32(cache_config_mutex_retry_delay, "proxy.config.cache.mutex_retry_delay");
03517   Debug("cache_init", "proxy.config.cache.mutex_retry_delay = %dms", cache_config_mutex_retry_delay);
03518 
03519   REC_EstablishStaticConfigInt32(cache_config_hit_evacuate_percent, "proxy.config.cache.hit_evacuate_percent");
03520   Debug("cache_init", "proxy.config.cache.hit_evacuate_percent = %d", cache_config_hit_evacuate_percent);
03521 
03522   REC_EstablishStaticConfigInt32(cache_config_hit_evacuate_size_limit, "proxy.config.cache.hit_evacuate_size_limit");
03523   Debug("cache_init", "proxy.config.cache.hit_evacuate_size_limit = %d", cache_config_hit_evacuate_size_limit);
03524 
03525   REC_EstablishStaticConfigInt32(cache_config_force_sector_size, "proxy.config.cache.force_sector_size");
03526   REC_EstablishStaticConfigInt32(cache_config_target_fragment_size, "proxy.config.cache.target_fragment_size");
03527 
03528   if (cache_config_target_fragment_size == 0)
03529     cache_config_target_fragment_size = DEFAULT_TARGET_FRAGMENT_SIZE;
03530 
03531 #ifdef HTTP_CACHE
03532   REC_EstablishStaticConfigInt32(enable_cache_empty_http_doc, "proxy.config.http.cache.allow_empty_doc");
03533 
03534   REC_EstablishStaticConfigInt32(cache_config_compatibility_4_2_0_fixup, "proxy.config.cache.http.compatibility.4-2-0-fixup");
03535 #endif
03536 
03537 #if TS_USE_INTERIM_CACHE == 1
03538   REC_EstablishStaticConfigInt32(migrate_threshold, "proxy.config.cache.interim.migrate_threshold");
03539   Debug("cache_init", "proxy.config.cache.migrate_threshold = %d", migrate_threshold);
03540 #endif
03541 
03542   REC_EstablishStaticConfigInt32(cache_config_max_disk_errors, "proxy.config.cache.max_disk_errors");
03543   Debug("cache_init", "proxy.config.cache.max_disk_errors = %d", cache_config_max_disk_errors);
03544 
03545   REC_EstablishStaticConfigInt32(cache_config_agg_write_backlog, "proxy.config.cache.agg_write_backlog");
03546   Debug("cache_init", "proxy.config.cache.agg_write_backlog = %d", cache_config_agg_write_backlog);
03547 
03548   REC_EstablishStaticConfigInt32(cache_config_enable_checksum, "proxy.config.cache.enable_checksum");
03549   Debug("cache_init", "proxy.config.cache.enable_checksum = %d", cache_config_enable_checksum);
03550 
03551   REC_EstablishStaticConfigInt32(cache_config_alt_rewrite_max_size, "proxy.config.cache.alt_rewrite_max_size");
03552   Debug("cache_init", "proxy.config.cache.alt_rewrite_max_size = %d", cache_config_alt_rewrite_max_size);
03553 
03554   REC_EstablishStaticConfigInt32(cache_config_read_while_writer, "proxy.config.cache.enable_read_while_writer");
03555   cache_config_read_while_writer = validate_rww(cache_config_read_while_writer);
03556   REC_RegisterConfigUpdateFunc("proxy.config.cache.enable_read_while_writer", update_cache_config, NULL);
03557   Debug("cache_init", "proxy.config.cache.enable_read_while_writer = %d", cache_config_read_while_writer);
03558 
03559   register_cache_stats(cache_rsb, "proxy.process.cache");
03560 
03561   const char *err = NULL;
03562   if ((err = theCacheStore.read_config())) {
03563     printf("%s  failed\n", err);
03564     exit(1);
03565   }
03566 
03567   if (theCacheStore.n_disks == 0) {
03568     ats_scoped_str path(RecConfigReadConfigPath("proxy.config.cache.storage_filename", "storage.config"));
03569     Warning("no cache disks specified in %s: cache disabled\n", (const char *)path);
03570     //exit(1);
03571   }
03572 #if TS_USE_INTERIM_CACHE == 1
03573   else {
03574     theCacheStore.read_interim_config();
03575     if (theCacheStore.n_interim_disks == 0)
03576       Warning("no interim disks specified in %s: \n", "proxy.config.cache.interim.storage");
03577   }
03578 #endif
03579 }
03580 
03581 //----------------------------------------------------------------------------
03582 Action *
03583 CacheProcessor::open_read(Continuation *cont, URL *url, bool cluster_cache_local, CacheHTTPHdr *request,
03584                           CacheLookupHttpConfig *params, time_t pin_in_cache, CacheFragType type)
03585 {
03586 #ifdef CLUSTER_CACHE
03587   if (cache_clustering_enabled > 0 && !cluster_cache_local) {
03588     return open_read_internal(CACHE_OPEN_READ_LONG, cont, (MIOBuffer *) 0,
03589                               url, request, params, (CacheKey *) 0, pin_in_cache, type, (char *) 0, 0);
03590   }
03591 #endif
03592   return caches[type]->open_read(cont, url, request, params, type);
03593 }
03594 
03595 
03596 //----------------------------------------------------------------------------
03597 Action *
03598 CacheProcessor::open_write(Continuation *cont, int expected_size, URL *url, bool cluster_cache_local,
03599                            CacheHTTPHdr *request, CacheHTTPInfo *old_info, time_t pin_in_cache, CacheFragType type)
03600 {
03601 #ifdef CLUSTER_CACHE
03602   if (cache_clustering_enabled > 0 && !cluster_cache_local) {
03603     INK_MD5 url_md5;
03604     Cache::generate_key(&url_md5, url);
03605     ClusterMachine *m = cluster_machine_at_depth(cache_hash(url_md5));
03606 
03607     if (m) {
03608       // Do remote open_write()
03609       INK_MD5 url_only_md5;
03610       Cache::generate_key(&url_only_md5, url);
03611       return Cluster_write(cont, expected_size, (MIOBuffer *) 0, m,
03612                            &url_only_md5, type,
03613                            false, pin_in_cache, CACHE_OPEN_WRITE_LONG,
03614                            (CacheKey *) 0, url, request, old_info, (char *) 0, 0);
03615     }
03616   }
03617 #endif
03618   return caches[type]->open_write(cont, url, request, old_info, pin_in_cache, type);
03619 }
03620 
03621 //----------------------------------------------------------------------------
03622 // Note: this should not be called from from the cluster processor, or bad
03623 // recursion could occur. This is merely a convenience wrapper.
03624 Action *
03625 CacheProcessor::remove(Continuation *cont, URL *url, bool cluster_cache_local, CacheFragType frag_type)
03626 {
03627   CryptoHash id;
03628   int len = 0;
03629   const char *hostname;
03630 
03631   url->hash_get(&id);
03632   hostname = url->host_get(&len);
03633 
03634   Debug("cache_remove", "[CacheProcessor::remove] Issuing cache delete for %s", url->string_get_ref());
03635 #ifdef CLUSTER_CACHE
03636   if (cache_clustering_enabled > 0 && !cluster_cache_local) {
03637     // Remove from cluster
03638     return remove(cont, &id, cluster_cache_local, frag_type, true, false, const_cast<char *>(hostname), len);
03639   }
03640 #endif
03641 
03642   // Remove from local cache only.
03643   return caches[frag_type]->remove(cont, &id, frag_type, true, false, const_cast<char*>(hostname), len);
03644 }
03645 
03646 CacheDisk*
03647 CacheProcessor::find_by_path(char const* path, int len)
03648 {
03649   if (CACHE_INITIALIZED == initialized) {
03650     // If no length is passed in, assume it's null terminated.
03651     if (0 >= len && 0 != *path) len = strlen(path);
03652 
03653     for ( int i = 0 ; i < gndisks ; ++i ) {
03654       if (0 == strncmp(path, gdisks[i]->path, len))
03655         return gdisks[i];
03656     }
03657   }
03658 
03659   return 0;
03660 }
03661 
03662 // ----------------------------
03663 
03664 namespace cache_bc {
03665   static size_t const HTTP_ALT_MARSHAL_SIZE = ROUND(sizeof(HTTPCacheAlt), HDR_PTR_SIZE); // current size.
03666   size_t
03667   HTTPInfo_v21::marshalled_length(void* data)
03668   {
03669     size_t zret = ROUND(sizeof(HTTPCacheAlt_v21), HDR_PTR_SIZE);
03670     HTTPCacheAlt_v21* alt = static_cast<HTTPCacheAlt_v21*>(data);
03671     HdrHeap* hdr;
03672 
03673     hdr = reinterpret_cast<HdrHeap*>(reinterpret_cast<char*>(alt) + reinterpret_cast<uintptr_t>(alt->m_request_hdr.m_heap));
03674     zret += ROUND(hdr->unmarshal_size(), HDR_PTR_SIZE);
03675     hdr = reinterpret_cast<HdrHeap*>(reinterpret_cast<char*>(alt) + reinterpret_cast<uintptr_t>(alt->m_response_hdr.m_heap));
03676     zret += ROUND(hdr->unmarshal_size(), HDR_PTR_SIZE);
03677     return zret;
03678   }
03679 
03680   // Copy an unmarshalled instance from @a src to @a dst.
03681   // @a src is presumed to be Cache version 21 and the result
03682   // is Cache version 23. @a length is the buffer available in @a dst.
03683   // @return @c false if something went wrong (e.g., data overrun).
03684   bool
03685   HTTPInfo_v21::copy_and_upgrade_unmarshalled_to_v23(
03686     char*& dst, char*& src, size_t& length, int n_frags, FragOffset* frag_offsets
03687     )
03688   {
03689     // Offsets of the data after the new stuff.
03690     static const size_t OLD_OFFSET = offsetof(HTTPCacheAlt_v21, m_ext_buffer);
03691     static const size_t NEW_OFFSET = offsetof(HTTPCacheAlt_v23, m_ext_buffer);
03692 
03693     HTTPCacheAlt_v21* s_alt = reinterpret_cast<HTTPCacheAlt_v21*>(src);
03694     HTTPCacheAlt_v23* d_alt = reinterpret_cast<HTTPCacheAlt_v23*>(dst);
03695     HdrHeap_v23* s_hdr;
03696     HdrHeap_v23* d_hdr;
03697     size_t hdr_size;
03698 
03699     if (length < HTTP_ALT_MARSHAL_SIZE) return false; // Absolutely no hope in this case.
03700 
03701     memcpy(dst, src, OLD_OFFSET); // initially same data
03702     // Now data that's now after extra
03703     memcpy( static_cast<char*>(dst) + NEW_OFFSET
03704             , static_cast<char*>(src) + OLD_OFFSET
03705             , sizeof(HTTPCacheAlt_v21) - OLD_OFFSET
03706       );
03707     dst += HTTP_ALT_MARSHAL_SIZE; // move past fixed data.
03708     length -= HTTP_ALT_MARSHAL_SIZE;
03709 
03710     // Extra data is fragment table - set that if we have it.
03711     if (n_frags) {
03712       static size_t const IFT_SIZE = HTTPCacheAlt_v23::N_INTEGRAL_FRAG_OFFSETS * sizeof(FragOffset);
03713       size_t ift_actual = min(n_frags, HTTPCacheAlt_v23::N_INTEGRAL_FRAG_OFFSETS) * sizeof(FragOffset);
03714 
03715       if (length < (HTTP_ALT_MARSHAL_SIZE + n_frags * sizeof(FragOffset) - IFT_SIZE))
03716         return false; // can't place fragment table.
03717 
03718       d_alt->m_frag_offset_count = n_frags;
03719       d_alt->m_frag_offsets = reinterpret_cast<FragOffset*>(dst - reinterpret_cast<char*>(d_alt));
03720 
03721       memcpy(d_alt->m_integral_frag_offsets, frag_offsets, ift_actual);
03722       n_frags -= HTTPCacheAlt_v23::N_INTEGRAL_FRAG_OFFSETS;
03723       if (n_frags > 0) {
03724         size_t k = sizeof(FragOffset) * n_frags;
03725         memcpy(dst, frag_offsets + IFT_SIZE, k);
03726         dst += k;
03727         length -= k;
03728       } else if (n_frags < 0) {
03729         memset(dst + ift_actual, 0, IFT_SIZE - ift_actual);
03730       }
03731     } else {
03732       d_alt->m_frag_offset_count = 0;
03733       d_alt->m_frag_offsets = 0;
03734       ink_zero(d_alt->m_integral_frag_offsets);
03735     }
03736 
03737     // Copy over the headers, tweaking the swizzled pointers.
03738     s_hdr = reinterpret_cast<HdrHeap_v23*>(reinterpret_cast<char*>(s_alt) + reinterpret_cast<uintptr_t>(s_alt->m_request_hdr.m_heap));
03739     d_hdr = reinterpret_cast<HdrHeap_v23*>(dst);
03740     hdr_size = ROUND(s_hdr->unmarshal_size(), HDR_PTR_SIZE);
03741     if (hdr_size > length) return false;
03742     memcpy(d_hdr, s_hdr, hdr_size);
03743     d_alt->m_request_hdr.m_heap = reinterpret_cast<HdrHeap_v23*>(reinterpret_cast<char*>(d_hdr) - reinterpret_cast<char*>(d_alt));
03744     dst += hdr_size;
03745     length -= hdr_size;
03746 
03747     s_hdr = reinterpret_cast<HdrHeap_v23*>(reinterpret_cast<char*>(s_alt) + reinterpret_cast<uintptr_t>(s_alt->m_response_hdr.m_heap));
03748     d_hdr = reinterpret_cast<HdrHeap_v23*>(dst);
03749     hdr_size = ROUND(s_hdr->unmarshal_size(), HDR_PTR_SIZE);
03750     if (hdr_size > length) return false;
03751     memcpy(d_hdr, s_hdr, hdr_size);
03752     d_alt->m_response_hdr.m_heap = reinterpret_cast<HdrHeap_v23*>(reinterpret_cast<char*>(d_hdr) - reinterpret_cast<char*>(d_alt));
03753     dst += hdr_size;
03754     length -= hdr_size;
03755 
03756     src = reinterpret_cast<char*>(s_hdr) + hdr_size;
03757   
03758     return true;
03759   }
03760 
03761 } // cache_bc

Generated by  doxygen 1.7.1