00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 #ifndef _P_CACHE_INTERNAL_H__
00026 #define _P_CACHE_INTERNAL_H__
00027 
00028 #include "libts.h"
00029 
00030 #ifdef HTTP_CACHE
00031 #include "HTTP.h"
00032 #include "P_CacheHttp.h"
00033 #endif
00034 
00035 struct EvacuationBlock;
00036 
00037 
00038 
00039 #define ALTERNATES                      1
00040 
00041 
00042 
00043 #define MAX_CACHE_VCS_PER_THREAD        500
00044 
00045 #define INTEGRAL_FRAGS                  4
00046 
00047 #ifdef CACHE_INSPECTOR_PAGES
00048 #ifdef DEBUG
00049 #define CACHE_STAT_PAGES
00050 #endif
00051 #endif
00052 
00053 #ifdef DEBUG
00054 #define DDebug Debug
00055 #else
00056 #define DDebug if (0) dummy_debug
00057 #endif
00058 
00059 #define AIO_SOFT_FAILURE                -100000
00060 
00061 #define WRITER_RETRY_DELAY  HRTIME_MSECONDS(50)
00062 
00063 #ifndef CACHE_LOCK_FAIL_RATE
00064 #define CACHE_TRY_LOCK(_l, _m, _t) MUTEX_TRY_LOCK(_l, _m, _t)
00065 #else
00066 #define CACHE_TRY_LOCK(_l, _m, _t)                             \
00067   MUTEX_TRY_LOCK(_l, _m, _t);                                  \
00068   if ((uint32_t)_t->generator.random() <                         \
00069      (uint32_t)(UINT_MAX *CACHE_LOCK_FAIL_RATE))                 \
00070     CACHE_MUTEX_RELEASE(_l)
00071 #endif
00072 
00073 
00074 #define VC_LOCK_RETRY_EVENT() \
00075   do { \
00076     trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay), event); \
00077     return EVENT_CONT; \
00078   } while (0)
00079 
00080 #define VC_SCHED_LOCK_RETRY() \
00081   do { \
00082     trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); \
00083     return EVENT_CONT; \
00084   } while (0)
00085 
00086 #define CONT_SCHED_LOCK_RETRY_RET(_c) \
00087   do { \
00088     _c->mutex->thread_holding->schedule_in_local(_c, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); \
00089     return EVENT_CONT; \
00090   } while (0)
00091 
00092 #define CONT_SCHED_LOCK_RETRY(_c) \
00093   _c->mutex->thread_holding->schedule_in_local(_c, HRTIME_MSECONDS(cache_config_mutex_retry_delay))
00094 
00095 #define VC_SCHED_WRITER_RETRY() \
00096   do { \
00097     ink_assert(!trigger); \
00098     writer_lock_retry++; \
00099     ink_hrtime _t = WRITER_RETRY_DELAY; \
00100     if (writer_lock_retry > 2) \
00101       _t = WRITER_RETRY_DELAY * 2; \
00102     else if (writer_lock_retry > 5) \
00103       _t = WRITER_RETRY_DELAY * 10; \
00104     else if (writer_lock_retry > 10) \
00105       _t = WRITER_RETRY_DELAY * 100; \
00106     trigger = mutex->thread_holding->schedule_in_local(this, _t); \
00107     return EVENT_CONT; \
00108   } while (0)
00109 
00110 
00111   
00112 enum
00113 {
00114   cache_bytes_used_stat,
00115   cache_bytes_total_stat,
00116   cache_ram_cache_bytes_stat,
00117   cache_ram_cache_bytes_total_stat,
00118   cache_direntries_total_stat,
00119   cache_direntries_used_stat,
00120   cache_ram_cache_hits_stat,
00121   cache_ram_cache_misses_stat,
00122   cache_pread_count_stat,
00123   cache_percent_full_stat,
00124   cache_lookup_active_stat,
00125   cache_lookup_success_stat,
00126   cache_lookup_failure_stat,
00127   cache_read_active_stat,
00128   cache_read_success_stat,
00129   cache_read_failure_stat,
00130 #if TS_USE_INTERIM_CACHE == 1
00131   cache_interim_read_success_stat,
00132   cache_disk_read_success_stat,
00133   cache_ram_read_success_stat,
00134 #endif
00135   cache_write_active_stat,
00136   cache_write_success_stat,
00137   cache_write_failure_stat,
00138   cache_write_backlog_failure_stat,
00139   cache_update_active_stat,
00140   cache_update_success_stat,
00141   cache_update_failure_stat,
00142   cache_remove_active_stat,
00143   cache_remove_success_stat,
00144   cache_remove_failure_stat,
00145   cache_evacuate_active_stat,
00146   cache_evacuate_success_stat,
00147   cache_evacuate_failure_stat,
00148   cache_scan_active_stat,
00149   cache_scan_success_stat,
00150   cache_scan_failure_stat,
00151   cache_directory_collision_count_stat,
00152   cache_single_fragment_document_count_stat,
00153   cache_two_fragment_document_count_stat,
00154   cache_three_plus_plus_fragment_document_count_stat,
00155   cache_read_busy_success_stat,
00156   cache_read_busy_failure_stat,
00157   cache_gc_bytes_evacuated_stat,
00158   cache_gc_frags_evacuated_stat,
00159   cache_write_bytes_stat,
00160   cache_hdr_vector_marshal_stat,
00161   cache_hdr_marshal_stat,
00162   cache_hdr_marshal_bytes_stat,
00163   cache_stat_count
00164 };
00165 
00166 
00167 extern RecRawStatBlock *cache_rsb;
00168 
00169 #define GLOBAL_CACHE_SET_DYN_STAT(x,y) \
00170         RecSetGlobalRawStatSum(cache_rsb, (x), (y))
00171 
00172 #define CACHE_SET_DYN_STAT(x,y) \
00173         RecSetGlobalRawStatSum(cache_rsb, (x), (y)) \
00174         RecSetGlobalRawStatSum(vol->cache_vol->vol_rsb, (x), (y))
00175 
00176 #define CACHE_INCREMENT_DYN_STAT(x) \
00177         RecIncrRawStat(cache_rsb, mutex->thread_holding, (int) (x), 1); \
00178         RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int) (x), 1);
00179 
00180 #define CACHE_DECREMENT_DYN_STAT(x) \
00181         RecIncrRawStat(cache_rsb, mutex->thread_holding, (int) (x), -1); \
00182         RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int) (x), -1);
00183 
00184 #define CACHE_VOL_SUM_DYN_STAT(x,y) \
00185         RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int) (x), (int64_t) y);
00186 
00187 #define CACHE_SUM_DYN_STAT(x, y) \
00188         RecIncrRawStat(cache_rsb, mutex->thread_holding, (int) (x), (int64_t) (y)); \
00189         RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int) (x), (int64_t) (y));
00190 
00191 #define CACHE_SUM_DYN_STAT_THREAD(x, y) \
00192         RecIncrRawStat(cache_rsb, this_ethread(), (int) (x), (int64_t) (y)); \
00193         RecIncrRawStat(vol->cache_vol->vol_rsb, this_ethread(), (int) (x), (int64_t) (y));
00194 
00195 #define GLOBAL_CACHE_SUM_GLOBAL_DYN_STAT(x, y) \
00196         RecIncrGlobalRawStatSum(cache_rsb,(x),(y))
00197 
00198 #define CACHE_SUM_GLOBAL_DYN_STAT(x, y) \
00199         RecIncrGlobalRawStatSum(cache_rsb,(x),(y)) \
00200         RecIncrGlobalRawStatSum(vol->cache_vol->vol_rsb,(x),(y))
00201 
00202 #define CACHE_CLEAR_DYN_STAT(x) \
00203 do { \
00204         RecSetRawStatSum(cache_rsb, (x), 0); \
00205         RecSetRawStatCount(cache_rsb, (x), 0); \
00206         RecSetRawStatSum(vol->cache_vol->vol_rsb, (x), 0); \
00207         RecSetRawStatCount(vol->cache_vol->vol_rsb, (x), 0); \
00208 } while (0);
00209 
00210 
00211 extern int cache_config_dir_sync_frequency;
00212 extern int cache_config_http_max_alts;
00213 extern int cache_config_permit_pinning;
00214 extern int cache_config_select_alternate;
00215 extern int cache_config_vary_on_user_agent;
00216 extern int cache_config_max_doc_size;
00217 extern int cache_config_min_average_object_size;
00218 extern int cache_config_agg_write_backlog;
00219 extern int cache_config_enable_checksum;
00220 extern int cache_config_alt_rewrite_max_size;
00221 extern int cache_config_read_while_writer;
00222 extern int cache_clustering_enabled;
00223 extern int cache_config_agg_write_backlog;
00224 extern int cache_config_ram_cache_compress;
00225 extern int cache_config_ram_cache_compress_percent;
00226 extern int cache_config_ram_cache_use_seen_filter;
00227 extern int cache_config_hit_evacuate_percent;
00228 extern int cache_config_hit_evacuate_size_limit;
00229 extern int cache_config_force_sector_size;
00230 extern int cache_config_target_fragment_size;
00231 extern int cache_config_mutex_retry_delay;
00232 #if TS_USE_INTERIM_CACHE == 1
00233 extern int good_interim_disks;
00234 #endif
00235 
00236 struct CacheVC: public CacheVConnection
00237 {
00238   CacheVC();
00239 
00240   VIO *do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf);
00241   VIO *do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *buf, int64_t offset);
00242   VIO *do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner = false);
00243   void do_io_close(int lerrno = -1);
00244   void reenable(VIO *avio);
00245   void reenable_re(VIO *avio);
00246   bool get_data(int i, void *data);
00247   bool set_data(int i, void *data);
00248 
00249   bool is_ram_cache_hit() const
00250   {
00251     ink_assert(vio.op == VIO::READ);
00252     return !f.not_from_ram_cache;
00253   }
00254   int get_header(void **ptr, int *len)
00255   {
00256     if (first_buf.m_ptr) {
00257       Doc *doc = (Doc*)first_buf->data();
00258       *ptr = doc->hdr();
00259       *len = doc->hlen;
00260       return 0;
00261     } else
00262       return -1;
00263   }
00264   int set_header(void *ptr, int len)
00265   {
00266     header_to_write = ptr;
00267     header_to_write_len = len;
00268     return 0;
00269   }
00270   int get_single_data(void **ptr, int *len)
00271   {
00272     if (first_buf.m_ptr) {
00273       Doc *doc = (Doc*)first_buf->data();
00274       if (doc->data_len() == doc->total_len) {
00275         *ptr = doc->data();
00276         *len = doc->data_len();
00277         return 0;
00278       }
00279     }
00280     return -1;
00281   }
00282 
00283   bool writer_done();
00284   int calluser(int event);
00285   int callcont(int event);
00286   int die();
00287   int dead(int event, Event *e);
00288 
00289   int handleReadDone(int event, Event *e);
00290   int handleRead(int event, Event *e);
00291   int do_read_call(CacheKey *akey);
00292   int handleWrite(int event, Event *e);
00293   int handleWriteLock(int event, Event *e);
00294   int do_write_call();
00295   int do_write_lock();
00296   int do_write_lock_call();
00297   int do_sync(uint32_t target_write_serial);
00298 
00299   int openReadClose(int event, Event *e);
00300   int openReadReadDone(int event, Event *e);
00301   int openReadMain(int event, Event *e);
00302   int openReadStartEarliest(int event, Event *e);
00303 #ifdef HTTP_CACHE
00304   int openReadVecWrite(int event, Event *e);
00305 #endif
00306   int openReadStartHead(int event, Event *e);
00307   int openReadFromWriter(int event, Event *e);
00308   int openReadFromWriterMain(int event, Event *e);
00309   int openReadFromWriterFailure(int event, Event *);
00310   int openReadChooseWriter(int event, Event *e);
00311 
00312   int openWriteCloseDir(int event, Event *e);
00313   int openWriteCloseHeadDone(int event, Event *e);
00314   int openWriteCloseHead(int event, Event *e);
00315   int openWriteCloseDataDone(int event, Event *e);
00316   int openWriteClose(int event, Event *e);
00317   int openWriteRemoveVector(int event, Event *e);
00318   int openWriteWriteDone(int event, Event *e);
00319   int openWriteOverwrite(int event, Event *e);
00320   int openWriteMain(int event, Event *e);
00321   int openWriteStartDone(int event, Event *e);
00322   int openWriteStartBegin(int event, Event *e);
00323 
00324   int updateVector(int event, Event *e);
00325   int updateReadDone(int event, Event *e);
00326   int updateVecWrite(int event, Event *e);
00327 
00328   int removeEvent(int event, Event *e);
00329 
00330   int linkWrite(int event, Event *e);
00331   int derefRead(int event, Event *e);
00332 
00333   int scanVol(int event, Event *e);
00334   int scanObject(int event, Event *e);
00335   int scanUpdateDone(int event, Event *e);
00336   int scanOpenWrite(int event, Event *e);
00337   int scanRemoveDone(int event, Event *e);
00338 
00339   int is_io_in_progress()
00340   {
00341     return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS;
00342   }
00343   void set_io_not_in_progress()
00344   {
00345     io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS;
00346   }
00347   void set_agg_write_in_progress()
00348   {
00349     io.aiocb.aio_fildes = AIO_AGG_WRITE_IN_PROGRESS;
00350   }
00351   int evacuateDocDone(int event, Event *e);
00352   int evacuateReadHead(int event, Event *e);
00353 
00354   void cancel_trigger();
00355   virtual int64_t get_object_size();
00356 #ifdef HTTP_CACHE
00357   virtual void set_http_info(CacheHTTPInfo *info);
00358   virtual void get_http_info(CacheHTTPInfo ** info);
00359 
00360 
00361 
00362 
00363   virtual HTTPInfo::FragOffset* get_frag_table();
00364 
00365 
00366 
00367   virtual uint32_t load_http_info(CacheHTTPInfoVector* info, struct Doc* doc, RefCountObj * block_ptr = NULL);
00368 #endif
00369   virtual bool is_pread_capable();
00370   virtual bool set_pin_in_cache(time_t time_pin);
00371   virtual time_t get_pin_in_cache();
00372   virtual bool set_disk_io_priority(int priority);
00373   virtual int get_disk_io_priority();
00374 
00375   
00376 #define CACHE_STAT_ACTIVE  0
00377 #define CACHE_STAT_SUCCESS 1
00378 #define CACHE_STAT_FAILURE 2
00379 
00380   
00381   
00382   
00383   static int size_to_init;
00384 
00385   
00386   
00387   
00388   
00389   
00390   CacheKey key, first_key, earliest_key, update_key;
00391   Dir dir, earliest_dir, overwrite_dir, first_dir;
00392   
00393 
00394   
00395   
00396   
00397   
00398   Action _action;
00399 #ifdef HTTP_CACHE
00400   CacheHTTPHdr request;
00401 #endif
00402   CacheHTTPInfoVector vector;
00403   CacheHTTPInfo alternate;
00404   Ptr<IOBufferData> buf;
00405   Ptr<IOBufferData> first_buf;
00406   Ptr<IOBufferBlock> blocks; 
00407   Ptr<IOBufferBlock> writer_buf;
00408 
00409   OpenDirEntry *od;
00410   AIOCallbackInternal io;
00411   int alternate_index;          
00412   LINK(CacheVC, opendir_link);
00413 #ifdef CACHE_STAT_PAGES
00414   LINK(CacheVC, stat_link);
00415 #endif
00416   
00417 
00418   
00419   
00420   
00421   
00422   
00423   
00424   
00425   VIO vio;
00426   EThread *initial_thread;  
00427   CacheFragType frag_type;
00428   CacheHTTPInfo *info;
00429   CacheHTTPInfoVector *write_vector;
00430 #ifdef HTTP_CACHE
00431   CacheLookupHttpConfig *params;
00432 #endif
00433   int header_len;       
00434   int frag_len;         
00435   uint32_t write_len;     
00436   uint32_t agg_len;       
00437   uint32_t write_serial;  
00438   Vol *vol;
00439   Dir *last_collision;
00440   Event *trigger;
00441   CacheKey *read_key;
00442   ContinuationHandler save_handler;
00443   uint32_t pin_in_cache;
00444   ink_hrtime start_time;
00445   int base_stat;
00446   int recursive;
00447   int closed;
00448   uint64_t seek_to;               
00449   int64_t offset;                 
00450   int64_t writer_offset;          
00451   int64_t length;                 
00452   int64_t doc_pos;                
00453   uint64_t write_pos;             
00454   uint64_t total_len;             
00455   uint64_t doc_len;               
00456   uint64_t update_len;
00457   int fragment;
00458   int scan_msec_delay;
00459   CacheVC *write_vc;
00460   char *hostname;
00461   int host_len;
00462   int header_to_write_len;
00463   void *header_to_write;
00464   short writer_lock_retry;
00465 #if TS_USE_INTERIM_CACHE == 1
00466   InterimCacheVol *interim_vol;
00467   MigrateToInterimCache *mts;
00468   uint64_t dir_off;
00469 #endif
00470   union
00471   {
00472     uint32_t flags;
00473     struct
00474     {
00475       unsigned int use_first_key:1;
00476       unsigned int overwrite:1; 
00477       unsigned int close_complete:1; 
00478       unsigned int sync:1; 
00479       unsigned int evacuator:1;
00480       unsigned int single_fragment:1;
00481       unsigned int evac_vector:1;
00482       unsigned int lookup:1;
00483       unsigned int update:1;
00484       unsigned int remove:1;
00485       unsigned int remove_aborted_writers:1;
00486       unsigned int open_read_timeout:1; 
00487       unsigned int data_done:1;
00488       unsigned int read_from_writer_called:1;
00489       unsigned int not_from_ram_cache:1;        
00490       unsigned int rewrite_resident_alt:1;
00491       unsigned int readers:1;
00492       unsigned int doc_from_ram_cache:1;
00493       unsigned int hit_evacuate:1;
00494 #if TS_USE_INTERIM_CACHE == 1
00495       unsigned int read_from_interim:1;
00496       unsigned int write_into_interim:1;
00497       unsigned int ram_fixup:1;
00498       unsigned int transistor:1;
00499 #endif
00500 #ifdef HTTP_CACHE
00501       unsigned int allow_empty_doc:1; 
00502 #endif
00503     } f;
00504   };
00505   
00506   
00507   char *scan_vol_map; 
00508   
00509   
00510   off_t scan_fix_buffer_offset;
00511   
00512 };
00513 
00514 #define PUSH_HANDLER(_x) do {                                           \
00515     ink_assert(handler != (ContinuationHandler)(&CacheVC::dead));       \
00516     save_handler = handler; handler = (ContinuationHandler)(_x);        \
00517 } while (0)
00518 
00519 #define POP_HANDLER do {                                          \
00520     handler = save_handler;                                       \
00521     ink_assert(handler != (ContinuationHandler)(&CacheVC::dead)); \
00522   } while (0)
00523 
00524 struct CacheRemoveCont: public Continuation
00525 {
00526   int event_handler(int event, void *data);
00527 
00528   CacheRemoveCont()
00529     : Continuation(NULL)
00530   { }
00531 };
00532 
00533 
00534 
00535 #if TS_USE_INTERIM_CACHE == 1
00536 extern ClassAllocator<MigrateToInterimCache> migrateToInterimCacheAllocator;
00537 #endif
00538 extern ClassAllocator<CacheVC> cacheVConnectionAllocator;
00539 extern CacheKey zero_key;
00540 extern CacheSync *cacheDirSync;
00541 
00542 #ifdef HTTP_CACHE
00543 int cache_write(CacheVC *, CacheHTTPInfoVector *);
00544 int get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key);
00545 #endif
00546 CacheVC *new_DocEvacuator(int nbytes, Vol *d);
00547 
00548 
00549 
00550 TS_INLINE CacheVC *
00551 new_CacheVC(Continuation *cont)
00552 {
00553   EThread *t = cont->mutex->thread_holding;
00554   CacheVC *c = THREAD_ALLOC(cacheVConnectionAllocator, t);
00555 #ifdef HTTP_CACHE
00556   c->vector.data.data = &c->vector.data.fast_data[0];
00557 #endif
00558   c->_action = cont;
00559   c->initial_thread = t->tt == DEDICATED ? NULL : t;
00560   c->mutex = cont->mutex;
00561   c->start_time = ink_get_hrtime();
00562   ink_assert(c->trigger == NULL);
00563   Debug("cache_new", "new %p", c);
00564 #ifdef CACHE_STAT_PAGES
00565   ink_assert(!c->stat_link.next);
00566   ink_assert(!c->stat_link.prev);
00567 #endif
00568   dir_clear(&c->dir);
00569   return c;
00570 }
00571 
00572 TS_INLINE int
00573 free_CacheVC(CacheVC *cont)
00574 {
00575   Debug("cache_free", "free %p", cont);
00576   ProxyMutex *mutex = cont->mutex;
00577   Vol *vol = cont->vol;
00578   if (vol) {
00579     CACHE_DECREMENT_DYN_STAT(cont->base_stat + CACHE_STAT_ACTIVE);
00580     if (cont->closed > 0) {
00581       CACHE_INCREMENT_DYN_STAT(cont->base_stat + CACHE_STAT_SUCCESS);
00582 #if TS_USE_INTERIM_CACHE == 1
00583       if (cont->vio.op == VIO::READ) {
00584         if (cont->f.doc_from_ram_cache) {
00585           CACHE_INCREMENT_DYN_STAT(cache_ram_read_success_stat);
00586         } else if (cont->f.read_from_interim) {
00587           CACHE_INCREMENT_DYN_STAT(cache_interim_read_success_stat);
00588         } else {
00589           CACHE_INCREMENT_DYN_STAT(cache_disk_read_success_stat);
00590         }
00591       }
00592 #endif
00593     }                             
00594   }
00595   ink_assert(mutex->thread_holding == this_ethread());
00596   if (cont->trigger)
00597     cont->trigger->cancel();
00598   ink_assert(!cont->is_io_in_progress());
00599   ink_assert(!cont->od);
00600   
00601 
00602 
00603   cont->io.action.continuation = NULL;
00604   cont->io.action.mutex = NULL;
00605   cont->io.mutex.clear();
00606   cont->io.aio_result = 0;
00607   cont->io.aiocb.aio_nbytes = 0;
00608   cont->io.aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
00609 #ifdef HTTP_CACHE
00610   cont->request.reset();
00611   cont->vector.clear();
00612 #endif
00613   cont->vio.buffer.clear();
00614   cont->vio.mutex.clear();
00615 #ifdef HTTP_CACHE
00616   if (cont->vio.op == VIO::WRITE && cont->alternate_index == CACHE_ALT_INDEX_DEFAULT)
00617     cont->alternate.destroy();
00618   else
00619     cont->alternate.clear();
00620 #endif
00621   cont->_action.cancelled = 0;
00622   cont->_action.mutex.clear();
00623   cont->mutex.clear();
00624   cont->buf.clear();
00625   cont->first_buf.clear();
00626   cont->blocks.clear();
00627   cont->writer_buf.clear();
00628   cont->alternate_index = CACHE_ALT_INDEX_DEFAULT;
00629   if (cont->scan_vol_map)
00630     ats_free(cont->scan_vol_map);
00631   memset((char *) &cont->vio, 0, cont->size_to_init);
00632 #ifdef CACHE_STAT_PAGES
00633   ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
00634 #endif
00635 #ifdef DEBUG
00636   SET_CONTINUATION_HANDLER(cont, &CacheVC::dead);
00637 #endif
00638   THREAD_FREE(cont, cacheVConnectionAllocator, this_thread());
00639   return EVENT_DONE;
00640 }
00641 
00642 TS_INLINE int
00643 CacheVC::calluser(int event)
00644 {
00645   recursive++;
00646   ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
00647   vio._cont->handleEvent(event, (void *) &vio);
00648   recursive--;
00649   if (closed) {
00650     die();
00651     return EVENT_DONE;
00652   }
00653   return EVENT_CONT;
00654 }
00655 
00656 TS_INLINE int
00657 CacheVC::callcont(int event)
00658 {
00659   recursive++;
00660   ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
00661   _action.continuation->handleEvent(event, this);
00662   recursive--;
00663   if (closed)
00664     die();
00665   else if (vio.vc_server)
00666     handleEvent(EVENT_IMMEDIATE, 0);
00667   return EVENT_DONE;
00668 }
00669 
00670 TS_INLINE int
00671 CacheVC::do_read_call(CacheKey *akey)
00672 {
00673   doc_pos = 0;
00674   read_key = akey;
00675   io.aiocb.aio_nbytes = dir_approx_size(&dir);
00676 #if TS_USE_INTERIM_CACHE == 1
00677   interim_vol = NULL;
00678   ink_assert(mts == NULL);
00679   mts = NULL;
00680   f.write_into_interim = 0;
00681   f.ram_fixup = 0;
00682   f.transistor = 0;
00683   f.read_from_interim = dir_ininterim(&dir);
00684 
00685   if (!f.read_from_interim && vio.op == VIO::READ && good_interim_disks > 0){
00686     vol->history.put_key(read_key);
00687     if (vol->history.is_hot(read_key) && !vol->migrate_probe(read_key, NULL) && !od) {
00688       f.write_into_interim = 1;
00689     }
00690   }
00691   if (f.read_from_interim) {
00692     interim_vol = &vol->interim_vols[dir_get_index(&dir)];
00693     if (vio.op == VIO::READ && vol_transistor_range_valid(interim_vol, &dir)
00694         && !vol->migrate_probe(read_key, NULL) && !od)
00695       f.transistor = 1;
00696   }
00697   if (f.write_into_interim || f.transistor) {
00698     mts = migrateToInterimCacheAllocator.alloc();
00699     mts->vc = this;
00700     mts->key = *read_key;
00701     mts->rewrite = (f.transistor == 1);
00702     dir_assign(&mts->dir, &dir);
00703     vol->set_migrate_in_progress(mts);
00704   }
00705 #endif
00706   PUSH_HANDLER(&CacheVC::handleRead);
00707   return handleRead(EVENT_CALL, 0);
00708 }
00709 
00710 TS_INLINE int
00711 CacheVC::do_write_call()
00712 {
00713   PUSH_HANDLER(&CacheVC::handleWrite);
00714   return handleWrite(EVENT_CALL, 0);
00715 }
00716 
00717 TS_INLINE void
00718 CacheVC::cancel_trigger()
00719 {
00720   if (trigger) {
00721     trigger->cancel_action();
00722     trigger = NULL;
00723   }
00724 }
00725 
00726 TS_INLINE int
00727 CacheVC::die()
00728 {
00729   if (vio.op == VIO::WRITE) {
00730 #ifdef HTTP_CACHE
00731     if (f.update && total_len) {
00732       alternate.object_key_set(earliest_key);
00733     }
00734 #endif
00735     if (!is_io_in_progress()) {
00736       SET_HANDLER(&CacheVC::openWriteClose);
00737       if (!recursive)
00738         openWriteClose(EVENT_NONE, NULL);
00739     }                           
00740     return EVENT_CONT;
00741   } else {
00742     if (is_io_in_progress())
00743       save_handler = (ContinuationHandler) & CacheVC::openReadClose;
00744     else {
00745       SET_HANDLER(&CacheVC::openReadClose);
00746       if (!recursive)
00747         openReadClose(EVENT_NONE, NULL);
00748     }
00749     return EVENT_CONT;
00750   }
00751 }
00752 
00753 TS_INLINE int
00754 CacheVC::handleWriteLock(int , Event *e)
00755 {
00756   cancel_trigger();
00757   int ret = 0;
00758   {
00759     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00760     if (!lock) {
00761       set_agg_write_in_progress();
00762       trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
00763       return EVENT_CONT;
00764     }
00765     ret = handleWrite(EVENT_CALL, e);
00766   }
00767   if (ret == EVENT_RETURN)
00768     return handleEvent(AIO_EVENT_DONE, 0);
00769   return EVENT_CONT;
00770 }
00771 
00772 TS_INLINE int
00773 CacheVC::do_write_lock()
00774 {
00775   PUSH_HANDLER(&CacheVC::handleWriteLock);
00776   return handleWriteLock(EVENT_NONE, 0);
00777 }
00778 
00779 TS_INLINE int
00780 CacheVC::do_write_lock_call()
00781 {
00782   PUSH_HANDLER(&CacheVC::handleWriteLock);
00783   return handleWriteLock(EVENT_CALL, 0);
00784 }
00785 
00786 TS_INLINE bool
00787 CacheVC::writer_done()
00788 {
00789   OpenDirEntry *cod = od;
00790   if (!cod)
00791     cod = vol->open_read(&first_key);
00792   CacheVC *w = (cod) ? cod->writers.head : NULL;
00793   
00794   
00795   
00796   
00797   for (; w && (w != write_vc || w->start_time > start_time); w = (CacheVC *) w->opendir_link.next);
00798   if (!w)
00799     return true;
00800   return false;
00801 }
00802 
00803 TS_INLINE int
00804 Vol::close_write(CacheVC *cont)
00805 {
00806 #ifdef CACHE_STAT_PAGES
00807   ink_assert(stat_cache_vcs.head);
00808   stat_cache_vcs.remove(cont, cont->stat_link);
00809   ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
00810 #endif
00811   return open_dir.close_write(cont);
00812 }
00813 
00814 
00815 TS_INLINE int
00816 Vol::open_write(CacheVC *cont, int allow_if_writers, int max_writers)
00817 {
00818   Vol *vol = this;
00819   bool agg_error = false;
00820   if (!cont->f.remove) {
00821     agg_error = (!cont->f.update && agg_todo_size > cache_config_agg_write_backlog);
00822 #ifdef CACHE_AGG_FAIL_RATE
00823     agg_error = agg_error || ((uint32_t) mutex->thread_holding->generator.random() <
00824                               (uint32_t) (UINT_MAX * CACHE_AGG_FAIL_RATE));
00825 #endif
00826   }
00827   if (agg_error) {
00828     CACHE_INCREMENT_DYN_STAT(cache_write_backlog_failure_stat);
00829     return ECACHE_WRITE_FAIL;
00830   }
00831 #if TS_USE_INTERIM_CACHE == 1
00832   MigrateToInterimCache *m_result = NULL;
00833   if (vol->migrate_probe(&cont->first_key, &m_result)) {
00834     m_result->notMigrate = true;
00835   }
00836 #endif
00837   if (open_dir.open_write(cont, allow_if_writers, max_writers)) {
00838 #ifdef CACHE_STAT_PAGES
00839     ink_assert(cont->mutex->thread_holding == this_ethread());
00840     ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
00841     stat_cache_vcs.enqueue(cont, cont->stat_link);
00842 #endif
00843     return 0;
00844   }
00845   return ECACHE_DOC_BUSY;
00846 }
00847 
00848 TS_INLINE int
00849 Vol::close_write_lock(CacheVC *cont)
00850 {
00851   EThread *t = cont->mutex->thread_holding;
00852   CACHE_TRY_LOCK(lock, mutex, t);
00853   if (!lock)
00854     return -1;
00855   return close_write(cont);
00856 }
00857 
00858 TS_INLINE int
00859 Vol::open_write_lock(CacheVC *cont, int allow_if_writers, int max_writers)
00860 {
00861   EThread *t = cont->mutex->thread_holding;
00862   CACHE_TRY_LOCK(lock, mutex, t);
00863   if (!lock)
00864     return -1;
00865   return open_write(cont, allow_if_writers, max_writers);
00866 }
00867 
00868 TS_INLINE OpenDirEntry *
00869 Vol::open_read_lock(INK_MD5 *key, EThread *t)
00870 {
00871   CACHE_TRY_LOCK(lock, mutex, t);
00872   if (!lock)
00873     return NULL;
00874   return open_dir.open_read(key);
00875 }
00876 
00877 TS_INLINE int
00878 Vol::begin_read_lock(CacheVC *cont)
00879 {
00880   
00881 #ifndef  CACHE_STAT_PAGES
00882   if (cont->f.single_fragment)
00883     return 0;
00884 #endif
00885   
00886   EThread *t = cont->mutex->thread_holding;
00887   CACHE_TRY_LOCK(lock, mutex, t);
00888   if (!lock)
00889     return -1;
00890   return begin_read(cont);
00891 }
00892 
00893 TS_INLINE int
00894 Vol::close_read_lock(CacheVC *cont)
00895 {
00896   EThread *t = cont->mutex->thread_holding;
00897   CACHE_TRY_LOCK(lock, mutex, t);
00898   if (!lock)
00899     return -1;
00900   return close_read(cont);
00901 }
00902 
00903 TS_INLINE int
00904 dir_delete_lock(CacheKey *key, Vol *d, ProxyMutex *m, Dir *del)
00905 {
00906   EThread *thread = m->thread_holding;
00907   CACHE_TRY_LOCK(lock, d->mutex, thread);
00908   if (!lock)
00909     return -1;
00910   return dir_delete(key, d, del);
00911 }
00912 
00913 TS_INLINE int
00914 dir_insert_lock(CacheKey *key, Vol *d, Dir *to_part, ProxyMutex *m)
00915 {
00916   EThread *thread = m->thread_holding;
00917   CACHE_TRY_LOCK(lock, d->mutex, thread);
00918   if (!lock)
00919     return -1;
00920   return dir_insert(key, d, to_part);
00921 }
00922 
00923 TS_INLINE int
00924 dir_overwrite_lock(CacheKey *key, Vol *d, Dir *to_part, ProxyMutex *m, Dir *overwrite, bool must_overwrite = true)
00925 {
00926   EThread *thread = m->thread_holding;
00927   CACHE_TRY_LOCK(lock, d->mutex, thread);
00928   if (!lock)
00929     return -1;
00930   return dir_overwrite(key, d, to_part, overwrite, must_overwrite);
00931 }
00932 
00933 void TS_INLINE
00934 rand_CacheKey(CacheKey *next_key, ProxyMutex *mutex)
00935 {
00936   next_key->b[0] = mutex->thread_holding->generator.random();
00937   next_key->b[1] = mutex->thread_holding->generator.random();
00938 }
00939 
00940 extern uint8_t CacheKey_next_table[];
00941 void TS_INLINE
00942 next_CacheKey(CacheKey *next_key, CacheKey *key)
00943 {
00944   uint8_t *b = (uint8_t *) next_key;
00945   uint8_t *k = (uint8_t *) key;
00946   b[0] = CacheKey_next_table[k[0]];
00947   for (int i = 1; i < 16; i++)
00948     b[i] = CacheKey_next_table[(b[i - 1] + k[i]) & 0xFF];
00949 }
00950 extern uint8_t CacheKey_prev_table[];
00951 void TS_INLINE
00952 prev_CacheKey(CacheKey *prev_key, CacheKey *key)
00953 {
00954   uint8_t *b = (uint8_t *) prev_key;
00955   uint8_t *k = (uint8_t *) key;
00956   for (int i = 15; i > 0; i--)
00957     b[i] = 256 + CacheKey_prev_table[k[i]] - k[i - 1];
00958   b[0] = CacheKey_prev_table[k[0]];
00959 }
00960 
00961 TS_INLINE unsigned int
00962 next_rand(unsigned int *p)
00963 {
00964   unsigned int seed = *p;
00965   seed = 1103515145 * seed + 12345;
00966   *p = seed;
00967   return seed;
00968 }
00969 
00970 extern ClassAllocator<CacheRemoveCont> cacheRemoveContAllocator;
00971 
00972 TS_INLINE CacheRemoveCont *
00973 new_CacheRemoveCont()
00974 {
00975   CacheRemoveCont *cache_rm = cacheRemoveContAllocator.alloc();
00976 
00977   cache_rm->mutex = new_ProxyMutex();
00978   SET_CONTINUATION_HANDLER(cache_rm, &CacheRemoveCont::event_handler);
00979   return cache_rm;
00980 }
00981 
00982 TS_INLINE void
00983 free_CacheRemoveCont(CacheRemoveCont *cache_rm)
00984 {
00985   cache_rm->mutex = NULL;
00986   cacheRemoveContAllocator.free(cache_rm);
00987 }
00988 
00989 TS_INLINE int
00990 CacheRemoveCont::event_handler(int event, void *data)
00991 {
00992   (void) event;
00993   (void) data;
00994   free_CacheRemoveCont(this);
00995   return EVENT_DONE;
00996 }
00997 
00998 int64_t cache_bytes_used(void);
00999 int64_t cache_bytes_total(void);
01000 
01001 #ifdef DEBUG
01002 #define CACHE_DEBUG_INCREMENT_DYN_STAT(_x) CACHE_INCREMENT_DYN_STAT(_x)
01003 #define CACHE_DEBUG_SUM_DYN_STAT(_x,_y) CACHE_SUM_DYN_STAT(_x,_y)
01004 #else
01005 #define CACHE_DEBUG_INCREMENT_DYN_STAT(_x)
01006 #define CACHE_DEBUG_SUM_DYN_STAT(_x,_y)
01007 #endif
01008 
01009 struct CacheHostRecord;
01010 struct Vol;
01011 class CacheHostTable;
01012 
01013 struct Cache
01014 {
01015   volatile int cache_read_done;
01016   volatile int total_good_nvol;
01017   volatile int total_nvol;
01018   volatile int ready;
01019   int64_t cache_size;             
01020   CacheHostTable *hosttable;
01021   volatile int total_initialized_vol;
01022   CacheType scheme;
01023 
01024   int open(bool reconfigure, bool fix);
01025   int close();
01026 
01027   Action *lookup(Continuation *cont, CacheKey *key, CacheFragType type, char const* hostname, int host_len);
01028   inkcoreapi Action *open_read(Continuation *cont, CacheKey *key, CacheFragType type, char *hostname, int len);
01029   inkcoreapi Action *open_write(Continuation *cont, CacheKey *key,
01030                                 CacheFragType frag_type, int options = 0,
01031                                 time_t pin_in_cache = (time_t) 0, char *hostname = 0, int host_len = 0);
01032   inkcoreapi Action *remove(Continuation *cont, CacheKey *key,
01033                             CacheFragType type = CACHE_FRAG_TYPE_HTTP,
01034                             bool user_agents = true, bool link = false,
01035                             char *hostname = 0, int host_len = 0);
01036   Action *scan(Continuation *cont, char *hostname = 0, int host_len = 0, int KB_per_second = 2500);
01037 
01038 #ifdef HTTP_CACHE
01039   Action *lookup(Continuation *cont, URL *url, CacheFragType type);
01040   inkcoreapi Action *open_read(Continuation *cont, CacheKey *key,
01041                                CacheHTTPHdr *request,
01042                                CacheLookupHttpConfig *params, CacheFragType type, char *hostname, int host_len);
01043   Action *open_read(Continuation *cont, URL *url, CacheHTTPHdr *request,
01044                     CacheLookupHttpConfig *params, CacheFragType type);
01045   Action *open_write(Continuation *cont, CacheKey *key,
01046                      CacheHTTPInfo *old_info, time_t pin_in_cache = (time_t) 0,
01047                      CacheKey *key1 = NULL,
01048                      CacheFragType type = CACHE_FRAG_TYPE_HTTP, char *hostname = 0, int host_len = 0);
01049   Action *open_write(Continuation *cont, URL *url, CacheHTTPHdr *request,
01050                      CacheHTTPInfo *old_info, time_t pin_in_cache = (time_t) 0,
01051                      CacheFragType type = CACHE_FRAG_TYPE_HTTP);
01052   static void generate_key(INK_MD5 *md5, URL *url);
01053 #endif
01054 
01055   Action *link(Continuation *cont, CacheKey *from, CacheKey *to, CacheFragType type, char *hostname, int host_len);
01056   Action *deref(Continuation *cont, CacheKey *key, CacheFragType type, char *hostname, int host_len);
01057 
01058   void vol_initialized(bool result);
01059 
01060   int open_done();
01061 
01062   Vol *key_to_vol(CacheKey *key, char const* hostname, int host_len);
01063 
01064   Cache()
01065     : cache_read_done(0), total_good_nvol(0), total_nvol(0), ready(CACHE_INITIALIZING), cache_size(0),  
01066       hosttable(NULL), total_initialized_vol(0), scheme(CACHE_NONE_TYPE)
01067     { }
01068 };
01069 
01070 extern Cache *theCache;
01071 extern Cache *theStreamCache;
01072 inkcoreapi extern Cache *caches[NUM_CACHE_FRAG_TYPES];
01073 
01074 #ifdef HTTP_CACHE
01075 TS_INLINE Action *
01076 Cache::open_read(Continuation *cont, CacheURL *url, CacheHTTPHdr *request,
01077                  CacheLookupHttpConfig *params, CacheFragType type)
01078 {
01079   INK_MD5 md5;
01080   int len;
01081   url->hash_get(&md5);
01082   const char *hostname = url->host_get(&len);
01083   return open_read(cont, &md5, request, params, type, (char *) hostname, len);
01084 }
01085 
01086 TS_INLINE void
01087 Cache::generate_key(INK_MD5 *md5, URL *url)
01088 {
01089   url->hash_get(md5);
01090 }
01091 
01092 TS_INLINE Action *
01093 Cache::open_write(Continuation *cont, CacheURL *url, CacheHTTPHdr *request,
01094                   CacheHTTPInfo *old_info, time_t pin_in_cache, CacheFragType type)
01095 {
01096   (void) request;
01097   INK_MD5 url_md5;
01098   url->hash_get(&url_md5);
01099   int len;
01100   const char *hostname = url->host_get(&len);
01101 
01102   return open_write(cont, &url_md5, old_info, pin_in_cache, NULL, type, (char *) hostname, len);
01103 }
01104 #endif
01105 
01106 TS_INLINE unsigned int
01107 cache_hash(INK_MD5 & md5)
01108 {
01109   uint64_t f = md5.fold();
01110   unsigned int mhash = (unsigned int) (f >> 32);
01111   return mhash;
01112 }
01113 
01114 #ifdef HTTP_CACHE
01115 #define CLUSTER_CACHE
01116 #endif
01117 #ifdef CLUSTER_CACHE
01118 #include "P_Net.h"
01119 #include "P_ClusterInternal.h"
01120 
01121 
01122 #include "P_ClusterInline.h"
01123 #endif
01124 
01125 TS_INLINE Action *
01126 CacheProcessor::lookup(Continuation *cont, CacheKey *key, bool cluster_cache_local ATS_UNUSED,
01127                        bool local_only ATS_UNUSED, CacheFragType frag_type, char *hostname, int host_len)
01128 {
01129 #ifdef CLUSTER_CACHE
01130   
01131   if ((cache_clustering_enabled > 0) && !cluster_cache_local && !local_only) {
01132     Action *a = Cluster_lookup(cont, key, frag_type, hostname, host_len);
01133     if (a) {
01134       return a;
01135     }
01136   }
01137 #endif
01138   return caches[frag_type]->lookup(cont, key, frag_type, hostname, host_len);
01139 }
01140 
01141 TS_INLINE inkcoreapi Action *
01142 CacheProcessor::open_read(Continuation *cont, CacheKey *key, bool cluster_cache_local ATS_UNUSED,
01143                           CacheFragType frag_type, char *hostname, int host_len)
01144 {
01145 #ifdef CLUSTER_CACHE
01146   if (cache_clustering_enabled > 0 && !cluster_cache_local) {
01147     return open_read_internal(CACHE_OPEN_READ, cont, (MIOBuffer *) 0,
01148                               (CacheURL *) 0, (CacheHTTPHdr *) 0,
01149                               (CacheLookupHttpConfig *) 0, key, 0, frag_type, hostname, host_len);
01150   }
01151 #endif
01152   return caches[frag_type]->open_read(cont, key, frag_type, hostname, host_len);
01153 }
01154 
01155 TS_INLINE Action *
01156 CacheProcessor::open_read_buffer(Continuation *cont, MIOBuffer *buf ATS_UNUSED, CacheKey *key, CacheFragType frag_type,
01157                                  char *hostname, int host_len)
01158 {
01159 #ifdef CLUSTER_CACHE
01160   if (cache_clustering_enabled > 0) {
01161     return open_read_internal(CACHE_OPEN_READ_BUFFER, cont, buf,
01162                               (CacheURL *) 0, (CacheHTTPHdr *) 0,
01163                               (CacheLookupHttpConfig *) 0, key, 0, frag_type, hostname, host_len);
01164   }
01165 #endif
01166   return caches[frag_type]->open_read(cont, key, frag_type, hostname, host_len);
01167 }
01168 
01169 
01170 TS_INLINE inkcoreapi Action *
01171 CacheProcessor::open_write(Continuation *cont, CacheKey *key, bool cluster_cache_local  ATS_UNUSED,
01172                            CacheFragType frag_type, int expected_size ATS_UNUSED, int options,
01173                            time_t pin_in_cache, char *hostname, int host_len)
01174 {
01175 #ifdef CLUSTER_CACHE
01176   if (cache_clustering_enabled > 0 && !cluster_cache_local) {
01177     ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
01178     if (m)
01179       return Cluster_write(cont, expected_size, (MIOBuffer *) 0, m,
01180                          key, frag_type, options, pin_in_cache,
01181                          CACHE_OPEN_WRITE, key, (CacheURL *) 0,
01182                          (CacheHTTPHdr *) 0, (CacheHTTPInfo *) 0, hostname, host_len);
01183   }
01184 #endif
01185   return caches[frag_type]->open_write(cont, key, frag_type, options, pin_in_cache, hostname, host_len);
01186 }
01187 
01188 TS_INLINE Action *
01189 CacheProcessor::open_write_buffer(Continuation *cont, MIOBuffer *buf, CacheKey *key,
01190                                   CacheFragType frag_type, int options, time_t pin_in_cache,
01191                                   char *hostname, int host_len)
01192 {
01193   (void)pin_in_cache;
01194   (void)cont;
01195   (void)buf;
01196   (void)key;
01197   (void)frag_type;
01198   (void)options;
01199   (void)hostname;
01200   (void)host_len;
01201   ink_assert(!"implemented");
01202   return NULL;
01203 }
01204 
01205 TS_INLINE Action *
01206 CacheProcessor::remove(Continuation *cont, CacheKey *key, bool cluster_cache_local ATS_UNUSED, CacheFragType frag_type,
01207                        bool rm_user_agents, bool rm_link, char *hostname, int host_len)
01208 {
01209   Debug("cache_remove", "[CacheProcessor::remove] Issuing cache delete for %u", cache_hash(*key));
01210 #ifdef CLUSTER_CACHE
01211   if (cache_clustering_enabled > 0 && !cluster_cache_local) {
01212     ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
01213 
01214     if (m) {
01215       return Cluster_remove(m, cont, key, rm_user_agents, rm_link, frag_type, hostname, host_len);
01216     }
01217   }
01218 #endif
01219   return caches[frag_type]->remove(cont, key, frag_type, rm_user_agents, rm_link, hostname, host_len);
01220 }
01221 
01222 # if 0
01223 TS_INLINE Action *
01224 scan(Continuation *cont, char *hostname = 0, int host_len = 0, int KB_per_second = 2500)
01225 {
01226   return caches[CACHE_FRAG_TYPE_HTTP]->scan(cont, hostname, host_len, KB_per_second);
01227 }
01228 # endif
01229 
01230 #ifdef HTTP_CACHE
01231 TS_INLINE Action *
01232 CacheProcessor::lookup(Continuation *cont, URL *url, bool cluster_cache_local, bool local_only, CacheFragType frag_type)
01233 {
01234   (void) local_only;
01235   INK_MD5 md5;
01236   url->hash_get(&md5);
01237   int host_len = 0;
01238   const char *hostname = url->host_get(&host_len);
01239 
01240   return lookup(cont, &md5, cluster_cache_local, local_only, frag_type, (char *) hostname, host_len);
01241 }
01242 
01243 TS_INLINE Action *
01244 CacheProcessor::open_read_buffer(Continuation *cont, MIOBuffer *buf,
01245                                  URL *url, CacheHTTPHdr *request, CacheLookupHttpConfig *params, CacheFragType type)
01246 {
01247   (void) buf;
01248 #ifdef CLUSTER_CACHE
01249   if (cache_clustering_enabled > 0) {
01250     return open_read_internal(CACHE_OPEN_READ_BUFFER_LONG, cont, buf, url,
01251                               request, params, (CacheKey *) 0, 0, type, (char *) 0, 0);
01252   }
01253 #endif
01254   return caches[type]->open_read(cont, url, request, params, type);
01255 }
01256 
01257 TS_INLINE Action *
01258 CacheProcessor::open_write_buffer(Continuation * cont, MIOBuffer * buf, URL * url,
01259                                   CacheHTTPHdr * request, CacheHTTPHdr * response, CacheFragType type)
01260 {
01261   (void) cont;
01262   (void) buf;
01263   (void) url;
01264   (void) request;
01265   (void) response;
01266   (void) type;
01267   ink_assert(!"implemented");
01268   return NULL;
01269 }
01270 
01271 #endif
01272 
01273 
01274 #ifdef CLUSTER_CACHE
01275 TS_INLINE Action *
01276 CacheProcessor::open_read_internal(int opcode,
01277                                    Continuation *cont, MIOBuffer *buf,
01278                                    CacheURL *url,
01279                                    CacheHTTPHdr *request,
01280                                    CacheLookupHttpConfig *params,
01281                                    CacheKey *key,
01282                                    time_t pin_in_cache, CacheFragType frag_type, char *hostname, int host_len)
01283 {
01284   INK_MD5 url_md5;
01285   if ((opcode == CACHE_OPEN_READ_LONG) || (opcode == CACHE_OPEN_READ_BUFFER_LONG)) {
01286     Cache::generate_key(&url_md5, url);
01287   } else {
01288     url_md5 = *key;
01289   }
01290   ClusterMachine *m = cluster_machine_at_depth(cache_hash(url_md5));
01291 
01292   if (m) {
01293     return Cluster_read(m, opcode, cont, buf, url,
01294                         request, params, key, pin_in_cache, frag_type, hostname, host_len);
01295   } else {
01296     if ((opcode == CACHE_OPEN_READ_LONG)
01297         || (opcode == CACHE_OPEN_READ_BUFFER_LONG)) {
01298       return caches[frag_type]->open_read(cont, &url_md5, request, params, frag_type, hostname, host_len);
01299     } else {
01300       return caches[frag_type]->open_read(cont, key, frag_type, hostname, host_len);
01301     }
01302   }
01303 }
01304 #endif
01305 
01306 #ifdef CLUSTER_CACHE
01307 TS_INLINE Action *
01308 CacheProcessor::link(Continuation *cont, CacheKey *from, CacheKey *to, bool cluster_cache_local,
01309                      CacheFragType type, char *hostname, int host_len)
01310 {
01311   if (cache_clustering_enabled > 0 && !cluster_cache_local) {
01312     
01313     ClusterMachine *m = cluster_machine_at_depth(cache_hash(*from));
01314     if (m) {
01315       return Cluster_link(m, cont, from, to, type, hostname, host_len);
01316     }
01317   }
01318   return caches[type]->link(cont, from, to, type, hostname, host_len);
01319 }
01320 
01321 TS_INLINE Action *
01322 CacheProcessor::deref(Continuation *cont, CacheKey *key, bool cluster_cache_local, CacheFragType type, char *hostname, int host_len)
01323 {
01324   if (cache_clustering_enabled > 0 && !cluster_cache_local) {
01325     ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
01326     if (m) {
01327       return Cluster_deref(m, cont, key, type, hostname, host_len);
01328     }
01329   }
01330   return caches[type]->deref(cont, key, type, hostname, host_len);
01331 }
01332 #endif
01333 
01334 TS_INLINE Action *
01335 CacheProcessor::scan(Continuation *cont, char *hostname, int host_len, int KB_per_second)
01336 {
01337   return caches[CACHE_FRAG_TYPE_HTTP]->scan(cont, hostname, host_len, KB_per_second);
01338 }
01339 
01340 TS_INLINE int
01341 CacheProcessor::IsCacheEnabled()
01342 {
01343   return CacheProcessor::initialized;
01344 }
01345 
01346 TS_INLINE bool
01347 CacheProcessor::IsCacheReady(CacheFragType type)
01348 {
01349   if (IsCacheEnabled() != CACHE_INITIALIZED)
01350     return 0;
01351   return (bool)(cache_ready & (1 << type));
01352 }
01353 
01354 TS_INLINE Cache *
01355 local_cache()
01356 {
01357   return theCache;
01358 }
01359 
01360 LINK_DEFINITION(CacheVC, opendir_link)
01361 
01362 #endif