• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

P_CacheInternal.h

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 
00025 #ifndef _P_CACHE_INTERNAL_H__
00026 #define _P_CACHE_INTERNAL_H__
00027 
00028 #include "libts.h"
00029 
00030 #ifdef HTTP_CACHE
00031 #include "HTTP.h"
00032 #include "P_CacheHttp.h"
00033 #endif
00034 
00035 struct EvacuationBlock;
00036 
00037 // Compilation Options
00038 
00039 #define ALTERNATES                      1
00040 // #define CACHE_LOCK_FAIL_RATE         0.001
00041 // #define CACHE_AGG_FAIL_RATE          0.005
00042 // #define CACHE_INSPECTOR_PAGES
00043 #define MAX_CACHE_VCS_PER_THREAD        500
00044 
00045 #define INTEGRAL_FRAGS                  4
00046 
00047 #ifdef CACHE_INSPECTOR_PAGES
00048 #ifdef DEBUG
00049 #define CACHE_STAT_PAGES
00050 #endif
00051 #endif
00052 
00053 #ifdef DEBUG
00054 #define DDebug Debug
00055 #else
00056 #define DDebug if (0) dummy_debug
00057 #endif
00058 
00059 #define AIO_SOFT_FAILURE                -100000
00060 // retry read from writer delay
00061 #define WRITER_RETRY_DELAY  HRTIME_MSECONDS(50)
00062 
00063 #ifndef CACHE_LOCK_FAIL_RATE
00064 #define CACHE_TRY_LOCK(_l, _m, _t) MUTEX_TRY_LOCK(_l, _m, _t)
00065 #else
00066 #define CACHE_TRY_LOCK(_l, _m, _t)                             \
00067   MUTEX_TRY_LOCK(_l, _m, _t);                                  \
00068   if ((uint32_t)_t->generator.random() <                         \
00069      (uint32_t)(UINT_MAX *CACHE_LOCK_FAIL_RATE))                 \
00070     CACHE_MUTEX_RELEASE(_l)
00071 #endif
00072 
00073 
00074 #define VC_LOCK_RETRY_EVENT() \
00075   do { \
00076     trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay), event); \
00077     return EVENT_CONT; \
00078   } while (0)
00079 
00080 #define VC_SCHED_LOCK_RETRY() \
00081   do { \
00082     trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); \
00083     return EVENT_CONT; \
00084   } while (0)
00085 
00086 #define CONT_SCHED_LOCK_RETRY_RET(_c) \
00087   do { \
00088     _c->mutex->thread_holding->schedule_in_local(_c, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); \
00089     return EVENT_CONT; \
00090   } while (0)
00091 
00092 #define CONT_SCHED_LOCK_RETRY(_c) \
00093   _c->mutex->thread_holding->schedule_in_local(_c, HRTIME_MSECONDS(cache_config_mutex_retry_delay))
00094 
00095 #define VC_SCHED_WRITER_RETRY() \
00096   do { \
00097     ink_assert(!trigger); \
00098     writer_lock_retry++; \
00099     ink_hrtime _t = WRITER_RETRY_DELAY; \
00100     if (writer_lock_retry > 2) \
00101       _t = WRITER_RETRY_DELAY * 2; \
00102     else if (writer_lock_retry > 5) \
00103       _t = WRITER_RETRY_DELAY * 10; \
00104     else if (writer_lock_retry > 10) \
00105       _t = WRITER_RETRY_DELAY * 100; \
00106     trigger = mutex->thread_holding->schedule_in_local(this, _t); \
00107     return EVENT_CONT; \
00108   } while (0)
00109 
00110 
00111   // cache stats definitions
00112 enum
00113 {
00114   cache_bytes_used_stat,
00115   cache_bytes_total_stat,
00116   cache_ram_cache_bytes_stat,
00117   cache_ram_cache_bytes_total_stat,
00118   cache_direntries_total_stat,
00119   cache_direntries_used_stat,
00120   cache_ram_cache_hits_stat,
00121   cache_ram_cache_misses_stat,
00122   cache_pread_count_stat,
00123   cache_percent_full_stat,
00124   cache_lookup_active_stat,
00125   cache_lookup_success_stat,
00126   cache_lookup_failure_stat,
00127   cache_read_active_stat,
00128   cache_read_success_stat,
00129   cache_read_failure_stat,
00130 #if TS_USE_INTERIM_CACHE == 1
00131   cache_interim_read_success_stat,
00132   cache_disk_read_success_stat,
00133   cache_ram_read_success_stat,
00134 #endif
00135   cache_write_active_stat,
00136   cache_write_success_stat,
00137   cache_write_failure_stat,
00138   cache_write_backlog_failure_stat,
00139   cache_update_active_stat,
00140   cache_update_success_stat,
00141   cache_update_failure_stat,
00142   cache_remove_active_stat,
00143   cache_remove_success_stat,
00144   cache_remove_failure_stat,
00145   cache_evacuate_active_stat,
00146   cache_evacuate_success_stat,
00147   cache_evacuate_failure_stat,
00148   cache_scan_active_stat,
00149   cache_scan_success_stat,
00150   cache_scan_failure_stat,
00151   cache_directory_collision_count_stat,
00152   cache_single_fragment_document_count_stat,
00153   cache_two_fragment_document_count_stat,
00154   cache_three_plus_plus_fragment_document_count_stat,
00155   cache_read_busy_success_stat,
00156   cache_read_busy_failure_stat,
00157   cache_gc_bytes_evacuated_stat,
00158   cache_gc_frags_evacuated_stat,
00159   cache_write_bytes_stat,
00160   cache_hdr_vector_marshal_stat,
00161   cache_hdr_marshal_stat,
00162   cache_hdr_marshal_bytes_stat,
00163   cache_stat_count
00164 };
00165 
00166 
00167 extern RecRawStatBlock *cache_rsb;
00168 
00169 #define GLOBAL_CACHE_SET_DYN_STAT(x,y) \
00170         RecSetGlobalRawStatSum(cache_rsb, (x), (y))
00171 
00172 #define CACHE_SET_DYN_STAT(x,y) \
00173         RecSetGlobalRawStatSum(cache_rsb, (x), (y)) \
00174         RecSetGlobalRawStatSum(vol->cache_vol->vol_rsb, (x), (y))
00175 
00176 #define CACHE_INCREMENT_DYN_STAT(x) \
00177         RecIncrRawStat(cache_rsb, mutex->thread_holding, (int) (x), 1); \
00178         RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int) (x), 1);
00179 
00180 #define CACHE_DECREMENT_DYN_STAT(x) \
00181         RecIncrRawStat(cache_rsb, mutex->thread_holding, (int) (x), -1); \
00182         RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int) (x), -1);
00183 
00184 #define CACHE_VOL_SUM_DYN_STAT(x,y) \
00185         RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int) (x), (int64_t) y);
00186 
00187 #define CACHE_SUM_DYN_STAT(x, y) \
00188         RecIncrRawStat(cache_rsb, mutex->thread_holding, (int) (x), (int64_t) (y)); \
00189         RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int) (x), (int64_t) (y));
00190 
00191 #define CACHE_SUM_DYN_STAT_THREAD(x, y) \
00192         RecIncrRawStat(cache_rsb, this_ethread(), (int) (x), (int64_t) (y)); \
00193         RecIncrRawStat(vol->cache_vol->vol_rsb, this_ethread(), (int) (x), (int64_t) (y));
00194 
00195 #define GLOBAL_CACHE_SUM_GLOBAL_DYN_STAT(x, y) \
00196         RecIncrGlobalRawStatSum(cache_rsb,(x),(y))
00197 
00198 #define CACHE_SUM_GLOBAL_DYN_STAT(x, y) \
00199         RecIncrGlobalRawStatSum(cache_rsb,(x),(y)) \
00200         RecIncrGlobalRawStatSum(vol->cache_vol->vol_rsb,(x),(y))
00201 
00202 #define CACHE_CLEAR_DYN_STAT(x) \
00203 do { \
00204         RecSetRawStatSum(cache_rsb, (x), 0); \
00205         RecSetRawStatCount(cache_rsb, (x), 0); \
00206         RecSetRawStatSum(vol->cache_vol->vol_rsb, (x), 0); \
00207         RecSetRawStatCount(vol->cache_vol->vol_rsb, (x), 0); \
00208 } while (0);
00209 
00210 // Configuration
00211 extern int cache_config_dir_sync_frequency;
00212 extern int cache_config_http_max_alts;
00213 extern int cache_config_permit_pinning;
00214 extern int cache_config_select_alternate;
00215 extern int cache_config_vary_on_user_agent;
00216 extern int cache_config_max_doc_size;
00217 extern int cache_config_min_average_object_size;
00218 extern int cache_config_agg_write_backlog;
00219 extern int cache_config_enable_checksum;
00220 extern int cache_config_alt_rewrite_max_size;
00221 extern int cache_config_read_while_writer;
00222 extern int cache_clustering_enabled;
00223 extern int cache_config_agg_write_backlog;
00224 extern int cache_config_ram_cache_compress;
00225 extern int cache_config_ram_cache_compress_percent;
00226 extern int cache_config_ram_cache_use_seen_filter;
00227 extern int cache_config_hit_evacuate_percent;
00228 extern int cache_config_hit_evacuate_size_limit;
00229 extern int cache_config_force_sector_size;
00230 extern int cache_config_target_fragment_size;
00231 extern int cache_config_mutex_retry_delay;
00232 #if TS_USE_INTERIM_CACHE == 1
00233 extern int good_interim_disks;
00234 #endif
00235 // CacheVC
00236 struct CacheVC: public CacheVConnection
00237 {
00238   CacheVC();
00239 
00240   VIO *do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf);
00241   VIO *do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *buf, int64_t offset);
00242   VIO *do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner = false);
00243   void do_io_close(int lerrno = -1);
00244   void reenable(VIO *avio);
00245   void reenable_re(VIO *avio);
00246   bool get_data(int i, void *data);
00247   bool set_data(int i, void *data);
00248 
00249   bool is_ram_cache_hit() const
00250   {
00251     ink_assert(vio.op == VIO::READ);
00252     return !f.not_from_ram_cache;
00253   }
00254   int get_header(void **ptr, int *len)
00255   {
00256     if (first_buf.m_ptr) {
00257       Doc *doc = (Doc*)first_buf->data();
00258       *ptr = doc->hdr();
00259       *len = doc->hlen;
00260       return 0;
00261     } else
00262       return -1;
00263   }
00264   int set_header(void *ptr, int len)
00265   {
00266     header_to_write = ptr;
00267     header_to_write_len = len;
00268     return 0;
00269   }
00270   int get_single_data(void **ptr, int *len)
00271   {
00272     if (first_buf.m_ptr) {
00273       Doc *doc = (Doc*)first_buf->data();
00274       if (doc->data_len() == doc->total_len) {
00275         *ptr = doc->data();
00276         *len = doc->data_len();
00277         return 0;
00278       }
00279     }
00280     return -1;
00281   }
00282 
00283   bool writer_done();
00284   int calluser(int event);
00285   int callcont(int event);
00286   int die();
00287   int dead(int event, Event *e);
00288 
00289   int handleReadDone(int event, Event *e);
00290   int handleRead(int event, Event *e);
00291   int do_read_call(CacheKey *akey);
00292   int handleWrite(int event, Event *e);
00293   int handleWriteLock(int event, Event *e);
00294   int do_write_call();
00295   int do_write_lock();
00296   int do_write_lock_call();
00297   int do_sync(uint32_t target_write_serial);
00298 
00299   int openReadClose(int event, Event *e);
00300   int openReadReadDone(int event, Event *e);
00301   int openReadMain(int event, Event *e);
00302   int openReadStartEarliest(int event, Event *e);
00303 #ifdef HTTP_CACHE
00304   int openReadVecWrite(int event, Event *e);
00305 #endif
00306   int openReadStartHead(int event, Event *e);
00307   int openReadFromWriter(int event, Event *e);
00308   int openReadFromWriterMain(int event, Event *e);
00309   int openReadFromWriterFailure(int event, Event *);
00310   int openReadChooseWriter(int event, Event *e);
00311 
00312   int openWriteCloseDir(int event, Event *e);
00313   int openWriteCloseHeadDone(int event, Event *e);
00314   int openWriteCloseHead(int event, Event *e);
00315   int openWriteCloseDataDone(int event, Event *e);
00316   int openWriteClose(int event, Event *e);
00317   int openWriteRemoveVector(int event, Event *e);
00318   int openWriteWriteDone(int event, Event *e);
00319   int openWriteOverwrite(int event, Event *e);
00320   int openWriteMain(int event, Event *e);
00321   int openWriteStartDone(int event, Event *e);
00322   int openWriteStartBegin(int event, Event *e);
00323 
00324   int updateVector(int event, Event *e);
00325   int updateReadDone(int event, Event *e);
00326   int updateVecWrite(int event, Event *e);
00327 
00328   int removeEvent(int event, Event *e);
00329 
00330   int linkWrite(int event, Event *e);
00331   int derefRead(int event, Event *e);
00332 
00333   int scanVol(int event, Event *e);
00334   int scanObject(int event, Event *e);
00335   int scanUpdateDone(int event, Event *e);
00336   int scanOpenWrite(int event, Event *e);
00337   int scanRemoveDone(int event, Event *e);
00338 
00339   int is_io_in_progress()
00340   {
00341     return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS;
00342   }
00343   void set_io_not_in_progress()
00344   {
00345     io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS;
00346   }
00347   void set_agg_write_in_progress()
00348   {
00349     io.aiocb.aio_fildes = AIO_AGG_WRITE_IN_PROGRESS;
00350   }
00351   int evacuateDocDone(int event, Event *e);
00352   int evacuateReadHead(int event, Event *e);
00353 
00354   void cancel_trigger();
00355   virtual int64_t get_object_size();
00356 #ifdef HTTP_CACHE
00357   virtual void set_http_info(CacheHTTPInfo *info);
00358   virtual void get_http_info(CacheHTTPInfo ** info);
00359   /** Get the fragment table.
00360       @return The address of the start of the fragment table,
00361       or @c NULL if there is no fragment table.
00362   */
00363   virtual HTTPInfo::FragOffset* get_frag_table();
00364   /** Load alt pointers and do fixups if needed.
00365       @return Length of header data used for alternates.
00366    */
00367   virtual uint32_t load_http_info(CacheHTTPInfoVector* info, struct Doc* doc, RefCountObj * block_ptr = NULL);
00368 #endif
00369   virtual bool is_pread_capable();
00370   virtual bool set_pin_in_cache(time_t time_pin);
00371   virtual time_t get_pin_in_cache();
00372   virtual bool set_disk_io_priority(int priority);
00373   virtual int get_disk_io_priority();
00374 
00375   // offsets from the base stat
00376 #define CACHE_STAT_ACTIVE  0
00377 #define CACHE_STAT_SUCCESS 1
00378 #define CACHE_STAT_FAILURE 2
00379 
00380   // number of bytes to memset to 0 in the CacheVC when we free
00381   // it. All member variables starting from vio are memset to 0.
00382   // This variable is initialized in CacheVC constructor.
00383   static int size_to_init;
00384 
00385   // Start Region A
00386   // This set of variables are not reset when the cacheVC is freed.
00387   // A CacheVC must set these to the correct values whenever needed
00388   // These are variables that are always set to the correct values
00389   // before being used by the CacheVC
00390   CacheKey key, first_key, earliest_key, update_key;
00391   Dir dir, earliest_dir, overwrite_dir, first_dir;
00392   // end Region A
00393 
00394   // Start Region B
00395   // These variables are individually cleared or reset when the
00396   // CacheVC is freed. All these variables must be reset/cleared
00397   // in free_CacheVC.
00398   Action _action;
00399 #ifdef HTTP_CACHE
00400   CacheHTTPHdr request;
00401 #endif
00402   CacheHTTPInfoVector vector;
00403   CacheHTTPInfo alternate;
00404   Ptr<IOBufferData> buf;
00405   Ptr<IOBufferData> first_buf;
00406   Ptr<IOBufferBlock> blocks; // data available to write
00407   Ptr<IOBufferBlock> writer_buf;
00408 
00409   OpenDirEntry *od;
00410   AIOCallbackInternal io;
00411   int alternate_index;          // preferred position in vector
00412   LINK(CacheVC, opendir_link);
00413 #ifdef CACHE_STAT_PAGES
00414   LINK(CacheVC, stat_link);
00415 #endif
00416   // end Region B
00417 
00418   // Start Region C
00419   // These variables are memset to 0 when the structure is freed.
00420   // The size of this region is size_to_init which is initialized
00421   // in the CacheVC constuctor. It assumes that vio is the start
00422   // of this region.
00423   // NOTE: NOTE: NOTE: If vio is NOT the start, then CHANGE the
00424   // size_to_init initialization
00425   VIO vio;
00426   EThread *initial_thread;  // initial thread open_XX was called on
00427   CacheFragType frag_type;
00428   CacheHTTPInfo *info;
00429   CacheHTTPInfoVector *write_vector;
00430 #ifdef HTTP_CACHE
00431   CacheLookupHttpConfig *params;
00432 #endif
00433   int header_len;       // for communicating with agg_copy
00434   int frag_len;         // for communicating with agg_copy
00435   uint32_t write_len;     // for communicating with agg_copy
00436   uint32_t agg_len;       // for communicating with aggWrite
00437   uint32_t write_serial;  // serial of the final write for SYNC
00438   Vol *vol;
00439   Dir *last_collision;
00440   Event *trigger;
00441   CacheKey *read_key;
00442   ContinuationHandler save_handler;
00443   uint32_t pin_in_cache;
00444   ink_hrtime start_time;
00445   int base_stat;
00446   int recursive;
00447   int closed;
00448   uint64_t seek_to;               // pread offset
00449   int64_t offset;                 // offset into 'blocks' of data to write
00450   int64_t writer_offset;          // offset of the writer for reading from a writer
00451   int64_t length;                 // length of data available to write
00452   int64_t doc_pos;                // read position in 'buf'
00453   uint64_t write_pos;             // length written
00454   uint64_t total_len;             // total length written and available to write
00455   uint64_t doc_len;               // total_length (of the selected alternate for HTTP)
00456   uint64_t update_len;
00457   int fragment;
00458   int scan_msec_delay;
00459   CacheVC *write_vc;
00460   char *hostname;
00461   int host_len;
00462   int header_to_write_len;
00463   void *header_to_write;
00464   short writer_lock_retry;
00465 #if TS_USE_INTERIM_CACHE == 1
00466   InterimCacheVol *interim_vol;
00467   MigrateToInterimCache *mts;
00468   uint64_t dir_off;
00469 #endif
00470   union
00471   {
00472     uint32_t flags;
00473     struct
00474     {
00475       unsigned int use_first_key:1;
00476       unsigned int overwrite:1; // overwrite first_key Dir if it exists
00477       unsigned int close_complete:1; // WRITE_COMPLETE is final
00478       unsigned int sync:1; // write to be committed to durable storage before WRITE_COMPLETE
00479       unsigned int evacuator:1;
00480       unsigned int single_fragment:1;
00481       unsigned int evac_vector:1;
00482       unsigned int lookup:1;
00483       unsigned int update:1;
00484       unsigned int remove:1;
00485       unsigned int remove_aborted_writers:1;
00486       unsigned int open_read_timeout:1; // UNUSED
00487       unsigned int data_done:1;
00488       unsigned int read_from_writer_called:1;
00489       unsigned int not_from_ram_cache:1;        // entire object was from ram cache
00490       unsigned int rewrite_resident_alt:1;
00491       unsigned int readers:1;
00492       unsigned int doc_from_ram_cache:1;
00493       unsigned int hit_evacuate:1;
00494 #if TS_USE_INTERIM_CACHE == 1
00495       unsigned int read_from_interim:1;
00496       unsigned int write_into_interim:1;
00497       unsigned int ram_fixup:1;
00498       unsigned int transistor:1;
00499 #endif
00500 #ifdef HTTP_CACHE
00501       unsigned int allow_empty_doc:1; // used for cache empty http document
00502 #endif
00503     } f;
00504   };
00505   // BTF optimization used to skip reading stuff in cache partition that doesn't contain any
00506   // dir entries.
00507   char *scan_vol_map; 
00508   // BTF fix to handle objects that overlapped over two different reads,
00509   // this is how much we need to back up the buffer to get the start of the overlapping object.
00510   off_t scan_fix_buffer_offset;
00511   //end region C
00512 };
00513 
00514 #define PUSH_HANDLER(_x) do {                                           \
00515     ink_assert(handler != (ContinuationHandler)(&CacheVC::dead));       \
00516     save_handler = handler; handler = (ContinuationHandler)(_x);        \
00517 } while (0)
00518 
00519 #define POP_HANDLER do {                                          \
00520     handler = save_handler;                                       \
00521     ink_assert(handler != (ContinuationHandler)(&CacheVC::dead)); \
00522   } while (0)
00523 
00524 struct CacheRemoveCont: public Continuation
00525 {
00526   int event_handler(int event, void *data);
00527 
00528   CacheRemoveCont()
00529     : Continuation(NULL)
00530   { }
00531 };
00532 
00533 
00534 // Global Data
00535 #if TS_USE_INTERIM_CACHE == 1
00536 extern ClassAllocator<MigrateToInterimCache> migrateToInterimCacheAllocator;
00537 #endif
00538 extern ClassAllocator<CacheVC> cacheVConnectionAllocator;
00539 extern CacheKey zero_key;
00540 extern CacheSync *cacheDirSync;
00541 // Function Prototypes
00542 #ifdef HTTP_CACHE
00543 int cache_write(CacheVC *, CacheHTTPInfoVector *);
00544 int get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key);
00545 #endif
00546 CacheVC *new_DocEvacuator(int nbytes, Vol *d);
00547 
00548 // inline Functions
00549 
00550 TS_INLINE CacheVC *
00551 new_CacheVC(Continuation *cont)
00552 {
00553   EThread *t = cont->mutex->thread_holding;
00554   CacheVC *c = THREAD_ALLOC(cacheVConnectionAllocator, t);
00555 #ifdef HTTP_CACHE
00556   c->vector.data.data = &c->vector.data.fast_data[0];
00557 #endif
00558   c->_action = cont;
00559   c->initial_thread = t->tt == DEDICATED ? NULL : t;
00560   c->mutex = cont->mutex;
00561   c->start_time = ink_get_hrtime();
00562   ink_assert(c->trigger == NULL);
00563   Debug("cache_new", "new %p", c);
00564 #ifdef CACHE_STAT_PAGES
00565   ink_assert(!c->stat_link.next);
00566   ink_assert(!c->stat_link.prev);
00567 #endif
00568   dir_clear(&c->dir);
00569   return c;
00570 }
00571 
00572 TS_INLINE int
00573 free_CacheVC(CacheVC *cont)
00574 {
00575   Debug("cache_free", "free %p", cont);
00576   ProxyMutex *mutex = cont->mutex;
00577   Vol *vol = cont->vol;
00578   if (vol) {
00579     CACHE_DECREMENT_DYN_STAT(cont->base_stat + CACHE_STAT_ACTIVE);
00580     if (cont->closed > 0) {
00581       CACHE_INCREMENT_DYN_STAT(cont->base_stat + CACHE_STAT_SUCCESS);
00582 #if TS_USE_INTERIM_CACHE == 1
00583       if (cont->vio.op == VIO::READ) {
00584         if (cont->f.doc_from_ram_cache) {
00585           CACHE_INCREMENT_DYN_STAT(cache_ram_read_success_stat);
00586         } else if (cont->f.read_from_interim) {
00587           CACHE_INCREMENT_DYN_STAT(cache_interim_read_success_stat);
00588         } else {
00589           CACHE_INCREMENT_DYN_STAT(cache_disk_read_success_stat);
00590         }
00591       }
00592 #endif
00593     }                             // else abort,cancel
00594   }
00595   ink_assert(mutex->thread_holding == this_ethread());
00596   if (cont->trigger)
00597     cont->trigger->cancel();
00598   ink_assert(!cont->is_io_in_progress());
00599   ink_assert(!cont->od);
00600   /* calling cont->io.action = NULL causes compile problem on 2.6 solaris
00601      release build....wierd??? For now, null out continuation and mutex
00602      of the action separately */
00603   cont->io.action.continuation = NULL;
00604   cont->io.action.mutex = NULL;
00605   cont->io.mutex.clear();
00606   cont->io.aio_result = 0;
00607   cont->io.aiocb.aio_nbytes = 0;
00608   cont->io.aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
00609 #ifdef HTTP_CACHE
00610   cont->request.reset();
00611   cont->vector.clear();
00612 #endif
00613   cont->vio.buffer.clear();
00614   cont->vio.mutex.clear();
00615 #ifdef HTTP_CACHE
00616   if (cont->vio.op == VIO::WRITE && cont->alternate_index == CACHE_ALT_INDEX_DEFAULT)
00617     cont->alternate.destroy();
00618   else
00619     cont->alternate.clear();
00620 #endif
00621   cont->_action.cancelled = 0;
00622   cont->_action.mutex.clear();
00623   cont->mutex.clear();
00624   cont->buf.clear();
00625   cont->first_buf.clear();
00626   cont->blocks.clear();
00627   cont->writer_buf.clear();
00628   cont->alternate_index = CACHE_ALT_INDEX_DEFAULT;
00629   if (cont->scan_vol_map)
00630     ats_free(cont->scan_vol_map);
00631   memset((char *) &cont->vio, 0, cont->size_to_init);
00632 #ifdef CACHE_STAT_PAGES
00633   ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
00634 #endif
00635 #ifdef DEBUG
00636   SET_CONTINUATION_HANDLER(cont, &CacheVC::dead);
00637 #endif
00638   THREAD_FREE(cont, cacheVConnectionAllocator, this_thread());
00639   return EVENT_DONE;
00640 }
00641 
00642 TS_INLINE int
00643 CacheVC::calluser(int event)
00644 {
00645   recursive++;
00646   ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
00647   vio._cont->handleEvent(event, (void *) &vio);
00648   recursive--;
00649   if (closed) {
00650     die();
00651     return EVENT_DONE;
00652   }
00653   return EVENT_CONT;
00654 }
00655 
00656 TS_INLINE int
00657 CacheVC::callcont(int event)
00658 {
00659   recursive++;
00660   ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
00661   _action.continuation->handleEvent(event, this);
00662   recursive--;
00663   if (closed)
00664     die();
00665   else if (vio.vc_server)
00666     handleEvent(EVENT_IMMEDIATE, 0);
00667   return EVENT_DONE;
00668 }
00669 
00670 TS_INLINE int
00671 CacheVC::do_read_call(CacheKey *akey)
00672 {
00673   doc_pos = 0;
00674   read_key = akey;
00675   io.aiocb.aio_nbytes = dir_approx_size(&dir);
00676 #if TS_USE_INTERIM_CACHE == 1
00677   interim_vol = NULL;
00678   ink_assert(mts == NULL);
00679   mts = NULL;
00680   f.write_into_interim = 0;
00681   f.ram_fixup = 0;
00682   f.transistor = 0;
00683   f.read_from_interim = dir_ininterim(&dir);
00684 
00685   if (!f.read_from_interim && vio.op == VIO::READ && good_interim_disks > 0){
00686     vol->history.put_key(read_key);
00687     if (vol->history.is_hot(read_key) && !vol->migrate_probe(read_key, NULL) && !od) {
00688       f.write_into_interim = 1;
00689     }
00690   }
00691   if (f.read_from_interim) {
00692     interim_vol = &vol->interim_vols[dir_get_index(&dir)];
00693     if (vio.op == VIO::READ && vol_transistor_range_valid(interim_vol, &dir)
00694         && !vol->migrate_probe(read_key, NULL) && !od)
00695       f.transistor = 1;
00696   }
00697   if (f.write_into_interim || f.transistor) {
00698     mts = migrateToInterimCacheAllocator.alloc();
00699     mts->vc = this;
00700     mts->key = *read_key;
00701     mts->rewrite = (f.transistor == 1);
00702     dir_assign(&mts->dir, &dir);
00703     vol->set_migrate_in_progress(mts);
00704   }
00705 #endif
00706   PUSH_HANDLER(&CacheVC::handleRead);
00707   return handleRead(EVENT_CALL, 0);
00708 }
00709 
00710 TS_INLINE int
00711 CacheVC::do_write_call()
00712 {
00713   PUSH_HANDLER(&CacheVC::handleWrite);
00714   return handleWrite(EVENT_CALL, 0);
00715 }
00716 
00717 TS_INLINE void
00718 CacheVC::cancel_trigger()
00719 {
00720   if (trigger) {
00721     trigger->cancel_action();
00722     trigger = NULL;
00723   }
00724 }
00725 
00726 TS_INLINE int
00727 CacheVC::die()
00728 {
00729   if (vio.op == VIO::WRITE) {
00730 #ifdef HTTP_CACHE
00731     if (f.update && total_len) {
00732       alternate.object_key_set(earliest_key);
00733     }
00734 #endif
00735     if (!is_io_in_progress()) {
00736       SET_HANDLER(&CacheVC::openWriteClose);
00737       if (!recursive)
00738         openWriteClose(EVENT_NONE, NULL);
00739     }                           // else catch it at the end of openWriteWriteDone
00740     return EVENT_CONT;
00741   } else {
00742     if (is_io_in_progress())
00743       save_handler = (ContinuationHandler) & CacheVC::openReadClose;
00744     else {
00745       SET_HANDLER(&CacheVC::openReadClose);
00746       if (!recursive)
00747         openReadClose(EVENT_NONE, NULL);
00748     }
00749     return EVENT_CONT;
00750   }
00751 }
00752 
00753 TS_INLINE int
00754 CacheVC::handleWriteLock(int /* event ATS_UNUSED */, Event *e)
00755 {
00756   cancel_trigger();
00757   int ret = 0;
00758   {
00759     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00760     if (!lock) {
00761       set_agg_write_in_progress();
00762       trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
00763       return EVENT_CONT;
00764     }
00765     ret = handleWrite(EVENT_CALL, e);
00766   }
00767   if (ret == EVENT_RETURN)
00768     return handleEvent(AIO_EVENT_DONE, 0);
00769   return EVENT_CONT;
00770 }
00771 
00772 TS_INLINE int
00773 CacheVC::do_write_lock()
00774 {
00775   PUSH_HANDLER(&CacheVC::handleWriteLock);
00776   return handleWriteLock(EVENT_NONE, 0);
00777 }
00778 
00779 TS_INLINE int
00780 CacheVC::do_write_lock_call()
00781 {
00782   PUSH_HANDLER(&CacheVC::handleWriteLock);
00783   return handleWriteLock(EVENT_CALL, 0);
00784 }
00785 
00786 TS_INLINE bool
00787 CacheVC::writer_done()
00788 {
00789   OpenDirEntry *cod = od;
00790   if (!cod)
00791     cod = vol->open_read(&first_key);
00792   CacheVC *w = (cod) ? cod->writers.head : NULL;
00793   // If the write vc started after the reader, then its not the
00794   // original writer, since we never choose a writer that started
00795   // after the reader. The original writer was deallocated and then
00796   // reallocated for the same first_key
00797   for (; w && (w != write_vc || w->start_time > start_time); w = (CacheVC *) w->opendir_link.next);
00798   if (!w)
00799     return true;
00800   return false;
00801 }
00802 
00803 TS_INLINE int
00804 Vol::close_write(CacheVC *cont)
00805 {
00806 #ifdef CACHE_STAT_PAGES
00807   ink_assert(stat_cache_vcs.head);
00808   stat_cache_vcs.remove(cont, cont->stat_link);
00809   ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
00810 #endif
00811   return open_dir.close_write(cont);
00812 }
00813 
00814 // Returns 0 on success or a positive error code on failure
00815 TS_INLINE int
00816 Vol::open_write(CacheVC *cont, int allow_if_writers, int max_writers)
00817 {
00818   Vol *vol = this;
00819   bool agg_error = false;
00820   if (!cont->f.remove) {
00821     agg_error = (!cont->f.update && agg_todo_size > cache_config_agg_write_backlog);
00822 #ifdef CACHE_AGG_FAIL_RATE
00823     agg_error = agg_error || ((uint32_t) mutex->thread_holding->generator.random() <
00824                               (uint32_t) (UINT_MAX * CACHE_AGG_FAIL_RATE));
00825 #endif
00826   }
00827   if (agg_error) {
00828     CACHE_INCREMENT_DYN_STAT(cache_write_backlog_failure_stat);
00829     return ECACHE_WRITE_FAIL;
00830   }
00831 #if TS_USE_INTERIM_CACHE == 1
00832   MigrateToInterimCache *m_result = NULL;
00833   if (vol->migrate_probe(&cont->first_key, &m_result)) {
00834     m_result->notMigrate = true;
00835   }
00836 #endif
00837   if (open_dir.open_write(cont, allow_if_writers, max_writers)) {
00838 #ifdef CACHE_STAT_PAGES
00839     ink_assert(cont->mutex->thread_holding == this_ethread());
00840     ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
00841     stat_cache_vcs.enqueue(cont, cont->stat_link);
00842 #endif
00843     return 0;
00844   }
00845   return ECACHE_DOC_BUSY;
00846 }
00847 
00848 TS_INLINE int
00849 Vol::close_write_lock(CacheVC *cont)
00850 {
00851   EThread *t = cont->mutex->thread_holding;
00852   CACHE_TRY_LOCK(lock, mutex, t);
00853   if (!lock)
00854     return -1;
00855   return close_write(cont);
00856 }
00857 
00858 TS_INLINE int
00859 Vol::open_write_lock(CacheVC *cont, int allow_if_writers, int max_writers)
00860 {
00861   EThread *t = cont->mutex->thread_holding;
00862   CACHE_TRY_LOCK(lock, mutex, t);
00863   if (!lock)
00864     return -1;
00865   return open_write(cont, allow_if_writers, max_writers);
00866 }
00867 
00868 TS_INLINE OpenDirEntry *
00869 Vol::open_read_lock(INK_MD5 *key, EThread *t)
00870 {
00871   CACHE_TRY_LOCK(lock, mutex, t);
00872   if (!lock)
00873     return NULL;
00874   return open_dir.open_read(key);
00875 }
00876 
00877 TS_INLINE int
00878 Vol::begin_read_lock(CacheVC *cont)
00879 {
00880   // no need for evacuation as the entire document is already in memory
00881 #ifndef  CACHE_STAT_PAGES
00882   if (cont->f.single_fragment)
00883     return 0;
00884 #endif
00885   // VC is enqueued in stat_cache_vcs in the begin_read call
00886   EThread *t = cont->mutex->thread_holding;
00887   CACHE_TRY_LOCK(lock, mutex, t);
00888   if (!lock)
00889     return -1;
00890   return begin_read(cont);
00891 }
00892 
00893 TS_INLINE int
00894 Vol::close_read_lock(CacheVC *cont)
00895 {
00896   EThread *t = cont->mutex->thread_holding;
00897   CACHE_TRY_LOCK(lock, mutex, t);
00898   if (!lock)
00899     return -1;
00900   return close_read(cont);
00901 }
00902 
00903 TS_INLINE int
00904 dir_delete_lock(CacheKey *key, Vol *d, ProxyMutex *m, Dir *del)
00905 {
00906   EThread *thread = m->thread_holding;
00907   CACHE_TRY_LOCK(lock, d->mutex, thread);
00908   if (!lock)
00909     return -1;
00910   return dir_delete(key, d, del);
00911 }
00912 
00913 TS_INLINE int
00914 dir_insert_lock(CacheKey *key, Vol *d, Dir *to_part, ProxyMutex *m)
00915 {
00916   EThread *thread = m->thread_holding;
00917   CACHE_TRY_LOCK(lock, d->mutex, thread);
00918   if (!lock)
00919     return -1;
00920   return dir_insert(key, d, to_part);
00921 }
00922 
00923 TS_INLINE int
00924 dir_overwrite_lock(CacheKey *key, Vol *d, Dir *to_part, ProxyMutex *m, Dir *overwrite, bool must_overwrite = true)
00925 {
00926   EThread *thread = m->thread_holding;
00927   CACHE_TRY_LOCK(lock, d->mutex, thread);
00928   if (!lock)
00929     return -1;
00930   return dir_overwrite(key, d, to_part, overwrite, must_overwrite);
00931 }
00932 
00933 void TS_INLINE
00934 rand_CacheKey(CacheKey *next_key, ProxyMutex *mutex)
00935 {
00936   next_key->b[0] = mutex->thread_holding->generator.random();
00937   next_key->b[1] = mutex->thread_holding->generator.random();
00938 }
00939 
00940 extern uint8_t CacheKey_next_table[];
00941 void TS_INLINE
00942 next_CacheKey(CacheKey *next_key, CacheKey *key)
00943 {
00944   uint8_t *b = (uint8_t *) next_key;
00945   uint8_t *k = (uint8_t *) key;
00946   b[0] = CacheKey_next_table[k[0]];
00947   for (int i = 1; i < 16; i++)
00948     b[i] = CacheKey_next_table[(b[i - 1] + k[i]) & 0xFF];
00949 }
00950 extern uint8_t CacheKey_prev_table[];
00951 void TS_INLINE
00952 prev_CacheKey(CacheKey *prev_key, CacheKey *key)
00953 {
00954   uint8_t *b = (uint8_t *) prev_key;
00955   uint8_t *k = (uint8_t *) key;
00956   for (int i = 15; i > 0; i--)
00957     b[i] = 256 + CacheKey_prev_table[k[i]] - k[i - 1];
00958   b[0] = CacheKey_prev_table[k[0]];
00959 }
00960 
00961 TS_INLINE unsigned int
00962 next_rand(unsigned int *p)
00963 {
00964   unsigned int seed = *p;
00965   seed = 1103515145 * seed + 12345;
00966   *p = seed;
00967   return seed;
00968 }
00969 
00970 extern ClassAllocator<CacheRemoveCont> cacheRemoveContAllocator;
00971 
00972 TS_INLINE CacheRemoveCont *
00973 new_CacheRemoveCont()
00974 {
00975   CacheRemoveCont *cache_rm = cacheRemoveContAllocator.alloc();
00976 
00977   cache_rm->mutex = new_ProxyMutex();
00978   SET_CONTINUATION_HANDLER(cache_rm, &CacheRemoveCont::event_handler);
00979   return cache_rm;
00980 }
00981 
00982 TS_INLINE void
00983 free_CacheRemoveCont(CacheRemoveCont *cache_rm)
00984 {
00985   cache_rm->mutex = NULL;
00986   cacheRemoveContAllocator.free(cache_rm);
00987 }
00988 
00989 TS_INLINE int
00990 CacheRemoveCont::event_handler(int event, void *data)
00991 {
00992   (void) event;
00993   (void) data;
00994   free_CacheRemoveCont(this);
00995   return EVENT_DONE;
00996 }
00997 
00998 int64_t cache_bytes_used(void);
00999 int64_t cache_bytes_total(void);
01000 
01001 #ifdef DEBUG
01002 #define CACHE_DEBUG_INCREMENT_DYN_STAT(_x) CACHE_INCREMENT_DYN_STAT(_x)
01003 #define CACHE_DEBUG_SUM_DYN_STAT(_x,_y) CACHE_SUM_DYN_STAT(_x,_y)
01004 #else
01005 #define CACHE_DEBUG_INCREMENT_DYN_STAT(_x)
01006 #define CACHE_DEBUG_SUM_DYN_STAT(_x,_y)
01007 #endif
01008 
01009 struct CacheHostRecord;
01010 struct Vol;
01011 class CacheHostTable;
01012 
01013 struct Cache
01014 {
01015   volatile int cache_read_done;
01016   volatile int total_good_nvol;
01017   volatile int total_nvol;
01018   volatile int ready;
01019   int64_t cache_size;             //in store block size
01020   CacheHostTable *hosttable;
01021   volatile int total_initialized_vol;
01022   CacheType scheme;
01023 
01024   int open(bool reconfigure, bool fix);
01025   int close();
01026 
01027   Action *lookup(Continuation *cont, CacheKey *key, CacheFragType type, char const* hostname, int host_len);
01028   inkcoreapi Action *open_read(Continuation *cont, CacheKey *key, CacheFragType type, char *hostname, int len);
01029   inkcoreapi Action *open_write(Continuation *cont, CacheKey *key,
01030                                 CacheFragType frag_type, int options = 0,
01031                                 time_t pin_in_cache = (time_t) 0, char *hostname = 0, int host_len = 0);
01032   inkcoreapi Action *remove(Continuation *cont, CacheKey *key,
01033                             CacheFragType type = CACHE_FRAG_TYPE_HTTP,
01034                             bool user_agents = true, bool link = false,
01035                             char *hostname = 0, int host_len = 0);
01036   Action *scan(Continuation *cont, char *hostname = 0, int host_len = 0, int KB_per_second = 2500);
01037 
01038 #ifdef HTTP_CACHE
01039   Action *lookup(Continuation *cont, URL *url, CacheFragType type);
01040   inkcoreapi Action *open_read(Continuation *cont, CacheKey *key,
01041                                CacheHTTPHdr *request,
01042                                CacheLookupHttpConfig *params, CacheFragType type, char *hostname, int host_len);
01043   Action *open_read(Continuation *cont, URL *url, CacheHTTPHdr *request,
01044                     CacheLookupHttpConfig *params, CacheFragType type);
01045   Action *open_write(Continuation *cont, CacheKey *key,
01046                      CacheHTTPInfo *old_info, time_t pin_in_cache = (time_t) 0,
01047                      CacheKey *key1 = NULL,
01048                      CacheFragType type = CACHE_FRAG_TYPE_HTTP, char *hostname = 0, int host_len = 0);
01049   Action *open_write(Continuation *cont, URL *url, CacheHTTPHdr *request,
01050                      CacheHTTPInfo *old_info, time_t pin_in_cache = (time_t) 0,
01051                      CacheFragType type = CACHE_FRAG_TYPE_HTTP);
01052   static void generate_key(INK_MD5 *md5, URL *url);
01053 #endif
01054 
01055   Action *link(Continuation *cont, CacheKey *from, CacheKey *to, CacheFragType type, char *hostname, int host_len);
01056   Action *deref(Continuation *cont, CacheKey *key, CacheFragType type, char *hostname, int host_len);
01057 
01058   void vol_initialized(bool result);
01059 
01060   int open_done();
01061 
01062   Vol *key_to_vol(CacheKey *key, char const* hostname, int host_len);
01063 
01064   Cache()
01065     : cache_read_done(0), total_good_nvol(0), total_nvol(0), ready(CACHE_INITIALIZING), cache_size(0),  // in store block size
01066       hosttable(NULL), total_initialized_vol(0), scheme(CACHE_NONE_TYPE)
01067     { }
01068 };
01069 
01070 extern Cache *theCache;
01071 extern Cache *theStreamCache;
01072 inkcoreapi extern Cache *caches[NUM_CACHE_FRAG_TYPES];
01073 
01074 #ifdef HTTP_CACHE
01075 TS_INLINE Action *
01076 Cache::open_read(Continuation *cont, CacheURL *url, CacheHTTPHdr *request,
01077                  CacheLookupHttpConfig *params, CacheFragType type)
01078 {
01079   INK_MD5 md5;
01080   int len;
01081   url->hash_get(&md5);
01082   const char *hostname = url->host_get(&len);
01083   return open_read(cont, &md5, request, params, type, (char *) hostname, len);
01084 }
01085 
01086 TS_INLINE void
01087 Cache::generate_key(INK_MD5 *md5, URL *url)
01088 {
01089   url->hash_get(md5);
01090 }
01091 
01092 TS_INLINE Action *
01093 Cache::open_write(Continuation *cont, CacheURL *url, CacheHTTPHdr *request,
01094                   CacheHTTPInfo *old_info, time_t pin_in_cache, CacheFragType type)
01095 {
01096   (void) request;
01097   INK_MD5 url_md5;
01098   url->hash_get(&url_md5);
01099   int len;
01100   const char *hostname = url->host_get(&len);
01101 
01102   return open_write(cont, &url_md5, old_info, pin_in_cache, NULL, type, (char *) hostname, len);
01103 }
01104 #endif
01105 
01106 TS_INLINE unsigned int
01107 cache_hash(INK_MD5 & md5)
01108 {
01109   uint64_t f = md5.fold();
01110   unsigned int mhash = (unsigned int) (f >> 32);
01111   return mhash;
01112 }
01113 
01114 #ifdef HTTP_CACHE
01115 #define CLUSTER_CACHE
01116 #endif
01117 #ifdef CLUSTER_CACHE
01118 #include "P_Net.h"
01119 #include "P_ClusterInternal.h"
01120 // Note: This include must occur here in order to avoid numerous forward
01121 //       reference problems.
01122 #include "P_ClusterInline.h"
01123 #endif
01124 
01125 TS_INLINE Action *
01126 CacheProcessor::lookup(Continuation *cont, CacheKey *key, bool cluster_cache_local ATS_UNUSED,
01127                        bool local_only ATS_UNUSED, CacheFragType frag_type, char *hostname, int host_len)
01128 {
01129 #ifdef CLUSTER_CACHE
01130   // Try to send remote, if not possible, handle locally
01131   if ((cache_clustering_enabled > 0) && !cluster_cache_local && !local_only) {
01132     Action *a = Cluster_lookup(cont, key, frag_type, hostname, host_len);
01133     if (a) {
01134       return a;
01135     }
01136   }
01137 #endif
01138   return caches[frag_type]->lookup(cont, key, frag_type, hostname, host_len);
01139 }
01140 
01141 TS_INLINE inkcoreapi Action *
01142 CacheProcessor::open_read(Continuation *cont, CacheKey *key, bool cluster_cache_local ATS_UNUSED,
01143                           CacheFragType frag_type, char *hostname, int host_len)
01144 {
01145 #ifdef CLUSTER_CACHE
01146   if (cache_clustering_enabled > 0 && !cluster_cache_local) {
01147     return open_read_internal(CACHE_OPEN_READ, cont, (MIOBuffer *) 0,
01148                               (CacheURL *) 0, (CacheHTTPHdr *) 0,
01149                               (CacheLookupHttpConfig *) 0, key, 0, frag_type, hostname, host_len);
01150   }
01151 #endif
01152   return caches[frag_type]->open_read(cont, key, frag_type, hostname, host_len);
01153 }
01154 
01155 TS_INLINE Action *
01156 CacheProcessor::open_read_buffer(Continuation *cont, MIOBuffer *buf ATS_UNUSED, CacheKey *key, CacheFragType frag_type,
01157                                  char *hostname, int host_len)
01158 {
01159 #ifdef CLUSTER_CACHE
01160   if (cache_clustering_enabled > 0) {
01161     return open_read_internal(CACHE_OPEN_READ_BUFFER, cont, buf,
01162                               (CacheURL *) 0, (CacheHTTPHdr *) 0,
01163                               (CacheLookupHttpConfig *) 0, key, 0, frag_type, hostname, host_len);
01164   }
01165 #endif
01166   return caches[frag_type]->open_read(cont, key, frag_type, hostname, host_len);
01167 }
01168 
01169 
01170 TS_INLINE inkcoreapi Action *
01171 CacheProcessor::open_write(Continuation *cont, CacheKey *key, bool cluster_cache_local  ATS_UNUSED,
01172                            CacheFragType frag_type, int expected_size ATS_UNUSED, int options,
01173                            time_t pin_in_cache, char *hostname, int host_len)
01174 {
01175 #ifdef CLUSTER_CACHE
01176   if (cache_clustering_enabled > 0 && !cluster_cache_local) {
01177     ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
01178     if (m)
01179       return Cluster_write(cont, expected_size, (MIOBuffer *) 0, m,
01180                          key, frag_type, options, pin_in_cache,
01181                          CACHE_OPEN_WRITE, key, (CacheURL *) 0,
01182                          (CacheHTTPHdr *) 0, (CacheHTTPInfo *) 0, hostname, host_len);
01183   }
01184 #endif
01185   return caches[frag_type]->open_write(cont, key, frag_type, options, pin_in_cache, hostname, host_len);
01186 }
01187 
01188 TS_INLINE Action *
01189 CacheProcessor::open_write_buffer(Continuation *cont, MIOBuffer *buf, CacheKey *key,
01190                                   CacheFragType frag_type, int options, time_t pin_in_cache,
01191                                   char *hostname, int host_len)
01192 {
01193   (void)pin_in_cache;
01194   (void)cont;
01195   (void)buf;
01196   (void)key;
01197   (void)frag_type;
01198   (void)options;
01199   (void)hostname;
01200   (void)host_len;
01201   ink_assert(!"implemented");
01202   return NULL;
01203 }
01204 
01205 TS_INLINE Action *
01206 CacheProcessor::remove(Continuation *cont, CacheKey *key, bool cluster_cache_local ATS_UNUSED, CacheFragType frag_type,
01207                        bool rm_user_agents, bool rm_link, char *hostname, int host_len)
01208 {
01209   Debug("cache_remove", "[CacheProcessor::remove] Issuing cache delete for %u", cache_hash(*key));
01210 #ifdef CLUSTER_CACHE
01211   if (cache_clustering_enabled > 0 && !cluster_cache_local) {
01212     ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
01213 
01214     if (m) {
01215       return Cluster_remove(m, cont, key, rm_user_agents, rm_link, frag_type, hostname, host_len);
01216     }
01217   }
01218 #endif
01219   return caches[frag_type]->remove(cont, key, frag_type, rm_user_agents, rm_link, hostname, host_len);
01220 }
01221 
01222 # if 0
01223 TS_INLINE Action *
01224 scan(Continuation *cont, char *hostname = 0, int host_len = 0, int KB_per_second = 2500)
01225 {
01226   return caches[CACHE_FRAG_TYPE_HTTP]->scan(cont, hostname, host_len, KB_per_second);
01227 }
01228 # endif
01229 
01230 #ifdef HTTP_CACHE
01231 TS_INLINE Action *
01232 CacheProcessor::lookup(Continuation *cont, URL *url, bool cluster_cache_local, bool local_only, CacheFragType frag_type)
01233 {
01234   (void) local_only;
01235   INK_MD5 md5;
01236   url->hash_get(&md5);
01237   int host_len = 0;
01238   const char *hostname = url->host_get(&host_len);
01239 
01240   return lookup(cont, &md5, cluster_cache_local, local_only, frag_type, (char *) hostname, host_len);
01241 }
01242 
01243 TS_INLINE Action *
01244 CacheProcessor::open_read_buffer(Continuation *cont, MIOBuffer *buf,
01245                                  URL *url, CacheHTTPHdr *request, CacheLookupHttpConfig *params, CacheFragType type)
01246 {
01247   (void) buf;
01248 #ifdef CLUSTER_CACHE
01249   if (cache_clustering_enabled > 0) {
01250     return open_read_internal(CACHE_OPEN_READ_BUFFER_LONG, cont, buf, url,
01251                               request, params, (CacheKey *) 0, 0, type, (char *) 0, 0);
01252   }
01253 #endif
01254   return caches[type]->open_read(cont, url, request, params, type);
01255 }
01256 
01257 TS_INLINE Action *
01258 CacheProcessor::open_write_buffer(Continuation * cont, MIOBuffer * buf, URL * url,
01259                                   CacheHTTPHdr * request, CacheHTTPHdr * response, CacheFragType type)
01260 {
01261   (void) cont;
01262   (void) buf;
01263   (void) url;
01264   (void) request;
01265   (void) response;
01266   (void) type;
01267   ink_assert(!"implemented");
01268   return NULL;
01269 }
01270 
01271 #endif
01272 
01273 
01274 #ifdef CLUSTER_CACHE
01275 TS_INLINE Action *
01276 CacheProcessor::open_read_internal(int opcode,
01277                                    Continuation *cont, MIOBuffer *buf,
01278                                    CacheURL *url,
01279                                    CacheHTTPHdr *request,
01280                                    CacheLookupHttpConfig *params,
01281                                    CacheKey *key,
01282                                    time_t pin_in_cache, CacheFragType frag_type, char *hostname, int host_len)
01283 {
01284   INK_MD5 url_md5;
01285   if ((opcode == CACHE_OPEN_READ_LONG) || (opcode == CACHE_OPEN_READ_BUFFER_LONG)) {
01286     Cache::generate_key(&url_md5, url);
01287   } else {
01288     url_md5 = *key;
01289   }
01290   ClusterMachine *m = cluster_machine_at_depth(cache_hash(url_md5));
01291 
01292   if (m) {
01293     return Cluster_read(m, opcode, cont, buf, url,
01294                         request, params, key, pin_in_cache, frag_type, hostname, host_len);
01295   } else {
01296     if ((opcode == CACHE_OPEN_READ_LONG)
01297         || (opcode == CACHE_OPEN_READ_BUFFER_LONG)) {
01298       return caches[frag_type]->open_read(cont, &url_md5, request, params, frag_type, hostname, host_len);
01299     } else {
01300       return caches[frag_type]->open_read(cont, key, frag_type, hostname, host_len);
01301     }
01302   }
01303 }
01304 #endif
01305 
01306 #ifdef CLUSTER_CACHE
01307 TS_INLINE Action *
01308 CacheProcessor::link(Continuation *cont, CacheKey *from, CacheKey *to, bool cluster_cache_local,
01309                      CacheFragType type, char *hostname, int host_len)
01310 {
01311   if (cache_clustering_enabled > 0 && !cluster_cache_local) {
01312     // Use INK_MD5 in "from" to determine target machine
01313     ClusterMachine *m = cluster_machine_at_depth(cache_hash(*from));
01314     if (m) {
01315       return Cluster_link(m, cont, from, to, type, hostname, host_len);
01316     }
01317   }
01318   return caches[type]->link(cont, from, to, type, hostname, host_len);
01319 }
01320 
01321 TS_INLINE Action *
01322 CacheProcessor::deref(Continuation *cont, CacheKey *key, bool cluster_cache_local, CacheFragType type, char *hostname, int host_len)
01323 {
01324   if (cache_clustering_enabled > 0 && !cluster_cache_local) {
01325     ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
01326     if (m) {
01327       return Cluster_deref(m, cont, key, type, hostname, host_len);
01328     }
01329   }
01330   return caches[type]->deref(cont, key, type, hostname, host_len);
01331 }
01332 #endif
01333 
01334 TS_INLINE Action *
01335 CacheProcessor::scan(Continuation *cont, char *hostname, int host_len, int KB_per_second)
01336 {
01337   return caches[CACHE_FRAG_TYPE_HTTP]->scan(cont, hostname, host_len, KB_per_second);
01338 }
01339 
01340 TS_INLINE int
01341 CacheProcessor::IsCacheEnabled()
01342 {
01343   return CacheProcessor::initialized;
01344 }
01345 
01346 TS_INLINE bool
01347 CacheProcessor::IsCacheReady(CacheFragType type)
01348 {
01349   if (IsCacheEnabled() != CACHE_INITIALIZED)
01350     return 0;
01351   return (bool)(cache_ready & (1 << type));
01352 }
01353 
01354 TS_INLINE Cache *
01355 local_cache()
01356 {
01357   return theCache;
01358 }
01359 
01360 LINK_DEFINITION(CacheVC, opendir_link)
01361 
01362 #endif /* _P_CACHE_INTERNAL_H__ */

Generated by  doxygen 1.7.1