• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

RamCacheCLFUS.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 // Clocked Least Frequently Used by Size (CLFUS) replacement policy
00025 // See https://cwiki.apache.org/confluence/display/TS/RamCache
00026 
00027 #include "P_Cache.h"
00028 #include "I_Tasks.h"
00029 #if TS_HAS_LIBZ
00030 #include <zlib.h>
00031 #endif
00032 #if TS_HAS_LZMA
00033 #include <lzma.h>
00034 #endif
00035 
00036 #define REQUIRED_COMPRESSION 0.9 // must get to this size or declared incompressible
00037 #define REQUIRED_SHRINK 0.8 // must get to this size or keep orignal buffer (with padding)
00038 #define HISTORY_HYSTERIA 10 // extra temporary history
00039 #define ENTRY_OVERHEAD 256 // per-entry overhead to consider when computing cache value/size
00040 #define LZMA_BASE_MEMLIMIT (64 * 1024 * 1024)
00041 //#define CHECK_ACOUNTING 1 // very expensive double checking of all sizes
00042 
00043 #define REQUEUE_HITS(_h) ((_h) ? 1 : 0)
00044 #define CACHE_VALUE_HITS_SIZE(_h, _s) ((float)((_h)+1) / ((_s) + ENTRY_OVERHEAD))
00045 #define CACHE_VALUE(_x) CACHE_VALUE_HITS_SIZE((_x)->hits, (_x)->size)
00046 
00047 struct RamCacheCLFUSEntry {
00048   INK_MD5 key;
00049   uint32_t auxkey1;
00050   uint32_t auxkey2;
00051   uint64_t hits;
00052   uint32_t size; // memory used including paddding in buffer
00053   uint32_t len;  // actual data length
00054   uint32_t compressed_len;
00055   union {
00056     struct {
00057       uint32_t compressed:3; // compression type
00058       uint32_t incompressible:1;
00059       uint32_t lru:1;
00060       uint32_t copy:1; // copy-in-copy-out
00061     } flag_bits;
00062     uint32_t flags;
00063   };
00064   LINK(RamCacheCLFUSEntry, lru_link);
00065   LINK(RamCacheCLFUSEntry, hash_link);
00066   Ptr<IOBufferData> data;
00067 };
00068 
00069 struct RamCacheCLFUS : public RamCache {
00070   int64_t max_bytes;
00071   int64_t bytes;
00072   int64_t objects;
00073 
00074   // returns 1 on found/stored, 0 on not found/stored, if provided auxkey1 and auxkey2 must match
00075   int get(INK_MD5 *key, Ptr<IOBufferData> *ret_data, uint32_t auxkey1 = 0, uint32_t auxkey2 = 0);
00076   int put(INK_MD5 *key, IOBufferData *data, uint32_t len, bool copy = false, uint32_t auxkey1 = 0, uint32_t auxkey2 = 0);
00077   int fixup(INK_MD5 *key, uint32_t old_auxkey1, uint32_t old_auxkey2, uint32_t new_auxkey1, uint32_t new_auxkey2);
00078 
00079   void init(int64_t max_bytes, Vol *vol);
00080 
00081   // private
00082   Vol *vol; // for stats
00083   int64_t history;
00084   int ibuckets;
00085   int nbuckets;
00086   DList(RamCacheCLFUSEntry, hash_link) *bucket;
00087   Que(RamCacheCLFUSEntry, lru_link) lru[2];
00088   uint16_t *seen;
00089   int ncompressed;
00090   RamCacheCLFUSEntry *compressed; // first uncompressed lru[0] entry
00091   void compress_entries(EThread *thread, int do_at_most = INT_MAX);
00092   void resize_hashtable();
00093   void victimize(RamCacheCLFUSEntry *e);
00094   void move_compressed(RamCacheCLFUSEntry *e);
00095   RamCacheCLFUSEntry *destroy(RamCacheCLFUSEntry *e);
00096   void requeue_victims(RamCacheCLFUS *c, Que(RamCacheCLFUSEntry, lru_link) &victims);
00097   void tick(); // move CLOCK on history
00098   RamCacheCLFUS(): max_bytes(0), bytes(0), objects(0), vol(0), history(0), ibuckets(0), nbuckets(0), bucket(0),
00099               seen(0), ncompressed(0), compressed(0) { }
00100 };
00101 
00102 class RamCacheCLFUSCompressor : public Continuation {
00103 public:
00104   RamCacheCLFUS *rc;
00105   int mainEvent(int event, Event *e);
00106 
00107   RamCacheCLFUSCompressor(RamCacheCLFUS *arc)
00108     : rc(arc)
00109   { 
00110     SET_HANDLER(&RamCacheCLFUSCompressor::mainEvent); 
00111   }
00112 };
00113 
00114 int
00115 RamCacheCLFUSCompressor::mainEvent(int /* event ATS_UNUSED */, Event *e)
00116 {
00117   switch (cache_config_ram_cache_compress) {
00118     default:
00119       Warning("unknown RAM cache compression type: %d", cache_config_ram_cache_compress);
00120     case CACHE_COMPRESSION_NONE: 
00121     case CACHE_COMPRESSION_FASTLZ:
00122       break;
00123     case CACHE_COMPRESSION_LIBZ:
00124 #if ! TS_HAS_LIBZ
00125       Warning("libz not available for RAM cache compression");
00126 #endif
00127       break;
00128     case CACHE_COMPRESSION_LIBLZMA:
00129 #if ! TS_HAS_LZMA
00130       Warning("lzma not available for RAM cache compression");
00131 #endif
00132       break;
00133   }
00134   if (cache_config_ram_cache_compress_percent)
00135     rc->compress_entries(e->ethread);
00136   return EVENT_CONT;
00137 }
00138 
00139 ClassAllocator<RamCacheCLFUSEntry> ramCacheCLFUSEntryAllocator("RamCacheCLFUSEntry");
00140 
00141 static const int bucket_sizes[] = {
00142   127, 251, 509, 1021, 2039, 4093, 8191, 16381, 32749, 65521, 131071, 262139,
00143   524287, 1048573, 2097143, 4194301, 8388593, 16777213, 33554393, 67108859,
00144   134217689, 268435399, 536870909, 1073741789, 2147483647
00145 };
00146 
00147 void
00148 RamCacheCLFUS::resize_hashtable()
00149 {
00150   int anbuckets = bucket_sizes[ibuckets];
00151   DDebug("ram_cache", "resize hashtable %d", anbuckets);
00152   int64_t s = anbuckets * sizeof(DList(RamCacheCLFUSEntry, hash_link));
00153   DList(RamCacheCLFUSEntry, hash_link) *new_bucket = (DList(RamCacheCLFUSEntry, hash_link) *)ats_malloc(s);
00154   memset(new_bucket, 0, s);
00155   if (bucket) {
00156     for (int64_t i = 0; i < nbuckets; i++) {
00157       RamCacheCLFUSEntry *e = 0;
00158       while ((e = bucket[i].pop()))
00159         new_bucket[e->key.slice32(3) % anbuckets].push(e);
00160     }
00161     ats_free(bucket);
00162   }
00163   bucket = new_bucket;
00164   nbuckets = anbuckets;
00165   ats_free(seen);
00166   if (cache_config_ram_cache_use_seen_filter) {
00167     int size = bucket_sizes[ibuckets] * sizeof(uint16_t);
00168     seen = (uint16_t*)ats_malloc(size);
00169     memset(seen, 0, size);
00170   }
00171 }
00172 
00173 void
00174 RamCacheCLFUS::init(int64_t abytes, Vol *avol)
00175 {
00176   ink_assert(avol != 0);
00177   vol = avol;
00178   max_bytes = abytes;
00179   DDebug("ram_cache", "initializing ram_cache %" PRId64 " bytes", abytes);
00180   if (!max_bytes)
00181     return;
00182   resize_hashtable();
00183   eventProcessor.schedule_every(new RamCacheCLFUSCompressor(this), HRTIME_SECOND, ET_TASK);
00184 }
00185 
00186 #ifdef CHECK_ACOUNTING
00187 static void check_accounting(RamCacheCLFUS *c)
00188 {
00189   int64_t x = 0, xsize = 0, h = 0;
00190   RamCacheCLFUSEntry *y = c->lru[0].head;
00191   while (y) { x++; xsize += y->size + ENTRY_OVERHEAD; y = y->lru_link.next; }
00192   y = c->lru[1].head;
00193   while (y) { h++; y = y->lru_link.next; }
00194   ink_assert(x == c->objects);
00195   ink_assert(xsize == c->bytes);
00196   ink_assert(h == c->history);
00197 }
00198 #else
00199 #define check_accounting(_c)
00200 #endif
00201 
00202 int
00203 RamCacheCLFUS::get(INK_MD5 *key, Ptr<IOBufferData> *ret_data, uint32_t auxkey1, uint32_t auxkey2)
00204 {
00205   if (!max_bytes)
00206     return 0;
00207   int64_t i = key->slice32(3) % nbuckets;
00208   RamCacheCLFUSEntry *e = bucket[i].head;
00209   char *b = 0;
00210   while (e) {
00211     if (e->key == *key && e->auxkey1 == auxkey1 && e->auxkey2 == auxkey2) {
00212       move_compressed(e);
00213       lru[e->flag_bits.lru].remove(e);
00214       lru[e->flag_bits.lru].enqueue(e);
00215       if (!e->flag_bits.lru) { // in memory
00216         e->hits++;
00217         if (e->flag_bits.compressed) {
00218           b = (char*)ats_malloc(e->len);
00219           switch (e->flag_bits.compressed) {
00220             default: goto Lfailed;
00221             case CACHE_COMPRESSION_FASTLZ: {
00222               int l = (int)e->len;
00223               if ((l != (int)fastlz_decompress(e->data->data(), e->compressed_len, b, l)))
00224                 goto Lfailed;
00225               break;
00226             }
00227 #if TS_HAS_LIBZ
00228             case CACHE_COMPRESSION_LIBZ: {
00229               uLongf l = e->len;
00230               if (Z_OK != uncompress((Bytef*)b, &l, (Bytef*)e->data->data(), e->compressed_len))
00231                 goto Lfailed;
00232               break;
00233             }
00234 #endif
00235 #if TS_HAS_LZMA
00236             case CACHE_COMPRESSION_LIBLZMA: {
00237               size_t l = (size_t)e->len, ipos = 0, opos = 0;
00238               uint64_t memlimit = e->len * 2 + LZMA_BASE_MEMLIMIT;
00239               if (LZMA_OK != lzma_stream_buffer_decode(
00240                     &memlimit, 0, NULL, (uint8_t*)e->data->data(), &ipos, e->compressed_len, (uint8_t*)b, &opos, l))
00241                 goto Lfailed;
00242               break;
00243             }
00244 #endif
00245           }
00246           IOBufferData *data = new_xmalloc_IOBufferData(b, e->len);
00247           data->_mem_type = DEFAULT_ALLOC;
00248           if (!e->flag_bits.copy) { // don't bother if we have to copy anyway
00249             int64_t delta = ((int64_t)e->compressed_len) - (int64_t)e->size;
00250             bytes += delta;
00251             CACHE_SUM_DYN_STAT_THREAD(cache_ram_cache_bytes_stat, delta);
00252             e->size = e->compressed_len;
00253             check_accounting(this);
00254             e->flag_bits.compressed = 0;
00255             e->data = data;
00256           }
00257           (*ret_data) = data;
00258         } else {
00259           IOBufferData *data = e->data;
00260           if (e->flag_bits.copy) {
00261             data = new_IOBufferData(iobuffer_size_to_index(e->len, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
00262             memcpy(data->data(), e->data->data(), e->len);
00263           }
00264           (*ret_data) = data;
00265         }
00266         CACHE_SUM_DYN_STAT_THREAD(cache_ram_cache_hits_stat, 1);
00267         DDebug("ram_cache", "get %X %d %d size %d HIT", key->slice32(3), auxkey1, auxkey2, e->size);
00268         return 1;
00269       } else {
00270         CACHE_SUM_DYN_STAT_THREAD(cache_ram_cache_misses_stat, 1);
00271         DDebug("ram_cache", "get %X %d %d HISTORY", key->slice32(3), auxkey1, auxkey2);
00272         return 0;
00273       }
00274     }
00275     assert(e != e->hash_link.next);
00276     e = e->hash_link.next;
00277   }
00278   DDebug("ram_cache", "get %X %d %d MISS", key->slice32(3), auxkey1, auxkey2);
00279 Lerror:
00280   CACHE_SUM_DYN_STAT_THREAD(cache_ram_cache_misses_stat, 1);
00281   return 0;
00282 Lfailed:
00283   ats_free(b);
00284   destroy(e);
00285   DDebug("ram_cache", "get %X %d %d Z_ERR", key->slice32(3), auxkey1, auxkey2);
00286   goto Lerror;
00287 }
00288 
00289 void RamCacheCLFUS::tick() {
00290   RamCacheCLFUSEntry *e = lru[1].dequeue();
00291   if (!e)
00292     return;
00293   e->hits <<= 1;
00294   if (e->hits) {
00295     e->hits = REQUEUE_HITS(e->hits);
00296     lru[1].enqueue(e);
00297   } else
00298     goto Lfree;
00299   if (history <= objects + HISTORY_HYSTERIA)
00300     return;
00301   e = lru[1].dequeue();
00302 Lfree:
00303   e->flag_bits.lru = 0;
00304   history--;
00305   uint32_t b = e->key.slice32(3) % nbuckets;
00306   bucket[b].remove(e);
00307   DDebug("ram_cache", "put %X %d %d size %d FREED", e->key.slice32(3), e->auxkey1, e->auxkey2, e->size);
00308   THREAD_FREE(e, ramCacheCLFUSEntryAllocator, this_thread());
00309 }
00310 
00311 void
00312 RamCacheCLFUS::victimize(RamCacheCLFUSEntry *e)
00313 {
00314   objects--;
00315   DDebug("ram_cache", "put %X %d %d size %d VICTIMIZED", e->key.slice32(3), e->auxkey1, e->auxkey2, e->size);
00316   e->data = NULL;
00317   e->flag_bits.lru = 1;
00318   lru[1].enqueue(e);
00319   history++;
00320 }
00321 
00322 void
00323 RamCacheCLFUS::move_compressed(RamCacheCLFUSEntry *e)
00324 {
00325   if (e == compressed) {
00326     if (compressed->lru_link.next)
00327       compressed = compressed->lru_link.next;
00328     else {
00329       ncompressed--;
00330       compressed = compressed->lru_link.prev;
00331     }
00332   }
00333 }
00334 
00335 RamCacheCLFUSEntry *
00336 RamCacheCLFUS::destroy(RamCacheCLFUSEntry *e)
00337 {
00338   RamCacheCLFUSEntry *ret = e->hash_link.next;
00339   move_compressed(e);
00340   lru[e->flag_bits.lru].remove(e);
00341   if (!e->flag_bits.lru) {
00342     objects--;
00343     bytes -= e->size + ENTRY_OVERHEAD;
00344     CACHE_SUM_DYN_STAT_THREAD(cache_ram_cache_bytes_stat, -(int64_t)e->size);
00345     e->data = NULL;
00346   } else
00347     history--;
00348   uint32_t b = e->key.slice32(3) % nbuckets;
00349   bucket[b].remove(e);
00350   DDebug("ram_cache", "put %X %d %d DESTROYED", e->key.slice32(3), e->auxkey1, e->auxkey2);
00351   THREAD_FREE(e, ramCacheCLFUSEntryAllocator, this_thread());
00352   return ret;
00353 }
00354 
00355 void
00356 RamCacheCLFUS::compress_entries(EThread *thread, int do_at_most)
00357 {
00358   if (!cache_config_ram_cache_compress)
00359     return;
00360   ink_assert(vol != 0);
00361   MUTEX_TAKE_LOCK(vol->mutex, thread);
00362   if (!compressed) {
00363     compressed = lru[0].head;
00364     ncompressed = 0;
00365   }
00366   float target = (cache_config_ram_cache_compress_percent / 100.0) * objects;
00367   int n = 0;
00368   char *b = 0, *bb = 0;
00369   while (compressed && target > ncompressed) {
00370     RamCacheCLFUSEntry *e = compressed;
00371     if (e->flag_bits.incompressible || e->flag_bits.compressed)
00372       goto Lcontinue;
00373     n++;
00374     if (do_at_most < n)
00375       break;
00376     {
00377       e->compressed_len = e->size;
00378       uint32_t l = 0;
00379       int ctype = cache_config_ram_cache_compress;
00380       switch (ctype) {
00381         default: goto Lcontinue;
00382         case CACHE_COMPRESSION_FASTLZ: l = (uint32_t)((double)e->len * 1.05 + 66); break;
00383 #if TS_HAS_LIBZ
00384         case CACHE_COMPRESSION_LIBZ: l = (uint32_t)compressBound(e->len); break;
00385 #endif
00386 #if TS_HAS_LZMA
00387         case CACHE_COMPRESSION_LIBLZMA: l = e->len; break;
00388 #endif
00389       }
00390       // store transient data for lock release
00391       Ptr<IOBufferData> edata = e->data;
00392       uint32_t elen = e->len;
00393       INK_MD5 key = e->key;
00394       MUTEX_UNTAKE_LOCK(vol->mutex, thread);
00395       b = (char*)ats_malloc(l);
00396       bool failed = false;
00397       switch (ctype) {
00398         default: goto Lfailed;
00399         case CACHE_COMPRESSION_FASTLZ:
00400           if (e->len < 16) goto Lfailed;
00401           if ((l = fastlz_compress(edata->data(), elen, b)) <= 0)
00402             failed = true;
00403           break;
00404 #if TS_HAS_LIBZ
00405         case CACHE_COMPRESSION_LIBZ: {
00406           uLongf ll = l;
00407           if ((Z_OK != compress((Bytef*)b, &ll, (Bytef*)edata->data(), elen)))
00408             failed = true;
00409           l = (int)ll;
00410           break;
00411         }
00412 #endif
00413 #if TS_HAS_LZMA
00414         case CACHE_COMPRESSION_LIBLZMA: {
00415           size_t pos = 0, ll = l;
00416           if (LZMA_OK != lzma_easy_buffer_encode(LZMA_PRESET_DEFAULT, LZMA_CHECK_NONE, NULL,
00417                                                  (uint8_t*)edata->data(), elen, (uint8_t*)b, &pos, ll))
00418             failed = true;
00419           l = (int)pos;
00420           break;
00421         }
00422 #endif
00423       }
00424       MUTEX_TAKE_LOCK(vol->mutex, thread);
00425       // see if the entry is till around
00426       {
00427         uint32_t i = key.slice32(3) % nbuckets;
00428         RamCacheCLFUSEntry *ee = bucket[i].head;
00429         while (ee) {
00430           if (ee->key == key && ee->data == edata) break;
00431           ee = ee->hash_link.next;
00432         }
00433         if (!ee || ee != e) {
00434           e = compressed;
00435           goto Lcontinue;
00436         }
00437         if (failed)
00438           goto Lfailed;
00439       }
00440       if (l > REQUIRED_COMPRESSION * e->len)
00441         e->flag_bits.incompressible = true;
00442       if (l > REQUIRED_SHRINK * e->size)
00443         goto Lfailed;
00444       if (l < e->len) {
00445         e->flag_bits.compressed = cache_config_ram_cache_compress;
00446         bb = (char*)ats_malloc(l);
00447         memcpy(bb, b, l);
00448         ats_free(b);
00449         e->compressed_len = l;
00450         int64_t delta = ((int64_t)l) - (int64_t)e->size;
00451         bytes += delta;
00452         CACHE_SUM_DYN_STAT_THREAD(cache_ram_cache_bytes_stat, delta);
00453         e->size = l;
00454       } else {
00455         ats_free(b);
00456         e->flag_bits.compressed = 0;
00457         bb = (char*)ats_malloc(e->len);
00458         memcpy(bb, e->data->data(), e->len);
00459         int64_t delta = ((int64_t)e->len) - (int64_t)e->size;
00460         bytes += delta;
00461         CACHE_SUM_DYN_STAT_THREAD(cache_ram_cache_bytes_stat, delta);
00462         e->size = e->len;
00463         l = e->len;
00464       }
00465       e->data = new_xmalloc_IOBufferData(bb, l);
00466       e->data->_mem_type = DEFAULT_ALLOC;
00467       check_accounting(this);
00468     }
00469     goto Lcontinue;
00470   Lfailed:
00471     ats_free(b);
00472     e->flag_bits.incompressible = 1;
00473   Lcontinue:;
00474     DDebug("ram_cache", "compress %X %d %d %d %d %d %d %d",
00475            e->key.slice32(3), e->auxkey1, e->auxkey2,
00476            e->flag_bits.incompressible, e->flag_bits.compressed,
00477            e->len, e->compressed_len, ncompressed);
00478     if (!e->lru_link.next)
00479       break;
00480     compressed = e->lru_link.next;
00481     ncompressed++;
00482   }
00483   MUTEX_UNTAKE_LOCK(vol->mutex, thread);
00484   return;
00485 }
00486 
00487 void
00488 RamCacheCLFUS::requeue_victims(RamCacheCLFUS *c, Que(RamCacheCLFUSEntry, lru_link) &victims)
00489 {
00490   RamCacheCLFUSEntry *victim = 0;
00491   while ((victim = victims.dequeue())) {
00492     c->bytes += victim->size + ENTRY_OVERHEAD;
00493     CACHE_SUM_DYN_STAT_THREAD(cache_ram_cache_bytes_stat, victim->size);
00494     victim->hits = REQUEUE_HITS(victim->hits);
00495     c->lru[0].enqueue(victim);
00496   }
00497 }
00498 
00499 int
00500 RamCacheCLFUS::put(INK_MD5 *key, IOBufferData *data, uint32_t len, bool copy, uint32_t auxkey1, uint32_t auxkey2)
00501 {
00502   if (!max_bytes)
00503     return 0;
00504   uint32_t i = key->slice32(3) % nbuckets;
00505   RamCacheCLFUSEntry *e = bucket[i].head;
00506   uint32_t size = copy ? len : data->block_size();
00507   while (e) {
00508     if (e->key == *key) {
00509       if (e->auxkey1 == auxkey1 && e->auxkey2 == auxkey2)
00510         break;
00511       else {
00512         e = destroy(e); // discard when aux keys conflict
00513         continue;
00514       }
00515     }
00516     e = e->hash_link.next;
00517   }
00518   if (e) {
00519     e->hits++;
00520     if (!e->flag_bits.lru) { // already in cache
00521       move_compressed(e);
00522       lru[e->flag_bits.lru].remove(e);
00523       lru[e->flag_bits.lru].enqueue(e);
00524       int64_t delta = ((int64_t)size) - (int64_t)e->size;
00525       bytes += delta;
00526       CACHE_SUM_DYN_STAT_THREAD(cache_ram_cache_bytes_stat, delta);
00527       if (!copy) {
00528         e->size = size;
00529         e->data = data;
00530       } else {
00531         char *b = (char*)ats_malloc(len);
00532         memcpy(b, data->data(), len);
00533         e->data = new_xmalloc_IOBufferData(b, len);
00534         e->data->_mem_type = DEFAULT_ALLOC;
00535         e->size = size;
00536       }
00537       check_accounting(this);
00538       e->flag_bits.copy = copy;
00539       e->flag_bits.compressed = 0;
00540       DDebug("ram_cache", "put %X %d %d size %d HIT", key->slice32(3), auxkey1, auxkey2, e->size);
00541       return 1;
00542     } else
00543       lru[1].remove(e);
00544   }
00545   Que(RamCacheCLFUSEntry, lru_link) victims;
00546   RamCacheCLFUSEntry *victim = 0;
00547   if (!lru[1].head) // initial fill
00548     if (bytes + size <= max_bytes)
00549       goto Linsert;
00550   if (!e && cache_config_ram_cache_use_seen_filter) {
00551     uint32_t s = key->slice32(3) % bucket_sizes[ibuckets];
00552     uint16_t k = key->slice32(3) >> 16;
00553     uint16_t kk = seen[s];
00554     seen[s] = k;
00555     if (history >= objects && kk != k) {
00556       DDebug("ram_cache", "put %X %d %d size %d UNSEEN", key->slice32(3), auxkey1, auxkey2, size);
00557       return 0;
00558     }
00559   }
00560   while (1) {
00561     victim = lru[0].dequeue();
00562     if (!victim) {
00563       if (bytes + size <= max_bytes)
00564         goto Linsert;
00565       if (e)
00566         lru[1].enqueue(e);
00567       requeue_victims(this, victims);
00568       DDebug("ram_cache", "put %X %d %d NO VICTIM", key->slice32(3), auxkey1, auxkey2);
00569       return 0;
00570     }
00571     bytes -= victim->size + ENTRY_OVERHEAD;
00572     CACHE_SUM_DYN_STAT_THREAD(cache_ram_cache_bytes_stat, -(int64_t)victim->size);
00573     victims.enqueue(victim);
00574     if (victim == compressed)
00575       compressed = 0;
00576     else
00577       ncompressed--;
00578     victim->hits <<= 1;
00579     tick();
00580     if (!e)
00581       goto Lhistory;
00582     else { // e from history
00583       DDebug("ram_cache_compare", "put %f %f", CACHE_VALUE(victim), CACHE_VALUE(e));
00584       if (bytes + victim->size + size > max_bytes && CACHE_VALUE(victim) > CACHE_VALUE(e)) {
00585         requeue_victims(this, victims);
00586         lru[1].enqueue(e);
00587         DDebug("ram_cache", "put %X %d %d size %d INC %" PRId64" HISTORY",
00588                key->slice32(3), auxkey1, auxkey2, e->size, e->hits);
00589         return 0;
00590       }
00591     }
00592     if (bytes + size <= max_bytes)
00593       goto Linsert;
00594   }
00595 Linsert:
00596   while ((victim = victims.dequeue())) {
00597     if (bytes + size + victim->size <= max_bytes) {
00598       bytes += victim->size + ENTRY_OVERHEAD;
00599       CACHE_SUM_DYN_STAT_THREAD(cache_ram_cache_bytes_stat, victim->size);
00600       victim->hits = REQUEUE_HITS(victim->hits);
00601       lru[0].enqueue(victim);
00602     } else
00603       victimize(victim);
00604   }
00605   if (e) {
00606     history--; // move from history
00607   } else {
00608     e = THREAD_ALLOC(ramCacheCLFUSEntryAllocator, this_ethread());
00609     e->key = *key;
00610     e->auxkey1 = auxkey1;
00611     e->auxkey2 = auxkey2;
00612     e->hits = 1;
00613     bucket[i].push(e);
00614     if (objects > nbuckets) {
00615       ++ibuckets;
00616       resize_hashtable();
00617     }
00618   }
00619   check_accounting(this);
00620   e->flags = 0;
00621   if (!copy)
00622     e->data = data;
00623   else {
00624     char *b = (char*)ats_malloc(len);
00625     memcpy(b, data->data(), len);
00626     e->data = new_xmalloc_IOBufferData(b, len);
00627     e->data->_mem_type = DEFAULT_ALLOC;
00628   }
00629   e->flag_bits.copy = copy;
00630   bytes += size + ENTRY_OVERHEAD;
00631   CACHE_SUM_DYN_STAT_THREAD(cache_ram_cache_bytes_stat, size);
00632   e->size = size;
00633   objects++;
00634   lru[0].enqueue(e);
00635   e->len = len;
00636   check_accounting(this);
00637   DDebug("ram_cache", "put %X %d %d size %d INSERTED", key->slice32(3), auxkey1, auxkey2, e->size);
00638   return 1;
00639 Lhistory:
00640   requeue_victims(this, victims);
00641   check_accounting(this);
00642   e = THREAD_ALLOC(ramCacheCLFUSEntryAllocator, this_ethread());
00643   e->key = *key;
00644   e->auxkey1 = auxkey1;
00645   e->auxkey2 = auxkey2;
00646   e->hits = 1;
00647   e->size = data->block_size();
00648   e->flags = 0;
00649   bucket[i].push(e);
00650   e->flag_bits.lru = 1;
00651   lru[1].enqueue(e);
00652   history++;
00653   DDebug("ram_cache", "put %X %d %d HISTORY", key->slice32(3), auxkey1, auxkey2);
00654   return 0;
00655 }
00656 
00657 int
00658 RamCacheCLFUS::fixup(INK_MD5 * key, uint32_t old_auxkey1, uint32_t old_auxkey2, uint32_t new_auxkey1,
00659                      uint32_t new_auxkey2)
00660 {
00661   if (!max_bytes)
00662     return 0;
00663   uint32_t i = key->slice32(3) % nbuckets;
00664   RamCacheCLFUSEntry *e = bucket[i].head;
00665   while (e) {
00666     if (e->key == *key && e->auxkey1 == old_auxkey1 && e->auxkey2 == old_auxkey2) {
00667       e->auxkey1 = new_auxkey1;
00668       e->auxkey2 = new_auxkey2;
00669       return 1;
00670     }
00671     e = e->hash_link.next;
00672   }
00673   return 0;
00674 }
00675 
00676 RamCache *
00677 new_RamCacheCLFUS()
00678 {
00679   RamCacheCLFUS *r = new RamCacheCLFUS;
00680   return r;
00681 }

Generated by  doxygen 1.7.1