00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #ifndef _P_CACHE_VOL_H__
00026 #define _P_CACHE_VOL_H__
00027
00028 #define CACHE_BLOCK_SHIFT 9
00029 #define CACHE_BLOCK_SIZE (1<<CACHE_BLOCK_SHIFT) // 512, smallest sector size
00030 #define ROUND_TO_STORE_BLOCK(_x) INK_ALIGN((_x), STORE_BLOCK_SIZE)
00031 #define ROUND_TO_CACHE_BLOCK(_x) INK_ALIGN((_x), CACHE_BLOCK_SIZE)
00032 #define ROUND_TO_SECTOR(_p, _x) INK_ALIGN((_x), _p->sector_size)
00033 #define ROUND_TO(_x, _y) INK_ALIGN((_x), (_y))
00034
00035
00036 #define VOL_MAGIC 0xF1D0F00D
00037 #define START_BLOCKS 16 // 8k, STORE_BLOCK_SIZE
00038 #define START_POS ((off_t)START_BLOCKS * CACHE_BLOCK_SIZE)
00039 #define AGG_SIZE (4 * 1024 * 1024) // 4MB
00040 #define AGG_HIGH_WATER (AGG_SIZE / 2) // 2MB
00041 #define EVACUATION_SIZE (2 * AGG_SIZE) // 8MB
00042 #define MAX_VOL_SIZE ((off_t)512 * 1024 * 1024 * 1024 * 1024)
00043 #define STORE_BLOCKS_PER_CACHE_BLOCK (STORE_BLOCK_SIZE / CACHE_BLOCK_SIZE)
00044 #define MAX_VOL_BLOCKS (MAX_VOL_SIZE / CACHE_BLOCK_SIZE)
00045 #define MAX_FRAG_SIZE (AGG_SIZE - sizeofDoc) // true max
00046 #define LEAVE_FREE DEFAULT_MAX_BUFFER_SIZE
00047 #define PIN_SCAN_EVERY 16 // scan every 1/16 of disk
00048 #define VOL_HASH_TABLE_SIZE 32707
00049 #define VOL_HASH_EMPTY 0xFFFF
00050 #define VOL_HASH_ALLOC_SIZE (8 * 1024 * 1024) // one chance per this unit
00051 #define LOOKASIDE_SIZE 256
00052 #define EVACUATION_BUCKET_SIZE (2 * EVACUATION_SIZE) // 16MB
00053 #define RECOVERY_SIZE EVACUATION_SIZE // 8MB
00054 #define AIO_NOT_IN_PROGRESS 0
00055 #define AIO_AGG_WRITE_IN_PROGRESS -1
00056 #define AUTO_SIZE_RAM_CACHE -1 // 1-1 with directory size
00057 #define DEFAULT_TARGET_FRAGMENT_SIZE (1048576 - sizeofDoc) // 1MB
00058
00059
00060 #define dir_offset_evac_bucket(_o) \
00061 (_o / (EVACUATION_BUCKET_SIZE / CACHE_BLOCK_SIZE))
00062 #define dir_evac_bucket(_e) dir_offset_evac_bucket(dir_offset(_e))
00063 #define offset_evac_bucket(_d, _o) \
00064 dir_offset_evac_bucket((offset_to_vol_offset(_d, _o)
00065
00066
00067
00068 #define DOC_MAGIC ((uint32_t)0x5F129B13)
00069 #define DOC_CORRUPT ((uint32_t)0xDEADBABE)
00070 #define DOC_NO_CHECKSUM ((uint32_t)0xA0B0C0D0)
00071
00072 #define sizeofDoc (((uint32_t)(uintptr_t)&((Doc*)0)->checksum)+(uint32_t)sizeof(uint32_t))
00073
00074 #if TS_USE_INTERIM_CACHE == 1
00075 struct InterimVolHeaderFooter
00076 {
00077 unsigned int magic;
00078 VersionNumber version;
00079 time_t create_time;
00080 off_t write_pos;
00081 off_t last_write_pos;
00082 off_t agg_pos;
00083 uint32_t generation;
00084 uint32_t phase;
00085 uint32_t cycle;
00086 uint32_t sync_serial;
00087 uint32_t write_serial;
00088 uint32_t dirty;
00089 uint32_t sector_size;
00090 int32_t unused;
00091 };
00092 #endif
00093
00094 struct Cache;
00095 struct Vol;
00096 struct CacheDisk;
00097 struct VolInitInfo;
00098 struct DiskVol;
00099 struct CacheVol;
00100
00101 struct VolHeaderFooter
00102 {
00103 unsigned int magic;
00104 VersionNumber version;
00105 time_t create_time;
00106 off_t write_pos;
00107 off_t last_write_pos;
00108 off_t agg_pos;
00109 uint32_t generation;
00110 uint32_t phase;
00111 uint32_t cycle;
00112 uint32_t sync_serial;
00113 uint32_t write_serial;
00114 uint32_t dirty;
00115 uint32_t sector_size;
00116 uint32_t unused;
00117 #if TS_USE_INTERIM_CACHE == 1
00118 InterimVolHeaderFooter interim_header[8];
00119 #endif
00120 uint16_t freelist[1];
00121 };
00122
00123
00124 struct EvacuationKey
00125 {
00126 SLink<EvacuationKey> link;
00127 CryptoHash key;
00128 CryptoHash earliest_key;
00129 };
00130
00131 struct EvacuationBlock
00132 {
00133 union
00134 {
00135 unsigned int init;
00136 struct
00137 {
00138 unsigned int done:1;
00139 unsigned int pinned:1;
00140 unsigned int evacuate_head:1;
00141 unsigned int unused:29;
00142 } f;
00143 };
00144
00145 int readers;
00146 Dir dir;
00147 Dir new_dir;
00148
00149 EvacuationKey evac_frags;
00150 CacheVC *earliest_evacuator;
00151 LINK(EvacuationBlock, link);
00152 };
00153
00154 #if TS_USE_INTERIM_CACHE == 1
00155 #define MIGRATE_BUCKETS 1021
00156 extern int migrate_threshold;
00157 extern int good_interim_disks;
00158
00159
00160 union AccessEntry {
00161 uintptr_t v[2];
00162 struct {
00163 uint32_t next;
00164 uint32_t prev;
00165 uint32_t index;
00166 uint16_t tag;
00167 int16_t count;
00168 } item;
00169 };
00170
00171 struct AccessHistory {
00172 AccessEntry *base;
00173 int size;
00174
00175 uint32_t *hash;
00176 int hash_size;
00177
00178 AccessEntry *freelist;
00179
00180 void freeEntry(AccessEntry *entry) {
00181 entry->v[0] = (uintptr_t) freelist;
00182 entry->v[1] = 0xABCD1234U;
00183 freelist = entry;
00184 }
00185
00186 void init(int size, int hash_size) {
00187 this->size = size;
00188 this->hash_size = hash_size;
00189 freelist = NULL;
00190
00191 base = (AccessEntry *) malloc(sizeof(AccessEntry) * size);
00192 hash = (uint32_t *) malloc (sizeof(uint32_t) * hash_size);
00193
00194 memset(hash, 0, sizeof(uint32_t) * hash_size);
00195
00196 base[0].item.next = base[0].item.prev = 0;
00197 base[0].v[1] = 0xABCD1234UL;
00198 for (int i = size; --i > 0;)
00199 freeEntry(&base[i]);
00200
00201 return;
00202 }
00203
00204 void remove(AccessEntry *entry) {
00205 if (entry == &(base[base[0].item.prev])) {
00206 base[0].item.prev = entry->item.next;
00207 } else {
00208 base[entry->item.prev].item.next = entry->item.next;
00209 }
00210 if (entry == &(base[base[0].item.next])) {
00211 base[0].item.next = entry->item.prev;
00212 } else {
00213 base[entry->item.next].item.prev = entry->item.prev;
00214 }
00215 uint32_t hash_index = (uint32_t) (entry->item.index % hash_size);
00216 hash[hash_index] = 0;
00217 }
00218
00219 void enqueue(AccessEntry *entry) {
00220 uint32_t hash_index = (uint32_t) (entry->item.index % hash_size);
00221 hash[hash_index] = entry - base;
00222
00223 entry->item.prev = 0;
00224 entry->item.next = base[0].item.prev;
00225 base[base[0].item.prev].item.prev = entry - base;
00226 base[0].item.prev = entry - base;
00227 if (base[0].item.next == 0)
00228 base[0].item.next = entry - base;
00229 }
00230
00231 AccessEntry* dequeue() {
00232 AccessEntry *tail = &base[base[0].item.next];
00233 if (tail != base)
00234 remove(tail);
00235
00236 return tail;
00237 }
00238
00239 void set_in_progress(CryptoHash *key) {
00240 uint32_t key_index = key->slice32(3);
00241 uint16_t tag = static_cast<uint16_t>(key->slice32(1));
00242 unsigned int hash_index = (uint32_t) (key_index % hash_size);
00243
00244 uint32_t index = hash[hash_index];
00245 AccessEntry *entry = &base[index];
00246 if (index != 0 && entry->item.tag == tag && entry->item.index == key_index) {
00247 entry->item.count |= 0x8000;
00248 }
00249 }
00250
00251 void set_not_in_progress(CryptoHash *key) {
00252 uint32_t key_index = key->slice32(3);
00253 uint16_t tag = static_cast<uint16_t>(key->slice32(1));
00254 unsigned int hash_index = (uint32_t) (key_index % hash_size);
00255
00256 uint32_t index = hash[hash_index];
00257 AccessEntry *entry = &base[index];
00258 if (index != 0 && entry->item.tag == tag && entry->item.index == key_index) {
00259 entry->item.count &= 0x7FFF;
00260 }
00261 }
00262
00263 void put_key(CryptoHash *key) {
00264 uint32_t key_index = key->slice32(3);
00265 uint16_t tag = static_cast<uint16_t>(key->slice32(1));
00266 unsigned int hash_index = (uint32_t) (key_index % hash_size);
00267
00268 uint32_t index = hash[hash_index];
00269 AccessEntry *entry = &base[index];
00270 if (index != 0 && entry->item.tag == tag && entry->item.index == key_index) {
00271 remove(entry);
00272 enqueue(entry);
00273 ++entry->item.count;
00274 } else {
00275 if (index == 0) {
00276 if (!freelist) {
00277 entry = dequeue();
00278 if (entry == base) {
00279 return;
00280 }
00281 } else {
00282 entry = freelist;
00283 freelist = (AccessEntry *) entry->v[0];
00284 }
00285 } else {
00286 remove(entry);
00287 }
00288 entry->item.index = key_index;
00289 entry->item.tag = tag;
00290 entry->item.count = 1;
00291 enqueue(entry);
00292 }
00293 }
00294
00295 bool remove_key(CryptoHash *key) {
00296 unsigned int hash_index = static_cast<uint32_t>(key->slice32(3) % hash_size);
00297 uint32_t index = hash[hash_index];
00298 AccessEntry *entry = &base[index];
00299 if (index != 0 && entry->item.tag == static_cast<uint16_t>(key->slice32(1)) && entry->item.index == key->slice32(3)) {
00300 remove(entry);
00301 freeEntry(entry);
00302 return true;
00303 }
00304 return false;
00305 }
00306
00307 bool is_hot(CryptoHash *key) {
00308 uint32_t key_index = key->slice32(3);
00309 uint16_t tag = (uint16_t) key->slice32(1);
00310 unsigned int hash_index = (uint32_t) (key_index % hash_size);
00311
00312 uint32_t index = hash[hash_index];
00313 AccessEntry *entry = &base[index];
00314
00315 return (index != 0 && entry->item.tag == tag && entry->item.index == key_index
00316 && entry->item.count >= migrate_threshold);
00317 }
00318 };
00319
00320 struct InterimCacheVol;
00321
00322 struct MigrateToInterimCache
00323 {
00324 MigrateToInterimCache() { }
00325 Ptr<IOBufferData> buf;
00326 uint32_t agg_len;
00327 CacheKey key;
00328 Dir dir;
00329 InterimCacheVol *interim_vol;
00330 CacheVC *vc;
00331 bool notMigrate;
00332 bool rewrite;
00333 bool copy;
00334 LINK(MigrateToInterimCache, link);
00335 LINK(MigrateToInterimCache, hash_link);
00336 };
00337
00338 struct InterimCacheVol: public Continuation
00339 {
00340 ats_scoped_str hash_text;
00341 InterimVolHeaderFooter *header;
00342
00343 off_t recover_pos;
00344 off_t prev_recover_pos;
00345 uint32_t last_sync_serial;
00346 uint32_t last_write_serial;
00347 bool recover_wrapped;
00348
00349 off_t scan_pos;
00350 off_t skip;
00351 off_t start;
00352 off_t len;
00353 off_t data_blocks;
00354 char *agg_buffer;
00355 int agg_todo_size;
00356 int agg_buf_pos;
00357 uint32_t sector_size;
00358 int fd;
00359 CacheDisk *disk;
00360 Vol *vol;
00361 AIOCallbackInternal io;
00362 Queue<MigrateToInterimCache, MigrateToInterimCache::Link_link> agg;
00363 int64_t transistor_range_threshold;
00364 bool sync;
00365 bool is_io_in_progress() {
00366 return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS;
00367 }
00368
00369 int recover_data();
00370 int handle_recover_from_data(int event, void *data);
00371
00372 void set_io_not_in_progress() {
00373 io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS;
00374 }
00375
00376 int aggWrite(int event, void *e);
00377 int aggWriteDone(int event, void *e);
00378 uint32_t round_to_approx_size (uint32_t l) {
00379 uint32_t ll = round_to_approx_dir_size(l);
00380 return INK_ALIGN(ll, disk->hw_sector_size);
00381 }
00382
00383 void init(off_t s, off_t l, CacheDisk *interim, Vol *v, InterimVolHeaderFooter *hptr) {
00384 char* seed_str = interim->hash_base_string ? interim->hash_base_string : interim->path;
00385 const size_t hash_seed_size = strlen(seed_str);
00386 const size_t hash_text_size = hash_seed_size + 32;
00387
00388 hash_text = static_cast<char *>(ats_malloc(hash_text_size));
00389 snprintf(hash_text, hash_text_size, "%s %" PRIu64 ":%" PRIu64 "", seed_str, s, l);
00390
00391 skip = start = s;
00392 len = l;
00393 disk = interim;
00394 fd = disk->fd;
00395 vol = v;
00396 transistor_range_threshold = len / 5;
00397 sync = false;
00398
00399 header = hptr;
00400
00401 agg_todo_size = 0;
00402 agg_buf_pos = 0;
00403
00404 agg_buffer = (char *) ats_memalign(sysconf(_SC_PAGESIZE), AGG_SIZE);
00405 memset(agg_buffer, 0, AGG_SIZE);
00406 this->mutex = ((Continuation *)vol)->mutex;
00407 }
00408 };
00409
00410
00411 void dir_clean_bucket(Dir *b, int s, InterimCacheVol *d);
00412 void dir_clean_segment(int s, InterimCacheVol *d);
00413 void dir_clean_interimvol(InterimCacheVol *d);
00414
00415 #endif
00416
00417 struct Vol: public Continuation
00418 {
00419 char *path;
00420 ats_scoped_str hash_text;
00421 CryptoHash hash_id;
00422 int fd;
00423
00424 char *raw_dir;
00425 Dir *dir;
00426 VolHeaderFooter *header;
00427 VolHeaderFooter *footer;
00428 int segments;
00429 off_t buckets;
00430 off_t recover_pos;
00431 off_t prev_recover_pos;
00432 off_t scan_pos;
00433 off_t skip;
00434 off_t start;
00435 off_t len;
00436 off_t data_blocks;
00437 int hit_evacuate_window;
00438 AIOCallbackInternal io;
00439
00440 Queue<CacheVC, Continuation::Link_link> agg;
00441 Queue<CacheVC, Continuation::Link_link> stat_cache_vcs;
00442 Queue<CacheVC, Continuation::Link_link> sync;
00443 char *agg_buffer;
00444 int agg_todo_size;
00445 int agg_buf_pos;
00446
00447 Event *trigger;
00448
00449 OpenDir open_dir;
00450 RamCache *ram_cache;
00451 int evacuate_size;
00452 DLL<EvacuationBlock> *evacuate;
00453 DLL<EvacuationBlock> lookaside[LOOKASIDE_SIZE];
00454 CacheVC *doc_evacuator;
00455
00456 VolInitInfo *init_info;
00457
00458 CacheDisk *disk;
00459 Cache *cache;
00460 CacheVol *cache_vol;
00461 uint32_t last_sync_serial;
00462 uint32_t last_write_serial;
00463 uint32_t sector_size;
00464 bool recover_wrapped;
00465 bool dir_sync_waiting;
00466 bool dir_sync_in_progress;
00467 bool writing_end_marker;
00468
00469 CacheKey first_fragment_key;
00470 int64_t first_fragment_offset;
00471 Ptr<IOBufferData> first_fragment_data;
00472
00473 #if TS_USE_INTERIM_CACHE == 1
00474 int num_interim_vols;
00475 InterimCacheVol interim_vols[8];
00476 AccessHistory history;
00477 uint32_t interim_index;
00478 Queue<MigrateToInterimCache, MigrateToInterimCache::Link_hash_link> mig_hash[MIGRATE_BUCKETS];
00479 volatile int interim_done;
00480
00481
00482 bool migrate_probe(CacheKey *key, MigrateToInterimCache **result) {
00483 uint32_t indx = key->slice32(3) % MIGRATE_BUCKETS;
00484 MigrateToInterimCache *m = mig_hash[indx].head;
00485 while (m != NULL && !(m->key == *key)) {
00486 m = mig_hash[indx].next(m);
00487 }
00488 if (result != NULL)
00489 *result = m;
00490 return m != NULL;
00491 }
00492
00493 void set_migrate_in_progress(MigrateToInterimCache *m) {
00494 uint32_t indx = m->key.slice32(3) % MIGRATE_BUCKETS;
00495 mig_hash[indx].enqueue(m);
00496 }
00497
00498 void set_migrate_failed(MigrateToInterimCache *m) {
00499 uint32_t indx = m->key.slice32(3) % MIGRATE_BUCKETS;
00500 mig_hash[indx].remove(m);
00501 }
00502
00503 void set_migrate_done(MigrateToInterimCache *m) {
00504 uint32_t indx = m->key.slice32(3) % MIGRATE_BUCKETS;
00505 mig_hash[indx].remove(m);
00506 history.remove_key(&m->key);
00507 }
00508 #endif
00509
00510 void cancel_trigger();
00511
00512 int recover_data();
00513
00514 int open_write(CacheVC *cont, int allow_if_writers, int max_writers);
00515 int open_write_lock(CacheVC *cont, int allow_if_writers, int max_writers);
00516 int close_write(CacheVC *cont);
00517 int close_write_lock(CacheVC *cont);
00518 int begin_read(CacheVC *cont);
00519 int begin_read_lock(CacheVC *cont);
00520
00521
00522 OpenDirEntry *open_read(CryptoHash *key);
00523 OpenDirEntry *open_read_lock(CryptoHash *key, EThread *t);
00524 int close_read(CacheVC *cont);
00525 int close_read_lock(CacheVC *cont);
00526
00527 int clear_dir();
00528
00529 int init(char *s, off_t blocks, off_t dir_skip, bool clear);
00530
00531 int handle_dir_clear(int event, void *data);
00532 int handle_dir_read(int event, void *data);
00533 int handle_recover_from_data(int event, void *data);
00534 int handle_recover_write_dir(int event, void *data);
00535 int handle_header_read(int event, void *data);
00536
00537 #if TS_USE_INTERIM_CACHE == 1
00538 int recover_interim_vol();
00539 #endif
00540
00541 int dir_init_done(int event, void *data);
00542
00543 int dir_check(bool fix);
00544 int db_check(bool fix);
00545
00546 int is_io_in_progress()
00547 {
00548 return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS;
00549 }
00550 int increment_generation()
00551 {
00552
00553 ink_assert(mutex->thread_holding == this_ethread());
00554 header->generation++;
00555 if (!header->generation)
00556 header->generation++;
00557 return header->generation;
00558 }
00559 void set_io_not_in_progress()
00560 {
00561 io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS;
00562 }
00563
00564 int aggWriteDone(int event, Event *e);
00565 int aggWrite(int event, void *e);
00566 void agg_wrap();
00567
00568 int evacuateWrite(CacheVC *evacuator, int event, Event *e);
00569 int evacuateDocReadDone(int event, Event *e);
00570 int evacuateDoc(int event, Event *e);
00571
00572 int evac_range(off_t start, off_t end, int evac_phase);
00573 void periodic_scan();
00574 void scan_for_pinned_documents();
00575 void evacuate_cleanup_blocks(int i);
00576 void evacuate_cleanup();
00577 EvacuationBlock *force_evacuate_head(Dir *dir, int pinned);
00578 int within_hit_evacuate_window(Dir *dir);
00579 uint32_t round_to_approx_size(uint32_t l);
00580
00581 Vol()
00582 : Continuation(new_ProxyMutex()), path(NULL), fd(-1),
00583 dir(0), buckets(0), recover_pos(0), prev_recover_pos(0), scan_pos(0), skip(0), start(0),
00584 len(0), data_blocks(0), hit_evacuate_window(0), agg_todo_size(0), agg_buf_pos(0), trigger(0),
00585 evacuate_size(0), disk(NULL), last_sync_serial(0), last_write_serial(0), recover_wrapped(false),
00586 dir_sync_waiting(0), dir_sync_in_progress(0), writing_end_marker(0) {
00587 open_dir.mutex = mutex;
00588 agg_buffer = (char *)ats_memalign(ats_pagesize(), AGG_SIZE);
00589 memset(agg_buffer, 0, AGG_SIZE);
00590 SET_HANDLER(&Vol::aggWrite);
00591 }
00592
00593 ~Vol() {
00594 ats_memalign_free(agg_buffer);
00595 }
00596 };
00597
00598 struct AIO_Callback_handler: public Continuation
00599 {
00600 int handle_disk_failure(int event, void *data);
00601
00602 AIO_Callback_handler():Continuation(new_ProxyMutex()) {
00603 SET_HANDLER(&AIO_Callback_handler::handle_disk_failure);
00604 }
00605 };
00606
00607 struct CacheVol
00608 {
00609 int vol_number;
00610 int scheme;
00611 off_t size;
00612 int num_vols;
00613 Vol **vols;
00614 DiskVol **disk_vols;
00615 LINK(CacheVol, link);
00616
00617 RecRawStatBlock *vol_rsb;
00618
00619 CacheVol()
00620 : vol_number(-1), scheme(0), size(0), num_vols(0), vols(NULL), disk_vols(0), vol_rsb(0)
00621 { }
00622 };
00623
00624
00625
00626 struct Doc
00627 {
00628 uint32_t magic;
00629 uint32_t len;
00630 uint64_t total_len;
00631 CryptoHash first_key;
00632 CryptoHash key;
00633 uint32_t hlen;
00634 uint32_t doc_type:8;
00635 uint32_t v_major:8;
00636 uint32_t v_minor:8;
00637 uint32_t unused:8;
00638 uint32_t sync_serial;
00639 uint32_t write_serial;
00640 uint32_t pinned;
00641 uint32_t checksum;
00642
00643 uint32_t data_len();
00644 uint32_t prefix_len();
00645 int single_fragment();
00646 int no_data_in_fragment();
00647 char *hdr();
00648 char *data();
00649 };
00650
00651
00652
00653 extern Vol **gvol;
00654 extern volatile int gnvol;
00655 extern ClassAllocator<OpenDirEntry> openDirEntryAllocator;
00656 extern ClassAllocator<EvacuationBlock> evacuationBlockAllocator;
00657 extern ClassAllocator<EvacuationKey> evacuationKeyAllocator;
00658 extern unsigned short *vol_hash_table;
00659
00660
00661
00662 TS_INLINE int
00663 vol_headerlen(Vol *d) {
00664 return ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter) + sizeof(uint16_t) * (d->segments-1));
00665 }
00666
00667 TS_INLINE size_t
00668 vol_dirlen(Vol *d)
00669 {
00670 return vol_headerlen(d) +
00671 ROUND_TO_STORE_BLOCK(((size_t)d->buckets) * DIR_DEPTH * d->segments * SIZEOF_DIR) +
00672 ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter));
00673 }
00674
00675 TS_INLINE int
00676 vol_direntries(Vol *d)
00677 {
00678 return d->buckets * DIR_DEPTH * d->segments;
00679 }
00680
00681 #if TS_USE_INTERIM_CACHE == 1
00682 #define vol_out_of_phase_valid(d, e) \
00683 (dir_offset(e) - 1 >= ((d->header->agg_pos - d->start) / CACHE_BLOCK_SIZE))
00684
00685 #define vol_out_of_phase_agg_valid(d, e) \
00686 (dir_offset(e) - 1 >= ((d->header->agg_pos - d->start + AGG_SIZE) / CACHE_BLOCK_SIZE))
00687
00688 #define vol_out_of_phase_write_valid(d, e) \
00689 (dir_offset(e) - 1 >= ((d->header->agg_pos - d->start + AGG_SIZE) / CACHE_BLOCK_SIZE))
00690
00691 #define vol_in_phase_valid(d, e) \
00692 (dir_offset(e) - 1 < ((d->header->write_pos + d->agg_buf_pos - d->start) / CACHE_BLOCK_SIZE))
00693
00694 #define vol_offset_to_offset(d, pos) \
00695 (d->start + pos * CACHE_BLOCK_SIZE - CACHE_BLOCK_SIZE)
00696
00697 #define vol_dir_segment(d, s) \
00698 (Dir *) (((char *) d->dir) + (s * d->buckets) * DIR_DEPTH * SIZEOF_DIR)
00699
00700 #define offset_to_vol_offset(d, pos) \
00701 ((pos - d->start + CACHE_BLOCK_SIZE) / CACHE_BLOCK_SIZE)
00702
00703 #define vol_offset(d, e) \
00704 ((d)->start + (off_t) ((off_t)dir_offset(e) * CACHE_BLOCK_SIZE) - CACHE_BLOCK_SIZE)
00705
00706 #define vol_in_phase_agg_buf_valid(d, e) \
00707 ((vol_offset(d, e) >= d->header->write_pos) && vol_offset(d, e) < (d->header->write_pos + d->agg_buf_pos))
00708
00709 #define vol_transistor_range_valid(d, e) \
00710 ((d->header->agg_pos + d->transistor_range_threshold < d->start + d->len) ? \
00711 (vol_out_of_phase_write_valid(d, e) && \
00712 (dir_offset(e) <= ((d->header->agg_pos - d->start + d->transistor_range_threshold) / CACHE_BLOCK_SIZE))) : \
00713 ((dir_offset(e) <= ((d->header->agg_pos - d->start + d->transistor_range_threshold - d->len) / CACHE_BLOCK_SIZE)) || \
00714 (dir_offset(e) > ((d->header->agg_pos - d->start) / CACHE_BLOCK_SIZE))))
00715
00716
00717 #else
00718 TS_INLINE int
00719 vol_out_of_phase_valid(Vol *d, Dir *e)
00720 {
00721 return (dir_offset(e) - 1 >= ((d->header->agg_pos - d->start) / CACHE_BLOCK_SIZE));
00722 }
00723
00724 TS_INLINE int
00725 vol_out_of_phase_agg_valid(Vol *d, Dir *e)
00726 {
00727 return (dir_offset(e) - 1 >= ((d->header->agg_pos - d->start + AGG_SIZE) / CACHE_BLOCK_SIZE));
00728 }
00729
00730 TS_INLINE int
00731 vol_out_of_phase_write_valid(Vol *d, Dir *e)
00732 {
00733 return (dir_offset(e) - 1 >= ((d->header->write_pos - d->start) / CACHE_BLOCK_SIZE));
00734 }
00735
00736 TS_INLINE int
00737 vol_in_phase_valid(Vol *d, Dir *e)
00738 {
00739 return (dir_offset(e) - 1 < ((d->header->write_pos + d->agg_buf_pos - d->start) / CACHE_BLOCK_SIZE));
00740 }
00741
00742 TS_INLINE off_t
00743 vol_offset(Vol *d, Dir *e)
00744 {
00745 return d->start + (off_t) dir_offset(e) * CACHE_BLOCK_SIZE - CACHE_BLOCK_SIZE;
00746 }
00747
00748 TS_INLINE off_t
00749 offset_to_vol_offset(Vol *d, off_t pos)
00750 {
00751 return ((pos - d->start + CACHE_BLOCK_SIZE) / CACHE_BLOCK_SIZE);
00752 }
00753
00754 TS_INLINE off_t
00755 vol_offset_to_offset(Vol *d, off_t pos)
00756 {
00757 return d->start + pos * CACHE_BLOCK_SIZE - CACHE_BLOCK_SIZE;
00758 }
00759
00760 TS_INLINE Dir *
00761 vol_dir_segment(Vol *d, int s)
00762 {
00763 return (Dir *) (((char *) d->dir) + (s * d->buckets) * DIR_DEPTH * SIZEOF_DIR);
00764 }
00765
00766 TS_INLINE int
00767 vol_in_phase_agg_buf_valid(Vol *d, Dir *e)
00768 {
00769 return (vol_offset(d, e) >= d->header->write_pos && vol_offset(d, e) < (d->header->write_pos + d->agg_buf_pos));
00770 }
00771 #endif
00772
00773 TS_INLINE off_t
00774 vol_relative_length(Vol *v, off_t start_offset)
00775 {
00776 return (v->len + v->skip) - start_offset;
00777 }
00778
00779 TS_INLINE uint32_t
00780 Doc::prefix_len()
00781 {
00782 return sizeofDoc + hlen;
00783 }
00784
00785 TS_INLINE uint32_t
00786 Doc::data_len()
00787 {
00788 return len - sizeofDoc - hlen;
00789 }
00790
00791 TS_INLINE int
00792 Doc::single_fragment()
00793 {
00794 return data_len() == total_len;
00795 }
00796
00797 TS_INLINE char *
00798 Doc::hdr()
00799 {
00800 return reinterpret_cast<char*>(this) + sizeofDoc;
00801 }
00802
00803 TS_INLINE char *
00804 Doc::data()
00805 {
00806 return this->hdr() + hlen;
00807 }
00808
00809 int vol_dir_clear(Vol *d);
00810 int vol_init(Vol *d, char *s, off_t blocks, off_t skip, bool clear);
00811
00812
00813
00814 TS_INLINE EvacuationBlock *
00815 evacuation_block_exists(Dir *dir, Vol *p)
00816 {
00817 EvacuationBlock *b = p->evacuate[dir_evac_bucket(dir)].head;
00818 for (; b; b = b->link.next)
00819 if (dir_offset(&b->dir) == dir_offset(dir))
00820 return b;
00821 return 0;
00822 }
00823
00824 TS_INLINE void
00825 Vol::cancel_trigger()
00826 {
00827 if (trigger) {
00828 trigger->cancel_action();
00829 trigger = NULL;
00830 }
00831 }
00832
00833 TS_INLINE EvacuationBlock *
00834 new_EvacuationBlock(EThread *t)
00835 {
00836 EvacuationBlock *b = THREAD_ALLOC(evacuationBlockAllocator, t);
00837 b->init = 0;
00838 b->readers = 0;
00839 b->earliest_evacuator = 0;
00840 b->evac_frags.link.next = 0;
00841 return b;
00842 }
00843
00844 TS_INLINE void
00845 free_EvacuationBlock(EvacuationBlock *b, EThread *t)
00846 {
00847 EvacuationKey *e = b->evac_frags.link.next;
00848 while (e) {
00849 EvacuationKey *n = e->link.next;
00850 evacuationKeyAllocator.free(e);
00851 e = n;
00852 }
00853 THREAD_FREE(b, evacuationBlockAllocator, t);
00854 }
00855
00856 TS_INLINE OpenDirEntry *
00857 Vol::open_read(CryptoHash *key)
00858 {
00859 return open_dir.open_read(key);
00860 }
00861
00862 TS_INLINE int
00863 Vol::within_hit_evacuate_window(Dir *xdir)
00864 {
00865 off_t oft = dir_offset(xdir) - 1;
00866 off_t write_off = (header->write_pos + AGG_SIZE - start) / CACHE_BLOCK_SIZE;
00867 off_t delta = oft - write_off;
00868 if (delta >= 0)
00869 return delta < hit_evacuate_window;
00870 else
00871 return -delta > (data_blocks - hit_evacuate_window) && -delta < data_blocks;
00872 }
00873
00874 TS_INLINE uint32_t
00875 Vol::round_to_approx_size(uint32_t l) {
00876 uint32_t ll = round_to_approx_dir_size(l);
00877 return ROUND_TO_SECTOR(this, ll);
00878 }
00879
00880 #if TS_USE_INTERIM_CACHE == 1
00881 inline bool
00882 dir_valid(Vol *_d, Dir *_e) {
00883 if (!dir_ininterim(_e))
00884 return _d->header->phase == dir_phase(_e) ? vol_in_phase_valid(_d, _e) :
00885 vol_out_of_phase_valid(_d, _e);
00886 else {
00887 int idx = dir_get_index(_e);
00888 if (good_interim_disks <= 0 || idx >= _d->num_interim_vols) return false;
00889 InterimCacheVol *sv = &(_d->interim_vols[idx]);
00890 return !DISK_BAD(sv->disk) ? (sv->header->phase == dir_phase(_e) ? vol_in_phase_valid(sv, _e) :
00891 vol_out_of_phase_valid(sv, _e)) : false;
00892 }
00893 }
00894
00895 inline bool
00896 dir_valid(InterimCacheVol *_d, Dir *_e) {
00897 if (!dir_ininterim(_e))
00898 return true;
00899 InterimCacheVol *sv = &(_d->vol->interim_vols[dir_get_index(_e)]);
00900 if (_d != sv)
00901 return true;
00902 return !DISK_BAD(sv->disk) ? (sv->header->phase == dir_phase(_e) ? vol_in_phase_valid(sv, _e) :
00903 vol_out_of_phase_valid(sv, _e)) : false;
00904
00905 }
00906
00907 inline bool
00908 dir_agg_valid(Vol *_d, Dir *_e) {
00909 if (!dir_ininterim(_e))
00910 return _d->header->phase == dir_phase(_e) ? vol_in_phase_valid(_d, _e) :
00911 vol_out_of_phase_agg_valid(_d, _e);
00912 else {
00913 int idx = dir_get_index(_e);
00914 if(good_interim_disks <= 0 || idx >= _d->num_interim_vols) return false;
00915 InterimCacheVol *sv = &(_d->interim_vols[idx]);
00916 return sv->header->phase == dir_phase(_e) ? vol_in_phase_valid(sv, _e) :
00917 vol_out_of_phase_agg_valid(sv, _e);
00918 }
00919 }
00920 inline bool
00921 dir_write_valid(Vol *_d, Dir *_e) {
00922 if (!dir_ininterim(_e))
00923 return _d->header->phase == dir_phase(_e) ? vol_in_phase_valid(_d, _e) :
00924 vol_out_of_phase_write_valid(_d, _e);
00925 else {
00926 InterimCacheVol *sv = &(_d->interim_vols[dir_get_index(_e)]);
00927 return sv->header->phase == dir_phase(_e) ? vol_in_phase_valid(sv, _e) :
00928 vol_out_of_phase_write_valid(sv, _e);
00929 }
00930 }
00931 inline bool
00932 dir_agg_buf_valid(Vol *_d, Dir *_e) {
00933 if (!dir_ininterim(_e))
00934 return _d->header->phase == dir_phase(_e) && vol_in_phase_agg_buf_valid(_d, _e);
00935 else {
00936 InterimCacheVol *sv = &(_d->interim_vols[dir_get_index(_e)]);
00937 return sv->header->phase == dir_phase(_e) && vol_in_phase_agg_buf_valid(sv, _e);
00938 }
00939 }
00940
00941 inline bool
00942 dir_agg_buf_valid(InterimCacheVol *_d, Dir *_e) {
00943 return _d->header->phase == dir_phase(_e) && vol_in_phase_agg_buf_valid(_d, _e);
00944 }
00945
00946 #endif // TS_USE_INTERIM_CACHE
00947 #endif