00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "P_Cache.h"
00026
00027
00028 #include "P_CacheTest.h"
00029 #include "StatPages.h"
00030
00031 #include "I_Layout.h"
00032
00033 #ifdef HTTP_CACHE
00034 #include "HttpTransactCache.h"
00035 #include "HttpSM.h"
00036 #include "HttpCacheSM.h"
00037 #include "InkAPIInternal.h"
00038 #include "P_CacheBC.h"
00039 #endif
00040
00041
00042 #define USELESS_REENABLES // allow them for now
00043
00044
00045 static size_t DEFAULT_RAM_CACHE_MULTIPLIER = 10;
00046
00047
00048 static short int const CACHE_DB_MAJOR_VERSION_COMPATIBLE = 21;
00049
00050 #define DOCACHE_CLEAR_DYN_STAT(x) \
00051 do { \
00052 RecSetRawStatSum(rsb, x, 0); \
00053 RecSetRawStatCount(rsb, x, 0); \
00054 } while (0);
00055
00056
00057
00058
00059 int64_t cache_config_ram_cache_size = AUTO_SIZE_RAM_CACHE;
00060 int cache_config_ram_cache_algorithm = 0;
00061 int cache_config_ram_cache_compress = 0;
00062 int cache_config_ram_cache_compress_percent = 90;
00063 int cache_config_ram_cache_use_seen_filter = 0;
00064 int cache_config_http_max_alts = 3;
00065 int cache_config_dir_sync_frequency = 60;
00066 int cache_config_permit_pinning = 0;
00067 int cache_config_vary_on_user_agent = 0;
00068 int cache_config_select_alternate = 1;
00069 int cache_config_max_doc_size = 0;
00070 int cache_config_min_average_object_size = ESTIMATED_OBJECT_SIZE;
00071 int64_t cache_config_ram_cache_cutoff = AGG_SIZE;
00072 int cache_config_max_disk_errors = 5;
00073 int cache_config_hit_evacuate_percent = 10;
00074 int cache_config_hit_evacuate_size_limit = 0;
00075 int cache_config_force_sector_size = 0;
00076 int cache_config_target_fragment_size = DEFAULT_TARGET_FRAGMENT_SIZE;
00077 int cache_config_agg_write_backlog = AGG_SIZE * 2;
00078 int cache_config_enable_checksum = 0;
00079 int cache_config_alt_rewrite_max_size = 4096;
00080 int cache_config_read_while_writer = 0;
00081 int cache_config_mutex_retry_delay = 2;
00082 #ifdef HTTP_CACHE
00083 static int enable_cache_empty_http_doc = 0;
00084
00085
00086 int cache_config_compatibility_4_2_0_fixup = 1;
00087 #endif
00088
00089 #if TS_USE_INTERIM_CACHE == 1
00090 int migrate_threshold = 2;
00091 #endif
00092
00093
00094
00095 RecRawStatBlock *cache_rsb = NULL;
00096 Cache *theStreamCache = 0;
00097 Cache *theCache = 0;
00098 CacheDisk **gdisks = NULL;
00099 int gndisks = 0;
00100 static volatile int initialize_disk = 0;
00101 Cache *caches[NUM_CACHE_FRAG_TYPES] = { 0 };
00102 CacheSync *cacheDirSync = 0;
00103 Store theCacheStore;
00104 volatile int CacheProcessor::initialized = CACHE_INITIALIZING;
00105 volatile uint32_t CacheProcessor::cache_ready = 0;
00106 volatile int CacheProcessor::start_done = 0;
00107 int CacheProcessor::clear = 0;
00108 int CacheProcessor::fix = 0;
00109 int CacheProcessor::start_internal_flags = 0;
00110 int CacheProcessor::auto_clear_flag = 0;
00111 CacheProcessor cacheProcessor;
00112 Vol **gvol = NULL;
00113 volatile int gnvol = 0;
00114 #if TS_USE_INTERIM_CACHE == 1
00115 CacheDisk **g_interim_disks = NULL;
00116 int gn_interim_disks = 0;
00117 int good_interim_disks = 0;
00118 uint64_t total_cache_size = 0;
00119 #endif
00120 ClassAllocator<CacheVC> cacheVConnectionAllocator("cacheVConnection");
00121 ClassAllocator<EvacuationBlock> evacuationBlockAllocator("evacuationBlock");
00122 ClassAllocator<CacheRemoveCont> cacheRemoveContAllocator("cacheRemoveCont");
00123 ClassAllocator<EvacuationKey> evacuationKeyAllocator("evacuationKey");
00124 int CacheVC::size_to_init = -1;
00125 CacheKey zero_key;
00126 #if TS_USE_INTERIM_CACHE == 1
00127 ClassAllocator<MigrateToInterimCache> migrateToInterimCacheAllocator("migrateToInterimCache");
00128 #endif
00129
00130 struct VolInitInfo
00131 {
00132 off_t recover_pos;
00133 AIOCallbackInternal vol_aio[4];
00134 char *vol_h_f;
00135
00136 VolInitInfo()
00137 {
00138 recover_pos = 0;
00139 vol_h_f = (char *)ats_memalign(ats_pagesize(), 4 * STORE_BLOCK_SIZE);
00140 memset(vol_h_f, 0, 4 * STORE_BLOCK_SIZE);
00141 }
00142
00143 ~VolInitInfo()
00144 {
00145 for (int i = 0; i < 4; i++) {
00146 vol_aio[i].action = NULL;
00147 vol_aio[i].mutex.clear();
00148 }
00149 free(vol_h_f);
00150 }
00151 };
00152
00153 #if AIO_MODE == AIO_MODE_NATIVE
00154 struct VolInit : public Continuation
00155 {
00156 Vol *vol;
00157 char *path;
00158 off_t blocks;
00159 int64_t offset;
00160 bool vol_clear;
00161
00162 int mainEvent(int , Event *) {
00163 vol->init(path, blocks, offset, vol_clear);
00164 mutex.clear();
00165 delete this;
00166 return EVENT_DONE;
00167 }
00168
00169 VolInit(Vol *v, char *p, off_t b, int64_t o, bool c) : Continuation(v->mutex),
00170 vol(v), path(p), blocks(b), offset(o), vol_clear(c) {
00171 SET_HANDLER(&VolInit::mainEvent);
00172 }
00173 };
00174
00175 struct DiskInit : public Continuation
00176 {
00177 CacheDisk *disk;
00178 char *s;
00179 off_t blocks;
00180 off_t askip;
00181 int ahw_sector_size;
00182 int fildes;
00183 bool clear;
00184
00185 int mainEvent(int , Event * ) {
00186 disk->open(s, blocks, askip, ahw_sector_size, fildes, clear);
00187 ats_free(s);
00188 mutex.clear();
00189 delete this;
00190 return EVENT_DONE;
00191 }
00192
00193 DiskInit(CacheDisk *d, char *str, off_t b, off_t skip, int sector, int f, bool c) : Continuation(d->mutex),
00194 disk(d), s(ats_strdup(str)), blocks(b), askip(skip), ahw_sector_size(sector), fildes(f), clear(c) {
00195 SET_HANDLER(&DiskInit::mainEvent);
00196 }
00197 };
00198 #endif
00199 void cplist_init();
00200 static void cplist_update();
00201 int cplist_reconfigure();
00202 static int create_volume(int volume_number, off_t size_in_blocks, int scheme, CacheVol *cp);
00203 static void rebuild_host_table(Cache *cache);
00204 void register_cache_stats(RecRawStatBlock *rsb, const char *prefix);
00205
00206 Queue<CacheVol> cp_list;
00207 int cp_list_len = 0;
00208 ConfigVolumes config_volumes;
00209
00210 #if TS_HAS_TESTS
00211 void force_link_CacheTestCaller() {
00212 force_link_CacheTest();
00213 }
00214 #endif
00215
00216 int64_t
00217 cache_bytes_used(int volume)
00218 {
00219 uint64_t used = 0;
00220
00221 for (int i = 0; i < gnvol; i++) {
00222 if (!DISK_BAD(gvol[i]->disk) && (volume == -1 || gvol[i]->cache_vol->vol_number == volume)) {
00223 if (!gvol[i]->header->cycle)
00224 used += gvol[i]->header->write_pos - gvol[i]->start;
00225 else
00226 used += gvol[i]->len - vol_dirlen(gvol[i]) - EVACUATION_SIZE;
00227 }
00228 }
00229
00230 return used;
00231 }
00232
00233 int
00234 cache_stats_bytes_used_cb(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
00235 {
00236 int volume = -1;
00237 char *p;
00238
00239
00240 p = strstr((char *) name, "volume_");
00241 if (p != NULL) {
00242
00243 volume = strtol(p + strlen("volume_"), NULL, 10);
00244 }
00245
00246 if (cacheProcessor.initialized == CACHE_INITIALIZED) {
00247 int64_t used, total =0;
00248 float percent_full;
00249
00250 used = cache_bytes_used(volume);
00251 RecSetGlobalRawStatSum(rsb, id, used);
00252 RecRawStatSyncSum(name, data_type, data, rsb, id);
00253 RecGetGlobalRawStatSum(rsb, (int) cache_bytes_total_stat, &total);
00254 percent_full = (float)used / (float)total * 100;
00255
00256 RecSetGlobalRawStatSum(rsb, (int) cache_percent_full_stat, (int64_t) percent_full);
00257 }
00258
00259 return 1;
00260 }
00261
00262 static int
00263 validate_rww(int new_value)
00264 {
00265 if (new_value) {
00266 float http_bg_fill;
00267
00268 REC_ReadConfigFloat(http_bg_fill, "proxy.config.http.background_fill_completed_threshold");
00269 if (http_bg_fill > 0.0) {
00270 Note("to enable reading while writing a document, %s should be 0.0: read while writing disabled",
00271 "proxy.config.http.background_fill_completed_threshold");
00272 return 0;
00273 }
00274 if (cache_config_max_doc_size > 0) {
00275 Note("to enable reading while writing a document, %s should be 0: read while writing disabled",
00276 "proxy.config.cache.max_doc_size");
00277 return 0;
00278 }
00279 return new_value;
00280 }
00281 return 0;
00282 }
00283
00284 static int
00285 update_cache_config(const char * , RecDataT , RecData data,
00286 void * )
00287 {
00288 volatile int new_value = validate_rww(data.rec_int);
00289 cache_config_read_while_writer = new_value;
00290
00291 return 0;
00292 }
00293
00294 CacheVC::CacheVC():alternate_index(CACHE_ALT_INDEX_DEFAULT)
00295 {
00296 size_to_init = sizeof(CacheVC) - (size_t) & ((CacheVC *) 0)->vio;
00297
00298 memset((void *) &vio, 0, size_to_init);
00299
00300
00301 }
00302
00303 #ifdef HTTP_CACHE
00304 HTTPInfo::FragOffset*
00305 CacheVC::get_frag_table()
00306 {
00307 ink_assert(alternate.valid());
00308 return alternate.valid() ? alternate.get_frag_table() : 0;
00309 }
00310 #endif
00311
00312 VIO *
00313 CacheVC::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *abuf)
00314 {
00315 ink_assert(vio.op == VIO::READ);
00316 vio.buffer.writer_for(abuf);
00317 vio.set_continuation(c);
00318 vio.ndone = 0;
00319 vio.nbytes = nbytes;
00320 vio.vc_server = this;
00321 #ifdef DEBUG
00322 ink_assert(c->mutex->thread_holding);
00323 #endif
00324 if (!trigger && !recursive)
00325 trigger = c->mutex->thread_holding->schedule_imm_local(this);
00326 return &vio;
00327 }
00328
00329 VIO *
00330 CacheVC::do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *abuf, int64_t offset)
00331 {
00332 ink_assert(vio.op == VIO::READ);
00333 vio.buffer.writer_for(abuf);
00334 vio.set_continuation(c);
00335 vio.ndone = 0;
00336 vio.nbytes = nbytes;
00337 vio.vc_server = this;
00338 seek_to = offset;
00339 #ifdef DEBUG
00340 ink_assert(c->mutex->thread_holding);
00341 #endif
00342 if (!trigger && !recursive)
00343 trigger = c->mutex->thread_holding->schedule_imm_local(this);
00344 return &vio;
00345 }
00346
00347 VIO *
00348 CacheVC::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *abuf, bool owner)
00349 {
00350 ink_assert(vio.op == VIO::WRITE);
00351 ink_assert(!owner);
00352 vio.buffer.reader_for(abuf);
00353 vio.set_continuation(c);
00354 vio.ndone = 0;
00355 vio.nbytes = nbytes;
00356 vio.vc_server = this;
00357 #ifdef DEBUG
00358 ink_assert(c->mutex->thread_holding);
00359 #endif
00360 if (!trigger && !recursive)
00361 trigger = c->mutex->thread_holding->schedule_imm_local(this);
00362 return &vio;
00363 }
00364
00365 void
00366 CacheVC::do_io_close(int alerrno)
00367 {
00368 ink_assert(mutex->thread_holding == this_ethread());
00369 int previous_closed = closed;
00370 closed = (alerrno == -1) ? 1 : -1;
00371 DDebug("cache_close", "do_io_close %p %d %d", this, alerrno, closed);
00372 if (!previous_closed && !recursive)
00373 die();
00374 }
00375
00376 void
00377 CacheVC::reenable(VIO *avio)
00378 {
00379 DDebug("cache_reenable", "reenable %p", this);
00380 (void) avio;
00381 #ifdef DEBUG
00382 ink_assert(avio->mutex->thread_holding);
00383 #endif
00384 if (!trigger) {
00385 #ifndef USELESS_REENABLES
00386 if (vio.op == VIO::READ) {
00387 if (vio.buffer.mbuf->max_read_avail() > vio.buffer.writer()->water_mark)
00388 ink_assert(!"useless reenable of cache read");
00389 } else if (!vio.buffer.reader()->read_avail())
00390 ink_assert(!"useless reenable of cache write");
00391 #endif
00392 trigger = avio->mutex->thread_holding->schedule_imm_local(this);
00393 }
00394 }
00395
00396 void
00397 CacheVC::reenable_re(VIO *avio)
00398 {
00399 DDebug("cache_reenable", "reenable_re %p", this);
00400 (void) avio;
00401 #ifdef DEBUG
00402 ink_assert(avio->mutex->thread_holding);
00403 #endif
00404 if (!trigger) {
00405 if (!is_io_in_progress() && !recursive) {
00406 handleEvent(EVENT_NONE, (void *) 0);
00407 } else
00408 trigger = avio->mutex->thread_holding->schedule_imm_local(this);
00409 }
00410 }
00411
00412 bool
00413 CacheVC::get_data(int i, void *data)
00414 {
00415 switch (i) {
00416 #ifdef HTTP_CACHE
00417 case CACHE_DATA_HTTP_INFO:
00418 *((CacheHTTPInfo **) data) = &alternate;
00419 return true;
00420 #endif
00421 case CACHE_DATA_RAM_CACHE_HIT_FLAG:
00422 *((int *) data) = !f.not_from_ram_cache;
00423 return true;
00424 default:
00425 break;
00426 }
00427 return false;
00428 }
00429
00430 int64_t
00431 CacheVC::get_object_size()
00432 {
00433 return ((CacheVC *) this)->doc_len;
00434 }
00435
00436 bool CacheVC::set_data(int , void * )
00437 {
00438 ink_assert(!"CacheVC::set_data should not be called!");
00439 return true;
00440 }
00441
00442 #ifdef HTTP_CACHE
00443 void
00444 CacheVC::get_http_info(CacheHTTPInfo ** ainfo)
00445 {
00446 *ainfo = &((CacheVC *) this)->alternate;
00447 }
00448
00449
00450
00451
00452
00453 void
00454 CacheVC::set_http_info(CacheHTTPInfo *ainfo)
00455 {
00456 ink_assert(!total_len);
00457 if (f.update) {
00458 ainfo->object_key_set(update_key);
00459 ainfo->object_size_set(update_len);
00460 } else {
00461 ainfo->object_key_set(earliest_key);
00462
00463 }
00464 if (enable_cache_empty_http_doc) {
00465 MIMEField *field = ainfo->m_alt->m_response_hdr.field_find(MIME_FIELD_CONTENT_LENGTH, MIME_LEN_CONTENT_LENGTH);
00466 if (field && !field->value_get_int64())
00467 f.allow_empty_doc = 1;
00468 else
00469 f.allow_empty_doc = 0;
00470 } else
00471 f.allow_empty_doc = 0;
00472 alternate.copy_shallow(ainfo);
00473 ainfo->clear();
00474 }
00475 #endif
00476
00477 bool CacheVC::set_pin_in_cache(time_t time_pin)
00478 {
00479 if (total_len) {
00480 ink_assert(!"should Pin the document before writing");
00481 return false;
00482 }
00483 if (vio.op != VIO::WRITE) {
00484 ink_assert(!"Pinning only allowed while writing objects to the cache");
00485 return false;
00486 }
00487 pin_in_cache = time_pin;
00488 return true;
00489 }
00490
00491 bool CacheVC::set_disk_io_priority(int priority)
00492 {
00493
00494 ink_assert(priority >= AIO_LOWEST_PRIORITY);
00495 io.aiocb.aio_reqprio = priority;
00496 return true;
00497 }
00498
00499 time_t CacheVC::get_pin_in_cache()
00500 {
00501 return pin_in_cache;
00502 }
00503
00504 int
00505 CacheVC::get_disk_io_priority()
00506 {
00507 return io.aiocb.aio_reqprio;
00508 }
00509
00510 int
00511 Vol::begin_read(CacheVC *cont)
00512 {
00513 ink_assert(cont->mutex->thread_holding == this_ethread());
00514 ink_assert(mutex->thread_holding == this_ethread());
00515 #ifdef CACHE_STAT_PAGES
00516 ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
00517 stat_cache_vcs.enqueue(cont, cont->stat_link);
00518 #endif
00519
00520 if (cont->f.single_fragment)
00521 return 0;
00522 #if TS_USE_INTERIM_CACHE == 1
00523 if (dir_ininterim(&cont->earliest_dir))
00524 return 0;
00525 #endif
00526 int i = dir_evac_bucket(&cont->earliest_dir);
00527 EvacuationBlock *b;
00528 for (b = evacuate[i].head; b; b = b->link.next) {
00529 if (dir_offset(&b->dir) != dir_offset(&cont->earliest_dir))
00530 continue;
00531 if (b->readers)
00532 b->readers = b->readers + 1;
00533 return 0;
00534 }
00535
00536
00537 EThread *t = cont->mutex->thread_holding;
00538 b = new_EvacuationBlock(t);
00539 b->readers = 1;
00540 b->dir = cont->earliest_dir;
00541 b->evac_frags.key = cont->earliest_key;
00542 evacuate[i].push(b);
00543 return 1;
00544 }
00545
00546 int
00547 Vol::close_read(CacheVC *cont)
00548 {
00549 EThread *t = cont->mutex->thread_holding;
00550 ink_assert(t == this_ethread());
00551 ink_assert(t == mutex->thread_holding);
00552 if (dir_is_empty(&cont->earliest_dir))
00553 return 1;
00554 int i = dir_evac_bucket(&cont->earliest_dir);
00555 EvacuationBlock *b;
00556 for (b = evacuate[i].head; b;) {
00557 EvacuationBlock *next = b->link.next;
00558 if (dir_offset(&b->dir) != dir_offset(&cont->earliest_dir)) {
00559 b = next;
00560 continue;
00561 }
00562 if (b->readers && !--b->readers) {
00563 evacuate[i].remove(b);
00564 free_EvacuationBlock(b, t);
00565 break;
00566 }
00567 b = next;
00568 }
00569 #ifdef CACHE_STAT_PAGES
00570 stat_cache_vcs.remove(cont, cont->stat_link);
00571 ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
00572 #endif
00573 return 1;
00574 }
00575
00576
00577
00578 int
00579 CacheProcessor::start(int, size_t)
00580 {
00581 return start_internal(0);
00582 }
00583
00584 static const int DEFAULT_CACHE_OPTIONS = (O_RDWR | _O_ATTRIB_OVERLAPPED);
00585
00586 int
00587 CacheProcessor::start_internal(int flags)
00588 {
00589
00590 ink_assert((int)TS_EVENT_CACHE_OPEN_READ == (int)CACHE_EVENT_OPEN_READ);
00591 ink_assert((int)TS_EVENT_CACHE_OPEN_READ_FAILED == (int)CACHE_EVENT_OPEN_READ_FAILED);
00592 ink_assert((int)TS_EVENT_CACHE_OPEN_WRITE == (int)CACHE_EVENT_OPEN_WRITE);
00593 ink_assert((int)TS_EVENT_CACHE_OPEN_WRITE_FAILED == (int)CACHE_EVENT_OPEN_WRITE_FAILED);
00594 ink_assert((int)TS_EVENT_CACHE_REMOVE == (int)CACHE_EVENT_REMOVE);
00595 ink_assert((int)TS_EVENT_CACHE_REMOVE_FAILED == (int)CACHE_EVENT_REMOVE_FAILED);
00596 ink_assert((int)TS_EVENT_CACHE_SCAN == (int)CACHE_EVENT_SCAN);
00597 ink_assert((int)TS_EVENT_CACHE_SCAN_FAILED == (int)CACHE_EVENT_SCAN_FAILED);
00598 ink_assert((int)TS_EVENT_CACHE_SCAN_OBJECT == (int)CACHE_EVENT_SCAN_OBJECT);
00599 ink_assert((int)TS_EVENT_CACHE_SCAN_OPERATION_BLOCKED == (int)CACHE_EVENT_SCAN_OPERATION_BLOCKED);
00600 ink_assert((int)TS_EVENT_CACHE_SCAN_OPERATION_FAILED == (int)CACHE_EVENT_SCAN_OPERATION_FAILED);
00601 ink_assert((int)TS_EVENT_CACHE_SCAN_DONE == (int)CACHE_EVENT_SCAN_DONE);
00602
00603 #if AIO_MODE == AIO_MODE_NATIVE
00604 int etype = ET_NET;
00605 int n_netthreads = eventProcessor.n_threads_for_type[etype];
00606 EThread **netthreads = eventProcessor.eventthread[etype];
00607 for (int i = 0; i < n_netthreads; ++i) {
00608 netthreads[i]->diskHandler = new DiskHandler();
00609 netthreads[i]->schedule_imm(netthreads[i]->diskHandler);
00610 }
00611 #endif
00612
00613 start_internal_flags = flags;
00614 clear = !!(flags & PROCESSOR_RECONFIGURE) || auto_clear_flag;
00615 fix = !!(flags & PROCESSOR_FIX);
00616 start_done = 0;
00617 int diskok = 1;
00618 Span *sd;
00619 #if TS_USE_INTERIM_CACHE == 1
00620 gn_interim_disks = theCacheStore.n_interim_disks;
00621 g_interim_disks = (CacheDisk **) ats_malloc(gn_interim_disks * sizeof(CacheDisk *));
00622
00623 gn_interim_disks = 0;
00624
00625 for (int i = 0; i < theCacheStore.n_interim_disks; i++) {
00626 sd = theCacheStore.interim_disk[i];
00627 char path[PATH_MAX];
00628 int opts = O_RDWR;
00629 ink_strlcpy(path, sd->pathname, sizeof(path));
00630 if (!sd->file_pathname) {
00631 #if !defined(_WIN32)
00632 if (config_volumes.num_http_volumes && config_volumes.num_stream_volumes) {
00633 Warning(
00634 "It is suggested that you use raw disks if streaming and http are in the same cache");
00635 }
00636 #endif
00637 ink_strlcat(path, "/cache.db", sizeof(path));
00638 opts |= O_CREAT;
00639 }
00640 opts |= _O_ATTRIB_OVERLAPPED;
00641 #ifdef O_DIRECT
00642 opts |= O_DIRECT;
00643 #endif
00644 #ifdef O_DSYNC
00645 opts |= O_DSYNC;
00646 #endif
00647
00648 int fd = open(path, opts, 0644);
00649 int blocks = sd->blocks;
00650 if (fd > 0) {
00651 if (!sd->file_pathname) {
00652 if (ftruncate(fd, ((uint64_t) blocks) * STORE_BLOCK_SIZE) < 0) {
00653 Warning("unable to truncate cache file '%s' to %d blocks", path, blocks);
00654 diskok = 0;
00655 }
00656 }
00657 if (diskok) {
00658 CacheDisk *disk = new CacheDisk();
00659 Debug("cache_hosting", "interim Disk: %d, blocks: %d", gn_interim_disks, blocks);
00660 int sector_size = sd->hw_sector_size;
00661 if (sector_size < cache_config_force_sector_size)
00662 sector_size = cache_config_force_sector_size;
00663 if (sd->hw_sector_size <= 0 || sector_size > STORE_BLOCK_SIZE) {
00664 Warning("bad hardware sector size %d, resetting to %d", sector_size, STORE_BLOCK_SIZE);
00665 sector_size = STORE_BLOCK_SIZE;
00666 }
00667 off_t skip = ROUND_TO_STORE_BLOCK((sd->offset * STORE_BLOCK_SIZE < START_POS ? START_POS + sd->alignment :
00668 sd->offset * STORE_BLOCK_SIZE));
00669 blocks = blocks - (skip >> STORE_BLOCK_SHIFT);
00670 disk->path = ats_strdup(path);
00671 disk->hw_sector_size = sector_size;
00672 disk->fd = fd;
00673 disk->skip = skip;
00674 disk->start = skip;
00675
00676 disk->len = blocks;
00677 disk->io.aiocb.aio_fildes = fd;
00678 disk->io.aiocb.aio_reqprio = 0;
00679 disk->io.action = disk;
00680 disk->io.thread = AIO_CALLBACK_THREAD_ANY;
00681 g_interim_disks[gn_interim_disks++] = disk;
00682 }
00683 } else
00684 Warning("cache unable to open '%s': %s", path, strerror(errno));
00685 }
00686
00687 if (gn_interim_disks == 0) {
00688 Warning("unable to open cache disk(s): InterimCache Cache Disabled\n");
00689 }
00690 good_interim_disks = gn_interim_disks;
00691 diskok = 1;
00692 #endif
00693
00694
00695 gndisks = theCacheStore.n_disks;
00696 gdisks = (CacheDisk **)ats_malloc(gndisks * sizeof(CacheDisk *));
00697
00698 gndisks = 0;
00699 ink_aio_set_callback(new AIO_Callback_handler());
00700
00701 config_volumes.read_config_file();
00702 #if TS_USE_INTERIM_CACHE == 1
00703 total_cache_size = 0;
00704 for (unsigned i = 0; i < theCacheStore.n_disks; i++)
00705 total_cache_size += theCacheStore.disk[i]->blocks;
00706 #endif
00707 for (unsigned i = 0; i < theCacheStore.n_disks; i++) {
00708 sd = theCacheStore.disk[i];
00709 char path[PATH_NAME_MAX];
00710 int opts = DEFAULT_CACHE_OPTIONS;
00711
00712 ink_strlcpy(path, sd->pathname, sizeof(path));
00713 if (!sd->file_pathname) {
00714 if (config_volumes.num_http_volumes && config_volumes.num_stream_volumes) {
00715 Warning("It is suggested that you use raw disks if streaming and http are in the same cache");
00716 }
00717 ink_strlcat(path, "/cache.db", sizeof(path));
00718 opts |= O_CREAT;
00719 }
00720
00721 #ifdef O_DIRECT
00722 opts |= O_DIRECT;
00723 #endif
00724 #ifdef O_DSYNC
00725 opts |= O_DSYNC;
00726 #endif
00727
00728 int fd = open(path, opts, 0644);
00729 int blocks = sd->blocks;
00730
00731 if (fd < 0 && (opts & O_CREAT))
00732 fd = open(path, DEFAULT_CACHE_OPTIONS | O_CREAT, 0644);
00733
00734 if (fd > 0) {
00735 if (!sd->file_pathname) {
00736 if (ftruncate(fd, ((uint64_t) blocks) * STORE_BLOCK_SIZE) < 0) {
00737 Warning("unable to truncate cache file '%s' to %d blocks", path, blocks);
00738 diskok = 0;
00739 }
00740 }
00741 if (diskok) {
00742 int sector_size = sd->hw_sector_size;
00743
00744 gdisks[gndisks] = new CacheDisk();
00745 gdisks[gndisks]->forced_volume_num = sd->forced_volume_num;
00746 if (sd->hash_base_string)
00747 gdisks[gndisks]->hash_base_string = ats_strdup(sd->hash_base_string);
00748
00749 Debug("cache_hosting", "Disk: %d, blocks: %d", gndisks, blocks);
00750
00751 if (sector_size < cache_config_force_sector_size)
00752 sector_size = cache_config_force_sector_size;
00753 if (sd->hw_sector_size <= 0 || sector_size > STORE_BLOCK_SIZE) {
00754 Warning("bad hardware sector size %d, resetting to %d", sector_size, STORE_BLOCK_SIZE);
00755 sector_size = STORE_BLOCK_SIZE;
00756 }
00757 off_t skip = ROUND_TO_STORE_BLOCK((sd->offset < START_POS ? START_POS + sd->alignment : sd->offset));
00758 blocks = blocks - (skip >> STORE_BLOCK_SHIFT);
00759 #if AIO_MODE == AIO_MODE_NATIVE
00760 eventProcessor.schedule_imm(new DiskInit(gdisks[gndisks], path, blocks, skip, sector_size, fd, clear));
00761 #else
00762 gdisks[gndisks]->open(path, blocks, skip, sector_size, fd, clear);
00763 #endif
00764 fd = 0;
00765 gndisks++;
00766 }
00767 } else {
00768 if (errno == EINVAL)
00769 Warning("cache unable to open '%s': It must be placed on a file system that supports direct I/O.", path);
00770 else
00771 Warning("cache unable to open '%s': %s", path, strerror(errno));
00772 }
00773 if(fd > 0) {
00774 close(fd);
00775 }
00776 }
00777
00778 if (gndisks == 0) {
00779 Warning("unable to open cache disk(s): Cache Disabled\n");
00780 return -1;
00781 }
00782 start_done = 1;
00783
00784 return 0;
00785 }
00786
00787 void
00788 CacheProcessor::diskInitialized()
00789 {
00790 int n_init = ink_atomic_increment(&initialize_disk, 1);
00791 int bad_disks = 0;
00792 int res = 0;
00793 if (n_init == gndisks - 1) {
00794
00795 int i;
00796 for (i = 0; i < gndisks; i++) {
00797 if (DISK_BAD(gdisks[i]))
00798 bad_disks++;
00799 }
00800
00801 if (bad_disks != 0) {
00802
00803 CacheDisk **p_good_disks;
00804 if ((gndisks - bad_disks) > 0)
00805 p_good_disks = (CacheDisk **)ats_malloc((gndisks - bad_disks) * sizeof(CacheDisk *));
00806 else
00807 p_good_disks = 0;
00808
00809 int insert_at = 0;
00810 for (i = 0; i < gndisks; i++) {
00811 if (DISK_BAD(gdisks[i])) {
00812 delete gdisks[i];
00813 continue;
00814 }
00815 if (p_good_disks != NULL) {
00816 p_good_disks[insert_at++] = gdisks[i];
00817 }
00818 }
00819 ats_free(gdisks);
00820 gdisks = p_good_disks;
00821 gndisks = gndisks - bad_disks;
00822 }
00823
00824
00825
00826 if (config_volumes.num_volumes == 0) {
00827 res = cplist_reconfigure();
00828
00829 } else {
00830
00831
00832 cplist_init();
00833
00834 res = cplist_reconfigure();
00835 }
00836
00837 if (res == -1) {
00838
00839 gnvol = 0;
00840 cacheInitialized();
00841 return;
00842 } else {
00843 CacheVol *cp = cp_list.head;
00844 for (; cp; cp = cp->link.next) {
00845 cp->vol_rsb = RecAllocateRawStatBlock((int) cache_stat_count);
00846 char vol_stat_str_prefix[256];
00847 snprintf(vol_stat_str_prefix, sizeof(vol_stat_str_prefix), "proxy.process.cache.volume_%d", cp->vol_number);
00848 register_cache_stats(cp->vol_rsb, vol_stat_str_prefix);
00849 }
00850 }
00851
00852 gvol = (Vol **)ats_malloc(gnvol * sizeof(Vol *));
00853 memset(gvol, 0, gnvol * sizeof(Vol *));
00854 gnvol = 0;
00855 for (i = 0; i < gndisks; i++) {
00856 CacheDisk *d = gdisks[i];
00857 if (is_debug_tag_set("cache_hosting")) {
00858 int j;
00859 Debug("cache_hosting", "Disk: %d: Vol Blocks: %u: Free space: %" PRIu64,
00860 i, d->header->num_diskvol_blks, d->free_space);
00861 for (j = 0; j < (int) d->header->num_volumes; j++) {
00862 Debug("cache_hosting", "\tVol: %d Size: %" PRIu64, d->disk_vols[j]->vol_number, d->disk_vols[j]->size);
00863 }
00864 for (j = 0; j < (int) d->header->num_diskvol_blks; j++) {
00865 Debug("cache_hosting", "\tBlock No: %d Size: %" PRIu64" Free: %u",
00866 d->header->vol_info[j].number, d->header->vol_info[j].len, d->header->vol_info[j].free);
00867 }
00868 }
00869 d->sync();
00870 }
00871 if (config_volumes.num_volumes == 0) {
00872 theCache = new Cache();
00873 theCache->scheme = CACHE_HTTP_TYPE;
00874 theCache->open(clear, fix);
00875 return;
00876 }
00877 if (config_volumes.num_http_volumes != 0) {
00878 theCache = new Cache();
00879 theCache->scheme = CACHE_HTTP_TYPE;
00880 theCache->open(clear, fix);
00881 }
00882
00883 if (config_volumes.num_stream_volumes != 0) {
00884 theStreamCache = new Cache();
00885 theStreamCache->scheme = CACHE_RTSP_TYPE;
00886 theStreamCache->open(clear, fix);
00887 }
00888
00889 }
00890 }
00891
00892 void
00893 CacheProcessor::cacheInitialized()
00894 {
00895 int i;
00896
00897 if ((theCache && (theCache->ready == CACHE_INITIALIZING)) ||
00898 (theStreamCache && (theStreamCache->ready == CACHE_INITIALIZING)))
00899 return;
00900 int caches_ready = 0;
00901 int cache_init_ok = 0;
00902
00903
00904 int64_t total_size = 0;
00905 uint64_t total_cache_bytes = 0;
00906 uint64_t total_direntries = 0;
00907 uint64_t used_direntries = 0;
00908 uint64_t vol_total_cache_bytes = 0;
00909 uint64_t vol_total_direntries = 0;
00910 uint64_t vol_used_direntries = 0;
00911 Vol *vol;
00912
00913 ProxyMutex *mutex = this_ethread()->mutex;
00914
00915 if (theCache) {
00916 total_size += theCache->cache_size;
00917 Debug("cache_init", "CacheProcessor::cacheInitialized - theCache, total_size = %" PRId64 " = %" PRId64 " MB",
00918 total_size, total_size / ((1024 * 1024) / STORE_BLOCK_SIZE));
00919 }
00920 if (theStreamCache) {
00921 total_size += theStreamCache->cache_size;
00922 Debug("cache_init", "CacheProcessor::cacheInitialized - theStreamCache, total_size = %" PRId64 " = %" PRId64 " MB",
00923 total_size, total_size / ((1024 * 1024) / STORE_BLOCK_SIZE));
00924 }
00925
00926 if (theCache) {
00927 if (theCache->ready == CACHE_INIT_FAILED) {
00928 Debug("cache_init", "CacheProcessor::cacheInitialized - failed to initialize the cache for http: cache disabled");
00929 Warning("failed to initialize the cache for http: cache disabled\n");
00930 } else {
00931 caches_ready = caches_ready | (1 << CACHE_FRAG_TYPE_HTTP);
00932 caches_ready = caches_ready | (1 << CACHE_FRAG_TYPE_NONE);
00933 caches[CACHE_FRAG_TYPE_HTTP] = theCache;
00934 caches[CACHE_FRAG_TYPE_NONE] = theCache;
00935 }
00936 }
00937 if (theStreamCache) {
00938 if (theStreamCache->ready == CACHE_INIT_FAILED) {
00939 Debug("cache_init",
00940 "CacheProcessor::cacheInitialized - failed to initialize the cache for streaming: cache disabled");
00941 Warning("failed to initialize the cache for streaming: cache disabled\n");
00942 } else {
00943 caches_ready = caches_ready | (1 << CACHE_FRAG_TYPE_RTSP);
00944 caches[CACHE_FRAG_TYPE_RTSP] = theStreamCache;
00945 }
00946 }
00947
00948
00949 if (gnvol)
00950 cacheProcessor.min_stripe_version = cacheProcessor.max_stripe_version = gvol[0]->header->version;
00951
00952 for (i = 1; i < gnvol; i++) {
00953 Vol* v = gvol[i];
00954 if (v->header->version < cacheProcessor.min_stripe_version)
00955 cacheProcessor.min_stripe_version = v->header->version;
00956 if (cacheProcessor.max_stripe_version < v->header->version)
00957 cacheProcessor.max_stripe_version = v->header->version;
00958 }
00959
00960
00961 if (caches_ready) {
00962 Debug("cache_init", "CacheProcessor::cacheInitialized - caches_ready=0x%0X, gnvol=%d", (unsigned int) caches_ready,
00963 gnvol);
00964
00965 int64_t ram_cache_bytes = 0;
00966
00967 if (gnvol) {
00968
00969 for (i = 0; i < gnvol; i++) {
00970 switch (cache_config_ram_cache_algorithm) {
00971 default:
00972 case RAM_CACHE_ALGORITHM_CLFUS:
00973 gvol[i]->ram_cache = new_RamCacheCLFUS();
00974 break;
00975 case RAM_CACHE_ALGORITHM_LRU:
00976 gvol[i]->ram_cache = new_RamCacheLRU();
00977 break;
00978 }
00979 }
00980
00981 if (cache_config_ram_cache_size == AUTO_SIZE_RAM_CACHE) {
00982 Debug("cache_init", "CacheProcessor::cacheInitialized - cache_config_ram_cache_size == AUTO_SIZE_RAM_CACHE");
00983 for (i = 0; i < gnvol; i++) {
00984 vol = gvol[i];
00985 gvol[i]->ram_cache->init(vol_dirlen(vol) * DEFAULT_RAM_CACHE_MULTIPLIER, vol);
00986 #if TS_USE_INTERIM_CACHE == 1
00987 gvol[i]->history.init(1<<20, 2097143);
00988 #endif
00989 ram_cache_bytes += vol_dirlen(gvol[i]);
00990 Debug("cache_init", "CacheProcessor::cacheInitialized - ram_cache_bytes = %" PRId64 " = %" PRId64 "Mb",
00991 ram_cache_bytes, ram_cache_bytes / (1024 * 1024));
00992 CACHE_VOL_SUM_DYN_STAT(cache_ram_cache_bytes_total_stat, (int64_t) vol_dirlen(gvol[i]));
00993
00994 vol_total_cache_bytes = gvol[i]->len - vol_dirlen(gvol[i]);
00995 total_cache_bytes += vol_total_cache_bytes;
00996 Debug("cache_init", "CacheProcessor::cacheInitialized - total_cache_bytes = %" PRId64 " = %" PRId64 "Mb",
00997 total_cache_bytes, total_cache_bytes / (1024 * 1024));
00998
00999 CACHE_VOL_SUM_DYN_STAT(cache_bytes_total_stat, vol_total_cache_bytes);
01000
01001
01002 vol_total_direntries = gvol[i]->buckets * gvol[i]->segments * DIR_DEPTH;
01003 total_direntries += vol_total_direntries;
01004 CACHE_VOL_SUM_DYN_STAT(cache_direntries_total_stat, vol_total_direntries);
01005
01006
01007 vol_used_direntries = dir_entries_used(gvol[i]);
01008 CACHE_VOL_SUM_DYN_STAT(cache_direntries_used_stat, vol_used_direntries);
01009 used_direntries += vol_used_direntries;
01010 }
01011
01012 } else {
01013
01014
01015
01016 Debug("cache_init", "CacheProcessor::cacheInitialized - %" PRId64 " != AUTO_SIZE_RAM_CACHE",
01017 cache_config_ram_cache_size);
01018 int64_t http_ram_cache_size =
01019 (theCache) ? (int64_t) (((double) theCache->cache_size / total_size) * cache_config_ram_cache_size) : 0;
01020 Debug("cache_init", "CacheProcessor::cacheInitialized - http_ram_cache_size = %" PRId64 " = %" PRId64 "Mb",
01021 http_ram_cache_size, http_ram_cache_size / (1024 * 1024));
01022 int64_t stream_ram_cache_size = cache_config_ram_cache_size - http_ram_cache_size;
01023 Debug("cache_init", "CacheProcessor::cacheInitialized - stream_ram_cache_size = %" PRId64 " = %" PRId64 "Mb",
01024 stream_ram_cache_size, stream_ram_cache_size / (1024 * 1024));
01025
01026
01027 Debug("ram_cache", "config: size = %" PRId64 ", cutoff = %" PRId64 "",
01028 cache_config_ram_cache_size, cache_config_ram_cache_cutoff);
01029
01030 for (i = 0; i < gnvol; i++) {
01031 vol = gvol[i];
01032 double factor;
01033 if (gvol[i]->cache == theCache) {
01034 factor = (double) (int64_t) (gvol[i]->len >> STORE_BLOCK_SHIFT) / (int64_t) theCache->cache_size;
01035 Debug("cache_init", "CacheProcessor::cacheInitialized - factor = %f", factor);
01036 gvol[i]->ram_cache->init((int64_t) (http_ram_cache_size * factor), vol);
01037 ram_cache_bytes += (int64_t) (http_ram_cache_size * factor);
01038 CACHE_VOL_SUM_DYN_STAT(cache_ram_cache_bytes_total_stat, (int64_t) (http_ram_cache_size * factor));
01039 } else {
01040 factor = (double) (int64_t) (gvol[i]->len >> STORE_BLOCK_SHIFT) / (int64_t) theStreamCache->cache_size;
01041 Debug("cache_init", "CacheProcessor::cacheInitialized - factor = %f", factor);
01042 gvol[i]->ram_cache->init((int64_t) (stream_ram_cache_size * factor), vol);
01043 ram_cache_bytes += (int64_t) (stream_ram_cache_size * factor);
01044 CACHE_VOL_SUM_DYN_STAT(cache_ram_cache_bytes_total_stat, (int64_t) (stream_ram_cache_size * factor));
01045 }
01046 Debug("cache_init", "CacheProcessor::cacheInitialized[%d] - ram_cache_bytes = %" PRId64 " = %" PRId64 "Mb",
01047 i, ram_cache_bytes, ram_cache_bytes / (1024 * 1024));
01048 #if TS_USE_INTERIM_CACHE == 1
01049 gvol[i]->history.init(1<<20, 2097143);
01050 #endif
01051 vol_total_cache_bytes = gvol[i]->len - vol_dirlen(gvol[i]);
01052 total_cache_bytes += vol_total_cache_bytes;
01053 CACHE_VOL_SUM_DYN_STAT(cache_bytes_total_stat, vol_total_cache_bytes);
01054 Debug("cache_init", "CacheProcessor::cacheInitialized - total_cache_bytes = %" PRId64 " = %" PRId64 "Mb",
01055 total_cache_bytes, total_cache_bytes / (1024 * 1024));
01056
01057 vol_total_direntries = gvol[i]->buckets * gvol[i]->segments * DIR_DEPTH;
01058 total_direntries += vol_total_direntries;
01059 CACHE_VOL_SUM_DYN_STAT(cache_direntries_total_stat, vol_total_direntries);
01060
01061
01062 vol_used_direntries = dir_entries_used(gvol[i]);
01063 CACHE_VOL_SUM_DYN_STAT(cache_direntries_used_stat, vol_used_direntries);
01064 used_direntries += vol_used_direntries;
01065
01066 }
01067 }
01068 switch (cache_config_ram_cache_compress) {
01069 default:
01070 Fatal("unknown RAM cache compression type: %d", cache_config_ram_cache_compress);
01071 case CACHE_COMPRESSION_NONE:
01072 case CACHE_COMPRESSION_FASTLZ:
01073 break;
01074 case CACHE_COMPRESSION_LIBZ:
01075 #if ! TS_HAS_LIBZ
01076 Fatal("libz not available for RAM cache compression");
01077 #endif
01078 break;
01079 case CACHE_COMPRESSION_LIBLZMA:
01080 #if ! TS_HAS_LZMA
01081 Fatal("lzma not available for RAM cache compression");
01082 #endif
01083 break;
01084 }
01085
01086 GLOBAL_CACHE_SET_DYN_STAT(cache_ram_cache_bytes_total_stat, ram_cache_bytes);
01087 GLOBAL_CACHE_SET_DYN_STAT(cache_bytes_total_stat, total_cache_bytes);
01088 GLOBAL_CACHE_SET_DYN_STAT(cache_direntries_total_stat, total_direntries);
01089 GLOBAL_CACHE_SET_DYN_STAT(cache_direntries_used_stat, used_direntries);
01090 dir_sync_init();
01091 cache_init_ok = 1;
01092 } else
01093 Warning("cache unable to open any vols, disabled");
01094 }
01095 if (cache_init_ok) {
01096
01097 CacheProcessor::initialized = CACHE_INITIALIZED;
01098 CacheProcessor::cache_ready = caches_ready;
01099 Note("cache enabled");
01100 #ifdef CLUSTER_CACHE
01101 if (!(start_internal_flags & PROCESSOR_RECONFIGURE)) {
01102 CacheContinuation::init();
01103 clusterProcessor.start();
01104 }
01105 #endif
01106 } else {
01107 CacheProcessor::initialized = CACHE_INIT_FAILED;
01108 Note("cache disabled");
01109 }
01110
01111 if (cb_after_init)
01112 cb_after_init();
01113 }
01114
01115 void
01116 CacheProcessor::stop()
01117 {
01118 }
01119
01120 int
01121 CacheProcessor::dir_check(bool afix)
01122 {
01123 for (int i = 0; i < gnvol; i++)
01124 gvol[i]->dir_check(afix);
01125 return 0;
01126 }
01127
01128 int
01129 CacheProcessor::db_check(bool afix)
01130 {
01131 for (int i = 0; i < gnvol; i++)
01132 gvol[i]->db_check(afix);
01133 return 0;
01134 }
01135
01136 int
01137 Vol::db_check(bool )
01138 {
01139 char tt[256];
01140 printf(" Data for [%s]\n", hash_text.get());
01141 printf(" Length: %" PRIu64 "\n", (uint64_t)len);
01142 printf(" Write Position: %" PRIu64 "\n", (uint64_t) (header->write_pos - skip));
01143 printf(" Phase: %d\n", (int)!!header->phase);
01144 ink_ctime_r(&header->create_time, tt);
01145 tt[strlen(tt) - 1] = 0;
01146 printf(" Create Time: %s\n", tt);
01147 printf(" Sync Serial: %u\n", (unsigned int)header->sync_serial);
01148 printf(" Write Serial: %u\n", (unsigned int)header->write_serial);
01149 printf("\n");
01150
01151 return 0;
01152 }
01153
01154 static void
01155 vol_init_data_internal(Vol *d)
01156 {
01157 d->buckets = ((d->len - (d->start - d->skip)) / cache_config_min_average_object_size) / DIR_DEPTH;
01158 d->segments = (d->buckets + (((1<<16)-1)/DIR_DEPTH)) / ((1<<16)/DIR_DEPTH);
01159 d->buckets = (d->buckets + d->segments - 1) / d->segments;
01160 d->start = d->skip + 2 *vol_dirlen(d);
01161 }
01162
01163 static void
01164 vol_init_data(Vol *d) {
01165
01166 vol_init_data_internal(d);
01167 vol_init_data_internal(d);
01168 vol_init_data_internal(d);
01169 }
01170
01171 void
01172 vol_init_dir(Vol *d)
01173 {
01174 int b, s, l;
01175
01176 for (s = 0; s < d->segments; s++) {
01177 d->header->freelist[s] = 0;
01178 Dir *seg = dir_segment(s, d);
01179 for (l = 1; l < DIR_DEPTH; l++) {
01180 for (b = 0; b < d->buckets; b++) {
01181 Dir *bucket = dir_bucket(b, seg);
01182 dir_free_entry(dir_bucket_row(bucket, l), s, d);
01183 }
01184 }
01185 }
01186 }
01187
01188 #if TS_USE_INTERIM_CACHE == 1
01189 void
01190 interimvol_clear_init(InterimCacheVol *d)
01191 {
01192 memset(d->header, 0, sizeof(InterimVolHeaderFooter));
01193 d->header->magic = VOL_MAGIC;
01194 d->header->version.ink_major = CACHE_DB_MAJOR_VERSION;
01195 d->header->version.ink_minor = CACHE_DB_MINOR_VERSION;
01196 d->header->agg_pos = d->header->write_pos = d->start;
01197 d->header->last_write_pos = d->header->write_pos;
01198 d->header->phase = 0;
01199 d->header->cycle = 0;
01200 d->header->create_time = time(NULL);
01201 d->header->dirty = 0;
01202 d->sector_size = d->header->sector_size = d->disk->hw_sector_size;
01203 }
01204 #endif
01205
01206 void
01207 vol_clear_init(Vol *d)
01208 {
01209 size_t dir_len = vol_dirlen(d);
01210 memset(d->raw_dir, 0, dir_len);
01211 vol_init_dir(d);
01212 d->header->magic = VOL_MAGIC;
01213 d->header->version.ink_major = CACHE_DB_MAJOR_VERSION;
01214 d->header->version.ink_minor = CACHE_DB_MINOR_VERSION;
01215 d->scan_pos = d->header->agg_pos = d->header->write_pos = d->start;
01216 d->header->last_write_pos = d->header->write_pos;
01217 d->header->phase = 0;
01218 d->header->cycle = 0;
01219 d->header->create_time = time(NULL);
01220 d->header->dirty = 0;
01221 d->sector_size = d->header->sector_size = d->disk->hw_sector_size;
01222 *d->footer = *d->header;
01223
01224 #if TS_USE_INTERIM_CACHE == 1
01225 for (int i = 0; i < d->num_interim_vols; i++) {
01226 interimvol_clear_init(&(d->interim_vols[i]));
01227 }
01228 #endif
01229 }
01230
01231 int
01232 vol_dir_clear(Vol *d)
01233 {
01234 size_t dir_len = vol_dirlen(d);
01235 vol_clear_init(d);
01236
01237 if (pwrite(d->fd, d->raw_dir, dir_len, d->skip) < 0) {
01238 Warning("unable to clear cache directory '%s'", d->hash_text.get());
01239 return -1;
01240 }
01241 return 0;
01242 }
01243
01244 int
01245 Vol::clear_dir()
01246 {
01247 size_t dir_len = vol_dirlen(this);
01248 vol_clear_init(this);
01249
01250 SET_HANDLER(&Vol::handle_dir_clear);
01251
01252 io.aiocb.aio_fildes = fd;
01253 io.aiocb.aio_buf = raw_dir;
01254 io.aiocb.aio_nbytes = dir_len;
01255 io.aiocb.aio_offset = skip;
01256 io.action = this;
01257 io.thread = AIO_CALLBACK_THREAD_ANY;
01258 io.then = 0;
01259 ink_assert(ink_aio_write(&io));
01260 return 0;
01261 }
01262
01263 int
01264 Vol::init(char *s, off_t blocks, off_t dir_skip, bool clear)
01265 {
01266 char* seed_str = disk->hash_base_string ? disk->hash_base_string : s;
01267 const size_t hash_seed_size = strlen(seed_str);
01268 const size_t hash_text_size = hash_seed_size + 32;
01269
01270 hash_text = static_cast<char *>(ats_malloc(hash_text_size));
01271 ink_strlcpy(hash_text, seed_str, hash_text_size);
01272 snprintf(hash_text + hash_seed_size, (hash_text_size - hash_seed_size), " %" PRIu64 ":%" PRIu64 "",
01273 (uint64_t)dir_skip, (uint64_t)blocks);
01274 MD5Context().hash_immediate(hash_id, hash_text, strlen(hash_text));
01275
01276 dir_skip = ROUND_TO_STORE_BLOCK((dir_skip < START_POS ? START_POS : dir_skip));
01277 path = ats_strdup(s);
01278 len = blocks * STORE_BLOCK_SIZE;
01279 ink_assert(len <= MAX_VOL_SIZE);
01280 skip = dir_skip;
01281 prev_recover_pos = 0;
01282
01283
01284 start = dir_skip;
01285 vol_init_data(this);
01286 data_blocks = (len - (start - skip)) / STORE_BLOCK_SIZE;
01287 hit_evacuate_window = (data_blocks * cache_config_hit_evacuate_percent) / 100;
01288
01289 evacuate_size = (int) (len / EVACUATION_BUCKET_SIZE) + 2;
01290 int evac_len = (int) evacuate_size * sizeof(DLL<EvacuationBlock>);
01291 evacuate = (DLL<EvacuationBlock> *)ats_malloc(evac_len);
01292 memset(evacuate, 0, evac_len);
01293
01294 Debug("cache_init", "allocating %zu directory bytes for a %lld byte volume (%lf%%)",
01295 vol_dirlen(this), (long long)this->len, (double)vol_dirlen(this) / (double)this->len * 100.0);
01296 raw_dir = (char *)ats_memalign(ats_pagesize(), vol_dirlen(this));
01297 dir = (Dir *) (raw_dir + vol_headerlen(this));
01298 header = (VolHeaderFooter *) raw_dir;
01299 footer = (VolHeaderFooter *) (raw_dir + vol_dirlen(this) - ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter)));
01300
01301 #if TS_USE_INTERIM_CACHE == 1
01302 num_interim_vols = good_interim_disks;
01303 ink_assert(num_interim_vols >= 0 && num_interim_vols <= 8);
01304 for (int i = 0; i < num_interim_vols; i++) {
01305 double r = (double) blocks / total_cache_size;
01306 off_t vlen = off_t (r * g_interim_disks[i]->len * STORE_BLOCK_SIZE);
01307 vlen = (vlen / STORE_BLOCK_SIZE) * STORE_BLOCK_SIZE;
01308 off_t start = ink_atomic_increment(&g_interim_disks[i]->skip, vlen);
01309 interim_vols[i].init(start, vlen, g_interim_disks[i], this, &(this->header->interim_header[i]));
01310 ink_assert(interim_vols[i].start + interim_vols[i].len <= g_interim_disks[i]->len * STORE_BLOCK_SIZE);
01311 }
01312 #endif
01313
01314 if (clear) {
01315 Note("clearing cache directory '%s'", hash_text.get());
01316 return clear_dir();
01317 }
01318
01319 init_info = new VolInitInfo();
01320 int footerlen = ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter));
01321 off_t footer_offset = vol_dirlen(this) - footerlen;
01322
01323 off_t as = skip;
01324 if (is_debug_tag_set("cache_init"))
01325 Note("reading directory '%s'", hash_text.get());
01326 SET_HANDLER(&Vol::handle_header_read);
01327 init_info->vol_aio[0].aiocb.aio_offset = as;
01328 init_info->vol_aio[1].aiocb.aio_offset = as + footer_offset;
01329 off_t bs = skip + vol_dirlen(this);
01330 init_info->vol_aio[2].aiocb.aio_offset = bs;
01331 init_info->vol_aio[3].aiocb.aio_offset = bs + footer_offset;
01332
01333 for (unsigned i = 0; i < countof(init_info->vol_aio); i++) {
01334 AIOCallback *aio = &(init_info->vol_aio[i]);
01335 aio->aiocb.aio_fildes = fd;
01336 aio->aiocb.aio_buf = &(init_info->vol_h_f[i * STORE_BLOCK_SIZE]);
01337 aio->aiocb.aio_nbytes = footerlen;
01338 aio->action = this;
01339 aio->thread = AIO_CALLBACK_THREAD_ANY;
01340 aio->then = (i < 3) ? &(init_info->vol_aio[i + 1]) : 0;
01341 }
01342 #if AIO_MODE == AIO_MODE_NATIVE
01343 ink_assert(ink_aio_readv(init_info->vol_aio));
01344 #else
01345 ink_assert(ink_aio_read(init_info->vol_aio));
01346 #endif
01347 return 0;
01348 }
01349
01350 int
01351 Vol::handle_dir_clear(int event, void *data)
01352 {
01353 size_t dir_len = vol_dirlen(this);
01354 AIOCallback *op;
01355
01356 if (event == AIO_EVENT_DONE) {
01357 op = (AIOCallback *) data;
01358 if ((size_t) op->aio_result != (size_t) op->aiocb.aio_nbytes) {
01359 Warning("unable to clear cache directory '%s'", hash_text.get());
01360 fd = -1;
01361 }
01362
01363 if (op->aiocb.aio_nbytes == dir_len) {
01364
01365
01366
01367 op->aiocb.aio_nbytes = ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter));
01368 op->aiocb.aio_offset = skip + dir_len;
01369 ink_assert(ink_aio_write(op));
01370 return EVENT_DONE;
01371 }
01372 set_io_not_in_progress();
01373 SET_HANDLER(&Vol::dir_init_done);
01374 dir_init_done(EVENT_IMMEDIATE, 0);
01375
01376 }
01377 return EVENT_DONE;
01378 }
01379
01380 int
01381 Vol::handle_dir_read(int event, void *data)
01382 {
01383 AIOCallback *op = (AIOCallback *) data;
01384
01385 if (event == AIO_EVENT_DONE) {
01386 if ((size_t) op->aio_result != (size_t) op->aiocb.aio_nbytes) {
01387 clear_dir();
01388 return EVENT_DONE;
01389 }
01390 }
01391
01392 if (!(header->magic == VOL_MAGIC && footer->magic == VOL_MAGIC &&
01393 CACHE_DB_MAJOR_VERSION_COMPATIBLE <= header->version.ink_major && header->version.ink_major <= CACHE_DB_MAJOR_VERSION
01394 )) {
01395 Warning("bad footer in cache directory for '%s', clearing", hash_text.get());
01396 Note("clearing cache directory '%s'", hash_text.get());
01397 clear_dir();
01398 return EVENT_DONE;
01399 }
01400 CHECK_DIR(this);
01401
01402 sector_size = header->sector_size;
01403
01404 #if TS_USE_INTERIM_CACHE == 1
01405 if (num_interim_vols > 0) {
01406 interim_done = 0;
01407 for (int i = 0; i < num_interim_vols; i++) {
01408 interim_vols[i].recover_data();
01409 }
01410 } else {
01411 #endif
01412
01413 return this->recover_data();
01414
01415 #if TS_USE_INTERIM_CACHE == 1
01416 }
01417 #endif
01418
01419 return EVENT_CONT;
01420 }
01421
01422 int
01423 Vol::recover_data()
01424 {
01425 SET_HANDLER(&Vol::handle_recover_from_data);
01426 return handle_recover_from_data(EVENT_IMMEDIATE, 0);
01427 }
01428
01429
01430
01431
01432
01433
01434
01435
01436
01437
01438
01439
01440
01441
01442
01443
01444
01445
01446
01447
01448
01449
01450
01451
01452
01453
01454
01455
01456
01457
01458
01459
01460
01461
01462
01463
01464
01465 int
01466 Vol::handle_recover_from_data(int event, void * )
01467 {
01468 uint32_t got_len = 0;
01469 uint32_t max_sync_serial = header->sync_serial;
01470 char *s, *e;
01471 if (event == EVENT_IMMEDIATE) {
01472 if (header->sync_serial == 0) {
01473 io.aiocb.aio_buf = NULL;
01474 SET_HANDLER(&Vol::handle_recover_write_dir);
01475 return handle_recover_write_dir(EVENT_IMMEDIATE, 0);
01476 }
01477
01478 recover_wrapped = 0;
01479 last_sync_serial = 0;
01480 last_write_serial = 0;
01481 recover_pos = header->last_write_pos;
01482 if (recover_pos >= skip + len) {
01483 recover_wrapped = 1;
01484 recover_pos = start;
01485 }
01486 io.aiocb.aio_buf = (char *)ats_memalign(ats_pagesize(), RECOVERY_SIZE);
01487 io.aiocb.aio_nbytes = RECOVERY_SIZE;
01488 if ((off_t)(recover_pos + io.aiocb.aio_nbytes) > (off_t)(skip + len))
01489 io.aiocb.aio_nbytes = (skip + len) - recover_pos;
01490 } else if (event == AIO_EVENT_DONE) {
01491 if ((size_t) io.aiocb.aio_nbytes != (size_t) io.aio_result) {
01492 Warning("disk read error on recover '%s', clearing", hash_text.get());
01493 goto Lclear;
01494 }
01495 if (io.aiocb.aio_offset == header->last_write_pos) {
01496
01497
01498
01499
01500
01501
01502 uint32_t to_check = header->write_pos - header->last_write_pos;
01503 ink_assert(to_check && to_check < (uint32_t)io.aiocb.aio_nbytes);
01504 uint32_t done = 0;
01505 s = (char *) io.aiocb.aio_buf;
01506 while (done < to_check) {
01507 Doc *doc = (Doc *) (s + done);
01508 if (doc->magic != DOC_MAGIC || doc->write_serial > header->write_serial) {
01509 Warning("no valid directory found while recovering '%s', clearing", hash_text.get());
01510 goto Lclear;
01511 }
01512 done += round_to_approx_size(doc->len);
01513 if (doc->sync_serial > last_write_serial)
01514 last_sync_serial = doc->sync_serial;
01515 }
01516 ink_assert(done == to_check);
01517
01518 got_len = io.aiocb.aio_nbytes - done;
01519 recover_pos += io.aiocb.aio_nbytes;
01520 s = (char *) io.aiocb.aio_buf + done;
01521 e = s + got_len;
01522 } else {
01523 got_len = io.aiocb.aio_nbytes;
01524 recover_pos += io.aiocb.aio_nbytes;
01525 s = (char *) io.aiocb.aio_buf;
01526 e = s + got_len;
01527 }
01528 }
01529
01530 if (got_len) {
01531
01532 Doc *doc = NULL;
01533
01534 if (recover_wrapped && start == io.aiocb.aio_offset) {
01535 doc = (Doc *) s;
01536 if (doc->magic != DOC_MAGIC || doc->write_serial < last_write_serial) {
01537 recover_pos = skip + len - EVACUATION_SIZE;
01538 goto Ldone;
01539 }
01540 }
01541
01542 while (s < e) {
01543 doc = (Doc *) s;
01544
01545 if (doc->magic != DOC_MAGIC || doc->sync_serial != last_sync_serial) {
01546
01547 if (doc->magic == DOC_MAGIC) {
01548 if (doc->sync_serial > header->sync_serial)
01549 max_sync_serial = doc->sync_serial;
01550
01551
01552
01553
01554
01555
01556
01557
01558
01559
01560
01561
01562
01563
01564
01565
01566
01567
01568
01569
01570
01571
01572
01573
01574
01575
01576
01577
01578 if (doc->sync_serial > last_sync_serial && doc->sync_serial <= header->sync_serial + 1) {
01579 last_sync_serial = doc->sync_serial;
01580 s += round_to_approx_size(doc->len);
01581 continue;
01582 }
01583
01584
01585
01586
01587 else if (recover_pos - (e - s) > (skip + len) - AGG_SIZE) {
01588 recover_wrapped = 1;
01589 recover_pos = start;
01590 io.aiocb.aio_nbytes = RECOVERY_SIZE;
01591
01592 break;
01593 }
01594
01595 recover_pos -= e - s;
01596 goto Ldone;
01597 } else {
01598
01599
01600
01601 recover_pos -= e - s;
01602 if (recover_pos > (skip + len) - AGG_SIZE) {
01603 recover_wrapped = 1;
01604 recover_pos = start;
01605 io.aiocb.aio_nbytes = RECOVERY_SIZE;
01606
01607 break;
01608 }
01609
01610 goto Ldone;
01611 }
01612 }
01613
01614 last_write_serial = doc->write_serial;
01615 s += round_to_approx_size(doc->len);
01616 }
01617
01618
01619
01620 if (s >= e) {
01621
01622
01623 if (s > e)
01624 s -= round_to_approx_size(doc->len);
01625 recover_pos -= e - s;
01626 if (recover_pos >= skip + len)
01627 recover_pos = start;
01628 io.aiocb.aio_nbytes = RECOVERY_SIZE;
01629 if ((off_t)(recover_pos + io.aiocb.aio_nbytes) > (off_t)(skip + len))
01630 io.aiocb.aio_nbytes = (skip + len) - recover_pos;
01631 }
01632 }
01633 if (recover_pos == prev_recover_pos)
01634 goto Lclear;
01635 prev_recover_pos = recover_pos;
01636 io.aiocb.aio_offset = recover_pos;
01637 ink_assert(ink_aio_read(&io));
01638 return EVENT_CONT;
01639
01640 Ldone:{
01641
01642 if (recover_pos == header->write_pos && recover_wrapped) {
01643 SET_HANDLER(&Vol::handle_recover_write_dir);
01644 if (is_debug_tag_set("cache_init"))
01645 Note("recovery wrapped around. nothing to clear\n");
01646 return handle_recover_write_dir(EVENT_IMMEDIATE, 0);
01647 }
01648
01649 recover_pos += EVACUATION_SIZE;
01650 if (recover_pos < header->write_pos && (recover_pos + EVACUATION_SIZE >= header->write_pos)) {
01651 Debug("cache_init", "Head Pos: %" PRIu64 ", Rec Pos: %" PRIu64 ", Wrapped:%d", header->write_pos, recover_pos, recover_wrapped);
01652 Warning("no valid directory found while recovering '%s', clearing", hash_text.get());
01653 goto Lclear;
01654 }
01655
01656 if (recover_pos > skip + len)
01657 recover_pos -= skip + len;
01658
01659 uint32_t next_sync_serial = max_sync_serial + 1;
01660
01661 if (!(header->sync_serial & 1) == !(next_sync_serial & 1))
01662 next_sync_serial++;
01663
01664 off_t clear_start = offset_to_vol_offset(this, header->write_pos);
01665 off_t clear_end = offset_to_vol_offset(this, recover_pos);
01666 if (clear_start <= clear_end)
01667 dir_clear_range(clear_start, clear_end, this);
01668 else {
01669 dir_clear_range(clear_end, DIR_OFFSET_MAX, this);
01670 dir_clear_range(1, clear_start, this);
01671 }
01672 if (is_debug_tag_set("cache_init"))
01673 Note("recovery clearing offsets [%" PRIu64 ", %" PRIu64 "] sync_serial %d next %d\n",
01674 header->write_pos, recover_pos, header->sync_serial, next_sync_serial);
01675 footer->sync_serial = header->sync_serial = next_sync_serial;
01676
01677 for (int i = 0; i < 3; i++) {
01678 AIOCallback *aio = &(init_info->vol_aio[i]);
01679 aio->aiocb.aio_fildes = fd;
01680 aio->action = this;
01681 aio->thread = AIO_CALLBACK_THREAD_ANY;
01682 aio->then = (i < 2) ? &(init_info->vol_aio[i + 1]) : 0;
01683 }
01684 int footerlen = ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter));
01685 size_t dirlen = vol_dirlen(this);
01686 int B = header->sync_serial & 1;
01687 off_t ss = skip + (B ? dirlen : 0);
01688
01689 init_info->vol_aio[0].aiocb.aio_buf = raw_dir;
01690 init_info->vol_aio[0].aiocb.aio_nbytes = footerlen;
01691 init_info->vol_aio[0].aiocb.aio_offset = ss;
01692 init_info->vol_aio[1].aiocb.aio_buf = raw_dir + footerlen;
01693 init_info->vol_aio[1].aiocb.aio_nbytes = dirlen - 2 * footerlen;
01694 init_info->vol_aio[1].aiocb.aio_offset = ss + footerlen;
01695 init_info->vol_aio[2].aiocb.aio_buf = raw_dir + dirlen - footerlen;
01696 init_info->vol_aio[2].aiocb.aio_nbytes = footerlen;
01697 init_info->vol_aio[2].aiocb.aio_offset = ss + dirlen - footerlen;
01698
01699 SET_HANDLER(&Vol::handle_recover_write_dir);
01700 #if AIO_MODE == AIO_MODE_NATIVE
01701 ink_assert(ink_aio_writev(init_info->vol_aio));
01702 #else
01703 ink_assert(ink_aio_write(init_info->vol_aio));
01704 #endif
01705 return EVENT_CONT;
01706 }
01707
01708 Lclear:
01709 free((char *) io.aiocb.aio_buf);
01710 delete init_info;
01711 init_info = 0;
01712 clear_dir();
01713 return EVENT_CONT;
01714 }
01715
01716 int
01717 Vol::handle_recover_write_dir(int , void * )
01718 {
01719 if (io.aiocb.aio_buf)
01720 free((char *) io.aiocb.aio_buf);
01721 delete init_info;
01722 init_info = 0;
01723 set_io_not_in_progress();
01724 scan_pos = header->write_pos;
01725 periodic_scan();
01726 SET_HANDLER(&Vol::dir_init_done);
01727 return dir_init_done(EVENT_IMMEDIATE, 0);
01728 }
01729
01730 int
01731 Vol::handle_header_read(int event, void *data)
01732 {
01733 AIOCallback *op;
01734 VolHeaderFooter *hf[4];
01735 switch (event) {
01736 case AIO_EVENT_DONE:
01737 op = (AIOCallback *) data;
01738 for (int i = 0; i < 4; i++) {
01739 ink_assert(op != 0);
01740 hf[i] = (VolHeaderFooter *) (op->aiocb.aio_buf);
01741 if ((size_t) op->aio_result != (size_t) op->aiocb.aio_nbytes) {
01742 clear_dir();
01743 return EVENT_DONE;
01744 }
01745 op = op->then;
01746 }
01747
01748 io.aiocb.aio_fildes = fd;
01749 io.aiocb.aio_nbytes = vol_dirlen(this);
01750 io.aiocb.aio_buf = raw_dir;
01751 io.action = this;
01752 io.thread = AIO_CALLBACK_THREAD_ANY;
01753 io.then = 0;
01754
01755 if (hf[0]->sync_serial == hf[1]->sync_serial &&
01756 (hf[0]->sync_serial >= hf[2]->sync_serial || hf[2]->sync_serial != hf[3]->sync_serial)) {
01757 SET_HANDLER(&Vol::handle_dir_read);
01758 if (is_debug_tag_set("cache_init"))
01759 Note("using directory A for '%s'", hash_text.get());
01760 io.aiocb.aio_offset = skip;
01761 ink_assert(ink_aio_read(&io));
01762 }
01763
01764 else if (hf[2]->sync_serial == hf[3]->sync_serial) {
01765
01766 SET_HANDLER(&Vol::handle_dir_read);
01767 if (is_debug_tag_set("cache_init"))
01768 Note("using directory B for '%s'", hash_text.get());
01769 io.aiocb.aio_offset = skip + vol_dirlen(this);
01770 ink_assert(ink_aio_read(&io));
01771 } else {
01772 Note("no good directory, clearing '%s'", hash_text.get());
01773 clear_dir();
01774 delete init_info;
01775 init_info = 0;
01776 }
01777 return EVENT_DONE;
01778 default:
01779 ink_assert(!"not reach here");
01780 }
01781 return EVENT_DONE;
01782 }
01783
01784 int
01785 Vol::dir_init_done(int , void * )
01786 {
01787 if (!cache->cache_read_done) {
01788 eventProcessor.schedule_in(this, HRTIME_MSECONDS(5), ET_CALL);
01789 return EVENT_CONT;
01790 } else {
01791 int vol_no = ink_atomic_increment(&gnvol, 1);
01792 ink_assert(!gvol[vol_no]);
01793 gvol[vol_no] = this;
01794 SET_HANDLER(&Vol::aggWrite);
01795 if (fd == -1)
01796 cache->vol_initialized(0);
01797 else
01798 cache->vol_initialized(1);
01799 return EVENT_DONE;
01800 }
01801 }
01802
01803 #if TS_USE_INTERIM_CACHE == 1
01804 int
01805 InterimCacheVol::recover_data()
01806 {
01807 io.aiocb.aio_fildes = fd;
01808 io.action = this;
01809 io.thread = AIO_CALLBACK_THREAD_ANY;
01810 io.then = 0;
01811
01812 SET_HANDLER(&InterimCacheVol::handle_recover_from_data);
01813 return handle_recover_from_data(EVENT_IMMEDIATE, 0);
01814 }
01815
01816 int
01817 InterimCacheVol::handle_recover_from_data(int event, void *data)
01818 {
01819 (void)data;
01820 uint32_t got_len = 0;
01821 uint32_t max_sync_serial = header->sync_serial;
01822 char *s, *e;
01823 int ndone, offset;
01824
01825 if (event == EVENT_IMMEDIATE) {
01826 if (header->magic != VOL_MAGIC || header->version.ink_major != CACHE_DB_MAJOR_VERSION) {
01827 Warning("bad header in cache directory for '%s', clearing", hash_text.get());
01828 goto Lclear;
01829 } else if (header->sync_serial == 0) {
01830 io.aiocb.aio_buf = NULL;
01831 goto Lfinish;
01832 }
01833
01834
01835 recover_wrapped = 0;
01836 last_sync_serial = 0;
01837 last_write_serial = 0;
01838 recover_pos = header->last_write_pos;
01839 if (recover_pos >= skip + len) {
01840 recover_wrapped = 1;
01841 recover_pos = start;
01842 }
01843
01844 io.aiocb.aio_buf = (char *)ats_memalign(sysconf(_SC_PAGESIZE), RECOVERY_SIZE);
01845 io.aiocb.aio_nbytes = RECOVERY_SIZE;
01846 if ((off_t)(recover_pos + io.aiocb.aio_nbytes) > (off_t)(skip + len))
01847 io.aiocb.aio_nbytes = (skip + len) - recover_pos;
01848
01849 } else if (event == AIO_EVENT_DONE) {
01850 if ((size_t) io.aiocb.aio_nbytes != (size_t) io.aio_result) {
01851 Warning("disk read error on recover '%s', clearing", hash_text.get());
01852 goto Lclear;
01853 }
01854
01855 if (io.aiocb.aio_offset == header->last_write_pos) {
01856 uint32_t to_check = header->write_pos - header->last_write_pos;
01857 ink_assert(to_check && to_check < (uint32_t)io.aiocb.aio_nbytes);
01858 uint32_t done = 0;
01859 s = (char *) io.aiocb.aio_buf;
01860 while (done < to_check) {
01861 Doc *doc = (Doc *) (s + done);
01862 if (doc->magic != DOC_MAGIC || doc->write_serial > header->write_serial) {
01863 Warning("no valid directory found while recovering '%s', clearing", hash_text.get());
01864 goto Lclear;
01865 }
01866 done += round_to_approx_size(doc->len);
01867 if (doc->sync_serial > last_write_serial)
01868 last_sync_serial = doc->sync_serial;
01869 }
01870 ink_assert(done == to_check);
01871
01872 got_len = io.aiocb.aio_nbytes - done;
01873 recover_pos += io.aiocb.aio_nbytes;
01874 s = (char *) io.aiocb.aio_buf + done;
01875 e = s + got_len;
01876 } else {
01877 got_len = io.aiocb.aio_nbytes;
01878 recover_pos += io.aiocb.aio_nbytes;
01879 s = (char *) io.aiocb.aio_buf;
01880 e = s + got_len;
01881 }
01882 }
01883
01884
01885 if (got_len) {
01886
01887 Doc *doc = NULL;
01888
01889 if (recover_wrapped && start == io.aiocb.aio_offset) {
01890 doc = (Doc *) s;
01891 if (doc->magic != DOC_MAGIC || doc->write_serial < last_write_serial) {
01892 recover_pos = skip + len - EVACUATION_SIZE;
01893 goto Ldone;
01894 }
01895 }
01896
01897 while (s < e) {
01898 doc = (Doc *) s;
01899
01900 if (doc->magic != DOC_MAGIC || doc->sync_serial != last_sync_serial) {
01901
01902 if (doc->magic == DOC_MAGIC) {
01903 if (doc->sync_serial > header->sync_serial)
01904 max_sync_serial = doc->sync_serial;
01905
01906 if (doc->sync_serial > last_sync_serial && doc->sync_serial <= header->sync_serial + 1) {
01907 last_sync_serial = doc->sync_serial;
01908 s += round_to_approx_size(doc->len);
01909 continue;
01910
01911 } else if (recover_pos - (e - s) > (skip + len) - AGG_SIZE) {
01912 recover_wrapped = 1;
01913 recover_pos = start;
01914 io.aiocb.aio_nbytes = RECOVERY_SIZE;
01915 break;
01916 }
01917
01918 recover_pos -= e - s;
01919 goto Ldone;
01920
01921 } else {
01922 recover_pos -= e - s;
01923 if (recover_pos > (skip + len) - AGG_SIZE) {
01924 recover_wrapped = 1;
01925 recover_pos = start;
01926 io.aiocb.aio_nbytes = RECOVERY_SIZE;
01927 break;
01928 }
01929
01930 goto Ldone;
01931 }
01932 }
01933
01934 last_write_serial = doc->write_serial;
01935 s += round_to_approx_size(doc->len);
01936 }
01937
01938 if (s >= e) {
01939
01940 if (s > e)
01941 s -= round_to_approx_size(doc->len);
01942
01943 recover_pos -= e - s;
01944 if (recover_pos >= skip + len)
01945 recover_pos = start;
01946
01947 io.aiocb.aio_nbytes = RECOVERY_SIZE;
01948 if ((off_t)(recover_pos + io.aiocb.aio_nbytes) > (off_t)(skip + len))
01949 io.aiocb.aio_nbytes = (skip + len) - recover_pos;
01950 }
01951 }
01952
01953 if (recover_pos == prev_recover_pos)
01954 goto Lclear;
01955
01956 prev_recover_pos = recover_pos;
01957 io.aiocb.aio_offset = recover_pos;
01958 ink_assert(ink_aio_read(&io));
01959 return EVENT_CONT;
01960
01961 Ldone: {
01962
01963 if (recover_pos == header->write_pos && recover_wrapped) {
01964 goto Lfinish;
01965 }
01966
01967 recover_pos += EVACUATION_SIZE;
01968 if (recover_pos < header->write_pos && (recover_pos + EVACUATION_SIZE >= header->write_pos)) {
01969 Debug("cache_init", "Head Pos: %" PRIu64 ", Rec Pos: %" PRIu64 ", Wrapped:%d", header->write_pos, recover_pos, recover_wrapped);
01970 Warning("no valid directory found while recovering '%s', clearing", hash_text.get());
01971 goto Lclear;
01972 }
01973
01974 if (recover_pos > skip + len)
01975 recover_pos -= skip + len;
01976
01977 uint32_t next_sync_serial = max_sync_serial + 1;
01978 if (!(header->sync_serial & 1) == !(next_sync_serial & 1))
01979 next_sync_serial++;
01980
01981 off_t clear_start = offset_to_vol_offset(this, header->write_pos);
01982 off_t clear_end = offset_to_vol_offset(this, recover_pos);
01983
01984 if (clear_start <= clear_end)
01985 dir_clean_range_interimvol(clear_start, clear_end, this);
01986 else {
01987 dir_clean_range_interimvol(clear_end, DIR_OFFSET_MAX, this);
01988 dir_clean_range_interimvol(1, clear_start, this);
01989 }
01990
01991 header->sync_serial = next_sync_serial;
01992
01993 goto Lfinish;
01994 }
01995
01996 Lclear:
01997
01998 interimvol_clear_init(this);
01999 offset = this - vol->interim_vols;
02000 clear_interimvol_dir(vol, offset);
02001
02002 Lfinish:
02003
02004 free((char*)io.aiocb.aio_buf);
02005 io.aiocb.aio_buf = NULL;
02006
02007 set_io_not_in_progress();
02008
02009 ndone = ink_atomic_increment(&vol->interim_done, 1);
02010 if (ndone == vol->num_interim_vols - 1) {
02011 return vol->recover_data();
02012 }
02013
02014 return EVENT_CONT;
02015 }
02016 #endif
02017
02018
02019 struct rtable_pair {
02020 unsigned int rval;
02021 unsigned int idx;
02022 };
02023
02024
02025
02026 static int
02027 cmprtable(const void *aa, const void *bb) {
02028 rtable_pair *a = (rtable_pair*)aa;
02029 rtable_pair *b = (rtable_pair*)bb;
02030 if (a->rval < b->rval) return -1;
02031 if (a->rval > b->rval) return 1;
02032 return 0;
02033 }
02034
02035 void
02036 build_vol_hash_table(CacheHostRecord *cp)
02037 {
02038 int num_vols = cp->num_vols;
02039 unsigned int *mapping = (unsigned int *)ats_malloc(sizeof(unsigned int) * num_vols);
02040 Vol **p = (Vol **)ats_malloc(sizeof(Vol *) * num_vols);
02041
02042 memset(mapping, 0, num_vols * sizeof(unsigned int));
02043 memset(p, 0, num_vols * sizeof(Vol *));
02044 uint64_t total = 0;
02045 int bad_vols = 0;
02046 int map = 0;
02047 uint64_t used = 0;
02048
02049 for (int i = 0; i < num_vols; i++) {
02050 if (DISK_BAD(cp->vols[i]->disk)) {
02051 bad_vols++;
02052 continue;
02053 }
02054 mapping[map] = i;
02055 p[map++] = cp->vols[i];
02056 total += (cp->vols[i]->len >> STORE_BLOCK_SHIFT);
02057 }
02058
02059 num_vols -= bad_vols;
02060
02061 if (!num_vols) {
02062
02063 if (cp->vol_hash_table) {
02064 new_Freer(cp->vol_hash_table, CACHE_MEM_FREE_TIMEOUT);
02065 }
02066 cp->vol_hash_table = NULL;
02067 ats_free(mapping);
02068 ats_free(p);
02069 return;
02070 }
02071
02072 unsigned int *forvol = (unsigned int *) ats_malloc(sizeof(unsigned int) * num_vols);
02073 unsigned int *gotvol = (unsigned int *) ats_malloc(sizeof(unsigned int) * num_vols);
02074 unsigned int *rnd = (unsigned int *) ats_malloc(sizeof(unsigned int) * num_vols);
02075 unsigned short *ttable = (unsigned short *)ats_malloc(sizeof(unsigned short) * VOL_HASH_TABLE_SIZE);
02076 unsigned short *old_table;
02077 unsigned int *rtable_entries = (unsigned int *) ats_malloc(sizeof(unsigned int) * num_vols);
02078 unsigned int rtable_size = 0;
02079
02080
02081 for (int i = 0; i < num_vols; i++) {
02082 forvol[i] = (VOL_HASH_TABLE_SIZE * (p[i]->len >> STORE_BLOCK_SHIFT)) / total;
02083 used += forvol[i];
02084 rtable_entries[i] = p[i]->len / VOL_HASH_ALLOC_SIZE;
02085 rtable_size += rtable_entries[i];
02086 gotvol[i] = 0;
02087 }
02088
02089 int extra = VOL_HASH_TABLE_SIZE - used;
02090 for (int i = 0; i < extra; i++)
02091 forvol[i % num_vols]++;
02092
02093 for (int i = 0; i < num_vols; i++) {
02094 uint64_t x = p[i]->hash_id.fold();
02095 rnd[i] = (unsigned int) x;
02096 }
02097
02098 for (int i = 0; i < VOL_HASH_TABLE_SIZE; i++)
02099 ttable[i] = VOL_HASH_EMPTY;
02100
02101 rtable_pair *rtable = (rtable_pair *)ats_malloc(sizeof(rtable_pair) * rtable_size);
02102 int rindex = 0;
02103 for (int i = 0; i < num_vols; i++)
02104 for (int j = 0; j < (int)rtable_entries[i]; j++) {
02105 rtable[rindex].rval = next_rand(&rnd[i]);
02106 rtable[rindex].idx = i;
02107 rindex++;
02108 }
02109 ink_assert(rindex == (int)rtable_size);
02110
02111 qsort(rtable, rtable_size, sizeof(rtable_pair), cmprtable);
02112 unsigned int width = (1LL << 32) / VOL_HASH_TABLE_SIZE;
02113 unsigned int pos;
02114
02115 int i = 0;
02116 for (int j = 0; j < VOL_HASH_TABLE_SIZE; j++) {
02117 pos = width / 2 + j * width;
02118 while (pos > rtable[i].rval && i < (int)rtable_size - 1) i++;
02119 ttable[j] = mapping[rtable[i].idx];
02120 gotvol[rtable[i].idx]++;
02121 }
02122 for (int i = 0; i < num_vols; i++) {
02123 Debug("cache_init", "build_vol_hash_table index %d mapped to %d requested %d got %d", i, mapping[i], forvol[i], gotvol[i]);
02124 }
02125
02126 if (0 != (old_table = ink_atomic_swap(&(cp->vol_hash_table), ttable)))
02127 new_Freer(old_table, CACHE_MEM_FREE_TIMEOUT);
02128 ats_free(mapping);
02129 ats_free(p);
02130 ats_free(forvol);
02131 ats_free(gotvol);
02132 ats_free(rnd);
02133 ats_free(rtable_entries);
02134 ats_free(rtable);
02135 }
02136
02137 void
02138 Cache::vol_initialized(bool result) {
02139 if (result)
02140 ink_atomic_increment(&total_good_nvol, 1);
02141 if (total_nvol == ink_atomic_increment(&total_initialized_vol, 1) + 1)
02142 open_done();
02143 }
02144
02145
02146
02147 bool
02148 CacheProcessor::mark_storage_offline( CacheDisk* d
02149 ) {
02150 bool zret;
02151 int p;
02152 uint64_t total_bytes_delete = 0;
02153 uint64_t total_dir_delete = 0;
02154 uint64_t used_dir_delete = 0;
02155
02156 if (!DISK_BAD(d)) SET_DISK_BAD(d);
02157
02158 for (p = 0; p < gnvol; p++) {
02159 if (d->fd == gvol[p]->fd) {
02160 total_dir_delete += gvol[p]->buckets * gvol[p]->segments * DIR_DEPTH;
02161 used_dir_delete += dir_entries_used(gvol[p]);
02162 total_bytes_delete += gvol[p]->len - vol_dirlen(gvol[p]);
02163 }
02164 }
02165
02166 RecIncrGlobalRawStat(cache_rsb, cache_bytes_total_stat, -total_bytes_delete);
02167 RecIncrGlobalRawStat(cache_rsb, cache_direntries_total_stat, -total_dir_delete);
02168 RecIncrGlobalRawStat(cache_rsb, cache_direntries_used_stat, -used_dir_delete);
02169
02170 if (theCache) {
02171 rebuild_host_table(theCache);
02172 }
02173 if (theStreamCache) {
02174 rebuild_host_table(theStreamCache);
02175 }
02176
02177 zret = this->has_online_storage();
02178 if (!zret) {
02179 Warning("All storage devices offline, cache disabled");
02180 CacheProcessor::cache_ready = 0;
02181 } else {
02182 if (theCache && !theCache->hosttable->gen_host_rec.vol_hash_table) {
02183 unsigned int caches_ready = 0;
02184 caches_ready = caches_ready | (1 << CACHE_FRAG_TYPE_HTTP);
02185 caches_ready = caches_ready | (1 << CACHE_FRAG_TYPE_NONE);
02186 caches_ready = ~caches_ready;
02187 CacheProcessor::cache_ready &= caches_ready;
02188 Warning("all volumes for http cache are corrupt, http cache disabled");
02189 }
02190 if (theStreamCache && !theStreamCache->hosttable->gen_host_rec.vol_hash_table) {
02191 unsigned int caches_ready = 0;
02192 caches_ready = caches_ready | (1 << CACHE_FRAG_TYPE_RTSP);
02193 caches_ready = ~caches_ready;
02194 CacheProcessor::cache_ready &= caches_ready;
02195 Warning("all volumes for mixt cache are corrupt, mixt cache disabled");
02196 }
02197 }
02198
02199 return zret;
02200 }
02201
02202 bool
02203 CacheProcessor::has_online_storage() const {
02204 CacheDisk** dptr = gdisks;
02205 for (int disk_no = 0 ; disk_no < gndisks ; ++disk_no, ++dptr) {
02206 if (!DISK_BAD(*dptr)) return true;
02207 }
02208 return false;
02209 }
02210
02211 int
02212 AIO_Callback_handler::handle_disk_failure(int , void *data) {
02213
02214 if (!CacheProcessor::cache_ready)
02215 return EVENT_DONE;
02216 int disk_no = 0;
02217 AIOCallback *cb = (AIOCallback *) data;
02218 #if TS_USE_INTERIM_CACHE == 1
02219 for (; disk_no < gn_interim_disks; disk_no++) {
02220 CacheDisk *d = g_interim_disks[disk_no];
02221
02222 if (d->fd == cb->aiocb.aio_fildes) {
02223 char message[256];
02224
02225 d->num_errors++;
02226 if (!DISK_BAD(d)) {
02227 snprintf(message, sizeof(message), "Error accessing Disk %s [%d/%d]", d->path, d->num_errors, cache_config_max_disk_errors);
02228 Warning("%s", message);
02229 RecSignalManager(REC_SIGNAL_CACHE_WARNING, message);
02230 } else if (!DISK_BAD_SIGNALLED(d)) {
02231 snprintf(message, sizeof(message),
02232 "too many errors [%d] accessing disk %s: declaring disk bad", d->num_errors, d->path);
02233 Warning("%s", message);
02234 RecSignalManager(REC_SIGNAL_CACHE_ERROR, message);
02235 good_interim_disks--;
02236 }
02237 }
02238 }
02239 #endif
02240
02241 for (; disk_no < gndisks; disk_no++) {
02242 CacheDisk *d = gdisks[disk_no];
02243
02244 if (d->fd == cb->aiocb.aio_fildes) {
02245 char message[256];
02246 d->num_errors++;
02247
02248 if (!DISK_BAD(d)) {
02249 snprintf(message, sizeof(message), "Error accessing Disk %s [%d/%d]", d->path, d->num_errors, cache_config_max_disk_errors);
02250 Warning("%s", message);
02251 RecSignalManager(REC_SIGNAL_CACHE_WARNING, message);
02252 } else if (!DISK_BAD_SIGNALLED(d)) {
02253 snprintf(message, sizeof(message), "too many errors accessing disk %s [%d/%d]: declaring disk bad", d->path, d->num_errors, cache_config_max_disk_errors);
02254 Warning("%s", message);
02255 RecSignalManager(REC_SIGNAL_CACHE_ERROR, message);
02256 cacheProcessor.mark_storage_offline(d);
02257 }
02258 break;
02259 }
02260 }
02261
02262 delete cb;
02263 return EVENT_DONE;
02264 }
02265
02266 int
02267 Cache::open_done() {
02268 Action *register_ShowCache(Continuation * c, HTTPHdr * h);
02269 Action *register_ShowCacheInternal(Continuation *c, HTTPHdr *h);
02270 statPagesManager.register_http("cache", register_ShowCache);
02271 statPagesManager.register_http("cache-internal", register_ShowCacheInternal);
02272 if (total_good_nvol == 0) {
02273 ready = CACHE_INIT_FAILED;
02274 cacheProcessor.cacheInitialized();
02275 return 0;
02276 }
02277
02278 hosttable = new CacheHostTable(this, scheme);
02279 hosttable->register_config_callback(&hosttable);
02280
02281 if (hosttable->gen_host_rec.num_cachevols == 0)
02282 ready = CACHE_INIT_FAILED;
02283 else
02284 ready = CACHE_INITIALIZED;
02285 cacheProcessor.cacheInitialized();
02286
02287 return 0;
02288 }
02289
02290 int
02291 Cache::open(bool clear, bool ) {
02292 int i;
02293 off_t blocks = 0;
02294 cache_read_done = 0;
02295 total_initialized_vol = 0;
02296 total_nvol = 0;
02297 total_good_nvol = 0;
02298
02299 REC_EstablishStaticConfigInt32(cache_config_min_average_object_size, "proxy.config.cache.min_average_object_size");
02300 Debug("cache_init", "Cache::open - proxy.config.cache.min_average_object_size = %d",
02301 (int)cache_config_min_average_object_size);
02302
02303 CacheVol *cp = cp_list.head;
02304 for (; cp; cp = cp->link.next) {
02305 if (cp->scheme == scheme) {
02306 cp->vols = (Vol **)ats_malloc(cp->num_vols * sizeof(Vol *));
02307 int vol_no = 0;
02308 for (i = 0; i < gndisks; i++) {
02309 if (cp->disk_vols[i] && !DISK_BAD(cp->disk_vols[i]->disk)) {
02310 DiskVolBlockQueue *q = cp->disk_vols[i]->dpb_queue.head;
02311 for (; q; q = q->link.next) {
02312 cp->vols[vol_no] = new Vol();
02313 CacheDisk *d = cp->disk_vols[i]->disk;
02314 cp->vols[vol_no]->disk = d;
02315 cp->vols[vol_no]->fd = d->fd;
02316 cp->vols[vol_no]->cache = this;
02317 cp->vols[vol_no]->cache_vol = cp;
02318 blocks = q->b->len;
02319
02320 bool vol_clear = clear || d->cleared || q->new_block;
02321 #if AIO_MODE == AIO_MODE_NATIVE
02322 eventProcessor.schedule_imm(new VolInit(cp->vols[vol_no], d->path, blocks, q->b->offset, vol_clear));
02323 #else
02324 cp->vols[vol_no]->init(d->path, blocks, q->b->offset, vol_clear);
02325 #endif
02326 vol_no++;
02327 cache_size += blocks;
02328 }
02329 }
02330 }
02331 total_nvol += vol_no;
02332 }
02333 }
02334 if (total_nvol == 0)
02335 return open_done();
02336 cache_read_done = 1;
02337 return 0;
02338 }
02339
02340 int
02341 Cache::close() {
02342 return -1;
02343 }
02344
02345 int
02346 CacheVC::dead(int , Event * ) {
02347 ink_assert(0);
02348 return EVENT_DONE;
02349 }
02350
02351 bool
02352 CacheVC::is_pread_capable()
02353 {
02354 return !f.read_from_writer_called;
02355 }
02356
02357 #define STORE_COLLISION 1
02358
02359 #ifdef HTTP_CACHE
02360 static void unmarshal_helper(Doc *doc, Ptr<IOBufferData> &buf, int &okay) {
02361 char *tmp = doc->hdr();
02362 int len = doc->hlen;
02363 while (len > 0) {
02364 int r = HTTPInfo::unmarshal(tmp, len, buf._ptr());
02365 if (r < 0) {
02366 ink_assert(!"CacheVC::handleReadDone unmarshal failed");
02367 okay = 0;
02368 break;
02369 }
02370 len -= r;
02371 tmp += r;
02372 }
02373 }
02374
02375
02376
02377
02378
02379
02380
02381
02382
02383
02384
02385
02386
02387
02388 static bool upgrade_doc_version(Ptr<IOBufferData>& buf) {
02389
02390 cache_bc::Doc_v23* doc = reinterpret_cast<cache_bc::Doc_v23*>(buf->data());
02391 bool zret = true;
02392
02393 if (DOC_MAGIC == doc->magic) {
02394 if (0 == doc->hlen) {
02395 Debug("cache_bc", "Doc %p without header, no upgrade needed.", doc);
02396 } else if (CACHE_FRAG_TYPE_HTTP_V23 == doc->doc_type) {
02397 cache_bc::HTTPCacheAlt_v21* alt = reinterpret_cast<cache_bc::HTTPCacheAlt_v21*>(doc->hdr());
02398 if (alt && alt->is_unmarshalled_format()) {
02399 Ptr<IOBufferData> d_buf(ioDataAllocator.alloc());
02400 Doc* d_doc;
02401 char* src;
02402 char* dst;
02403 char* hdr_limit = doc->data();
02404 HTTPInfo::FragOffset* frags = reinterpret_cast<HTTPInfo::FragOffset*>(static_cast<char*>(buf->data()) + cache_bc::sizeofDoc_v23);
02405 int frag_count = doc->_flen / sizeof(HTTPInfo::FragOffset);
02406 size_t n = 0;
02407 size_t content_size = doc->data_len();
02408
02409 Debug("cache_bc", "Doc %p is 3.2", doc);
02410
02411
02412 d_buf->alloc(buf->_size_index, buf->_mem_type);
02413 d_doc = reinterpret_cast<Doc*>(d_buf->data());
02414 n = d_buf->block_size();
02415
02416 src = buf->data();
02417 dst = d_buf->data();
02418 memcpy(dst, src, sizeofDoc);
02419 src += sizeofDoc + doc->_flen; dst += sizeofDoc; n -= sizeofDoc;
02420
02421
02422 if (frag_count > 0 && cache_bc::HTTPInfo_v21::marshalled_length(src) > doc->hlen)
02423 frag_count = 0;
02424
02425 while (zret && src < hdr_limit) {
02426 zret = cache_bc::HTTPInfo_v21::copy_and_upgrade_unmarshalled_to_v23(dst, src, n, frag_count, frags);
02427 }
02428 if (zret && content_size <= n) {
02429 memcpy(dst, src, content_size);
02430
02431
02432 d_doc->len = (dst - reinterpret_cast<char*>(d_doc)) + content_size;
02433 d_doc->hlen = (dst - reinterpret_cast<char*>(d_doc)) - sizeofDoc;
02434 buf = d_buf;
02435 } else {
02436 zret = false;
02437 }
02438 }
02439 Doc* n_doc = reinterpret_cast<Doc*>(buf->data());
02440
02441
02442 ink_assert(sizeof(*n_doc) == sizeof(*doc));
02443
02444 n_doc->doc_type = CACHE_FRAG_TYPE_HTTP;
02445
02446
02447 n_doc->v_major = 0;
02448 n_doc->v_minor = 0;
02449 n_doc->unused = 0;
02450 }
02451 }
02452 return zret;
02453 }
02454 #endif
02455
02456
02457 int
02458 CacheVC::handleReadDone(int event, Event *e)
02459 {
02460 cancel_trigger();
02461 ink_assert(this_ethread() == mutex->thread_holding);
02462
02463 Doc *doc = NULL;
02464 if (event == AIO_EVENT_DONE)
02465 set_io_not_in_progress();
02466 else
02467 if (is_io_in_progress())
02468 return EVENT_CONT;
02469 {
02470 MUTEX_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
02471 if (!lock)
02472 VC_SCHED_LOCK_RETRY();
02473 if ((!dir_valid(vol, &dir)) || (!io.ok())) {
02474 if (!io.ok()) {
02475 Debug("cache_disk_error", "Read error on disk %s\n \
02476 read range : [%" PRIu64 " - %" PRIu64 " bytes] [%" PRIu64 " - %" PRIu64 " blocks] \n",
02477 vol->hash_text.get(), (uint64_t)io.aiocb.aio_offset, (uint64_t)io.aiocb.aio_offset + io.aiocb.aio_nbytes,
02478 (uint64_t)io.aiocb.aio_offset / 512, (uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / 512);
02479 }
02480 goto Ldone;
02481 }
02482
02483 doc = reinterpret_cast<Doc*>(buf->data());
02484 ink_assert(vol->mutex->nthread_holding < 1000);
02485 ink_assert(doc->magic == DOC_MAGIC);
02486
02487
02488
02489
02490
02491
02492
02493
02494
02495
02496
02497
02498
02499 if (doc->doc_type == CACHE_FRAG_TYPE_HTTP_V23) {
02500 if (upgrade_doc_version(buf)) {
02501 doc = reinterpret_cast<Doc*>(buf->data());
02502 } else {
02503 Debug("cache_bc", "Upgrade of fragment failed - disk %s - doc id = %" PRIx64 ":%" PRIx64 "\n"
02504 , vol->hash_text.get(), read_key->slice64(0), read_key->slice64(1));
02505 doc->magic = DOC_CORRUPT;
02506
02507
02508 goto Ldone;
02509 }
02510 }
02511
02512 #ifdef VERIFY_JTEST_DATA
02513 char xx[500];
02514 if (read_key && *read_key == doc->key && request.valid() && !dir_head(&dir) && !vio.ndone) {
02515 int ib = 0, xd = 0;
02516 request.url_get()->print(xx, 500, &ib, &xd);
02517 char *x = xx;
02518 for (int q = 0; q < 3; q++)
02519 x = strchr(x + 1, '/');
02520 ink_assert(!memcmp(doc->data(), x, ib - (x - xx)));
02521 }
02522 #endif
02523
02524 if (is_debug_tag_set("cache_read")) {
02525 char xt[33];
02526 Debug("cache_read",
02527 "Read complete on fragment %s. Length: data payload=%d this fragment=%d total doc=%" PRId64" prefix=%d",
02528 doc->key.toHexStr(xt), doc->data_len(), doc->len, doc->total_len, doc->prefix_len());
02529 }
02530
02531
02532 if (io.ok() && ((doc->first_key == *read_key) || (doc->key == *read_key) || STORE_COLLISION) &&
02533 doc->magic == DOC_MAGIC) {
02534 int okay = 1;
02535 if (!f.doc_from_ram_cache)
02536 f.not_from_ram_cache = 1;
02537 if (cache_config_enable_checksum && doc->checksum != DOC_NO_CHECKSUM) {
02538
02539 uint32_t checksum = 0;
02540 for (char *b = doc->hdr(); b < (char *) doc + doc->len; b++)
02541 checksum += *b;
02542 ink_assert(checksum == doc->checksum);
02543 if (checksum != doc->checksum) {
02544 Note("cache: checksum error for [%" PRIu64 " %" PRIu64 "] len %d, hlen %d, disk %s, offset %" PRIu64 " size %zu",
02545 doc->first_key.b[0], doc->first_key.b[1],
02546 doc->len, doc->hlen, vol->path, (uint64_t)io.aiocb.aio_offset, (size_t)io.aiocb.aio_nbytes);
02547 doc->magic = DOC_CORRUPT;
02548 okay = 0;
02549 }
02550 }
02551 #if TS_USE_INTERIM_CACHE == 1
02552 ink_assert(vol->num_interim_vols >= good_interim_disks);
02553 if (mts && !f.doc_from_ram_cache) {
02554 int indx;
02555 do {
02556 indx = vol->interim_index++ % vol->num_interim_vols;
02557 } while (good_interim_disks > 0 && DISK_BAD(vol->interim_vols[indx].disk));
02558
02559 if (good_interim_disks) {
02560 if (f.write_into_interim) {
02561 mts->interim_vol = interim_vol = &vol->interim_vols[indx];
02562 mts->agg_len = interim_vol->round_to_approx_size(doc->len);
02563 if (vol->sector_size != interim_vol->sector_size) {
02564 dir_set_approx_size(&mts->dir, mts->agg_len);
02565 }
02566 }
02567 if (f.transistor) {
02568 mts->interim_vol = interim_vol;
02569 mts->agg_len = interim_vol->round_to_approx_size(doc->len);
02570 ink_assert(mts->agg_len == dir_approx_size(&mts->dir));
02571 }
02572
02573 if (!interim_vol->is_io_in_progress()) {
02574 mts->buf = buf;
02575 mts->copy = false;
02576 interim_vol->agg.enqueue(mts);
02577 interim_vol->aggWrite(event, e);
02578 } else {
02579 mts->buf = new_IOBufferData(iobuffer_size_to_index(mts->agg_len, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
02580 mts->copy = true;
02581 memcpy(mts->buf->data(), buf->data(), doc->len);
02582 interim_vol->agg.enqueue(mts);
02583 }
02584 } else {
02585 vol->set_migrate_failed(mts);
02586 migrateToInterimCacheAllocator.free(mts);
02587 }
02588 mts = NULL;
02589 }
02590 #else
02591 (void)e;
02592 #endif
02593 bool http_copy_hdr = false;
02594 #ifdef HTTP_CACHE
02595 http_copy_hdr = cache_config_ram_cache_compress && !f.doc_from_ram_cache &&
02596 doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen;
02597
02598
02599 if (!http_copy_hdr && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen && okay)
02600 unmarshal_helper(doc, buf, okay);
02601 #endif
02602
02603 if (vio.op == VIO::READ && okay) {
02604 bool cutoff_check;
02605
02606
02607
02608
02609
02610
02611 cutoff_check = ((!doc_len && (int64_t)doc->total_len < cache_config_ram_cache_cutoff)
02612 || (doc_len && (int64_t)doc_len < cache_config_ram_cache_cutoff)
02613 || !cache_config_ram_cache_cutoff);
02614 if (cutoff_check && !f.doc_from_ram_cache) {
02615 #if TS_USE_INTERIM_CACHE == 1
02616 if (!f.ram_fixup) {
02617 uint64_t o = dir_get_offset(&dir);
02618 vol->ram_cache->put(read_key, buf, doc->len, http_copy_hdr, (uint32_t)(o >> 32), (uint32_t)o);
02619 } else {
02620 vol->ram_cache->put(read_key, buf, doc->len, http_copy_hdr, (uint32_t)(dir_off>>32), (uint32_t)dir_off);
02621 }
02622 #else
02623 uint64_t o = dir_offset(&dir);
02624 vol->ram_cache->put(read_key, buf, doc->len, http_copy_hdr, (uint32_t)(o >> 32), (uint32_t)o);
02625 #endif
02626 }
02627 if (!doc_len) {
02628
02629
02630
02631 vol->first_fragment_key = *read_key;
02632 #if TS_USE_INTERIM_CACHE == 1
02633 if (!f.ram_fixup)
02634 vol->first_fragment_offset = dir_get_offset(&dir);
02635 else
02636 vol->first_fragment_offset = dir_off;
02637 #else
02638 vol->first_fragment_offset = dir_offset(&dir);
02639 #endif
02640 vol->first_fragment_data = buf;
02641 }
02642 }
02643 #ifdef HTTP_CACHE
02644
02645 if (http_copy_hdr && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen && okay)
02646 unmarshal_helper(doc, buf, okay);
02647 #endif
02648 }
02649 #if TS_USE_INTERIM_CACHE == 1
02650 Ldone:
02651 if (mts) {
02652 vol->set_migrate_failed(mts);
02653 migrateToInterimCacheAllocator.free(mts);
02654 mts = NULL;
02655 }
02656 }
02657 #else
02658 }
02659 Ldone:
02660 #endif
02661 POP_HANDLER;
02662 return handleEvent(AIO_EVENT_DONE, 0);
02663 }
02664
02665
02666 int
02667 CacheVC::handleRead(int , Event * )
02668 {
02669 cancel_trigger();
02670
02671 f.doc_from_ram_cache = false;
02672
02673
02674 ink_assert(vol->mutex->thread_holding == this_ethread());
02675 #if TS_USE_INTERIM_CACHE == 1
02676 uint64_t o = dir_get_offset(&dir);
02677 if (f.read_from_interim && mts && mts->rewrite) {
02678 goto LinterimRead;
02679 }
02680 #else
02681 int64_t o = dir_offset(&dir);
02682 #endif
02683 if (vol->ram_cache->get(read_key, &buf, (uint32_t)(o >> 32), (uint32_t)o)) {
02684 goto LramHit;
02685 }
02686
02687
02688 #if TS_USE_INTERIM_CACHE == 1
02689 if (*read_key == vol->first_fragment_key && dir_get_offset(&dir) == vol->first_fragment_offset) {
02690 #else
02691 if (*read_key == vol->first_fragment_key && dir_offset(&dir) == vol->first_fragment_offset) {
02692 #endif
02693 buf = vol->first_fragment_data;
02694 goto LmemHit;
02695 }
02696 #if TS_USE_INTERIM_CACHE == 1
02697 LinterimRead:
02698 if (f.read_from_interim) {
02699 if (dir_agg_buf_valid(interim_vol, &dir)) {
02700 int interim_agg_offset = vol_offset(interim_vol, &dir) - interim_vol->header->write_pos;
02701 buf = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
02702 ink_assert((interim_agg_offset + io.aiocb.aio_nbytes) <= (unsigned) interim_vol->agg_buf_pos);
02703 char *doc = buf->data();
02704 char *agg = interim_vol->agg_buffer + interim_agg_offset;
02705 memcpy(doc, agg, io.aiocb.aio_nbytes);
02706 io.aio_result = io.aiocb.aio_nbytes;
02707 SET_HANDLER(&CacheVC::handleReadDone);
02708 return EVENT_RETURN;
02709 }
02710
02711 io.aiocb.aio_fildes = interim_vol->fd;
02712 io.aiocb.aio_offset = vol_offset(interim_vol, &dir);
02713 if ((off_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > (off_t)(interim_vol->skip + interim_vol->len))
02714 io.aiocb.aio_nbytes = interim_vol->skip + interim_vol->len - io.aiocb.aio_offset;
02715 buf = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
02716 io.aiocb.aio_buf = buf->data();
02717 io.action = this;
02718 io.thread = mutex->thread_holding->tt == DEDICATED ? AIO_CALLBACK_THREAD_ANY : mutex->thread_holding;
02719
02720 SET_HANDLER(&CacheVC::handleReadDone);
02721 ink_assert(ink_aio_read(&io) >= 0);
02722 CACHE_DEBUG_INCREMENT_DYN_STAT(cache_pread_count_stat);
02723 return EVENT_CONT;
02724 }
02725 #endif
02726
02727 if (dir_agg_buf_valid(vol, &dir)) {
02728 int agg_offset = vol_offset(vol, &dir) - vol->header->write_pos;
02729 buf = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
02730 ink_assert((agg_offset + io.aiocb.aio_nbytes) <= (unsigned) vol->agg_buf_pos);
02731 char *doc = buf->data();
02732 char *agg = vol->agg_buffer + agg_offset;
02733 memcpy(doc, agg, io.aiocb.aio_nbytes);
02734 io.aio_result = io.aiocb.aio_nbytes;
02735 SET_HANDLER(&CacheVC::handleReadDone);
02736 return EVENT_RETURN;
02737 }
02738
02739 io.aiocb.aio_fildes = vol->fd;
02740 io.aiocb.aio_offset = vol_offset(vol, &dir);
02741 if ((off_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > (off_t)(vol->skip + vol->len))
02742 io.aiocb.aio_nbytes = vol->skip + vol->len - io.aiocb.aio_offset;
02743 buf = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
02744 io.aiocb.aio_buf = buf->data();
02745 io.action = this;
02746 io.thread = mutex->thread_holding->tt == DEDICATED ? AIO_CALLBACK_THREAD_ANY : mutex->thread_holding;
02747 SET_HANDLER(&CacheVC::handleReadDone);
02748 ink_assert(ink_aio_read(&io) >= 0);
02749 CACHE_DEBUG_INCREMENT_DYN_STAT(cache_pread_count_stat);
02750 return EVENT_CONT;
02751
02752 LramHit: {
02753 f.doc_from_ram_cache = true;
02754 io.aio_result = io.aiocb.aio_nbytes;
02755 Doc *doc = (Doc*)buf->data();
02756 if (cache_config_ram_cache_compress && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen) {
02757 SET_HANDLER(&CacheVC::handleReadDone);
02758 return EVENT_RETURN;
02759 }
02760 }
02761 LmemHit:
02762 f.doc_from_ram_cache = true;
02763 io.aio_result = io.aiocb.aio_nbytes;
02764 #if TS_USE_INTERIM_CACHE == 1
02765 if (mts) {
02766 vol->set_migrate_failed(mts);
02767 migrateToInterimCacheAllocator.free(mts);
02768 mts = NULL;
02769 }
02770 #endif
02771 POP_HANDLER;
02772 return EVENT_RETURN;
02773 }
02774
02775 Action *
02776 Cache::lookup(Continuation *cont, CacheKey *key, CacheFragType type, char const* hostname, int host_len)
02777 {
02778 if (!CacheProcessor::IsCacheReady(type)) {
02779 cont->handleEvent(CACHE_EVENT_LOOKUP_FAILED, 0);
02780 return ACTION_RESULT_DONE;
02781 }
02782
02783 Vol *vol = key_to_vol(key, hostname, host_len);
02784 ProxyMutex *mutex = cont->mutex;
02785 CacheVC *c = new_CacheVC(cont);
02786 SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
02787 c->vio.op = VIO::READ;
02788 c->base_stat = cache_lookup_active_stat;
02789 CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
02790 c->first_key = c->key = *key;
02791 c->frag_type = type;
02792 c->f.lookup = 1;
02793 c->vol = vol;
02794 c->last_collision = NULL;
02795
02796 if (c->handleEvent(EVENT_INTERVAL, 0) == EVENT_CONT)
02797 return &c->_action;
02798 else
02799 return ACTION_RESULT_DONE;
02800 }
02801
02802 Action *
02803 Cache::lookup(Continuation *cont, CacheURL *url, CacheFragType type)
02804 {
02805 CryptoHash id;
02806
02807 url->hash_get(&id);
02808 int len = 0;
02809 char const* hostname = url->host_get(&len);
02810 return lookup(cont, &id, type, hostname, len);
02811 }
02812
02813 int
02814 CacheVC::removeEvent(int , Event * )
02815 {
02816 cancel_trigger();
02817 set_io_not_in_progress();
02818 {
02819 MUTEX_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
02820 if (!lock)
02821 VC_SCHED_LOCK_RETRY();
02822 if (_action.cancelled) {
02823 if (od) {
02824 vol->close_write(this);
02825 od = 0;
02826 }
02827 goto Lfree;
02828 }
02829 if (!f.remove_aborted_writers) {
02830 if (vol->open_write(this, true, 1)) {
02831
02832 ink_assert(od = vol->open_read(&key));
02833 od->dont_update_directory = 1;
02834 od = NULL;
02835 } else {
02836 od->dont_update_directory = 1;
02837 }
02838 f.remove_aborted_writers = 1;
02839 }
02840 Lread:
02841 SET_HANDLER(&CacheVC::removeEvent);
02842 if (!buf)
02843 goto Lcollision;
02844 if (!dir_valid(vol, &dir)) {
02845 last_collision = NULL;
02846 goto Lcollision;
02847 }
02848
02849 if ((size_t) io.aio_result != (size_t) io.aiocb.aio_nbytes)
02850 goto Ldone;
02851 {
02852
02853 Doc *doc = (Doc *) buf->data();
02854
02855 if (doc->first_key == key) {
02856 ink_assert(doc->magic == DOC_MAGIC);
02857 if (dir_delete(&key, vol, &dir) > 0) {
02858 if (od)
02859 vol->close_write(this);
02860 od = NULL;
02861 goto Lremoved;
02862 }
02863 goto Ldone;
02864 }
02865 }
02866 Lcollision:
02867
02868 if (dir_probe(&key, vol, &dir, &last_collision) > 0) {
02869 #if TS_USE_INTERIM_CACHE == 1
02870 if (dir_ininterim(&dir)) {
02871 dir_delete(&key, vol, &dir);
02872 last_collision = NULL;
02873 goto Lcollision;
02874 }
02875 #endif
02876 int ret = do_read_call(&key);
02877 if (ret == EVENT_RETURN)
02878 goto Lread;
02879 return ret;
02880 }
02881 Ldone:
02882 CACHE_INCREMENT_DYN_STAT(cache_remove_failure_stat);
02883 if (od)
02884 vol->close_write(this);
02885 }
02886 ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
02887 _action.continuation->handleEvent(CACHE_EVENT_REMOVE_FAILED, (void *) -ECACHE_NO_DOC);
02888 goto Lfree;
02889 Lremoved:
02890 _action.continuation->handleEvent(CACHE_EVENT_REMOVE, 0);
02891 Lfree:
02892 return free_CacheVC(this);
02893 }
02894
02895 Action *
02896 Cache::remove(Continuation *cont, CacheKey *key, CacheFragType type, bool ,
02897 bool , char *hostname, int host_len)
02898 {
02899 if (!CacheProcessor::IsCacheReady(type)) {
02900 if (cont)
02901 cont->handleEvent(CACHE_EVENT_REMOVE_FAILED, 0);
02902 return ACTION_RESULT_DONE;
02903 }
02904
02905 ink_assert(this);
02906
02907 Ptr<ProxyMutex> mutex;
02908 if (!cont)
02909 cont = new_CacheRemoveCont();
02910
02911 CACHE_TRY_LOCK(lock, cont->mutex, this_ethread());
02912 ink_assert(lock);
02913 Vol *vol = key_to_vol(key, hostname, host_len);
02914
02915 Dir result;
02916 dir_clear(&result);
02917 mutex = cont->mutex;
02918
02919 CacheVC *c = new_CacheVC(cont);
02920 c->vio.op = VIO::NONE;
02921 c->frag_type = type;
02922 c->base_stat = cache_remove_active_stat;
02923 CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
02924 c->first_key = c->key = *key;
02925 c->vol = vol;
02926 c->dir = result;
02927 c->f.remove = 1;
02928
02929 SET_CONTINUATION_HANDLER(c, &CacheVC::removeEvent);
02930 int ret = c->removeEvent(EVENT_IMMEDIATE, 0);
02931 if (ret == EVENT_DONE)
02932 return ACTION_RESULT_DONE;
02933 else
02934 return &c->_action;
02935 }
02936
02937
02938 CacheVConnection::CacheVConnection()
02939 : VConnection(NULL)
02940 { }
02941
02942
02943 void
02944 cplist_init()
02945 {
02946 cp_list_len = 0;
02947 for (int i = 0; i < gndisks; i++) {
02948 CacheDisk *d = gdisks[i];
02949 DiskVol **dp = d->disk_vols;
02950 for (unsigned int j = 0; j < d->header->num_volumes; j++) {
02951 ink_assert(dp[j]->dpb_queue.head);
02952 CacheVol *p = cp_list.head;
02953 while (p) {
02954 if (p->vol_number == dp[j]->vol_number) {
02955 ink_assert(p->scheme == (int) dp[j]->dpb_queue.head->b->type);
02956 p->size += dp[j]->size;
02957 p->num_vols += dp[j]->num_volblocks;
02958 p->disk_vols[i] = dp[j];
02959 break;
02960 }
02961 p = p->link.next;
02962 }
02963 if (!p) {
02964
02965
02966 CacheVol *new_p = new CacheVol();
02967 new_p->vol_number = dp[j]->vol_number;
02968 new_p->num_vols = dp[j]->num_volblocks;
02969 new_p->size = dp[j]->size;
02970 new_p->scheme = dp[j]->dpb_queue.head->b->type;
02971 new_p->disk_vols = (DiskVol **)ats_malloc(gndisks * sizeof(DiskVol *));
02972 memset(new_p->disk_vols, 0, gndisks * sizeof(DiskVol *));
02973 new_p->disk_vols[i] = dp[j];
02974 cp_list.enqueue(new_p);
02975 cp_list_len++;
02976 }
02977 }
02978 }
02979 }
02980
02981
02982 void
02983 cplist_update()
02984 {
02985
02986 CacheVol *cp = cp_list.head;
02987 ConfigVol *config_vol;
02988
02989 while (cp) {
02990 for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) {
02991 if (config_vol->number == cp->vol_number) {
02992 off_t size_in_blocks = config_vol->size << (20 - STORE_BLOCK_SHIFT);
02993 if ((cp->size <= size_in_blocks) && (cp->scheme == config_vol->scheme)) {
02994 config_vol->cachep = cp;
02995 } else {
02996
02997 int d_no;
02998 int clearCV = 1;
02999
03000 for (d_no = 0; d_no < gndisks; d_no++) {
03001 if (cp->disk_vols[d_no]) {
03002 if (cp->disk_vols[d_no]->disk->forced_volume_num == cp->vol_number) {
03003 clearCV = 0;
03004 config_vol->cachep = cp;
03005 } else {
03006 cp->disk_vols[d_no]->disk->delete_volume(cp->vol_number);
03007 }
03008 }
03009 }
03010 if (clearCV) {
03011 config_vol = NULL;
03012 }
03013 }
03014 break;
03015 }
03016 }
03017
03018 if (!config_vol) {
03019
03020
03021 int d_no;
03022 for (d_no = 0; d_no < gndisks; d_no++) {
03023 if (cp->disk_vols[d_no])
03024 cp->disk_vols[d_no]->disk->delete_volume(cp->vol_number);
03025 }
03026 CacheVol *temp_cp = cp;
03027 cp = cp->link.next;
03028 cp_list.remove(temp_cp);
03029 cp_list_len--;
03030 delete temp_cp;
03031 continue;
03032 } else
03033 cp = cp->link.next;
03034 }
03035 }
03036
03037 static int
03038 fillExclusiveDisks(CacheVol *cp)
03039 {
03040 int diskCount = 0;
03041 int volume_number = cp->vol_number;
03042
03043 Debug("cache_init", "volume %d", volume_number);
03044 for (int i = 0; i < gndisks; i++) {
03045 if (gdisks[i]->forced_volume_num != volume_number) {
03046 continue;
03047 }
03048
03049
03050 for(int j = 0; j < (int)gdisks[i]->header->num_volumes; j++) {
03051 if (volume_number != gdisks[i]->disk_vols[j]->vol_number) {
03052 Note("Clearing Disk: %s", gdisks[i]->path);
03053 gdisks[i]->delete_all_volumes();
03054 break;
03055 }
03056 }
03057 diskCount++;
03058
03059 int64_t size_diff = gdisks[i]->num_usable_blocks;
03060 DiskVolBlock *dpb;
03061
03062 do {
03063 dpb = gdisks[i]->create_volume(volume_number, size_diff, cp->scheme);
03064 if (dpb) {
03065 if (!cp->disk_vols[i]) {
03066 cp->disk_vols[i] = gdisks[i]->get_diskvol(volume_number);
03067 }
03068 size_diff -= dpb->len;
03069 cp->size += dpb->len;
03070 cp->num_vols++;
03071 } else {
03072 Debug("cache_init", "create_volume failed");
03073 break;
03074 }
03075 } while ((size_diff > 0));
03076 }
03077 return diskCount;
03078 }
03079
03080
03081 int
03082 cplist_reconfigure()
03083 {
03084 int64_t size;
03085 int volume_number;
03086 off_t size_in_blocks;
03087 ConfigVol *config_vol;
03088
03089 gnvol = 0;
03090 if (config_volumes.num_volumes == 0) {
03091
03092 CacheVol *cp = new CacheVol();
03093 cp->vol_number = 0;
03094 cp->scheme = CACHE_HTTP_TYPE;
03095 cp->disk_vols = (DiskVol **)ats_malloc(gndisks * sizeof(DiskVol *));
03096 memset(cp->disk_vols, 0, gndisks * sizeof(DiskVol *));
03097 cp_list.enqueue(cp);
03098 cp_list_len++;
03099 for (int i = 0; i < gndisks; i++) {
03100 if (gdisks[i]->header->num_volumes != 1 || gdisks[i]->disk_vols[0]->vol_number != 0) {
03101
03102
03103 Note("Clearing Disk: %s", gdisks[i]->path);
03104 gdisks[i]->delete_all_volumes();
03105 }
03106 if (gdisks[i]->cleared) {
03107 uint64_t free_space = gdisks[i]->free_space * STORE_BLOCK_SIZE;
03108 int vols = (free_space / MAX_VOL_SIZE) + 1;
03109 for (int p = 0; p < vols; p++) {
03110 off_t b = gdisks[i]->free_space / (vols - p);
03111 Debug("cache_hosting", "blocks = %" PRId64, (int64_t)b);
03112 DiskVolBlock *dpb = gdisks[i]->create_volume(0, b, CACHE_HTTP_TYPE);
03113 ink_assert(dpb && dpb->len == (uint64_t)b);
03114 }
03115 ink_assert(gdisks[i]->free_space == 0);
03116 }
03117
03118 ink_assert(gdisks[i]->header->num_volumes == 1);
03119 DiskVol **dp = gdisks[i]->disk_vols;
03120 gnvol += dp[0]->num_volblocks;
03121 cp->size += dp[0]->size;
03122 cp->num_vols += dp[0]->num_volblocks;
03123 cp->disk_vols[i] = dp[0];
03124 }
03125
03126 } else {
03127 for (int i = 0; i < gndisks; i++) {
03128 if (gdisks[i]->header->num_volumes == 1 && gdisks[i]->disk_vols[0]->vol_number == 0) {
03129
03130
03131 Note("Clearing Disk: %s", gdisks[i]->path);
03132 gdisks[i]->delete_all_volumes();
03133 }
03134 }
03135
03136
03137 off_t tot_space_in_blks = 0;
03138 off_t blocks_per_vol = VOL_BLOCK_SIZE / STORE_BLOCK_SIZE;
03139
03140
03141 for (int i = 0; i < gndisks; i++)
03142 tot_space_in_blks += (gdisks[i]->num_usable_blocks / blocks_per_vol) * blocks_per_vol;
03143
03144 double percent_remaining = 100.00;
03145 for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) {
03146 if (config_vol->in_percent) {
03147 if (config_vol->percent > percent_remaining) {
03148 Warning("total volume sizes added up to more than 100%%!");
03149 Warning("no volumes created");
03150 return -1;
03151 }
03152 int64_t space_in_blks = (int64_t) (((double) (config_vol->percent / percent_remaining)) * tot_space_in_blks);
03153
03154 space_in_blks = space_in_blks >> (20 - STORE_BLOCK_SHIFT);
03155
03156 space_in_blks = (space_in_blks >> 7) << 7;
03157 config_vol->size = space_in_blks;
03158 tot_space_in_blks -= space_in_blks << (20 - STORE_BLOCK_SHIFT);
03159 percent_remaining -= (config_vol->size < 128) ? 0 : config_vol->percent;
03160 }
03161 if (config_vol->size < 128) {
03162 Warning("the size of volume %d (%" PRId64") is less than the minimum required volume size %d",
03163 config_vol->number, (int64_t)config_vol->size, 128);
03164 Warning("volume %d is not created", config_vol->number);
03165 }
03166 Debug("cache_hosting", "Volume: %d Size: %" PRId64, config_vol->number, (int64_t)config_vol->size);
03167 }
03168 cplist_update();
03169
03170
03171 for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) {
03172
03173 if (!config_vol->cachep) {
03174 continue;
03175 }
03176 fillExclusiveDisks(config_vol->cachep);
03177 }
03178
03179 for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) {
03180
03181 size = config_vol->size;
03182 if (size < 128)
03183 continue;
03184
03185 volume_number = config_vol->number;
03186
03187 size_in_blocks = ((off_t) size * 1024 * 1024) / STORE_BLOCK_SIZE;
03188
03189 if (config_vol->cachep && config_vol->cachep->num_vols > 0) {
03190 gnvol += config_vol->cachep->num_vols;
03191 continue;
03192 }
03193
03194 if (!config_vol->cachep) {
03195
03196
03197 CacheVol *new_cp = new CacheVol();
03198 new_cp->disk_vols = (DiskVol **)ats_malloc(gndisks * sizeof(DiskVol *));
03199 memset(new_cp->disk_vols, 0, gndisks * sizeof(DiskVol *));
03200 if (create_volume(config_vol->number, size_in_blocks, config_vol->scheme, new_cp)) {
03201 delete new_cp;
03202 return -1;
03203 }
03204 cp_list.enqueue(new_cp);
03205 cp_list_len++;
03206 config_vol->cachep = new_cp;
03207 gnvol += new_cp->num_vols;
03208 continue;
03209 }
03210
03211 CacheVol *cp = config_vol->cachep;
03212 ink_assert(cp->size <= size_in_blocks);
03213 if (cp->size == size_in_blocks) {
03214 gnvol += cp->num_vols;
03215 continue;
03216 }
03217
03218
03219
03220 int *sorted_vols = new int[gndisks];
03221 for (int i = 0; i < gndisks; i++)
03222 sorted_vols[i] = i;
03223 for (int i = 0; i < gndisks - 1; i++) {
03224 int smallest = sorted_vols[i];
03225 int smallest_ndx = i;
03226 for (int j = i + 1; j < gndisks; j++) {
03227 int curr = sorted_vols[j];
03228 DiskVol *dvol = cp->disk_vols[curr];
03229 if (gdisks[curr]->cleared) {
03230 ink_assert(!dvol);
03231
03232 smallest = curr;
03233 smallest_ndx = j;
03234 } else if (!dvol && cp->disk_vols[smallest]) {
03235
03236 smallest = curr;
03237 smallest_ndx = j;
03238 } else if (dvol && cp->disk_vols[smallest] && (dvol->size < cp->disk_vols[smallest]->size)) {
03239 smallest = curr;
03240 smallest_ndx = j;
03241 }
03242 }
03243 sorted_vols[smallest_ndx] = sorted_vols[i];
03244 sorted_vols[i] = smallest;
03245 }
03246
03247 int64_t size_to_alloc = size_in_blocks - cp->size;
03248 int disk_full = 0;
03249 for (int i = 0; (i < gndisks) && size_to_alloc; i++) {
03250
03251 int disk_no = sorted_vols[i];
03252 ink_assert(cp->disk_vols[sorted_vols[gndisks - 1]]);
03253 int largest_vol = cp->disk_vols[sorted_vols[gndisks - 1]]->size;
03254
03255
03256
03257
03258
03259 int64_t size_diff = (cp->disk_vols[disk_no]) ? largest_vol - cp->disk_vols[disk_no]->size : largest_vol;
03260 size_diff = (size_diff < size_to_alloc) ? size_diff : size_to_alloc;
03261
03262
03263 if (size_diff == 0)
03264 break;
03265
03266 DiskVolBlock *dpb;
03267 do {
03268 dpb = gdisks[disk_no]->create_volume(volume_number, size_diff, cp->scheme);
03269 if (dpb) {
03270 if (!cp->disk_vols[disk_no]) {
03271 cp->disk_vols[disk_no] = gdisks[disk_no]->get_diskvol(volume_number);
03272 }
03273 size_diff -= dpb->len;
03274 cp->size += dpb->len;
03275 cp->num_vols++;
03276 } else
03277 break;
03278 } while ((size_diff > 0));
03279
03280 if (!dpb)
03281 disk_full++;
03282
03283 size_to_alloc = size_in_blocks - cp->size;
03284 }
03285
03286 delete[]sorted_vols;
03287
03288 if (size_to_alloc) {
03289 if (create_volume(volume_number, size_to_alloc, cp->scheme, cp))
03290 return -1;
03291 }
03292 gnvol += cp->num_vols;
03293 }
03294 }
03295 return 0;
03296 }
03297
03298
03299 int
03300 create_volume(int volume_number, off_t size_in_blocks, int scheme, CacheVol *cp)
03301 {
03302 static int curr_vol = 0;
03303 off_t to_create = size_in_blocks;
03304 off_t blocks_per_vol = VOL_BLOCK_SIZE >> STORE_BLOCK_SHIFT;
03305 int full_disks = 0;
03306
03307 cp->vol_number = volume_number;
03308 cp->scheme = scheme;
03309 if (fillExclusiveDisks(cp)) {
03310 Debug("cache_init", "volume successfully filled from forced disks: volume_number=%d", volume_number);
03311 return 0;
03312 }
03313
03314 int *sp = new int[gndisks];
03315 memset(sp, 0, gndisks * sizeof(int));
03316
03317 int i = curr_vol;
03318 while (size_in_blocks > 0) {
03319 if (gdisks[i]->free_space >= (sp[i] + blocks_per_vol)) {
03320 sp[i] += blocks_per_vol;
03321 size_in_blocks -= blocks_per_vol;
03322 full_disks = 0;
03323 } else {
03324 full_disks += 1;
03325 if (full_disks == gndisks) {
03326 char config_file[PATH_NAME_MAX];
03327 REC_ReadConfigString(config_file, "proxy.config.cache.volume_filename", PATH_NAME_MAX);
03328 if (cp->size)
03329 Warning("not enough space to increase volume: [%d] to size: [%" PRId64 "]",
03330 volume_number, (int64_t)((to_create + cp->size) >> (20 - STORE_BLOCK_SHIFT)));
03331 else
03332 Warning("not enough space to create volume: [%d], size: [%" PRId64 "]",
03333 volume_number, (int64_t)(to_create >> (20 - STORE_BLOCK_SHIFT)));
03334
03335 Note("edit the %s file and restart traffic_server", config_file);
03336 delete[]sp;
03337 return -1;
03338 }
03339 }
03340 i = (i + 1) % gndisks;
03341 }
03342 cp->vol_number = volume_number;
03343 cp->scheme = scheme;
03344 curr_vol = i;
03345 for (i = 0; i < gndisks; i++) {
03346 if (sp[i] > 0) {
03347 while (sp[i] > 0) {
03348 DiskVolBlock *p = gdisks[i]->create_volume(volume_number, sp[i], scheme);
03349 ink_assert(p && (p->len >= (unsigned int) blocks_per_vol));
03350 sp[i] -= p->len;
03351 cp->num_vols++;
03352 cp->size += p->len;
03353 }
03354 if (!cp->disk_vols[i])
03355 cp->disk_vols[i] = gdisks[i]->get_diskvol(volume_number);
03356 }
03357 }
03358 delete[]sp;
03359 return 0;
03360 }
03361
03362 void
03363 rebuild_host_table(Cache *cache)
03364 {
03365 build_vol_hash_table(&cache->hosttable->gen_host_rec);
03366 if (cache->hosttable->m_numEntries != 0) {
03367 CacheHostMatcher *hm = cache->hosttable->getHostMatcher();
03368 CacheHostRecord *h_rec = hm->getDataArray();
03369 int h_rec_len = hm->getNumElements();
03370 int i;
03371 for (i = 0; i < h_rec_len; i++) {
03372 build_vol_hash_table(&h_rec[i]);
03373 }
03374 }
03375 }
03376
03377
03378 Vol *
03379 Cache::key_to_vol(CacheKey *key, char const* hostname, int host_len)
03380 {
03381 uint32_t h = (key->slice32(2) >> DIR_TAG_WIDTH) % VOL_HASH_TABLE_SIZE;
03382 unsigned short *hash_table = hosttable->gen_host_rec.vol_hash_table;
03383 CacheHostRecord *host_rec = &hosttable->gen_host_rec;
03384
03385 if (hosttable->m_numEntries > 0 && host_len) {
03386 CacheHostResult res;
03387 hosttable->Match(hostname, host_len, &res);
03388 if (res.record) {
03389 unsigned short *host_hash_table = res.record->vol_hash_table;
03390 if (host_hash_table) {
03391 if (is_debug_tag_set("cache_hosting")) {
03392 char format_str[50];
03393 snprintf(format_str, sizeof(format_str), "Volume: %%xd for host: %%.%ds", host_len);
03394 Debug("cache_hosting", format_str, res.record, hostname);
03395 }
03396 return res.record->vols[host_hash_table[h]];
03397 }
03398 }
03399 }
03400 if (hash_table) {
03401 if (is_debug_tag_set("cache_hosting")) {
03402 char format_str[50];
03403 snprintf(format_str, sizeof(format_str), "Generic volume: %%xd for host: %%.%ds", host_len);
03404 Debug("cache_hosting", format_str, host_rec, hostname);
03405 }
03406 return host_rec->vols[hash_table[h]];
03407 } else
03408 return host_rec->vols[0];
03409 }
03410
03411 static void reg_int(const char *str, int stat, RecRawStatBlock *rsb, const char *prefix, RecRawStatSyncCb sync_cb=RecRawStatSyncSum) {
03412 char stat_str[256];
03413 snprintf(stat_str, sizeof(stat_str), "%s.%s", prefix, str);
03414 RecRegisterRawStat(rsb, RECT_PROCESS, stat_str, RECD_INT, RECP_NON_PERSISTENT, stat, sync_cb);
03415 DOCACHE_CLEAR_DYN_STAT(stat)
03416 }
03417 #define REG_INT(_str, _stat) reg_int(_str, (int)_stat, rsb, prefix)
03418
03419
03420 void
03421 register_cache_stats(RecRawStatBlock *rsb, const char *prefix)
03422 {
03423
03424 reg_int("bytes_used", cache_bytes_used_stat, rsb, prefix, cache_stats_bytes_used_cb);
03425
03426 REG_INT("bytes_total", cache_bytes_total_stat);
03427 REG_INT("ram_cache.total_bytes", cache_ram_cache_bytes_total_stat);
03428 REG_INT("ram_cache.bytes_used", cache_ram_cache_bytes_stat);
03429 REG_INT("ram_cache.hits", cache_ram_cache_hits_stat);
03430 REG_INT("ram_cache.misses", cache_ram_cache_misses_stat);
03431 REG_INT("pread_count", cache_pread_count_stat);
03432 REG_INT("percent_full", cache_percent_full_stat);
03433 REG_INT("lookup.active", cache_lookup_active_stat);
03434 REG_INT("lookup.success", cache_lookup_success_stat);
03435 REG_INT("lookup.failure", cache_lookup_failure_stat);
03436 REG_INT("read.active", cache_read_active_stat);
03437 REG_INT("read.success", cache_read_success_stat);
03438 REG_INT("read.failure", cache_read_failure_stat);
03439 #if TS_USE_INTERIM_CACHE == 1
03440 REG_INT("interim.read.success", cache_interim_read_success_stat);
03441 REG_INT("disk.read.success", cache_disk_read_success_stat);
03442 REG_INT("ram.read.success", cache_ram_read_success_stat);
03443 #endif
03444 REG_INT("write.active", cache_write_active_stat);
03445 REG_INT("write.success", cache_write_success_stat);
03446 REG_INT("write.failure", cache_write_failure_stat);
03447 REG_INT("write.backlog.failure", cache_write_backlog_failure_stat);
03448 REG_INT("update.active", cache_update_active_stat);
03449 REG_INT("update.success", cache_update_success_stat);
03450 REG_INT("update.failure", cache_update_failure_stat);
03451 REG_INT("remove.active", cache_remove_active_stat);
03452 REG_INT("remove.success", cache_remove_success_stat);
03453 REG_INT("remove.failure", cache_remove_failure_stat);
03454 REG_INT("evacuate.active", cache_evacuate_active_stat);
03455 REG_INT("evacuate.success", cache_evacuate_success_stat);
03456 REG_INT("evacuate.failure", cache_evacuate_failure_stat);
03457 REG_INT("scan.active", cache_scan_active_stat);
03458 REG_INT("scan.success", cache_scan_success_stat);
03459 REG_INT("scan.failure", cache_scan_failure_stat);
03460 REG_INT("direntries.total", cache_direntries_total_stat);
03461 REG_INT("direntries.used", cache_direntries_used_stat);
03462 REG_INT("directory_collision", cache_directory_collision_count_stat);
03463 REG_INT("frags_per_doc.1", cache_single_fragment_document_count_stat);
03464 REG_INT("frags_per_doc.2", cache_two_fragment_document_count_stat);
03465 REG_INT("frags_per_doc.3+", cache_three_plus_plus_fragment_document_count_stat);
03466 REG_INT("read_busy.success", cache_read_busy_success_stat);
03467 REG_INT("read_busy.failure", cache_read_busy_failure_stat);
03468 REG_INT("write_bytes_stat", cache_write_bytes_stat);
03469 REG_INT("vector_marshals", cache_hdr_vector_marshal_stat);
03470 REG_INT("hdr_marshals", cache_hdr_marshal_stat);
03471 REG_INT("hdr_marshal_bytes", cache_hdr_marshal_bytes_stat);
03472 REG_INT("gc_bytes_evacuated", cache_gc_bytes_evacuated_stat);
03473 REG_INT("gc_frags_evacuated", cache_gc_frags_evacuated_stat);
03474 }
03475
03476
03477 void
03478 ink_cache_init(ModuleVersion v)
03479 {
03480 ink_release_assert(!checkModuleVersion(v, CACHE_MODULE_VERSION));
03481
03482 cache_rsb = RecAllocateRawStatBlock((int) cache_stat_count);
03483
03484 REC_EstablishStaticConfigInteger(cache_config_ram_cache_size, "proxy.config.cache.ram_cache.size");
03485 Debug("cache_init", "proxy.config.cache.ram_cache.size = %" PRId64 " = %" PRId64 "Mb",
03486 cache_config_ram_cache_size, cache_config_ram_cache_size / (1024 * 1024));
03487
03488 REC_EstablishStaticConfigInt32(cache_config_ram_cache_algorithm, "proxy.config.cache.ram_cache.algorithm");
03489 REC_EstablishStaticConfigInt32(cache_config_ram_cache_compress, "proxy.config.cache.ram_cache.compress");
03490 REC_EstablishStaticConfigInt32(cache_config_ram_cache_compress_percent, "proxy.config.cache.ram_cache.compress_percent");
03491 REC_EstablishStaticConfigInt32(cache_config_ram_cache_use_seen_filter, "proxy.config.cache.ram_cache.use_seen_filter");
03492
03493 REC_EstablishStaticConfigInt32(cache_config_http_max_alts, "proxy.config.cache.limits.http.max_alts");
03494 Debug("cache_init", "proxy.config.cache.limits.http.max_alts = %d", cache_config_http_max_alts);
03495
03496 REC_EstablishStaticConfigInteger(cache_config_ram_cache_cutoff, "proxy.config.cache.ram_cache_cutoff");
03497 Debug("cache_init", "cache_config_ram_cache_cutoff = %" PRId64 " = %" PRId64 "Mb",
03498 cache_config_ram_cache_cutoff, cache_config_ram_cache_cutoff / (1024 * 1024));
03499
03500 REC_EstablishStaticConfigInt32(cache_config_permit_pinning, "proxy.config.cache.permit.pinning");
03501 Debug("cache_init", "proxy.config.cache.permit.pinning = %d", cache_config_permit_pinning);
03502
03503 REC_EstablishStaticConfigInt32(cache_config_dir_sync_frequency, "proxy.config.cache.dir.sync_frequency");
03504 Debug("cache_init", "proxy.config.cache.dir.sync_frequency = %d", cache_config_dir_sync_frequency);
03505
03506 REC_EstablishStaticConfigInt32(cache_config_vary_on_user_agent, "proxy.config.cache.vary_on_user_agent");
03507 Debug("cache_init", "proxy.config.cache.vary_on_user_agent = %d", cache_config_vary_on_user_agent);
03508
03509 REC_EstablishStaticConfigInt32(cache_config_select_alternate, "proxy.config.cache.select_alternate");
03510 Debug("cache_init", "proxy.config.cache.select_alternate = %d", cache_config_select_alternate);
03511
03512 REC_EstablishStaticConfigInt32(cache_config_max_doc_size, "proxy.config.cache.max_doc_size");
03513 Debug("cache_init", "proxy.config.cache.max_doc_size = %d = %dMb",
03514 cache_config_max_doc_size, cache_config_max_doc_size / (1024 * 1024));
03515
03516 REC_EstablishStaticConfigInt32(cache_config_mutex_retry_delay, "proxy.config.cache.mutex_retry_delay");
03517 Debug("cache_init", "proxy.config.cache.mutex_retry_delay = %dms", cache_config_mutex_retry_delay);
03518
03519 REC_EstablishStaticConfigInt32(cache_config_hit_evacuate_percent, "proxy.config.cache.hit_evacuate_percent");
03520 Debug("cache_init", "proxy.config.cache.hit_evacuate_percent = %d", cache_config_hit_evacuate_percent);
03521
03522 REC_EstablishStaticConfigInt32(cache_config_hit_evacuate_size_limit, "proxy.config.cache.hit_evacuate_size_limit");
03523 Debug("cache_init", "proxy.config.cache.hit_evacuate_size_limit = %d", cache_config_hit_evacuate_size_limit);
03524
03525 REC_EstablishStaticConfigInt32(cache_config_force_sector_size, "proxy.config.cache.force_sector_size");
03526 REC_EstablishStaticConfigInt32(cache_config_target_fragment_size, "proxy.config.cache.target_fragment_size");
03527
03528 if (cache_config_target_fragment_size == 0)
03529 cache_config_target_fragment_size = DEFAULT_TARGET_FRAGMENT_SIZE;
03530
03531 #ifdef HTTP_CACHE
03532 REC_EstablishStaticConfigInt32(enable_cache_empty_http_doc, "proxy.config.http.cache.allow_empty_doc");
03533
03534 REC_EstablishStaticConfigInt32(cache_config_compatibility_4_2_0_fixup, "proxy.config.cache.http.compatibility.4-2-0-fixup");
03535 #endif
03536
03537 #if TS_USE_INTERIM_CACHE == 1
03538 REC_EstablishStaticConfigInt32(migrate_threshold, "proxy.config.cache.interim.migrate_threshold");
03539 Debug("cache_init", "proxy.config.cache.migrate_threshold = %d", migrate_threshold);
03540 #endif
03541
03542 REC_EstablishStaticConfigInt32(cache_config_max_disk_errors, "proxy.config.cache.max_disk_errors");
03543 Debug("cache_init", "proxy.config.cache.max_disk_errors = %d", cache_config_max_disk_errors);
03544
03545 REC_EstablishStaticConfigInt32(cache_config_agg_write_backlog, "proxy.config.cache.agg_write_backlog");
03546 Debug("cache_init", "proxy.config.cache.agg_write_backlog = %d", cache_config_agg_write_backlog);
03547
03548 REC_EstablishStaticConfigInt32(cache_config_enable_checksum, "proxy.config.cache.enable_checksum");
03549 Debug("cache_init", "proxy.config.cache.enable_checksum = %d", cache_config_enable_checksum);
03550
03551 REC_EstablishStaticConfigInt32(cache_config_alt_rewrite_max_size, "proxy.config.cache.alt_rewrite_max_size");
03552 Debug("cache_init", "proxy.config.cache.alt_rewrite_max_size = %d", cache_config_alt_rewrite_max_size);
03553
03554 REC_EstablishStaticConfigInt32(cache_config_read_while_writer, "proxy.config.cache.enable_read_while_writer");
03555 cache_config_read_while_writer = validate_rww(cache_config_read_while_writer);
03556 REC_RegisterConfigUpdateFunc("proxy.config.cache.enable_read_while_writer", update_cache_config, NULL);
03557 Debug("cache_init", "proxy.config.cache.enable_read_while_writer = %d", cache_config_read_while_writer);
03558
03559 register_cache_stats(cache_rsb, "proxy.process.cache");
03560
03561 const char *err = NULL;
03562 if ((err = theCacheStore.read_config())) {
03563 printf("%s failed\n", err);
03564 exit(1);
03565 }
03566
03567 if (theCacheStore.n_disks == 0) {
03568 ats_scoped_str path(RecConfigReadConfigPath("proxy.config.cache.storage_filename", "storage.config"));
03569 Warning("no cache disks specified in %s: cache disabled\n", (const char *)path);
03570
03571 }
03572 #if TS_USE_INTERIM_CACHE == 1
03573 else {
03574 theCacheStore.read_interim_config();
03575 if (theCacheStore.n_interim_disks == 0)
03576 Warning("no interim disks specified in %s: \n", "proxy.config.cache.interim.storage");
03577 }
03578 #endif
03579 }
03580
03581
03582 Action *
03583 CacheProcessor::open_read(Continuation *cont, URL *url, bool cluster_cache_local, CacheHTTPHdr *request,
03584 CacheLookupHttpConfig *params, time_t pin_in_cache, CacheFragType type)
03585 {
03586 #ifdef CLUSTER_CACHE
03587 if (cache_clustering_enabled > 0 && !cluster_cache_local) {
03588 return open_read_internal(CACHE_OPEN_READ_LONG, cont, (MIOBuffer *) 0,
03589 url, request, params, (CacheKey *) 0, pin_in_cache, type, (char *) 0, 0);
03590 }
03591 #endif
03592 return caches[type]->open_read(cont, url, request, params, type);
03593 }
03594
03595
03596
03597 Action *
03598 CacheProcessor::open_write(Continuation *cont, int expected_size, URL *url, bool cluster_cache_local,
03599 CacheHTTPHdr *request, CacheHTTPInfo *old_info, time_t pin_in_cache, CacheFragType type)
03600 {
03601 #ifdef CLUSTER_CACHE
03602 if (cache_clustering_enabled > 0 && !cluster_cache_local) {
03603 INK_MD5 url_md5;
03604 Cache::generate_key(&url_md5, url);
03605 ClusterMachine *m = cluster_machine_at_depth(cache_hash(url_md5));
03606
03607 if (m) {
03608
03609 INK_MD5 url_only_md5;
03610 Cache::generate_key(&url_only_md5, url);
03611 return Cluster_write(cont, expected_size, (MIOBuffer *) 0, m,
03612 &url_only_md5, type,
03613 false, pin_in_cache, CACHE_OPEN_WRITE_LONG,
03614 (CacheKey *) 0, url, request, old_info, (char *) 0, 0);
03615 }
03616 }
03617 #endif
03618 return caches[type]->open_write(cont, url, request, old_info, pin_in_cache, type);
03619 }
03620
03621
03622
03623
03624 Action *
03625 CacheProcessor::remove(Continuation *cont, URL *url, bool cluster_cache_local, CacheFragType frag_type)
03626 {
03627 CryptoHash id;
03628 int len = 0;
03629 const char *hostname;
03630
03631 url->hash_get(&id);
03632 hostname = url->host_get(&len);
03633
03634 Debug("cache_remove", "[CacheProcessor::remove] Issuing cache delete for %s", url->string_get_ref());
03635 #ifdef CLUSTER_CACHE
03636 if (cache_clustering_enabled > 0 && !cluster_cache_local) {
03637
03638 return remove(cont, &id, cluster_cache_local, frag_type, true, false, const_cast<char *>(hostname), len);
03639 }
03640 #endif
03641
03642
03643 return caches[frag_type]->remove(cont, &id, frag_type, true, false, const_cast<char*>(hostname), len);
03644 }
03645
03646 CacheDisk*
03647 CacheProcessor::find_by_path(char const* path, int len)
03648 {
03649 if (CACHE_INITIALIZED == initialized) {
03650
03651 if (0 >= len && 0 != *path) len = strlen(path);
03652
03653 for ( int i = 0 ; i < gndisks ; ++i ) {
03654 if (0 == strncmp(path, gdisks[i]->path, len))
03655 return gdisks[i];
03656 }
03657 }
03658
03659 return 0;
03660 }
03661
03662
03663
03664 namespace cache_bc {
03665 static size_t const HTTP_ALT_MARSHAL_SIZE = ROUND(sizeof(HTTPCacheAlt), HDR_PTR_SIZE);
03666 size_t
03667 HTTPInfo_v21::marshalled_length(void* data)
03668 {
03669 size_t zret = ROUND(sizeof(HTTPCacheAlt_v21), HDR_PTR_SIZE);
03670 HTTPCacheAlt_v21* alt = static_cast<HTTPCacheAlt_v21*>(data);
03671 HdrHeap* hdr;
03672
03673 hdr = reinterpret_cast<HdrHeap*>(reinterpret_cast<char*>(alt) + reinterpret_cast<uintptr_t>(alt->m_request_hdr.m_heap));
03674 zret += ROUND(hdr->unmarshal_size(), HDR_PTR_SIZE);
03675 hdr = reinterpret_cast<HdrHeap*>(reinterpret_cast<char*>(alt) + reinterpret_cast<uintptr_t>(alt->m_response_hdr.m_heap));
03676 zret += ROUND(hdr->unmarshal_size(), HDR_PTR_SIZE);
03677 return zret;
03678 }
03679
03680
03681
03682
03683
03684 bool
03685 HTTPInfo_v21::copy_and_upgrade_unmarshalled_to_v23(
03686 char*& dst, char*& src, size_t& length, int n_frags, FragOffset* frag_offsets
03687 )
03688 {
03689
03690 static const size_t OLD_OFFSET = offsetof(HTTPCacheAlt_v21, m_ext_buffer);
03691 static const size_t NEW_OFFSET = offsetof(HTTPCacheAlt_v23, m_ext_buffer);
03692
03693 HTTPCacheAlt_v21* s_alt = reinterpret_cast<HTTPCacheAlt_v21*>(src);
03694 HTTPCacheAlt_v23* d_alt = reinterpret_cast<HTTPCacheAlt_v23*>(dst);
03695 HdrHeap_v23* s_hdr;
03696 HdrHeap_v23* d_hdr;
03697 size_t hdr_size;
03698
03699 if (length < HTTP_ALT_MARSHAL_SIZE) return false;
03700
03701 memcpy(dst, src, OLD_OFFSET);
03702
03703 memcpy( static_cast<char*>(dst) + NEW_OFFSET
03704 , static_cast<char*>(src) + OLD_OFFSET
03705 , sizeof(HTTPCacheAlt_v21) - OLD_OFFSET
03706 );
03707 dst += HTTP_ALT_MARSHAL_SIZE;
03708 length -= HTTP_ALT_MARSHAL_SIZE;
03709
03710
03711 if (n_frags) {
03712 static size_t const IFT_SIZE = HTTPCacheAlt_v23::N_INTEGRAL_FRAG_OFFSETS * sizeof(FragOffset);
03713 size_t ift_actual = min(n_frags, HTTPCacheAlt_v23::N_INTEGRAL_FRAG_OFFSETS) * sizeof(FragOffset);
03714
03715 if (length < (HTTP_ALT_MARSHAL_SIZE + n_frags * sizeof(FragOffset) - IFT_SIZE))
03716 return false;
03717
03718 d_alt->m_frag_offset_count = n_frags;
03719 d_alt->m_frag_offsets = reinterpret_cast<FragOffset*>(dst - reinterpret_cast<char*>(d_alt));
03720
03721 memcpy(d_alt->m_integral_frag_offsets, frag_offsets, ift_actual);
03722 n_frags -= HTTPCacheAlt_v23::N_INTEGRAL_FRAG_OFFSETS;
03723 if (n_frags > 0) {
03724 size_t k = sizeof(FragOffset) * n_frags;
03725 memcpy(dst, frag_offsets + IFT_SIZE, k);
03726 dst += k;
03727 length -= k;
03728 } else if (n_frags < 0) {
03729 memset(dst + ift_actual, 0, IFT_SIZE - ift_actual);
03730 }
03731 } else {
03732 d_alt->m_frag_offset_count = 0;
03733 d_alt->m_frag_offsets = 0;
03734 ink_zero(d_alt->m_integral_frag_offsets);
03735 }
03736
03737
03738 s_hdr = reinterpret_cast<HdrHeap_v23*>(reinterpret_cast<char*>(s_alt) + reinterpret_cast<uintptr_t>(s_alt->m_request_hdr.m_heap));
03739 d_hdr = reinterpret_cast<HdrHeap_v23*>(dst);
03740 hdr_size = ROUND(s_hdr->unmarshal_size(), HDR_PTR_SIZE);
03741 if (hdr_size > length) return false;
03742 memcpy(d_hdr, s_hdr, hdr_size);
03743 d_alt->m_request_hdr.m_heap = reinterpret_cast<HdrHeap_v23*>(reinterpret_cast<char*>(d_hdr) - reinterpret_cast<char*>(d_alt));
03744 dst += hdr_size;
03745 length -= hdr_size;
03746
03747 s_hdr = reinterpret_cast<HdrHeap_v23*>(reinterpret_cast<char*>(s_alt) + reinterpret_cast<uintptr_t>(s_alt->m_response_hdr.m_heap));
03748 d_hdr = reinterpret_cast<HdrHeap_v23*>(dst);
03749 hdr_size = ROUND(s_hdr->unmarshal_size(), HDR_PTR_SIZE);
03750 if (hdr_size > length) return false;
03751 memcpy(d_hdr, s_hdr, hdr_size);
03752 d_alt->m_response_hdr.m_heap = reinterpret_cast<HdrHeap_v23*>(reinterpret_cast<char*>(d_hdr) - reinterpret_cast<char*>(d_alt));
03753 dst += hdr_size;
03754 length -= hdr_size;
03755
03756 src = reinterpret_cast<char*>(s_hdr) + hdr_size;
03757
03758 return true;
03759 }
03760
03761 }