• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

P_CacheVol.h

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 
00025 #ifndef _P_CACHE_VOL_H__
00026 #define _P_CACHE_VOL_H__
00027 
00028 #define CACHE_BLOCK_SHIFT               9
00029 #define CACHE_BLOCK_SIZE                (1<<CACHE_BLOCK_SHIFT) // 512, smallest sector size
00030 #define ROUND_TO_STORE_BLOCK(_x)        INK_ALIGN((_x), STORE_BLOCK_SIZE)
00031 #define ROUND_TO_CACHE_BLOCK(_x)        INK_ALIGN((_x), CACHE_BLOCK_SIZE)
00032 #define ROUND_TO_SECTOR(_p, _x)         INK_ALIGN((_x), _p->sector_size)
00033 #define ROUND_TO(_x, _y)                INK_ALIGN((_x), (_y))
00034 
00035 // Vol (volumes)
00036 #define VOL_MAGIC                      0xF1D0F00D
00037 #define START_BLOCKS                    16      // 8k, STORE_BLOCK_SIZE
00038 #define START_POS                       ((off_t)START_BLOCKS * CACHE_BLOCK_SIZE)
00039 #define AGG_SIZE                        (4 * 1024 * 1024) // 4MB
00040 #define AGG_HIGH_WATER                  (AGG_SIZE / 2) // 2MB
00041 #define EVACUATION_SIZE                 (2 * AGG_SIZE)  // 8MB
00042 #define MAX_VOL_SIZE                   ((off_t)512 * 1024 * 1024 * 1024 * 1024)
00043 #define STORE_BLOCKS_PER_CACHE_BLOCK    (STORE_BLOCK_SIZE / CACHE_BLOCK_SIZE)
00044 #define MAX_VOL_BLOCKS                 (MAX_VOL_SIZE / CACHE_BLOCK_SIZE)
00045 #define MAX_FRAG_SIZE                   (AGG_SIZE - sizeofDoc) // true max
00046 #define LEAVE_FREE                      DEFAULT_MAX_BUFFER_SIZE
00047 #define PIN_SCAN_EVERY                  16      // scan every 1/16 of disk
00048 #define VOL_HASH_TABLE_SIZE             32707
00049 #define VOL_HASH_EMPTY                 0xFFFF
00050 #define VOL_HASH_ALLOC_SIZE             (8 * 1024 * 1024)  // one chance per this unit
00051 #define LOOKASIDE_SIZE                  256
00052 #define EVACUATION_BUCKET_SIZE          (2 * EVACUATION_SIZE) // 16MB
00053 #define RECOVERY_SIZE                   EVACUATION_SIZE // 8MB
00054 #define AIO_NOT_IN_PROGRESS             0
00055 #define AIO_AGG_WRITE_IN_PROGRESS       -1
00056 #define AUTO_SIZE_RAM_CACHE             -1      // 1-1 with directory size
00057 #define DEFAULT_TARGET_FRAGMENT_SIZE    (1048576 - sizeofDoc) // 1MB
00058 
00059 
00060 #define dir_offset_evac_bucket(_o) \
00061   (_o / (EVACUATION_BUCKET_SIZE / CACHE_BLOCK_SIZE))
00062 #define dir_evac_bucket(_e) dir_offset_evac_bucket(dir_offset(_e))
00063 #define offset_evac_bucket(_d, _o) \
00064   dir_offset_evac_bucket((offset_to_vol_offset(_d, _o)
00065 
00066 // Documents
00067 
00068 #define DOC_MAGIC                       ((uint32_t)0x5F129B13)
00069 #define DOC_CORRUPT                     ((uint32_t)0xDEADBABE)
00070 #define DOC_NO_CHECKSUM                 ((uint32_t)0xA0B0C0D0)
00071 
00072 #define sizeofDoc (((uint32_t)(uintptr_t)&((Doc*)0)->checksum)+(uint32_t)sizeof(uint32_t))
00073 
00074 #if TS_USE_INTERIM_CACHE == 1
00075 struct InterimVolHeaderFooter
00076 {
00077   unsigned int magic;
00078   VersionNumber version;
00079   time_t create_time;
00080   off_t write_pos;
00081   off_t last_write_pos;
00082   off_t agg_pos;
00083   uint32_t generation;            // token generation (vary), this cannot be 0
00084   uint32_t phase;
00085   uint32_t cycle;
00086   uint32_t sync_serial;
00087   uint32_t write_serial;
00088   uint32_t dirty;
00089   uint32_t sector_size;
00090   int32_t unused;                // pad out to 8 byte boundary
00091 };
00092 #endif
00093 
00094 struct Cache;
00095 struct Vol;
00096 struct CacheDisk;
00097 struct VolInitInfo;
00098 struct DiskVol;
00099 struct CacheVol;
00100 
00101 struct VolHeaderFooter
00102 {
00103   unsigned int magic;
00104   VersionNumber version;
00105   time_t create_time;
00106   off_t write_pos;
00107   off_t last_write_pos;
00108   off_t agg_pos;
00109   uint32_t generation;            // token generation (vary), this cannot be 0
00110   uint32_t phase;
00111   uint32_t cycle;
00112   uint32_t sync_serial;
00113   uint32_t write_serial;
00114   uint32_t dirty;
00115   uint32_t sector_size;
00116   uint32_t unused;                // pad out to 8 byte boundary
00117 #if TS_USE_INTERIM_CACHE == 1
00118   InterimVolHeaderFooter interim_header[8];
00119 #endif
00120   uint16_t freelist[1];
00121 };
00122 
00123 // Key and Earliest key for each fragment that needs to be evacuated
00124 struct EvacuationKey
00125 {
00126   SLink<EvacuationKey> link;
00127   CryptoHash key;
00128   CryptoHash earliest_key;
00129 };
00130 
00131 struct EvacuationBlock
00132 {
00133   union
00134   {
00135     unsigned int init;
00136     struct
00137     {
00138       unsigned int done:1;              // has been evacuated
00139       unsigned int pinned:1;            // check pinning timeout
00140       unsigned int evacuate_head:1;     // check pinning timeout
00141       unsigned int unused:29;
00142     } f;
00143   };
00144 
00145   int readers;
00146   Dir dir;
00147   Dir new_dir;
00148   // we need to have a list of evacuationkeys because of collision.
00149   EvacuationKey evac_frags;
00150   CacheVC *earliest_evacuator;
00151   LINK(EvacuationBlock, link);
00152 };
00153 
00154 #if TS_USE_INTERIM_CACHE == 1
00155 #define MIGRATE_BUCKETS                 1021
00156 extern int migrate_threshold;
00157 extern int good_interim_disks;
00158 
00159 
00160 union AccessEntry {
00161   uintptr_t v[2];
00162   struct {
00163     uint32_t  next;
00164     uint32_t  prev;
00165     uint32_t  index;
00166     uint16_t  tag;
00167     int16_t  count;
00168   } item;
00169 };
00170 
00171 struct AccessHistory {
00172   AccessEntry *base;
00173   int size; // 1M
00174 
00175   uint32_t *hash;
00176   int hash_size; // 2097143
00177 
00178   AccessEntry *freelist;
00179 
00180   void freeEntry(AccessEntry *entry) {
00181     entry->v[0] = (uintptr_t) freelist;
00182     entry->v[1] = 0xABCD1234U;
00183     freelist = entry;
00184   }
00185 
00186   void init(int size, int hash_size) {
00187     this->size = size;
00188     this->hash_size = hash_size;
00189     freelist = NULL;
00190 
00191     base = (AccessEntry *) malloc(sizeof(AccessEntry) * size);
00192     hash = (uint32_t *) malloc (sizeof(uint32_t) * hash_size);
00193 
00194     memset(hash, 0, sizeof(uint32_t) * hash_size);
00195 
00196     base[0].item.next = base[0].item.prev = 0;
00197     base[0].v[1] = 0xABCD1234UL;
00198     for (int i = size; --i > 0;)
00199      freeEntry(&base[i]);
00200 
00201     return;
00202   }
00203 
00204   void remove(AccessEntry *entry) {
00205     if (entry == &(base[base[0].item.prev])) { // head
00206       base[0].item.prev = entry->item.next;
00207     } else {
00208       base[entry->item.prev].item.next = entry->item.next;
00209     }
00210     if (entry == &(base[base[0].item.next])) { // tail
00211       base[0].item.next = entry->item.prev;
00212     } else {
00213       base[entry->item.next].item.prev = entry->item.prev;
00214     }
00215     uint32_t hash_index = (uint32_t) (entry->item.index % hash_size);
00216     hash[hash_index] = 0;
00217   }
00218 
00219   void enqueue(AccessEntry *entry) {
00220     uint32_t hash_index = (uint32_t) (entry->item.index % hash_size);
00221     hash[hash_index] = entry - base;
00222 
00223     entry->item.prev = 0;
00224     entry->item.next = base[0].item.prev;
00225     base[base[0].item.prev].item.prev = entry - base;
00226     base[0].item.prev = entry - base;
00227     if (base[0].item.next == 0)
00228       base[0].item.next = entry - base;
00229   }
00230 
00231   AccessEntry* dequeue() {
00232     AccessEntry *tail = &base[base[0].item.next];
00233     if (tail != base)
00234       remove(tail);
00235 
00236     return tail;
00237   }
00238 
00239   void set_in_progress(CryptoHash *key) {
00240     uint32_t key_index = key->slice32(3);
00241     uint16_t tag = static_cast<uint16_t>(key->slice32(1));
00242     unsigned int hash_index = (uint32_t) (key_index % hash_size);
00243 
00244     uint32_t index = hash[hash_index];
00245     AccessEntry *entry = &base[index];
00246     if (index != 0 && entry->item.tag == tag && entry->item.index == key_index) {
00247       entry->item.count |= 0x8000;
00248     }
00249   }
00250 
00251   void set_not_in_progress(CryptoHash *key) {
00252     uint32_t key_index = key->slice32(3);
00253     uint16_t tag = static_cast<uint16_t>(key->slice32(1));
00254     unsigned int hash_index = (uint32_t) (key_index % hash_size);
00255 
00256     uint32_t index = hash[hash_index];
00257     AccessEntry *entry = &base[index];
00258     if (index != 0 && entry->item.tag == tag && entry->item.index == key_index) {
00259       entry->item.count &= 0x7FFF;
00260     }
00261   }
00262 
00263   void put_key(CryptoHash *key) {
00264     uint32_t key_index = key->slice32(3);
00265     uint16_t tag = static_cast<uint16_t>(key->slice32(1));
00266     unsigned int hash_index = (uint32_t) (key_index % hash_size);
00267 
00268     uint32_t index = hash[hash_index];
00269     AccessEntry *entry = &base[index];
00270     if (index != 0 && entry->item.tag == tag && entry->item.index == key_index) { // seen before
00271       remove(entry);
00272       enqueue(entry);
00273       ++entry->item.count;
00274     } else {
00275       if (index == 0) { // not seen before
00276         if (!freelist) {
00277           entry = dequeue();
00278           if (entry == base) {
00279             return;
00280           }
00281         } else {
00282           entry = freelist;
00283           freelist = (AccessEntry *) entry->v[0];
00284         }
00285       } else { // collation
00286         remove(entry);
00287       }
00288       entry->item.index = key_index;
00289       entry->item.tag = tag;
00290       entry->item.count = 1;
00291       enqueue(entry);
00292     }
00293   }
00294 
00295   bool remove_key(CryptoHash *key) {
00296     unsigned int hash_index = static_cast<uint32_t>(key->slice32(3) % hash_size);
00297     uint32_t index = hash[hash_index];
00298     AccessEntry *entry = &base[index];
00299     if (index != 0 && entry->item.tag == static_cast<uint16_t>(key->slice32(1)) && entry->item.index == key->slice32(3)) {
00300       remove(entry);
00301       freeEntry(entry);
00302       return true;
00303     }
00304     return false;
00305   }
00306 
00307   bool is_hot(CryptoHash *key) {
00308     uint32_t key_index = key->slice32(3);
00309     uint16_t tag = (uint16_t) key->slice32(1);
00310     unsigned int hash_index = (uint32_t) (key_index % hash_size);
00311 
00312     uint32_t index = hash[hash_index];
00313     AccessEntry *entry = &base[index];
00314 
00315     return (index != 0 && entry->item.tag == tag && entry->item.index == key_index
00316         && entry->item.count >= migrate_threshold);
00317   }
00318 };
00319 
00320 struct InterimCacheVol;
00321 
00322 struct MigrateToInterimCache
00323 {
00324   MigrateToInterimCache() { }
00325   Ptr<IOBufferData> buf;
00326   uint32_t agg_len;
00327   CacheKey  key;
00328   Dir dir;
00329   InterimCacheVol *interim_vol;
00330   CacheVC *vc;
00331   bool notMigrate;
00332   bool rewrite;
00333   bool copy;
00334   LINK(MigrateToInterimCache, link);
00335   LINK(MigrateToInterimCache, hash_link);
00336 };
00337 
00338 struct InterimCacheVol: public Continuation
00339 {
00340   ats_scoped_str hash_text;
00341   InterimVolHeaderFooter *header;
00342 
00343   off_t recover_pos;
00344   off_t prev_recover_pos;
00345   uint32_t last_sync_serial;
00346   uint32_t last_write_serial;
00347   bool recover_wrapped;
00348 
00349   off_t scan_pos;
00350   off_t skip; // start of headers
00351   off_t start; // start of data
00352   off_t len;
00353   off_t data_blocks;
00354   char *agg_buffer;
00355   int agg_todo_size;
00356   int agg_buf_pos;
00357   uint32_t sector_size;
00358   int fd;
00359   CacheDisk *disk;
00360   Vol *vol; // backpointer to vol
00361   AIOCallbackInternal io;
00362   Queue<MigrateToInterimCache, MigrateToInterimCache::Link_link> agg;
00363   int64_t transistor_range_threshold;
00364   bool sync;
00365   bool is_io_in_progress() {
00366     return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS;
00367   }
00368 
00369   int recover_data();
00370   int handle_recover_from_data(int event, void *data);
00371 
00372   void set_io_not_in_progress() {
00373     io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS;
00374   }
00375 
00376   int aggWrite(int event, void *e);
00377   int aggWriteDone(int event, void *e);
00378   uint32_t round_to_approx_size (uint32_t l) {
00379     uint32_t ll = round_to_approx_dir_size(l);
00380     return INK_ALIGN(ll, disk->hw_sector_size);
00381   }
00382 
00383   void init(off_t s, off_t l, CacheDisk *interim, Vol *v, InterimVolHeaderFooter *hptr) {
00384     char* seed_str = interim->hash_base_string ? interim->hash_base_string : interim->path;
00385     const size_t hash_seed_size = strlen(seed_str);
00386     const size_t hash_text_size = hash_seed_size + 32;
00387 
00388     hash_text = static_cast<char *>(ats_malloc(hash_text_size));
00389     snprintf(hash_text, hash_text_size, "%s %" PRIu64 ":%" PRIu64 "", seed_str, s, l);
00390 
00391     skip = start = s;
00392     len = l;
00393     disk = interim;
00394     fd = disk->fd;
00395     vol = v;
00396     transistor_range_threshold = len / 5; // 20% storage size for transistor
00397     sync = false;
00398 
00399     header = hptr;
00400 
00401     agg_todo_size = 0;
00402     agg_buf_pos = 0;
00403 
00404     agg_buffer = (char *) ats_memalign(sysconf(_SC_PAGESIZE), AGG_SIZE);
00405     memset(agg_buffer, 0, AGG_SIZE);
00406     this->mutex = ((Continuation *)vol)->mutex;
00407   }
00408 };
00409 
00410 
00411 void dir_clean_bucket(Dir *b, int s, InterimCacheVol *d);
00412 void dir_clean_segment(int s, InterimCacheVol *d);
00413 void dir_clean_interimvol(InterimCacheVol *d);
00414 
00415 #endif
00416 
00417 struct Vol: public Continuation
00418 {
00419   char *path;
00420   ats_scoped_str hash_text;
00421   CryptoHash hash_id;
00422   int fd;
00423 
00424   char *raw_dir;
00425   Dir *dir;
00426   VolHeaderFooter *header;
00427   VolHeaderFooter *footer;
00428   int segments;
00429   off_t buckets;
00430   off_t recover_pos;
00431   off_t prev_recover_pos;
00432   off_t scan_pos;
00433   off_t skip;               // start of headers
00434   off_t start;              // start of data
00435   off_t len;
00436   off_t data_blocks;
00437   int hit_evacuate_window;
00438   AIOCallbackInternal io;
00439 
00440   Queue<CacheVC, Continuation::Link_link> agg;
00441   Queue<CacheVC, Continuation::Link_link> stat_cache_vcs;
00442   Queue<CacheVC, Continuation::Link_link> sync;
00443   char *agg_buffer;
00444   int agg_todo_size;
00445   int agg_buf_pos;
00446 
00447   Event *trigger;
00448 
00449   OpenDir open_dir;
00450   RamCache *ram_cache;
00451   int evacuate_size;
00452   DLL<EvacuationBlock> *evacuate;
00453   DLL<EvacuationBlock> lookaside[LOOKASIDE_SIZE];
00454   CacheVC *doc_evacuator;
00455 
00456   VolInitInfo *init_info;
00457 
00458   CacheDisk *disk;
00459   Cache *cache;
00460   CacheVol *cache_vol;
00461   uint32_t last_sync_serial;
00462   uint32_t last_write_serial;
00463   uint32_t sector_size;
00464   bool recover_wrapped;
00465   bool dir_sync_waiting;
00466   bool dir_sync_in_progress;
00467   bool writing_end_marker;
00468 
00469   CacheKey first_fragment_key;
00470   int64_t first_fragment_offset;
00471   Ptr<IOBufferData> first_fragment_data;
00472 
00473 #if TS_USE_INTERIM_CACHE == 1
00474   int num_interim_vols;
00475   InterimCacheVol interim_vols[8];
00476   AccessHistory history;
00477   uint32_t interim_index;
00478   Queue<MigrateToInterimCache, MigrateToInterimCache::Link_hash_link> mig_hash[MIGRATE_BUCKETS];
00479   volatile int interim_done;
00480 
00481 
00482   bool migrate_probe(CacheKey *key, MigrateToInterimCache **result) {
00483     uint32_t indx = key->slice32(3) % MIGRATE_BUCKETS;
00484     MigrateToInterimCache *m = mig_hash[indx].head;
00485     while (m != NULL && !(m->key == *key)) {
00486       m = mig_hash[indx].next(m);
00487     }
00488     if (result != NULL)
00489       *result = m;
00490     return m != NULL;
00491   }
00492 
00493   void set_migrate_in_progress(MigrateToInterimCache *m) {
00494     uint32_t indx = m->key.slice32(3) % MIGRATE_BUCKETS;
00495     mig_hash[indx].enqueue(m);
00496   }
00497 
00498   void set_migrate_failed(MigrateToInterimCache *m) {
00499     uint32_t indx = m->key.slice32(3) % MIGRATE_BUCKETS;
00500     mig_hash[indx].remove(m);
00501   }
00502 
00503   void set_migrate_done(MigrateToInterimCache *m) {
00504     uint32_t indx = m->key.slice32(3) % MIGRATE_BUCKETS;
00505     mig_hash[indx].remove(m);
00506     history.remove_key(&m->key);
00507   }
00508 #endif
00509 
00510   void cancel_trigger();
00511 
00512   int recover_data();
00513 
00514   int open_write(CacheVC *cont, int allow_if_writers, int max_writers);
00515   int open_write_lock(CacheVC *cont, int allow_if_writers, int max_writers);
00516   int close_write(CacheVC *cont);
00517   int close_write_lock(CacheVC *cont);
00518   int begin_read(CacheVC *cont);
00519   int begin_read_lock(CacheVC *cont);
00520   // unused read-write interlock code
00521   // currently http handles a write-lock failure by retrying the read
00522   OpenDirEntry *open_read(CryptoHash *key);
00523   OpenDirEntry *open_read_lock(CryptoHash *key, EThread *t);
00524   int close_read(CacheVC *cont);
00525   int close_read_lock(CacheVC *cont);
00526 
00527   int clear_dir();
00528 
00529   int init(char *s, off_t blocks, off_t dir_skip, bool clear);
00530 
00531   int handle_dir_clear(int event, void *data);
00532   int handle_dir_read(int event, void *data);
00533   int handle_recover_from_data(int event, void *data);
00534   int handle_recover_write_dir(int event, void *data);
00535   int handle_header_read(int event, void *data);
00536 
00537 #if TS_USE_INTERIM_CACHE == 1
00538   int recover_interim_vol();
00539 #endif
00540 
00541   int dir_init_done(int event, void *data);
00542 
00543   int dir_check(bool fix);
00544   int db_check(bool fix);
00545 
00546   int is_io_in_progress()
00547   {
00548     return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS;
00549   }
00550   int increment_generation()
00551   {
00552     // this is stored in the offset field of the directory (!=0)
00553     ink_assert(mutex->thread_holding == this_ethread());
00554     header->generation++;
00555     if (!header->generation)
00556       header->generation++;
00557     return header->generation;
00558   }
00559   void set_io_not_in_progress()
00560   {
00561     io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS;
00562   }
00563   
00564   int aggWriteDone(int event, Event *e);
00565   int aggWrite(int event, void *e);
00566   void agg_wrap();
00567 
00568   int evacuateWrite(CacheVC *evacuator, int event, Event *e);
00569   int evacuateDocReadDone(int event, Event *e);
00570   int evacuateDoc(int event, Event *e);
00571 
00572   int evac_range(off_t start, off_t end, int evac_phase);
00573   void periodic_scan();
00574   void scan_for_pinned_documents();
00575   void evacuate_cleanup_blocks(int i);
00576   void evacuate_cleanup();
00577   EvacuationBlock *force_evacuate_head(Dir *dir, int pinned);
00578   int within_hit_evacuate_window(Dir *dir);
00579   uint32_t round_to_approx_size(uint32_t l);
00580 
00581   Vol()
00582     : Continuation(new_ProxyMutex()), path(NULL), fd(-1),
00583       dir(0), buckets(0), recover_pos(0), prev_recover_pos(0), scan_pos(0), skip(0), start(0),
00584       len(0), data_blocks(0), hit_evacuate_window(0), agg_todo_size(0), agg_buf_pos(0), trigger(0),
00585       evacuate_size(0), disk(NULL), last_sync_serial(0), last_write_serial(0), recover_wrapped(false),
00586       dir_sync_waiting(0), dir_sync_in_progress(0), writing_end_marker(0) {
00587     open_dir.mutex = mutex;
00588     agg_buffer = (char *)ats_memalign(ats_pagesize(), AGG_SIZE);
00589     memset(agg_buffer, 0, AGG_SIZE);
00590     SET_HANDLER(&Vol::aggWrite);
00591   }
00592 
00593   ~Vol() {
00594     ats_memalign_free(agg_buffer);
00595   }
00596 };
00597 
00598 struct AIO_Callback_handler: public Continuation
00599 {
00600   int handle_disk_failure(int event, void *data);
00601 
00602   AIO_Callback_handler():Continuation(new_ProxyMutex()) {
00603     SET_HANDLER(&AIO_Callback_handler::handle_disk_failure);
00604   }
00605 };
00606 
00607 struct CacheVol
00608 {
00609   int vol_number;
00610   int scheme;
00611   off_t size;
00612   int num_vols;
00613   Vol **vols;
00614   DiskVol **disk_vols;
00615   LINK(CacheVol, link);
00616   // per volume stats
00617   RecRawStatBlock *vol_rsb;
00618 
00619   CacheVol()
00620     : vol_number(-1), scheme(0), size(0), num_vols(0), vols(NULL), disk_vols(0), vol_rsb(0)
00621   { }
00622 };
00623 
00624 // Note : hdr() needs to be 8 byte aligned.
00625 // If you change this, change sizeofDoc above
00626 struct Doc
00627 {
00628   uint32_t magic;         // DOC_MAGIC
00629   uint32_t len;           // length of this fragment (including hlen & sizeof(Doc), unrounded)
00630   uint64_t total_len;     // total length of document
00631   CryptoHash first_key;    ///< first key in object.
00632   CryptoHash key; ///< Key for this doc.
00633   uint32_t hlen; ///< Length of this header.
00634   uint32_t doc_type:8;       ///< Doc type - indicates the format of this structure and its content.
00635   uint32_t v_major:8;   ///< Major version number.
00636   uint32_t v_minor:8; ///< Minor version number.
00637   uint32_t unused:8; ///< Unused, forced to zero.
00638   uint32_t sync_serial;
00639   uint32_t write_serial;
00640   uint32_t pinned;        // pinned until
00641   uint32_t checksum;
00642 
00643   uint32_t data_len();
00644   uint32_t prefix_len();
00645   int single_fragment();
00646   int no_data_in_fragment();
00647   char *hdr();
00648   char *data();
00649 };
00650 
00651 // Global Data
00652 
00653 extern Vol **gvol;
00654 extern volatile int gnvol;
00655 extern ClassAllocator<OpenDirEntry> openDirEntryAllocator;
00656 extern ClassAllocator<EvacuationBlock> evacuationBlockAllocator;
00657 extern ClassAllocator<EvacuationKey> evacuationKeyAllocator;
00658 extern unsigned short *vol_hash_table;
00659 
00660 // inline Functions
00661 
00662 TS_INLINE int
00663 vol_headerlen(Vol *d) {
00664   return ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter) + sizeof(uint16_t) * (d->segments-1));
00665 }
00666 
00667 TS_INLINE size_t
00668 vol_dirlen(Vol *d)
00669 {
00670   return vol_headerlen(d) + 
00671     ROUND_TO_STORE_BLOCK(((size_t)d->buckets) * DIR_DEPTH * d->segments * SIZEOF_DIR) +
00672     ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter));
00673 }
00674 
00675 TS_INLINE int
00676 vol_direntries(Vol *d)
00677 {
00678   return d->buckets * DIR_DEPTH * d->segments;
00679 }
00680 
00681 #if TS_USE_INTERIM_CACHE == 1
00682 #define vol_out_of_phase_valid(d, e)            \
00683     (dir_offset(e) - 1 >= ((d->header->agg_pos - d->start) / CACHE_BLOCK_SIZE))
00684 
00685 #define vol_out_of_phase_agg_valid(d, e)        \
00686     (dir_offset(e) - 1 >= ((d->header->agg_pos - d->start + AGG_SIZE) / CACHE_BLOCK_SIZE))
00687 
00688 #define vol_out_of_phase_write_valid(d, e)      \
00689     (dir_offset(e) - 1 >= ((d->header->agg_pos - d->start + AGG_SIZE) / CACHE_BLOCK_SIZE))
00690 
00691 #define vol_in_phase_valid(d, e)                \
00692     (dir_offset(e) - 1 < ((d->header->write_pos + d->agg_buf_pos - d->start) / CACHE_BLOCK_SIZE))
00693 
00694 #define vol_offset_to_offset(d, pos)            \
00695     (d->start + pos * CACHE_BLOCK_SIZE - CACHE_BLOCK_SIZE)
00696 
00697 #define vol_dir_segment(d, s)                   \
00698     (Dir *) (((char *) d->dir) + (s * d->buckets) * DIR_DEPTH * SIZEOF_DIR)
00699 
00700 #define offset_to_vol_offset(d, pos)            \
00701     ((pos - d->start + CACHE_BLOCK_SIZE) / CACHE_BLOCK_SIZE)
00702 
00703 #define vol_offset(d, e)                        \
00704     ((d)->start + (off_t) ((off_t)dir_offset(e) * CACHE_BLOCK_SIZE) - CACHE_BLOCK_SIZE)
00705 
00706 #define vol_in_phase_agg_buf_valid(d, e)        \
00707     ((vol_offset(d, e) >= d->header->write_pos) && vol_offset(d, e) < (d->header->write_pos + d->agg_buf_pos))
00708 
00709 #define vol_transistor_range_valid(d, e)    \
00710   ((d->header->agg_pos + d->transistor_range_threshold < d->start + d->len) ? \
00711       (vol_out_of_phase_write_valid(d, e) && \
00712       (dir_offset(e) <= ((d->header->agg_pos - d->start + d->transistor_range_threshold) / CACHE_BLOCK_SIZE))) : \
00713       ((dir_offset(e) <= ((d->header->agg_pos - d->start + d->transistor_range_threshold - d->len) / CACHE_BLOCK_SIZE)) || \
00714           (dir_offset(e) > ((d->header->agg_pos - d->start) / CACHE_BLOCK_SIZE))))
00715 
00716 
00717 #else
00718 TS_INLINE int
00719 vol_out_of_phase_valid(Vol *d, Dir *e)
00720 {
00721   return (dir_offset(e) - 1 >= ((d->header->agg_pos - d->start) / CACHE_BLOCK_SIZE));
00722 }
00723 
00724 TS_INLINE int
00725 vol_out_of_phase_agg_valid(Vol *d, Dir *e)
00726 {
00727   return (dir_offset(e) - 1 >= ((d->header->agg_pos - d->start + AGG_SIZE) / CACHE_BLOCK_SIZE));
00728 }
00729 
00730 TS_INLINE int
00731 vol_out_of_phase_write_valid(Vol *d, Dir *e)
00732 {
00733   return (dir_offset(e) - 1 >= ((d->header->write_pos - d->start) / CACHE_BLOCK_SIZE));
00734 }
00735 
00736 TS_INLINE int
00737 vol_in_phase_valid(Vol *d, Dir *e)
00738 {
00739   return (dir_offset(e) - 1 < ((d->header->write_pos + d->agg_buf_pos - d->start) / CACHE_BLOCK_SIZE));
00740 }
00741 
00742 TS_INLINE off_t
00743 vol_offset(Vol *d, Dir *e)
00744 {
00745   return d->start + (off_t) dir_offset(e) * CACHE_BLOCK_SIZE - CACHE_BLOCK_SIZE;
00746 }
00747 
00748 TS_INLINE off_t
00749 offset_to_vol_offset(Vol *d, off_t pos)
00750 {
00751   return ((pos - d->start + CACHE_BLOCK_SIZE) / CACHE_BLOCK_SIZE);
00752 }
00753 
00754 TS_INLINE off_t
00755 vol_offset_to_offset(Vol *d, off_t pos)
00756 {
00757   return d->start + pos * CACHE_BLOCK_SIZE - CACHE_BLOCK_SIZE;
00758 }
00759 
00760 TS_INLINE Dir *
00761 vol_dir_segment(Vol *d, int s)
00762 {
00763   return (Dir *) (((char *) d->dir) + (s * d->buckets) * DIR_DEPTH * SIZEOF_DIR);
00764 }
00765 
00766 TS_INLINE int
00767 vol_in_phase_agg_buf_valid(Vol *d, Dir *e)
00768 {
00769   return (vol_offset(d, e) >= d->header->write_pos && vol_offset(d, e) < (d->header->write_pos + d->agg_buf_pos));
00770 }
00771 #endif
00772 // length of the partition not including the offset of location 0.
00773 TS_INLINE off_t
00774 vol_relative_length(Vol *v, off_t start_offset)
00775 {
00776    return (v->len + v->skip) - start_offset;
00777 }
00778 
00779 TS_INLINE uint32_t
00780 Doc::prefix_len()
00781 {
00782   return sizeofDoc + hlen;
00783 }
00784 
00785 TS_INLINE uint32_t
00786 Doc::data_len()
00787 {
00788   return len - sizeofDoc - hlen;
00789 }
00790 
00791 TS_INLINE int
00792 Doc::single_fragment()
00793 {
00794   return data_len() == total_len;
00795 }
00796 
00797 TS_INLINE char *
00798 Doc::hdr()
00799 {
00800   return reinterpret_cast<char*>(this) + sizeofDoc;
00801 }
00802 
00803 TS_INLINE char *
00804 Doc::data()
00805 {
00806   return this->hdr() +  hlen;
00807 }
00808 
00809 int vol_dir_clear(Vol *d);
00810 int vol_init(Vol *d, char *s, off_t blocks, off_t skip, bool clear);
00811 
00812 // inline Functions
00813 
00814 TS_INLINE EvacuationBlock *
00815 evacuation_block_exists(Dir *dir, Vol *p)
00816 {
00817   EvacuationBlock *b = p->evacuate[dir_evac_bucket(dir)].head;
00818   for (; b; b = b->link.next)
00819     if (dir_offset(&b->dir) == dir_offset(dir))
00820       return b;
00821   return 0;
00822 }
00823 
00824 TS_INLINE void
00825 Vol::cancel_trigger()
00826 {
00827   if (trigger) {
00828     trigger->cancel_action();
00829     trigger = NULL;
00830   }
00831 }
00832 
00833 TS_INLINE EvacuationBlock *
00834 new_EvacuationBlock(EThread *t)
00835 {
00836   EvacuationBlock *b = THREAD_ALLOC(evacuationBlockAllocator, t);
00837   b->init = 0;
00838   b->readers = 0;
00839   b->earliest_evacuator = 0;
00840   b->evac_frags.link.next = 0;
00841   return b;
00842 }
00843 
00844 TS_INLINE void
00845 free_EvacuationBlock(EvacuationBlock *b, EThread *t)
00846 {
00847   EvacuationKey *e = b->evac_frags.link.next;
00848   while (e) {
00849     EvacuationKey *n = e->link.next;
00850     evacuationKeyAllocator.free(e);
00851     e = n;
00852   }
00853   THREAD_FREE(b, evacuationBlockAllocator, t);
00854 }
00855 
00856 TS_INLINE OpenDirEntry *
00857 Vol::open_read(CryptoHash *key)
00858 {
00859   return open_dir.open_read(key);
00860 }
00861 
00862 TS_INLINE int
00863 Vol::within_hit_evacuate_window(Dir *xdir)
00864 {
00865   off_t oft = dir_offset(xdir) - 1;
00866   off_t write_off = (header->write_pos + AGG_SIZE - start) / CACHE_BLOCK_SIZE;
00867   off_t delta = oft - write_off;
00868   if (delta >= 0)
00869     return delta < hit_evacuate_window;
00870   else
00871     return -delta > (data_blocks - hit_evacuate_window) && -delta < data_blocks;
00872 }
00873 
00874 TS_INLINE uint32_t
00875 Vol::round_to_approx_size(uint32_t l) {
00876   uint32_t ll = round_to_approx_dir_size(l);
00877   return ROUND_TO_SECTOR(this, ll);
00878 }
00879 
00880 #if TS_USE_INTERIM_CACHE == 1
00881 inline bool
00882 dir_valid(Vol *_d, Dir *_e) {
00883   if (!dir_ininterim(_e))
00884     return _d->header->phase == dir_phase(_e) ? vol_in_phase_valid(_d, _e) :
00885         vol_out_of_phase_valid(_d, _e);
00886   else {
00887     int idx = dir_get_index(_e);
00888     if (good_interim_disks <= 0 || idx >= _d->num_interim_vols) return false;
00889     InterimCacheVol *sv = &(_d->interim_vols[idx]);
00890     return !DISK_BAD(sv->disk) ? (sv->header->phase == dir_phase(_e) ? vol_in_phase_valid(sv, _e) :
00891         vol_out_of_phase_valid(sv, _e)) : false;
00892   }
00893 }
00894 
00895 inline bool
00896 dir_valid(InterimCacheVol *_d, Dir *_e) {
00897   if (!dir_ininterim(_e))
00898     return true;
00899   InterimCacheVol *sv = &(_d->vol->interim_vols[dir_get_index(_e)]);
00900   if (_d != sv)
00901     return true;
00902   return !DISK_BAD(sv->disk) ? (sv->header->phase == dir_phase(_e) ? vol_in_phase_valid(sv, _e) :
00903       vol_out_of_phase_valid(sv, _e)) : false;
00904 
00905 }
00906 
00907 inline bool
00908 dir_agg_valid(Vol *_d, Dir *_e) {
00909   if (!dir_ininterim(_e))
00910     return _d->header->phase == dir_phase(_e) ? vol_in_phase_valid(_d, _e) :
00911         vol_out_of_phase_agg_valid(_d, _e);
00912   else {
00913     int idx = dir_get_index(_e);
00914     if(good_interim_disks <= 0 || idx >= _d->num_interim_vols) return false;
00915     InterimCacheVol *sv = &(_d->interim_vols[idx]);
00916     return sv->header->phase == dir_phase(_e) ? vol_in_phase_valid(sv, _e) :
00917         vol_out_of_phase_agg_valid(sv, _e);
00918   }
00919 }
00920 inline bool
00921 dir_write_valid(Vol *_d, Dir *_e) {
00922   if (!dir_ininterim(_e))
00923     return _d->header->phase == dir_phase(_e) ? vol_in_phase_valid(_d, _e) :
00924         vol_out_of_phase_write_valid(_d, _e);
00925   else {
00926     InterimCacheVol *sv = &(_d->interim_vols[dir_get_index(_e)]);
00927     return sv->header->phase == dir_phase(_e) ? vol_in_phase_valid(sv, _e) :
00928         vol_out_of_phase_write_valid(sv, _e);
00929   }
00930 }
00931 inline bool
00932 dir_agg_buf_valid(Vol *_d, Dir *_e) {
00933   if (!dir_ininterim(_e))
00934     return _d->header->phase == dir_phase(_e) && vol_in_phase_agg_buf_valid(_d, _e);
00935   else {
00936     InterimCacheVol *sv = &(_d->interim_vols[dir_get_index(_e)]);
00937     return sv->header->phase == dir_phase(_e) && vol_in_phase_agg_buf_valid(sv, _e);
00938   }
00939 }
00940 
00941 inline bool
00942 dir_agg_buf_valid(InterimCacheVol *_d, Dir *_e) {
00943   return _d->header->phase == dir_phase(_e) && vol_in_phase_agg_buf_valid(_d, _e);
00944 }
00945 
00946 #endif // TS_USE_INTERIM_CACHE
00947 #endif /* _P_CACHE_VOL_H__ */

Generated by  doxygen 1.7.1