00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 #ifndef _P_CACHE_VOL_H__
00026 #define _P_CACHE_VOL_H__
00027 
00028 #define CACHE_BLOCK_SHIFT               9
00029 #define CACHE_BLOCK_SIZE                (1<<CACHE_BLOCK_SHIFT) // 512, smallest sector size
00030 #define ROUND_TO_STORE_BLOCK(_x)        INK_ALIGN((_x), STORE_BLOCK_SIZE)
00031 #define ROUND_TO_CACHE_BLOCK(_x)        INK_ALIGN((_x), CACHE_BLOCK_SIZE)
00032 #define ROUND_TO_SECTOR(_p, _x)         INK_ALIGN((_x), _p->sector_size)
00033 #define ROUND_TO(_x, _y)                INK_ALIGN((_x), (_y))
00034 
00035 
00036 #define VOL_MAGIC                      0xF1D0F00D
00037 #define START_BLOCKS                    16      // 8k, STORE_BLOCK_SIZE
00038 #define START_POS                       ((off_t)START_BLOCKS * CACHE_BLOCK_SIZE)
00039 #define AGG_SIZE                        (4 * 1024 * 1024) // 4MB
00040 #define AGG_HIGH_WATER                  (AGG_SIZE / 2) // 2MB
00041 #define EVACUATION_SIZE                 (2 * AGG_SIZE)  // 8MB
00042 #define MAX_VOL_SIZE                   ((off_t)512 * 1024 * 1024 * 1024 * 1024)
00043 #define STORE_BLOCKS_PER_CACHE_BLOCK    (STORE_BLOCK_SIZE / CACHE_BLOCK_SIZE)
00044 #define MAX_VOL_BLOCKS                 (MAX_VOL_SIZE / CACHE_BLOCK_SIZE)
00045 #define MAX_FRAG_SIZE                   (AGG_SIZE - sizeofDoc) // true max
00046 #define LEAVE_FREE                      DEFAULT_MAX_BUFFER_SIZE
00047 #define PIN_SCAN_EVERY                  16      // scan every 1/16 of disk
00048 #define VOL_HASH_TABLE_SIZE             32707
00049 #define VOL_HASH_EMPTY                 0xFFFF
00050 #define VOL_HASH_ALLOC_SIZE             (8 * 1024 * 1024)  // one chance per this unit
00051 #define LOOKASIDE_SIZE                  256
00052 #define EVACUATION_BUCKET_SIZE          (2 * EVACUATION_SIZE) // 16MB
00053 #define RECOVERY_SIZE                   EVACUATION_SIZE // 8MB
00054 #define AIO_NOT_IN_PROGRESS             0
00055 #define AIO_AGG_WRITE_IN_PROGRESS       -1
00056 #define AUTO_SIZE_RAM_CACHE             -1      // 1-1 with directory size
00057 #define DEFAULT_TARGET_FRAGMENT_SIZE    (1048576 - sizeofDoc) // 1MB
00058 
00059 
00060 #define dir_offset_evac_bucket(_o) \
00061   (_o / (EVACUATION_BUCKET_SIZE / CACHE_BLOCK_SIZE))
00062 #define dir_evac_bucket(_e) dir_offset_evac_bucket(dir_offset(_e))
00063 #define offset_evac_bucket(_d, _o) \
00064   dir_offset_evac_bucket((offset_to_vol_offset(_d, _o)
00065 
00066 
00067 
00068 #define DOC_MAGIC                       ((uint32_t)0x5F129B13)
00069 #define DOC_CORRUPT                     ((uint32_t)0xDEADBABE)
00070 #define DOC_NO_CHECKSUM                 ((uint32_t)0xA0B0C0D0)
00071 
00072 #define sizeofDoc (((uint32_t)(uintptr_t)&((Doc*)0)->checksum)+(uint32_t)sizeof(uint32_t))
00073 
00074 #if TS_USE_INTERIM_CACHE == 1
00075 struct InterimVolHeaderFooter
00076 {
00077   unsigned int magic;
00078   VersionNumber version;
00079   time_t create_time;
00080   off_t write_pos;
00081   off_t last_write_pos;
00082   off_t agg_pos;
00083   uint32_t generation;            
00084   uint32_t phase;
00085   uint32_t cycle;
00086   uint32_t sync_serial;
00087   uint32_t write_serial;
00088   uint32_t dirty;
00089   uint32_t sector_size;
00090   int32_t unused;                
00091 };
00092 #endif
00093 
00094 struct Cache;
00095 struct Vol;
00096 struct CacheDisk;
00097 struct VolInitInfo;
00098 struct DiskVol;
00099 struct CacheVol;
00100 
00101 struct VolHeaderFooter
00102 {
00103   unsigned int magic;
00104   VersionNumber version;
00105   time_t create_time;
00106   off_t write_pos;
00107   off_t last_write_pos;
00108   off_t agg_pos;
00109   uint32_t generation;            
00110   uint32_t phase;
00111   uint32_t cycle;
00112   uint32_t sync_serial;
00113   uint32_t write_serial;
00114   uint32_t dirty;
00115   uint32_t sector_size;
00116   uint32_t unused;                
00117 #if TS_USE_INTERIM_CACHE == 1
00118   InterimVolHeaderFooter interim_header[8];
00119 #endif
00120   uint16_t freelist[1];
00121 };
00122 
00123 
00124 struct EvacuationKey
00125 {
00126   SLink<EvacuationKey> link;
00127   CryptoHash key;
00128   CryptoHash earliest_key;
00129 };
00130 
00131 struct EvacuationBlock
00132 {
00133   union
00134   {
00135     unsigned int init;
00136     struct
00137     {
00138       unsigned int done:1;              
00139       unsigned int pinned:1;            
00140       unsigned int evacuate_head:1;     
00141       unsigned int unused:29;
00142     } f;
00143   };
00144 
00145   int readers;
00146   Dir dir;
00147   Dir new_dir;
00148   
00149   EvacuationKey evac_frags;
00150   CacheVC *earliest_evacuator;
00151   LINK(EvacuationBlock, link);
00152 };
00153 
00154 #if TS_USE_INTERIM_CACHE == 1
00155 #define MIGRATE_BUCKETS                 1021
00156 extern int migrate_threshold;
00157 extern int good_interim_disks;
00158 
00159 
00160 union AccessEntry {
00161   uintptr_t v[2];
00162   struct {
00163     uint32_t  next;
00164     uint32_t  prev;
00165     uint32_t  index;
00166     uint16_t  tag;
00167     int16_t  count;
00168   } item;
00169 };
00170 
00171 struct AccessHistory {
00172   AccessEntry *base;
00173   int size; 
00174 
00175   uint32_t *hash;
00176   int hash_size; 
00177 
00178   AccessEntry *freelist;
00179 
00180   void freeEntry(AccessEntry *entry) {
00181     entry->v[0] = (uintptr_t) freelist;
00182     entry->v[1] = 0xABCD1234U;
00183     freelist = entry;
00184   }
00185 
00186   void init(int size, int hash_size) {
00187     this->size = size;
00188     this->hash_size = hash_size;
00189     freelist = NULL;
00190 
00191     base = (AccessEntry *) malloc(sizeof(AccessEntry) * size);
00192     hash = (uint32_t *) malloc (sizeof(uint32_t) * hash_size);
00193 
00194     memset(hash, 0, sizeof(uint32_t) * hash_size);
00195 
00196     base[0].item.next = base[0].item.prev = 0;
00197     base[0].v[1] = 0xABCD1234UL;
00198     for (int i = size; --i > 0;)
00199      freeEntry(&base[i]);
00200 
00201     return;
00202   }
00203 
00204   void remove(AccessEntry *entry) {
00205     if (entry == &(base[base[0].item.prev])) { 
00206       base[0].item.prev = entry->item.next;
00207     } else {
00208       base[entry->item.prev].item.next = entry->item.next;
00209     }
00210     if (entry == &(base[base[0].item.next])) { 
00211       base[0].item.next = entry->item.prev;
00212     } else {
00213       base[entry->item.next].item.prev = entry->item.prev;
00214     }
00215     uint32_t hash_index = (uint32_t) (entry->item.index % hash_size);
00216     hash[hash_index] = 0;
00217   }
00218 
00219   void enqueue(AccessEntry *entry) {
00220     uint32_t hash_index = (uint32_t) (entry->item.index % hash_size);
00221     hash[hash_index] = entry - base;
00222 
00223     entry->item.prev = 0;
00224     entry->item.next = base[0].item.prev;
00225     base[base[0].item.prev].item.prev = entry - base;
00226     base[0].item.prev = entry - base;
00227     if (base[0].item.next == 0)
00228       base[0].item.next = entry - base;
00229   }
00230 
00231   AccessEntry* dequeue() {
00232     AccessEntry *tail = &base[base[0].item.next];
00233     if (tail != base)
00234       remove(tail);
00235 
00236     return tail;
00237   }
00238 
00239   void set_in_progress(CryptoHash *key) {
00240     uint32_t key_index = key->slice32(3);
00241     uint16_t tag = static_cast<uint16_t>(key->slice32(1));
00242     unsigned int hash_index = (uint32_t) (key_index % hash_size);
00243 
00244     uint32_t index = hash[hash_index];
00245     AccessEntry *entry = &base[index];
00246     if (index != 0 && entry->item.tag == tag && entry->item.index == key_index) {
00247       entry->item.count |= 0x8000;
00248     }
00249   }
00250 
00251   void set_not_in_progress(CryptoHash *key) {
00252     uint32_t key_index = key->slice32(3);
00253     uint16_t tag = static_cast<uint16_t>(key->slice32(1));
00254     unsigned int hash_index = (uint32_t) (key_index % hash_size);
00255 
00256     uint32_t index = hash[hash_index];
00257     AccessEntry *entry = &base[index];
00258     if (index != 0 && entry->item.tag == tag && entry->item.index == key_index) {
00259       entry->item.count &= 0x7FFF;
00260     }
00261   }
00262 
00263   void put_key(CryptoHash *key) {
00264     uint32_t key_index = key->slice32(3);
00265     uint16_t tag = static_cast<uint16_t>(key->slice32(1));
00266     unsigned int hash_index = (uint32_t) (key_index % hash_size);
00267 
00268     uint32_t index = hash[hash_index];
00269     AccessEntry *entry = &base[index];
00270     if (index != 0 && entry->item.tag == tag && entry->item.index == key_index) { 
00271       remove(entry);
00272       enqueue(entry);
00273       ++entry->item.count;
00274     } else {
00275       if (index == 0) { 
00276         if (!freelist) {
00277           entry = dequeue();
00278           if (entry == base) {
00279             return;
00280           }
00281         } else {
00282           entry = freelist;
00283           freelist = (AccessEntry *) entry->v[0];
00284         }
00285       } else { 
00286         remove(entry);
00287       }
00288       entry->item.index = key_index;
00289       entry->item.tag = tag;
00290       entry->item.count = 1;
00291       enqueue(entry);
00292     }
00293   }
00294 
00295   bool remove_key(CryptoHash *key) {
00296     unsigned int hash_index = static_cast<uint32_t>(key->slice32(3) % hash_size);
00297     uint32_t index = hash[hash_index];
00298     AccessEntry *entry = &base[index];
00299     if (index != 0 && entry->item.tag == static_cast<uint16_t>(key->slice32(1)) && entry->item.index == key->slice32(3)) {
00300       remove(entry);
00301       freeEntry(entry);
00302       return true;
00303     }
00304     return false;
00305   }
00306 
00307   bool is_hot(CryptoHash *key) {
00308     uint32_t key_index = key->slice32(3);
00309     uint16_t tag = (uint16_t) key->slice32(1);
00310     unsigned int hash_index = (uint32_t) (key_index % hash_size);
00311 
00312     uint32_t index = hash[hash_index];
00313     AccessEntry *entry = &base[index];
00314 
00315     return (index != 0 && entry->item.tag == tag && entry->item.index == key_index
00316         && entry->item.count >= migrate_threshold);
00317   }
00318 };
00319 
00320 struct InterimCacheVol;
00321 
00322 struct MigrateToInterimCache
00323 {
00324   MigrateToInterimCache() { }
00325   Ptr<IOBufferData> buf;
00326   uint32_t agg_len;
00327   CacheKey  key;
00328   Dir dir;
00329   InterimCacheVol *interim_vol;
00330   CacheVC *vc;
00331   bool notMigrate;
00332   bool rewrite;
00333   bool copy;
00334   LINK(MigrateToInterimCache, link);
00335   LINK(MigrateToInterimCache, hash_link);
00336 };
00337 
00338 struct InterimCacheVol: public Continuation
00339 {
00340   ats_scoped_str hash_text;
00341   InterimVolHeaderFooter *header;
00342 
00343   off_t recover_pos;
00344   off_t prev_recover_pos;
00345   uint32_t last_sync_serial;
00346   uint32_t last_write_serial;
00347   bool recover_wrapped;
00348 
00349   off_t scan_pos;
00350   off_t skip; 
00351   off_t start; 
00352   off_t len;
00353   off_t data_blocks;
00354   char *agg_buffer;
00355   int agg_todo_size;
00356   int agg_buf_pos;
00357   uint32_t sector_size;
00358   int fd;
00359   CacheDisk *disk;
00360   Vol *vol; 
00361   AIOCallbackInternal io;
00362   Queue<MigrateToInterimCache, MigrateToInterimCache::Link_link> agg;
00363   int64_t transistor_range_threshold;
00364   bool sync;
00365   bool is_io_in_progress() {
00366     return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS;
00367   }
00368 
00369   int recover_data();
00370   int handle_recover_from_data(int event, void *data);
00371 
00372   void set_io_not_in_progress() {
00373     io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS;
00374   }
00375 
00376   int aggWrite(int event, void *e);
00377   int aggWriteDone(int event, void *e);
00378   uint32_t round_to_approx_size (uint32_t l) {
00379     uint32_t ll = round_to_approx_dir_size(l);
00380     return INK_ALIGN(ll, disk->hw_sector_size);
00381   }
00382 
00383   void init(off_t s, off_t l, CacheDisk *interim, Vol *v, InterimVolHeaderFooter *hptr) {
00384     char* seed_str = interim->hash_base_string ? interim->hash_base_string : interim->path;
00385     const size_t hash_seed_size = strlen(seed_str);
00386     const size_t hash_text_size = hash_seed_size + 32;
00387 
00388     hash_text = static_cast<char *>(ats_malloc(hash_text_size));
00389     snprintf(hash_text, hash_text_size, "%s %" PRIu64 ":%" PRIu64 "", seed_str, s, l);
00390 
00391     skip = start = s;
00392     len = l;
00393     disk = interim;
00394     fd = disk->fd;
00395     vol = v;
00396     transistor_range_threshold = len / 5; 
00397     sync = false;
00398 
00399     header = hptr;
00400 
00401     agg_todo_size = 0;
00402     agg_buf_pos = 0;
00403 
00404     agg_buffer = (char *) ats_memalign(sysconf(_SC_PAGESIZE), AGG_SIZE);
00405     memset(agg_buffer, 0, AGG_SIZE);
00406     this->mutex = ((Continuation *)vol)->mutex;
00407   }
00408 };
00409 
00410 
00411 void dir_clean_bucket(Dir *b, int s, InterimCacheVol *d);
00412 void dir_clean_segment(int s, InterimCacheVol *d);
00413 void dir_clean_interimvol(InterimCacheVol *d);
00414 
00415 #endif
00416 
00417 struct Vol: public Continuation
00418 {
00419   char *path;
00420   ats_scoped_str hash_text;
00421   CryptoHash hash_id;
00422   int fd;
00423 
00424   char *raw_dir;
00425   Dir *dir;
00426   VolHeaderFooter *header;
00427   VolHeaderFooter *footer;
00428   int segments;
00429   off_t buckets;
00430   off_t recover_pos;
00431   off_t prev_recover_pos;
00432   off_t scan_pos;
00433   off_t skip;               
00434   off_t start;              
00435   off_t len;
00436   off_t data_blocks;
00437   int hit_evacuate_window;
00438   AIOCallbackInternal io;
00439 
00440   Queue<CacheVC, Continuation::Link_link> agg;
00441   Queue<CacheVC, Continuation::Link_link> stat_cache_vcs;
00442   Queue<CacheVC, Continuation::Link_link> sync;
00443   char *agg_buffer;
00444   int agg_todo_size;
00445   int agg_buf_pos;
00446 
00447   Event *trigger;
00448 
00449   OpenDir open_dir;
00450   RamCache *ram_cache;
00451   int evacuate_size;
00452   DLL<EvacuationBlock> *evacuate;
00453   DLL<EvacuationBlock> lookaside[LOOKASIDE_SIZE];
00454   CacheVC *doc_evacuator;
00455 
00456   VolInitInfo *init_info;
00457 
00458   CacheDisk *disk;
00459   Cache *cache;
00460   CacheVol *cache_vol;
00461   uint32_t last_sync_serial;
00462   uint32_t last_write_serial;
00463   uint32_t sector_size;
00464   bool recover_wrapped;
00465   bool dir_sync_waiting;
00466   bool dir_sync_in_progress;
00467   bool writing_end_marker;
00468 
00469   CacheKey first_fragment_key;
00470   int64_t first_fragment_offset;
00471   Ptr<IOBufferData> first_fragment_data;
00472 
00473 #if TS_USE_INTERIM_CACHE == 1
00474   int num_interim_vols;
00475   InterimCacheVol interim_vols[8];
00476   AccessHistory history;
00477   uint32_t interim_index;
00478   Queue<MigrateToInterimCache, MigrateToInterimCache::Link_hash_link> mig_hash[MIGRATE_BUCKETS];
00479   volatile int interim_done;
00480 
00481 
00482   bool migrate_probe(CacheKey *key, MigrateToInterimCache **result) {
00483     uint32_t indx = key->slice32(3) % MIGRATE_BUCKETS;
00484     MigrateToInterimCache *m = mig_hash[indx].head;
00485     while (m != NULL && !(m->key == *key)) {
00486       m = mig_hash[indx].next(m);
00487     }
00488     if (result != NULL)
00489       *result = m;
00490     return m != NULL;
00491   }
00492 
00493   void set_migrate_in_progress(MigrateToInterimCache *m) {
00494     uint32_t indx = m->key.slice32(3) % MIGRATE_BUCKETS;
00495     mig_hash[indx].enqueue(m);
00496   }
00497 
00498   void set_migrate_failed(MigrateToInterimCache *m) {
00499     uint32_t indx = m->key.slice32(3) % MIGRATE_BUCKETS;
00500     mig_hash[indx].remove(m);
00501   }
00502 
00503   void set_migrate_done(MigrateToInterimCache *m) {
00504     uint32_t indx = m->key.slice32(3) % MIGRATE_BUCKETS;
00505     mig_hash[indx].remove(m);
00506     history.remove_key(&m->key);
00507   }
00508 #endif
00509 
00510   void cancel_trigger();
00511 
00512   int recover_data();
00513 
00514   int open_write(CacheVC *cont, int allow_if_writers, int max_writers);
00515   int open_write_lock(CacheVC *cont, int allow_if_writers, int max_writers);
00516   int close_write(CacheVC *cont);
00517   int close_write_lock(CacheVC *cont);
00518   int begin_read(CacheVC *cont);
00519   int begin_read_lock(CacheVC *cont);
00520   
00521   
00522   OpenDirEntry *open_read(CryptoHash *key);
00523   OpenDirEntry *open_read_lock(CryptoHash *key, EThread *t);
00524   int close_read(CacheVC *cont);
00525   int close_read_lock(CacheVC *cont);
00526 
00527   int clear_dir();
00528 
00529   int init(char *s, off_t blocks, off_t dir_skip, bool clear);
00530 
00531   int handle_dir_clear(int event, void *data);
00532   int handle_dir_read(int event, void *data);
00533   int handle_recover_from_data(int event, void *data);
00534   int handle_recover_write_dir(int event, void *data);
00535   int handle_header_read(int event, void *data);
00536 
00537 #if TS_USE_INTERIM_CACHE == 1
00538   int recover_interim_vol();
00539 #endif
00540 
00541   int dir_init_done(int event, void *data);
00542 
00543   int dir_check(bool fix);
00544   int db_check(bool fix);
00545 
00546   int is_io_in_progress()
00547   {
00548     return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS;
00549   }
00550   int increment_generation()
00551   {
00552     
00553     ink_assert(mutex->thread_holding == this_ethread());
00554     header->generation++;
00555     if (!header->generation)
00556       header->generation++;
00557     return header->generation;
00558   }
00559   void set_io_not_in_progress()
00560   {
00561     io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS;
00562   }
00563   
00564   int aggWriteDone(int event, Event *e);
00565   int aggWrite(int event, void *e);
00566   void agg_wrap();
00567 
00568   int evacuateWrite(CacheVC *evacuator, int event, Event *e);
00569   int evacuateDocReadDone(int event, Event *e);
00570   int evacuateDoc(int event, Event *e);
00571 
00572   int evac_range(off_t start, off_t end, int evac_phase);
00573   void periodic_scan();
00574   void scan_for_pinned_documents();
00575   void evacuate_cleanup_blocks(int i);
00576   void evacuate_cleanup();
00577   EvacuationBlock *force_evacuate_head(Dir *dir, int pinned);
00578   int within_hit_evacuate_window(Dir *dir);
00579   uint32_t round_to_approx_size(uint32_t l);
00580 
00581   Vol()
00582     : Continuation(new_ProxyMutex()), path(NULL), fd(-1),
00583       dir(0), buckets(0), recover_pos(0), prev_recover_pos(0), scan_pos(0), skip(0), start(0),
00584       len(0), data_blocks(0), hit_evacuate_window(0), agg_todo_size(0), agg_buf_pos(0), trigger(0),
00585       evacuate_size(0), disk(NULL), last_sync_serial(0), last_write_serial(0), recover_wrapped(false),
00586       dir_sync_waiting(0), dir_sync_in_progress(0), writing_end_marker(0) {
00587     open_dir.mutex = mutex;
00588     agg_buffer = (char *)ats_memalign(ats_pagesize(), AGG_SIZE);
00589     memset(agg_buffer, 0, AGG_SIZE);
00590     SET_HANDLER(&Vol::aggWrite);
00591   }
00592 
00593   ~Vol() {
00594     ats_memalign_free(agg_buffer);
00595   }
00596 };
00597 
00598 struct AIO_Callback_handler: public Continuation
00599 {
00600   int handle_disk_failure(int event, void *data);
00601 
00602   AIO_Callback_handler():Continuation(new_ProxyMutex()) {
00603     SET_HANDLER(&AIO_Callback_handler::handle_disk_failure);
00604   }
00605 };
00606 
00607 struct CacheVol
00608 {
00609   int vol_number;
00610   int scheme;
00611   off_t size;
00612   int num_vols;
00613   Vol **vols;
00614   DiskVol **disk_vols;
00615   LINK(CacheVol, link);
00616   
00617   RecRawStatBlock *vol_rsb;
00618 
00619   CacheVol()
00620     : vol_number(-1), scheme(0), size(0), num_vols(0), vols(NULL), disk_vols(0), vol_rsb(0)
00621   { }
00622 };
00623 
00624 
00625 
00626 struct Doc
00627 {
00628   uint32_t magic;         
00629   uint32_t len;           
00630   uint64_t total_len;     
00631   CryptoHash first_key;    
00632   CryptoHash key; 
00633   uint32_t hlen; 
00634   uint32_t doc_type:8;       
00635   uint32_t v_major:8;   
00636   uint32_t v_minor:8; 
00637   uint32_t unused:8; 
00638   uint32_t sync_serial;
00639   uint32_t write_serial;
00640   uint32_t pinned;        
00641   uint32_t checksum;
00642 
00643   uint32_t data_len();
00644   uint32_t prefix_len();
00645   int single_fragment();
00646   int no_data_in_fragment();
00647   char *hdr();
00648   char *data();
00649 };
00650 
00651 
00652 
00653 extern Vol **gvol;
00654 extern volatile int gnvol;
00655 extern ClassAllocator<OpenDirEntry> openDirEntryAllocator;
00656 extern ClassAllocator<EvacuationBlock> evacuationBlockAllocator;
00657 extern ClassAllocator<EvacuationKey> evacuationKeyAllocator;
00658 extern unsigned short *vol_hash_table;
00659 
00660 
00661 
00662 TS_INLINE int
00663 vol_headerlen(Vol *d) {
00664   return ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter) + sizeof(uint16_t) * (d->segments-1));
00665 }
00666 
00667 TS_INLINE size_t
00668 vol_dirlen(Vol *d)
00669 {
00670   return vol_headerlen(d) + 
00671     ROUND_TO_STORE_BLOCK(((size_t)d->buckets) * DIR_DEPTH * d->segments * SIZEOF_DIR) +
00672     ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter));
00673 }
00674 
00675 TS_INLINE int
00676 vol_direntries(Vol *d)
00677 {
00678   return d->buckets * DIR_DEPTH * d->segments;
00679 }
00680 
00681 #if TS_USE_INTERIM_CACHE == 1
00682 #define vol_out_of_phase_valid(d, e)            \
00683     (dir_offset(e) - 1 >= ((d->header->agg_pos - d->start) / CACHE_BLOCK_SIZE))
00684 
00685 #define vol_out_of_phase_agg_valid(d, e)        \
00686     (dir_offset(e) - 1 >= ((d->header->agg_pos - d->start + AGG_SIZE) / CACHE_BLOCK_SIZE))
00687 
00688 #define vol_out_of_phase_write_valid(d, e)      \
00689     (dir_offset(e) - 1 >= ((d->header->agg_pos - d->start + AGG_SIZE) / CACHE_BLOCK_SIZE))
00690 
00691 #define vol_in_phase_valid(d, e)                \
00692     (dir_offset(e) - 1 < ((d->header->write_pos + d->agg_buf_pos - d->start) / CACHE_BLOCK_SIZE))
00693 
00694 #define vol_offset_to_offset(d, pos)            \
00695     (d->start + pos * CACHE_BLOCK_SIZE - CACHE_BLOCK_SIZE)
00696 
00697 #define vol_dir_segment(d, s)                   \
00698     (Dir *) (((char *) d->dir) + (s * d->buckets) * DIR_DEPTH * SIZEOF_DIR)
00699 
00700 #define offset_to_vol_offset(d, pos)            \
00701     ((pos - d->start + CACHE_BLOCK_SIZE) / CACHE_BLOCK_SIZE)
00702 
00703 #define vol_offset(d, e)                        \
00704     ((d)->start + (off_t) ((off_t)dir_offset(e) * CACHE_BLOCK_SIZE) - CACHE_BLOCK_SIZE)
00705 
00706 #define vol_in_phase_agg_buf_valid(d, e)        \
00707     ((vol_offset(d, e) >= d->header->write_pos) && vol_offset(d, e) < (d->header->write_pos + d->agg_buf_pos))
00708 
00709 #define vol_transistor_range_valid(d, e)    \
00710   ((d->header->agg_pos + d->transistor_range_threshold < d->start + d->len) ? \
00711       (vol_out_of_phase_write_valid(d, e) && \
00712       (dir_offset(e) <= ((d->header->agg_pos - d->start + d->transistor_range_threshold) / CACHE_BLOCK_SIZE))) : \
00713       ((dir_offset(e) <= ((d->header->agg_pos - d->start + d->transistor_range_threshold - d->len) / CACHE_BLOCK_SIZE)) || \
00714           (dir_offset(e) > ((d->header->agg_pos - d->start) / CACHE_BLOCK_SIZE))))
00715 
00716 
00717 #else
00718 TS_INLINE int
00719 vol_out_of_phase_valid(Vol *d, Dir *e)
00720 {
00721   return (dir_offset(e) - 1 >= ((d->header->agg_pos - d->start) / CACHE_BLOCK_SIZE));
00722 }
00723 
00724 TS_INLINE int
00725 vol_out_of_phase_agg_valid(Vol *d, Dir *e)
00726 {
00727   return (dir_offset(e) - 1 >= ((d->header->agg_pos - d->start + AGG_SIZE) / CACHE_BLOCK_SIZE));
00728 }
00729 
00730 TS_INLINE int
00731 vol_out_of_phase_write_valid(Vol *d, Dir *e)
00732 {
00733   return (dir_offset(e) - 1 >= ((d->header->write_pos - d->start) / CACHE_BLOCK_SIZE));
00734 }
00735 
00736 TS_INLINE int
00737 vol_in_phase_valid(Vol *d, Dir *e)
00738 {
00739   return (dir_offset(e) - 1 < ((d->header->write_pos + d->agg_buf_pos - d->start) / CACHE_BLOCK_SIZE));
00740 }
00741 
00742 TS_INLINE off_t
00743 vol_offset(Vol *d, Dir *e)
00744 {
00745   return d->start + (off_t) dir_offset(e) * CACHE_BLOCK_SIZE - CACHE_BLOCK_SIZE;
00746 }
00747 
00748 TS_INLINE off_t
00749 offset_to_vol_offset(Vol *d, off_t pos)
00750 {
00751   return ((pos - d->start + CACHE_BLOCK_SIZE) / CACHE_BLOCK_SIZE);
00752 }
00753 
00754 TS_INLINE off_t
00755 vol_offset_to_offset(Vol *d, off_t pos)
00756 {
00757   return d->start + pos * CACHE_BLOCK_SIZE - CACHE_BLOCK_SIZE;
00758 }
00759 
00760 TS_INLINE Dir *
00761 vol_dir_segment(Vol *d, int s)
00762 {
00763   return (Dir *) (((char *) d->dir) + (s * d->buckets) * DIR_DEPTH * SIZEOF_DIR);
00764 }
00765 
00766 TS_INLINE int
00767 vol_in_phase_agg_buf_valid(Vol *d, Dir *e)
00768 {
00769   return (vol_offset(d, e) >= d->header->write_pos && vol_offset(d, e) < (d->header->write_pos + d->agg_buf_pos));
00770 }
00771 #endif
00772 
00773 TS_INLINE off_t
00774 vol_relative_length(Vol *v, off_t start_offset)
00775 {
00776    return (v->len + v->skip) - start_offset;
00777 }
00778 
00779 TS_INLINE uint32_t
00780 Doc::prefix_len()
00781 {
00782   return sizeofDoc + hlen;
00783 }
00784 
00785 TS_INLINE uint32_t
00786 Doc::data_len()
00787 {
00788   return len - sizeofDoc - hlen;
00789 }
00790 
00791 TS_INLINE int
00792 Doc::single_fragment()
00793 {
00794   return data_len() == total_len;
00795 }
00796 
00797 TS_INLINE char *
00798 Doc::hdr()
00799 {
00800   return reinterpret_cast<char*>(this) + sizeofDoc;
00801 }
00802 
00803 TS_INLINE char *
00804 Doc::data()
00805 {
00806   return this->hdr() +  hlen;
00807 }
00808 
00809 int vol_dir_clear(Vol *d);
00810 int vol_init(Vol *d, char *s, off_t blocks, off_t skip, bool clear);
00811 
00812 
00813 
00814 TS_INLINE EvacuationBlock *
00815 evacuation_block_exists(Dir *dir, Vol *p)
00816 {
00817   EvacuationBlock *b = p->evacuate[dir_evac_bucket(dir)].head;
00818   for (; b; b = b->link.next)
00819     if (dir_offset(&b->dir) == dir_offset(dir))
00820       return b;
00821   return 0;
00822 }
00823 
00824 TS_INLINE void
00825 Vol::cancel_trigger()
00826 {
00827   if (trigger) {
00828     trigger->cancel_action();
00829     trigger = NULL;
00830   }
00831 }
00832 
00833 TS_INLINE EvacuationBlock *
00834 new_EvacuationBlock(EThread *t)
00835 {
00836   EvacuationBlock *b = THREAD_ALLOC(evacuationBlockAllocator, t);
00837   b->init = 0;
00838   b->readers = 0;
00839   b->earliest_evacuator = 0;
00840   b->evac_frags.link.next = 0;
00841   return b;
00842 }
00843 
00844 TS_INLINE void
00845 free_EvacuationBlock(EvacuationBlock *b, EThread *t)
00846 {
00847   EvacuationKey *e = b->evac_frags.link.next;
00848   while (e) {
00849     EvacuationKey *n = e->link.next;
00850     evacuationKeyAllocator.free(e);
00851     e = n;
00852   }
00853   THREAD_FREE(b, evacuationBlockAllocator, t);
00854 }
00855 
00856 TS_INLINE OpenDirEntry *
00857 Vol::open_read(CryptoHash *key)
00858 {
00859   return open_dir.open_read(key);
00860 }
00861 
00862 TS_INLINE int
00863 Vol::within_hit_evacuate_window(Dir *xdir)
00864 {
00865   off_t oft = dir_offset(xdir) - 1;
00866   off_t write_off = (header->write_pos + AGG_SIZE - start) / CACHE_BLOCK_SIZE;
00867   off_t delta = oft - write_off;
00868   if (delta >= 0)
00869     return delta < hit_evacuate_window;
00870   else
00871     return -delta > (data_blocks - hit_evacuate_window) && -delta < data_blocks;
00872 }
00873 
00874 TS_INLINE uint32_t
00875 Vol::round_to_approx_size(uint32_t l) {
00876   uint32_t ll = round_to_approx_dir_size(l);
00877   return ROUND_TO_SECTOR(this, ll);
00878 }
00879 
00880 #if TS_USE_INTERIM_CACHE == 1
00881 inline bool
00882 dir_valid(Vol *_d, Dir *_e) {
00883   if (!dir_ininterim(_e))
00884     return _d->header->phase == dir_phase(_e) ? vol_in_phase_valid(_d, _e) :
00885         vol_out_of_phase_valid(_d, _e);
00886   else {
00887     int idx = dir_get_index(_e);
00888     if (good_interim_disks <= 0 || idx >= _d->num_interim_vols) return false;
00889     InterimCacheVol *sv = &(_d->interim_vols[idx]);
00890     return !DISK_BAD(sv->disk) ? (sv->header->phase == dir_phase(_e) ? vol_in_phase_valid(sv, _e) :
00891         vol_out_of_phase_valid(sv, _e)) : false;
00892   }
00893 }
00894 
00895 inline bool
00896 dir_valid(InterimCacheVol *_d, Dir *_e) {
00897   if (!dir_ininterim(_e))
00898     return true;
00899   InterimCacheVol *sv = &(_d->vol->interim_vols[dir_get_index(_e)]);
00900   if (_d != sv)
00901     return true;
00902   return !DISK_BAD(sv->disk) ? (sv->header->phase == dir_phase(_e) ? vol_in_phase_valid(sv, _e) :
00903       vol_out_of_phase_valid(sv, _e)) : false;
00904 
00905 }
00906 
00907 inline bool
00908 dir_agg_valid(Vol *_d, Dir *_e) {
00909   if (!dir_ininterim(_e))
00910     return _d->header->phase == dir_phase(_e) ? vol_in_phase_valid(_d, _e) :
00911         vol_out_of_phase_agg_valid(_d, _e);
00912   else {
00913     int idx = dir_get_index(_e);
00914     if(good_interim_disks <= 0 || idx >= _d->num_interim_vols) return false;
00915     InterimCacheVol *sv = &(_d->interim_vols[idx]);
00916     return sv->header->phase == dir_phase(_e) ? vol_in_phase_valid(sv, _e) :
00917         vol_out_of_phase_agg_valid(sv, _e);
00918   }
00919 }
00920 inline bool
00921 dir_write_valid(Vol *_d, Dir *_e) {
00922   if (!dir_ininterim(_e))
00923     return _d->header->phase == dir_phase(_e) ? vol_in_phase_valid(_d, _e) :
00924         vol_out_of_phase_write_valid(_d, _e);
00925   else {
00926     InterimCacheVol *sv = &(_d->interim_vols[dir_get_index(_e)]);
00927     return sv->header->phase == dir_phase(_e) ? vol_in_phase_valid(sv, _e) :
00928         vol_out_of_phase_write_valid(sv, _e);
00929   }
00930 }
00931 inline bool
00932 dir_agg_buf_valid(Vol *_d, Dir *_e) {
00933   if (!dir_ininterim(_e))
00934     return _d->header->phase == dir_phase(_e) && vol_in_phase_agg_buf_valid(_d, _e);
00935   else {
00936     InterimCacheVol *sv = &(_d->interim_vols[dir_get_index(_e)]);
00937     return sv->header->phase == dir_phase(_e) && vol_in_phase_agg_buf_valid(sv, _e);
00938   }
00939 }
00940 
00941 inline bool
00942 dir_agg_buf_valid(InterimCacheVol *_d, Dir *_e) {
00943   return _d->header->phase == dir_phase(_e) && vol_in_phase_agg_buf_valid(_d, _e);
00944 }
00945 
00946 #endif // TS_USE_INTERIM_CACHE
00947 #endif