00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #ifndef _P_CACHE_INTERNAL_H__
00026 #define _P_CACHE_INTERNAL_H__
00027
00028 #include "libts.h"
00029
00030 #ifdef HTTP_CACHE
00031 #include "HTTP.h"
00032 #include "P_CacheHttp.h"
00033 #endif
00034
00035 struct EvacuationBlock;
00036
00037
00038
00039 #define ALTERNATES 1
00040
00041
00042
00043 #define MAX_CACHE_VCS_PER_THREAD 500
00044
00045 #define INTEGRAL_FRAGS 4
00046
00047 #ifdef CACHE_INSPECTOR_PAGES
00048 #ifdef DEBUG
00049 #define CACHE_STAT_PAGES
00050 #endif
00051 #endif
00052
00053 #ifdef DEBUG
00054 #define DDebug Debug
00055 #else
00056 #define DDebug if (0) dummy_debug
00057 #endif
00058
00059 #define AIO_SOFT_FAILURE -100000
00060
00061 #define WRITER_RETRY_DELAY HRTIME_MSECONDS(50)
00062
00063 #ifndef CACHE_LOCK_FAIL_RATE
00064 #define CACHE_TRY_LOCK(_l, _m, _t) MUTEX_TRY_LOCK(_l, _m, _t)
00065 #else
00066 #define CACHE_TRY_LOCK(_l, _m, _t) \
00067 MUTEX_TRY_LOCK(_l, _m, _t); \
00068 if ((uint32_t)_t->generator.random() < \
00069 (uint32_t)(UINT_MAX *CACHE_LOCK_FAIL_RATE)) \
00070 CACHE_MUTEX_RELEASE(_l)
00071 #endif
00072
00073
00074 #define VC_LOCK_RETRY_EVENT() \
00075 do { \
00076 trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay), event); \
00077 return EVENT_CONT; \
00078 } while (0)
00079
00080 #define VC_SCHED_LOCK_RETRY() \
00081 do { \
00082 trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); \
00083 return EVENT_CONT; \
00084 } while (0)
00085
00086 #define CONT_SCHED_LOCK_RETRY_RET(_c) \
00087 do { \
00088 _c->mutex->thread_holding->schedule_in_local(_c, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); \
00089 return EVENT_CONT; \
00090 } while (0)
00091
00092 #define CONT_SCHED_LOCK_RETRY(_c) \
00093 _c->mutex->thread_holding->schedule_in_local(_c, HRTIME_MSECONDS(cache_config_mutex_retry_delay))
00094
00095 #define VC_SCHED_WRITER_RETRY() \
00096 do { \
00097 ink_assert(!trigger); \
00098 writer_lock_retry++; \
00099 ink_hrtime _t = WRITER_RETRY_DELAY; \
00100 if (writer_lock_retry > 2) \
00101 _t = WRITER_RETRY_DELAY * 2; \
00102 else if (writer_lock_retry > 5) \
00103 _t = WRITER_RETRY_DELAY * 10; \
00104 else if (writer_lock_retry > 10) \
00105 _t = WRITER_RETRY_DELAY * 100; \
00106 trigger = mutex->thread_holding->schedule_in_local(this, _t); \
00107 return EVENT_CONT; \
00108 } while (0)
00109
00110
00111
00112 enum
00113 {
00114 cache_bytes_used_stat,
00115 cache_bytes_total_stat,
00116 cache_ram_cache_bytes_stat,
00117 cache_ram_cache_bytes_total_stat,
00118 cache_direntries_total_stat,
00119 cache_direntries_used_stat,
00120 cache_ram_cache_hits_stat,
00121 cache_ram_cache_misses_stat,
00122 cache_pread_count_stat,
00123 cache_percent_full_stat,
00124 cache_lookup_active_stat,
00125 cache_lookup_success_stat,
00126 cache_lookup_failure_stat,
00127 cache_read_active_stat,
00128 cache_read_success_stat,
00129 cache_read_failure_stat,
00130 #if TS_USE_INTERIM_CACHE == 1
00131 cache_interim_read_success_stat,
00132 cache_disk_read_success_stat,
00133 cache_ram_read_success_stat,
00134 #endif
00135 cache_write_active_stat,
00136 cache_write_success_stat,
00137 cache_write_failure_stat,
00138 cache_write_backlog_failure_stat,
00139 cache_update_active_stat,
00140 cache_update_success_stat,
00141 cache_update_failure_stat,
00142 cache_remove_active_stat,
00143 cache_remove_success_stat,
00144 cache_remove_failure_stat,
00145 cache_evacuate_active_stat,
00146 cache_evacuate_success_stat,
00147 cache_evacuate_failure_stat,
00148 cache_scan_active_stat,
00149 cache_scan_success_stat,
00150 cache_scan_failure_stat,
00151 cache_directory_collision_count_stat,
00152 cache_single_fragment_document_count_stat,
00153 cache_two_fragment_document_count_stat,
00154 cache_three_plus_plus_fragment_document_count_stat,
00155 cache_read_busy_success_stat,
00156 cache_read_busy_failure_stat,
00157 cache_gc_bytes_evacuated_stat,
00158 cache_gc_frags_evacuated_stat,
00159 cache_write_bytes_stat,
00160 cache_hdr_vector_marshal_stat,
00161 cache_hdr_marshal_stat,
00162 cache_hdr_marshal_bytes_stat,
00163 cache_stat_count
00164 };
00165
00166
00167 extern RecRawStatBlock *cache_rsb;
00168
00169 #define GLOBAL_CACHE_SET_DYN_STAT(x,y) \
00170 RecSetGlobalRawStatSum(cache_rsb, (x), (y))
00171
00172 #define CACHE_SET_DYN_STAT(x,y) \
00173 RecSetGlobalRawStatSum(cache_rsb, (x), (y)) \
00174 RecSetGlobalRawStatSum(vol->cache_vol->vol_rsb, (x), (y))
00175
00176 #define CACHE_INCREMENT_DYN_STAT(x) \
00177 RecIncrRawStat(cache_rsb, mutex->thread_holding, (int) (x), 1); \
00178 RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int) (x), 1);
00179
00180 #define CACHE_DECREMENT_DYN_STAT(x) \
00181 RecIncrRawStat(cache_rsb, mutex->thread_holding, (int) (x), -1); \
00182 RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int) (x), -1);
00183
00184 #define CACHE_VOL_SUM_DYN_STAT(x,y) \
00185 RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int) (x), (int64_t) y);
00186
00187 #define CACHE_SUM_DYN_STAT(x, y) \
00188 RecIncrRawStat(cache_rsb, mutex->thread_holding, (int) (x), (int64_t) (y)); \
00189 RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int) (x), (int64_t) (y));
00190
00191 #define CACHE_SUM_DYN_STAT_THREAD(x, y) \
00192 RecIncrRawStat(cache_rsb, this_ethread(), (int) (x), (int64_t) (y)); \
00193 RecIncrRawStat(vol->cache_vol->vol_rsb, this_ethread(), (int) (x), (int64_t) (y));
00194
00195 #define GLOBAL_CACHE_SUM_GLOBAL_DYN_STAT(x, y) \
00196 RecIncrGlobalRawStatSum(cache_rsb,(x),(y))
00197
00198 #define CACHE_SUM_GLOBAL_DYN_STAT(x, y) \
00199 RecIncrGlobalRawStatSum(cache_rsb,(x),(y)) \
00200 RecIncrGlobalRawStatSum(vol->cache_vol->vol_rsb,(x),(y))
00201
00202 #define CACHE_CLEAR_DYN_STAT(x) \
00203 do { \
00204 RecSetRawStatSum(cache_rsb, (x), 0); \
00205 RecSetRawStatCount(cache_rsb, (x), 0); \
00206 RecSetRawStatSum(vol->cache_vol->vol_rsb, (x), 0); \
00207 RecSetRawStatCount(vol->cache_vol->vol_rsb, (x), 0); \
00208 } while (0);
00209
00210
00211 extern int cache_config_dir_sync_frequency;
00212 extern int cache_config_http_max_alts;
00213 extern int cache_config_permit_pinning;
00214 extern int cache_config_select_alternate;
00215 extern int cache_config_vary_on_user_agent;
00216 extern int cache_config_max_doc_size;
00217 extern int cache_config_min_average_object_size;
00218 extern int cache_config_agg_write_backlog;
00219 extern int cache_config_enable_checksum;
00220 extern int cache_config_alt_rewrite_max_size;
00221 extern int cache_config_read_while_writer;
00222 extern int cache_clustering_enabled;
00223 extern int cache_config_agg_write_backlog;
00224 extern int cache_config_ram_cache_compress;
00225 extern int cache_config_ram_cache_compress_percent;
00226 extern int cache_config_ram_cache_use_seen_filter;
00227 extern int cache_config_hit_evacuate_percent;
00228 extern int cache_config_hit_evacuate_size_limit;
00229 extern int cache_config_force_sector_size;
00230 extern int cache_config_target_fragment_size;
00231 extern int cache_config_mutex_retry_delay;
00232 #if TS_USE_INTERIM_CACHE == 1
00233 extern int good_interim_disks;
00234 #endif
00235
00236 struct CacheVC: public CacheVConnection
00237 {
00238 CacheVC();
00239
00240 VIO *do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf);
00241 VIO *do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *buf, int64_t offset);
00242 VIO *do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner = false);
00243 void do_io_close(int lerrno = -1);
00244 void reenable(VIO *avio);
00245 void reenable_re(VIO *avio);
00246 bool get_data(int i, void *data);
00247 bool set_data(int i, void *data);
00248
00249 bool is_ram_cache_hit() const
00250 {
00251 ink_assert(vio.op == VIO::READ);
00252 return !f.not_from_ram_cache;
00253 }
00254 int get_header(void **ptr, int *len)
00255 {
00256 if (first_buf.m_ptr) {
00257 Doc *doc = (Doc*)first_buf->data();
00258 *ptr = doc->hdr();
00259 *len = doc->hlen;
00260 return 0;
00261 } else
00262 return -1;
00263 }
00264 int set_header(void *ptr, int len)
00265 {
00266 header_to_write = ptr;
00267 header_to_write_len = len;
00268 return 0;
00269 }
00270 int get_single_data(void **ptr, int *len)
00271 {
00272 if (first_buf.m_ptr) {
00273 Doc *doc = (Doc*)first_buf->data();
00274 if (doc->data_len() == doc->total_len) {
00275 *ptr = doc->data();
00276 *len = doc->data_len();
00277 return 0;
00278 }
00279 }
00280 return -1;
00281 }
00282
00283 bool writer_done();
00284 int calluser(int event);
00285 int callcont(int event);
00286 int die();
00287 int dead(int event, Event *e);
00288
00289 int handleReadDone(int event, Event *e);
00290 int handleRead(int event, Event *e);
00291 int do_read_call(CacheKey *akey);
00292 int handleWrite(int event, Event *e);
00293 int handleWriteLock(int event, Event *e);
00294 int do_write_call();
00295 int do_write_lock();
00296 int do_write_lock_call();
00297 int do_sync(uint32_t target_write_serial);
00298
00299 int openReadClose(int event, Event *e);
00300 int openReadReadDone(int event, Event *e);
00301 int openReadMain(int event, Event *e);
00302 int openReadStartEarliest(int event, Event *e);
00303 #ifdef HTTP_CACHE
00304 int openReadVecWrite(int event, Event *e);
00305 #endif
00306 int openReadStartHead(int event, Event *e);
00307 int openReadFromWriter(int event, Event *e);
00308 int openReadFromWriterMain(int event, Event *e);
00309 int openReadFromWriterFailure(int event, Event *);
00310 int openReadChooseWriter(int event, Event *e);
00311
00312 int openWriteCloseDir(int event, Event *e);
00313 int openWriteCloseHeadDone(int event, Event *e);
00314 int openWriteCloseHead(int event, Event *e);
00315 int openWriteCloseDataDone(int event, Event *e);
00316 int openWriteClose(int event, Event *e);
00317 int openWriteRemoveVector(int event, Event *e);
00318 int openWriteWriteDone(int event, Event *e);
00319 int openWriteOverwrite(int event, Event *e);
00320 int openWriteMain(int event, Event *e);
00321 int openWriteStartDone(int event, Event *e);
00322 int openWriteStartBegin(int event, Event *e);
00323
00324 int updateVector(int event, Event *e);
00325 int updateReadDone(int event, Event *e);
00326 int updateVecWrite(int event, Event *e);
00327
00328 int removeEvent(int event, Event *e);
00329
00330 int linkWrite(int event, Event *e);
00331 int derefRead(int event, Event *e);
00332
00333 int scanVol(int event, Event *e);
00334 int scanObject(int event, Event *e);
00335 int scanUpdateDone(int event, Event *e);
00336 int scanOpenWrite(int event, Event *e);
00337 int scanRemoveDone(int event, Event *e);
00338
00339 int is_io_in_progress()
00340 {
00341 return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS;
00342 }
00343 void set_io_not_in_progress()
00344 {
00345 io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS;
00346 }
00347 void set_agg_write_in_progress()
00348 {
00349 io.aiocb.aio_fildes = AIO_AGG_WRITE_IN_PROGRESS;
00350 }
00351 int evacuateDocDone(int event, Event *e);
00352 int evacuateReadHead(int event, Event *e);
00353
00354 void cancel_trigger();
00355 virtual int64_t get_object_size();
00356 #ifdef HTTP_CACHE
00357 virtual void set_http_info(CacheHTTPInfo *info);
00358 virtual void get_http_info(CacheHTTPInfo ** info);
00359
00360
00361
00362
00363 virtual HTTPInfo::FragOffset* get_frag_table();
00364
00365
00366
00367 virtual uint32_t load_http_info(CacheHTTPInfoVector* info, struct Doc* doc, RefCountObj * block_ptr = NULL);
00368 #endif
00369 virtual bool is_pread_capable();
00370 virtual bool set_pin_in_cache(time_t time_pin);
00371 virtual time_t get_pin_in_cache();
00372 virtual bool set_disk_io_priority(int priority);
00373 virtual int get_disk_io_priority();
00374
00375
00376 #define CACHE_STAT_ACTIVE 0
00377 #define CACHE_STAT_SUCCESS 1
00378 #define CACHE_STAT_FAILURE 2
00379
00380
00381
00382
00383 static int size_to_init;
00384
00385
00386
00387
00388
00389
00390 CacheKey key, first_key, earliest_key, update_key;
00391 Dir dir, earliest_dir, overwrite_dir, first_dir;
00392
00393
00394
00395
00396
00397
00398 Action _action;
00399 #ifdef HTTP_CACHE
00400 CacheHTTPHdr request;
00401 #endif
00402 CacheHTTPInfoVector vector;
00403 CacheHTTPInfo alternate;
00404 Ptr<IOBufferData> buf;
00405 Ptr<IOBufferData> first_buf;
00406 Ptr<IOBufferBlock> blocks;
00407 Ptr<IOBufferBlock> writer_buf;
00408
00409 OpenDirEntry *od;
00410 AIOCallbackInternal io;
00411 int alternate_index;
00412 LINK(CacheVC, opendir_link);
00413 #ifdef CACHE_STAT_PAGES
00414 LINK(CacheVC, stat_link);
00415 #endif
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425 VIO vio;
00426 EThread *initial_thread;
00427 CacheFragType frag_type;
00428 CacheHTTPInfo *info;
00429 CacheHTTPInfoVector *write_vector;
00430 #ifdef HTTP_CACHE
00431 CacheLookupHttpConfig *params;
00432 #endif
00433 int header_len;
00434 int frag_len;
00435 uint32_t write_len;
00436 uint32_t agg_len;
00437 uint32_t write_serial;
00438 Vol *vol;
00439 Dir *last_collision;
00440 Event *trigger;
00441 CacheKey *read_key;
00442 ContinuationHandler save_handler;
00443 uint32_t pin_in_cache;
00444 ink_hrtime start_time;
00445 int base_stat;
00446 int recursive;
00447 int closed;
00448 uint64_t seek_to;
00449 int64_t offset;
00450 int64_t writer_offset;
00451 int64_t length;
00452 int64_t doc_pos;
00453 uint64_t write_pos;
00454 uint64_t total_len;
00455 uint64_t doc_len;
00456 uint64_t update_len;
00457 int fragment;
00458 int scan_msec_delay;
00459 CacheVC *write_vc;
00460 char *hostname;
00461 int host_len;
00462 int header_to_write_len;
00463 void *header_to_write;
00464 short writer_lock_retry;
00465 #if TS_USE_INTERIM_CACHE == 1
00466 InterimCacheVol *interim_vol;
00467 MigrateToInterimCache *mts;
00468 uint64_t dir_off;
00469 #endif
00470 union
00471 {
00472 uint32_t flags;
00473 struct
00474 {
00475 unsigned int use_first_key:1;
00476 unsigned int overwrite:1;
00477 unsigned int close_complete:1;
00478 unsigned int sync:1;
00479 unsigned int evacuator:1;
00480 unsigned int single_fragment:1;
00481 unsigned int evac_vector:1;
00482 unsigned int lookup:1;
00483 unsigned int update:1;
00484 unsigned int remove:1;
00485 unsigned int remove_aborted_writers:1;
00486 unsigned int open_read_timeout:1;
00487 unsigned int data_done:1;
00488 unsigned int read_from_writer_called:1;
00489 unsigned int not_from_ram_cache:1;
00490 unsigned int rewrite_resident_alt:1;
00491 unsigned int readers:1;
00492 unsigned int doc_from_ram_cache:1;
00493 unsigned int hit_evacuate:1;
00494 #if TS_USE_INTERIM_CACHE == 1
00495 unsigned int read_from_interim:1;
00496 unsigned int write_into_interim:1;
00497 unsigned int ram_fixup:1;
00498 unsigned int transistor:1;
00499 #endif
00500 #ifdef HTTP_CACHE
00501 unsigned int allow_empty_doc:1;
00502 #endif
00503 } f;
00504 };
00505
00506
00507 char *scan_vol_map;
00508
00509
00510 off_t scan_fix_buffer_offset;
00511
00512 };
00513
00514 #define PUSH_HANDLER(_x) do { \
00515 ink_assert(handler != (ContinuationHandler)(&CacheVC::dead)); \
00516 save_handler = handler; handler = (ContinuationHandler)(_x); \
00517 } while (0)
00518
00519 #define POP_HANDLER do { \
00520 handler = save_handler; \
00521 ink_assert(handler != (ContinuationHandler)(&CacheVC::dead)); \
00522 } while (0)
00523
00524 struct CacheRemoveCont: public Continuation
00525 {
00526 int event_handler(int event, void *data);
00527
00528 CacheRemoveCont()
00529 : Continuation(NULL)
00530 { }
00531 };
00532
00533
00534
00535 #if TS_USE_INTERIM_CACHE == 1
00536 extern ClassAllocator<MigrateToInterimCache> migrateToInterimCacheAllocator;
00537 #endif
00538 extern ClassAllocator<CacheVC> cacheVConnectionAllocator;
00539 extern CacheKey zero_key;
00540 extern CacheSync *cacheDirSync;
00541
00542 #ifdef HTTP_CACHE
00543 int cache_write(CacheVC *, CacheHTTPInfoVector *);
00544 int get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key);
00545 #endif
00546 CacheVC *new_DocEvacuator(int nbytes, Vol *d);
00547
00548
00549
00550 TS_INLINE CacheVC *
00551 new_CacheVC(Continuation *cont)
00552 {
00553 EThread *t = cont->mutex->thread_holding;
00554 CacheVC *c = THREAD_ALLOC(cacheVConnectionAllocator, t);
00555 #ifdef HTTP_CACHE
00556 c->vector.data.data = &c->vector.data.fast_data[0];
00557 #endif
00558 c->_action = cont;
00559 c->initial_thread = t->tt == DEDICATED ? NULL : t;
00560 c->mutex = cont->mutex;
00561 c->start_time = ink_get_hrtime();
00562 ink_assert(c->trigger == NULL);
00563 Debug("cache_new", "new %p", c);
00564 #ifdef CACHE_STAT_PAGES
00565 ink_assert(!c->stat_link.next);
00566 ink_assert(!c->stat_link.prev);
00567 #endif
00568 dir_clear(&c->dir);
00569 return c;
00570 }
00571
00572 TS_INLINE int
00573 free_CacheVC(CacheVC *cont)
00574 {
00575 Debug("cache_free", "free %p", cont);
00576 ProxyMutex *mutex = cont->mutex;
00577 Vol *vol = cont->vol;
00578 if (vol) {
00579 CACHE_DECREMENT_DYN_STAT(cont->base_stat + CACHE_STAT_ACTIVE);
00580 if (cont->closed > 0) {
00581 CACHE_INCREMENT_DYN_STAT(cont->base_stat + CACHE_STAT_SUCCESS);
00582 #if TS_USE_INTERIM_CACHE == 1
00583 if (cont->vio.op == VIO::READ) {
00584 if (cont->f.doc_from_ram_cache) {
00585 CACHE_INCREMENT_DYN_STAT(cache_ram_read_success_stat);
00586 } else if (cont->f.read_from_interim) {
00587 CACHE_INCREMENT_DYN_STAT(cache_interim_read_success_stat);
00588 } else {
00589 CACHE_INCREMENT_DYN_STAT(cache_disk_read_success_stat);
00590 }
00591 }
00592 #endif
00593 }
00594 }
00595 ink_assert(mutex->thread_holding == this_ethread());
00596 if (cont->trigger)
00597 cont->trigger->cancel();
00598 ink_assert(!cont->is_io_in_progress());
00599 ink_assert(!cont->od);
00600
00601
00602
00603 cont->io.action.continuation = NULL;
00604 cont->io.action.mutex = NULL;
00605 cont->io.mutex.clear();
00606 cont->io.aio_result = 0;
00607 cont->io.aiocb.aio_nbytes = 0;
00608 cont->io.aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
00609 #ifdef HTTP_CACHE
00610 cont->request.reset();
00611 cont->vector.clear();
00612 #endif
00613 cont->vio.buffer.clear();
00614 cont->vio.mutex.clear();
00615 #ifdef HTTP_CACHE
00616 if (cont->vio.op == VIO::WRITE && cont->alternate_index == CACHE_ALT_INDEX_DEFAULT)
00617 cont->alternate.destroy();
00618 else
00619 cont->alternate.clear();
00620 #endif
00621 cont->_action.cancelled = 0;
00622 cont->_action.mutex.clear();
00623 cont->mutex.clear();
00624 cont->buf.clear();
00625 cont->first_buf.clear();
00626 cont->blocks.clear();
00627 cont->writer_buf.clear();
00628 cont->alternate_index = CACHE_ALT_INDEX_DEFAULT;
00629 if (cont->scan_vol_map)
00630 ats_free(cont->scan_vol_map);
00631 memset((char *) &cont->vio, 0, cont->size_to_init);
00632 #ifdef CACHE_STAT_PAGES
00633 ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
00634 #endif
00635 #ifdef DEBUG
00636 SET_CONTINUATION_HANDLER(cont, &CacheVC::dead);
00637 #endif
00638 THREAD_FREE(cont, cacheVConnectionAllocator, this_thread());
00639 return EVENT_DONE;
00640 }
00641
00642 TS_INLINE int
00643 CacheVC::calluser(int event)
00644 {
00645 recursive++;
00646 ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
00647 vio._cont->handleEvent(event, (void *) &vio);
00648 recursive--;
00649 if (closed) {
00650 die();
00651 return EVENT_DONE;
00652 }
00653 return EVENT_CONT;
00654 }
00655
00656 TS_INLINE int
00657 CacheVC::callcont(int event)
00658 {
00659 recursive++;
00660 ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
00661 _action.continuation->handleEvent(event, this);
00662 recursive--;
00663 if (closed)
00664 die();
00665 else if (vio.vc_server)
00666 handleEvent(EVENT_IMMEDIATE, 0);
00667 return EVENT_DONE;
00668 }
00669
00670 TS_INLINE int
00671 CacheVC::do_read_call(CacheKey *akey)
00672 {
00673 doc_pos = 0;
00674 read_key = akey;
00675 io.aiocb.aio_nbytes = dir_approx_size(&dir);
00676 #if TS_USE_INTERIM_CACHE == 1
00677 interim_vol = NULL;
00678 ink_assert(mts == NULL);
00679 mts = NULL;
00680 f.write_into_interim = 0;
00681 f.ram_fixup = 0;
00682 f.transistor = 0;
00683 f.read_from_interim = dir_ininterim(&dir);
00684
00685 if (!f.read_from_interim && vio.op == VIO::READ && good_interim_disks > 0){
00686 vol->history.put_key(read_key);
00687 if (vol->history.is_hot(read_key) && !vol->migrate_probe(read_key, NULL) && !od) {
00688 f.write_into_interim = 1;
00689 }
00690 }
00691 if (f.read_from_interim) {
00692 interim_vol = &vol->interim_vols[dir_get_index(&dir)];
00693 if (vio.op == VIO::READ && vol_transistor_range_valid(interim_vol, &dir)
00694 && !vol->migrate_probe(read_key, NULL) && !od)
00695 f.transistor = 1;
00696 }
00697 if (f.write_into_interim || f.transistor) {
00698 mts = migrateToInterimCacheAllocator.alloc();
00699 mts->vc = this;
00700 mts->key = *read_key;
00701 mts->rewrite = (f.transistor == 1);
00702 dir_assign(&mts->dir, &dir);
00703 vol->set_migrate_in_progress(mts);
00704 }
00705 #endif
00706 PUSH_HANDLER(&CacheVC::handleRead);
00707 return handleRead(EVENT_CALL, 0);
00708 }
00709
00710 TS_INLINE int
00711 CacheVC::do_write_call()
00712 {
00713 PUSH_HANDLER(&CacheVC::handleWrite);
00714 return handleWrite(EVENT_CALL, 0);
00715 }
00716
00717 TS_INLINE void
00718 CacheVC::cancel_trigger()
00719 {
00720 if (trigger) {
00721 trigger->cancel_action();
00722 trigger = NULL;
00723 }
00724 }
00725
00726 TS_INLINE int
00727 CacheVC::die()
00728 {
00729 if (vio.op == VIO::WRITE) {
00730 #ifdef HTTP_CACHE
00731 if (f.update && total_len) {
00732 alternate.object_key_set(earliest_key);
00733 }
00734 #endif
00735 if (!is_io_in_progress()) {
00736 SET_HANDLER(&CacheVC::openWriteClose);
00737 if (!recursive)
00738 openWriteClose(EVENT_NONE, NULL);
00739 }
00740 return EVENT_CONT;
00741 } else {
00742 if (is_io_in_progress())
00743 save_handler = (ContinuationHandler) & CacheVC::openReadClose;
00744 else {
00745 SET_HANDLER(&CacheVC::openReadClose);
00746 if (!recursive)
00747 openReadClose(EVENT_NONE, NULL);
00748 }
00749 return EVENT_CONT;
00750 }
00751 }
00752
00753 TS_INLINE int
00754 CacheVC::handleWriteLock(int , Event *e)
00755 {
00756 cancel_trigger();
00757 int ret = 0;
00758 {
00759 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00760 if (!lock) {
00761 set_agg_write_in_progress();
00762 trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
00763 return EVENT_CONT;
00764 }
00765 ret = handleWrite(EVENT_CALL, e);
00766 }
00767 if (ret == EVENT_RETURN)
00768 return handleEvent(AIO_EVENT_DONE, 0);
00769 return EVENT_CONT;
00770 }
00771
00772 TS_INLINE int
00773 CacheVC::do_write_lock()
00774 {
00775 PUSH_HANDLER(&CacheVC::handleWriteLock);
00776 return handleWriteLock(EVENT_NONE, 0);
00777 }
00778
00779 TS_INLINE int
00780 CacheVC::do_write_lock_call()
00781 {
00782 PUSH_HANDLER(&CacheVC::handleWriteLock);
00783 return handleWriteLock(EVENT_CALL, 0);
00784 }
00785
00786 TS_INLINE bool
00787 CacheVC::writer_done()
00788 {
00789 OpenDirEntry *cod = od;
00790 if (!cod)
00791 cod = vol->open_read(&first_key);
00792 CacheVC *w = (cod) ? cod->writers.head : NULL;
00793
00794
00795
00796
00797 for (; w && (w != write_vc || w->start_time > start_time); w = (CacheVC *) w->opendir_link.next);
00798 if (!w)
00799 return true;
00800 return false;
00801 }
00802
00803 TS_INLINE int
00804 Vol::close_write(CacheVC *cont)
00805 {
00806 #ifdef CACHE_STAT_PAGES
00807 ink_assert(stat_cache_vcs.head);
00808 stat_cache_vcs.remove(cont, cont->stat_link);
00809 ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
00810 #endif
00811 return open_dir.close_write(cont);
00812 }
00813
00814
00815 TS_INLINE int
00816 Vol::open_write(CacheVC *cont, int allow_if_writers, int max_writers)
00817 {
00818 Vol *vol = this;
00819 bool agg_error = false;
00820 if (!cont->f.remove) {
00821 agg_error = (!cont->f.update && agg_todo_size > cache_config_agg_write_backlog);
00822 #ifdef CACHE_AGG_FAIL_RATE
00823 agg_error = agg_error || ((uint32_t) mutex->thread_holding->generator.random() <
00824 (uint32_t) (UINT_MAX * CACHE_AGG_FAIL_RATE));
00825 #endif
00826 }
00827 if (agg_error) {
00828 CACHE_INCREMENT_DYN_STAT(cache_write_backlog_failure_stat);
00829 return ECACHE_WRITE_FAIL;
00830 }
00831 #if TS_USE_INTERIM_CACHE == 1
00832 MigrateToInterimCache *m_result = NULL;
00833 if (vol->migrate_probe(&cont->first_key, &m_result)) {
00834 m_result->notMigrate = true;
00835 }
00836 #endif
00837 if (open_dir.open_write(cont, allow_if_writers, max_writers)) {
00838 #ifdef CACHE_STAT_PAGES
00839 ink_assert(cont->mutex->thread_holding == this_ethread());
00840 ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
00841 stat_cache_vcs.enqueue(cont, cont->stat_link);
00842 #endif
00843 return 0;
00844 }
00845 return ECACHE_DOC_BUSY;
00846 }
00847
00848 TS_INLINE int
00849 Vol::close_write_lock(CacheVC *cont)
00850 {
00851 EThread *t = cont->mutex->thread_holding;
00852 CACHE_TRY_LOCK(lock, mutex, t);
00853 if (!lock)
00854 return -1;
00855 return close_write(cont);
00856 }
00857
00858 TS_INLINE int
00859 Vol::open_write_lock(CacheVC *cont, int allow_if_writers, int max_writers)
00860 {
00861 EThread *t = cont->mutex->thread_holding;
00862 CACHE_TRY_LOCK(lock, mutex, t);
00863 if (!lock)
00864 return -1;
00865 return open_write(cont, allow_if_writers, max_writers);
00866 }
00867
00868 TS_INLINE OpenDirEntry *
00869 Vol::open_read_lock(INK_MD5 *key, EThread *t)
00870 {
00871 CACHE_TRY_LOCK(lock, mutex, t);
00872 if (!lock)
00873 return NULL;
00874 return open_dir.open_read(key);
00875 }
00876
00877 TS_INLINE int
00878 Vol::begin_read_lock(CacheVC *cont)
00879 {
00880
00881 #ifndef CACHE_STAT_PAGES
00882 if (cont->f.single_fragment)
00883 return 0;
00884 #endif
00885
00886 EThread *t = cont->mutex->thread_holding;
00887 CACHE_TRY_LOCK(lock, mutex, t);
00888 if (!lock)
00889 return -1;
00890 return begin_read(cont);
00891 }
00892
00893 TS_INLINE int
00894 Vol::close_read_lock(CacheVC *cont)
00895 {
00896 EThread *t = cont->mutex->thread_holding;
00897 CACHE_TRY_LOCK(lock, mutex, t);
00898 if (!lock)
00899 return -1;
00900 return close_read(cont);
00901 }
00902
00903 TS_INLINE int
00904 dir_delete_lock(CacheKey *key, Vol *d, ProxyMutex *m, Dir *del)
00905 {
00906 EThread *thread = m->thread_holding;
00907 CACHE_TRY_LOCK(lock, d->mutex, thread);
00908 if (!lock)
00909 return -1;
00910 return dir_delete(key, d, del);
00911 }
00912
00913 TS_INLINE int
00914 dir_insert_lock(CacheKey *key, Vol *d, Dir *to_part, ProxyMutex *m)
00915 {
00916 EThread *thread = m->thread_holding;
00917 CACHE_TRY_LOCK(lock, d->mutex, thread);
00918 if (!lock)
00919 return -1;
00920 return dir_insert(key, d, to_part);
00921 }
00922
00923 TS_INLINE int
00924 dir_overwrite_lock(CacheKey *key, Vol *d, Dir *to_part, ProxyMutex *m, Dir *overwrite, bool must_overwrite = true)
00925 {
00926 EThread *thread = m->thread_holding;
00927 CACHE_TRY_LOCK(lock, d->mutex, thread);
00928 if (!lock)
00929 return -1;
00930 return dir_overwrite(key, d, to_part, overwrite, must_overwrite);
00931 }
00932
00933 void TS_INLINE
00934 rand_CacheKey(CacheKey *next_key, ProxyMutex *mutex)
00935 {
00936 next_key->b[0] = mutex->thread_holding->generator.random();
00937 next_key->b[1] = mutex->thread_holding->generator.random();
00938 }
00939
00940 extern uint8_t CacheKey_next_table[];
00941 void TS_INLINE
00942 next_CacheKey(CacheKey *next_key, CacheKey *key)
00943 {
00944 uint8_t *b = (uint8_t *) next_key;
00945 uint8_t *k = (uint8_t *) key;
00946 b[0] = CacheKey_next_table[k[0]];
00947 for (int i = 1; i < 16; i++)
00948 b[i] = CacheKey_next_table[(b[i - 1] + k[i]) & 0xFF];
00949 }
00950 extern uint8_t CacheKey_prev_table[];
00951 void TS_INLINE
00952 prev_CacheKey(CacheKey *prev_key, CacheKey *key)
00953 {
00954 uint8_t *b = (uint8_t *) prev_key;
00955 uint8_t *k = (uint8_t *) key;
00956 for (int i = 15; i > 0; i--)
00957 b[i] = 256 + CacheKey_prev_table[k[i]] - k[i - 1];
00958 b[0] = CacheKey_prev_table[k[0]];
00959 }
00960
00961 TS_INLINE unsigned int
00962 next_rand(unsigned int *p)
00963 {
00964 unsigned int seed = *p;
00965 seed = 1103515145 * seed + 12345;
00966 *p = seed;
00967 return seed;
00968 }
00969
00970 extern ClassAllocator<CacheRemoveCont> cacheRemoveContAllocator;
00971
00972 TS_INLINE CacheRemoveCont *
00973 new_CacheRemoveCont()
00974 {
00975 CacheRemoveCont *cache_rm = cacheRemoveContAllocator.alloc();
00976
00977 cache_rm->mutex = new_ProxyMutex();
00978 SET_CONTINUATION_HANDLER(cache_rm, &CacheRemoveCont::event_handler);
00979 return cache_rm;
00980 }
00981
00982 TS_INLINE void
00983 free_CacheRemoveCont(CacheRemoveCont *cache_rm)
00984 {
00985 cache_rm->mutex = NULL;
00986 cacheRemoveContAllocator.free(cache_rm);
00987 }
00988
00989 TS_INLINE int
00990 CacheRemoveCont::event_handler(int event, void *data)
00991 {
00992 (void) event;
00993 (void) data;
00994 free_CacheRemoveCont(this);
00995 return EVENT_DONE;
00996 }
00997
00998 int64_t cache_bytes_used(void);
00999 int64_t cache_bytes_total(void);
01000
01001 #ifdef DEBUG
01002 #define CACHE_DEBUG_INCREMENT_DYN_STAT(_x) CACHE_INCREMENT_DYN_STAT(_x)
01003 #define CACHE_DEBUG_SUM_DYN_STAT(_x,_y) CACHE_SUM_DYN_STAT(_x,_y)
01004 #else
01005 #define CACHE_DEBUG_INCREMENT_DYN_STAT(_x)
01006 #define CACHE_DEBUG_SUM_DYN_STAT(_x,_y)
01007 #endif
01008
01009 struct CacheHostRecord;
01010 struct Vol;
01011 class CacheHostTable;
01012
01013 struct Cache
01014 {
01015 volatile int cache_read_done;
01016 volatile int total_good_nvol;
01017 volatile int total_nvol;
01018 volatile int ready;
01019 int64_t cache_size;
01020 CacheHostTable *hosttable;
01021 volatile int total_initialized_vol;
01022 CacheType scheme;
01023
01024 int open(bool reconfigure, bool fix);
01025 int close();
01026
01027 Action *lookup(Continuation *cont, CacheKey *key, CacheFragType type, char const* hostname, int host_len);
01028 inkcoreapi Action *open_read(Continuation *cont, CacheKey *key, CacheFragType type, char *hostname, int len);
01029 inkcoreapi Action *open_write(Continuation *cont, CacheKey *key,
01030 CacheFragType frag_type, int options = 0,
01031 time_t pin_in_cache = (time_t) 0, char *hostname = 0, int host_len = 0);
01032 inkcoreapi Action *remove(Continuation *cont, CacheKey *key,
01033 CacheFragType type = CACHE_FRAG_TYPE_HTTP,
01034 bool user_agents = true, bool link = false,
01035 char *hostname = 0, int host_len = 0);
01036 Action *scan(Continuation *cont, char *hostname = 0, int host_len = 0, int KB_per_second = 2500);
01037
01038 #ifdef HTTP_CACHE
01039 Action *lookup(Continuation *cont, URL *url, CacheFragType type);
01040 inkcoreapi Action *open_read(Continuation *cont, CacheKey *key,
01041 CacheHTTPHdr *request,
01042 CacheLookupHttpConfig *params, CacheFragType type, char *hostname, int host_len);
01043 Action *open_read(Continuation *cont, URL *url, CacheHTTPHdr *request,
01044 CacheLookupHttpConfig *params, CacheFragType type);
01045 Action *open_write(Continuation *cont, CacheKey *key,
01046 CacheHTTPInfo *old_info, time_t pin_in_cache = (time_t) 0,
01047 CacheKey *key1 = NULL,
01048 CacheFragType type = CACHE_FRAG_TYPE_HTTP, char *hostname = 0, int host_len = 0);
01049 Action *open_write(Continuation *cont, URL *url, CacheHTTPHdr *request,
01050 CacheHTTPInfo *old_info, time_t pin_in_cache = (time_t) 0,
01051 CacheFragType type = CACHE_FRAG_TYPE_HTTP);
01052 static void generate_key(INK_MD5 *md5, URL *url);
01053 #endif
01054
01055 Action *link(Continuation *cont, CacheKey *from, CacheKey *to, CacheFragType type, char *hostname, int host_len);
01056 Action *deref(Continuation *cont, CacheKey *key, CacheFragType type, char *hostname, int host_len);
01057
01058 void vol_initialized(bool result);
01059
01060 int open_done();
01061
01062 Vol *key_to_vol(CacheKey *key, char const* hostname, int host_len);
01063
01064 Cache()
01065 : cache_read_done(0), total_good_nvol(0), total_nvol(0), ready(CACHE_INITIALIZING), cache_size(0),
01066 hosttable(NULL), total_initialized_vol(0), scheme(CACHE_NONE_TYPE)
01067 { }
01068 };
01069
01070 extern Cache *theCache;
01071 extern Cache *theStreamCache;
01072 inkcoreapi extern Cache *caches[NUM_CACHE_FRAG_TYPES];
01073
01074 #ifdef HTTP_CACHE
01075 TS_INLINE Action *
01076 Cache::open_read(Continuation *cont, CacheURL *url, CacheHTTPHdr *request,
01077 CacheLookupHttpConfig *params, CacheFragType type)
01078 {
01079 INK_MD5 md5;
01080 int len;
01081 url->hash_get(&md5);
01082 const char *hostname = url->host_get(&len);
01083 return open_read(cont, &md5, request, params, type, (char *) hostname, len);
01084 }
01085
01086 TS_INLINE void
01087 Cache::generate_key(INK_MD5 *md5, URL *url)
01088 {
01089 url->hash_get(md5);
01090 }
01091
01092 TS_INLINE Action *
01093 Cache::open_write(Continuation *cont, CacheURL *url, CacheHTTPHdr *request,
01094 CacheHTTPInfo *old_info, time_t pin_in_cache, CacheFragType type)
01095 {
01096 (void) request;
01097 INK_MD5 url_md5;
01098 url->hash_get(&url_md5);
01099 int len;
01100 const char *hostname = url->host_get(&len);
01101
01102 return open_write(cont, &url_md5, old_info, pin_in_cache, NULL, type, (char *) hostname, len);
01103 }
01104 #endif
01105
01106 TS_INLINE unsigned int
01107 cache_hash(INK_MD5 & md5)
01108 {
01109 uint64_t f = md5.fold();
01110 unsigned int mhash = (unsigned int) (f >> 32);
01111 return mhash;
01112 }
01113
01114 #ifdef HTTP_CACHE
01115 #define CLUSTER_CACHE
01116 #endif
01117 #ifdef CLUSTER_CACHE
01118 #include "P_Net.h"
01119 #include "P_ClusterInternal.h"
01120
01121
01122 #include "P_ClusterInline.h"
01123 #endif
01124
01125 TS_INLINE Action *
01126 CacheProcessor::lookup(Continuation *cont, CacheKey *key, bool cluster_cache_local ATS_UNUSED,
01127 bool local_only ATS_UNUSED, CacheFragType frag_type, char *hostname, int host_len)
01128 {
01129 #ifdef CLUSTER_CACHE
01130
01131 if ((cache_clustering_enabled > 0) && !cluster_cache_local && !local_only) {
01132 Action *a = Cluster_lookup(cont, key, frag_type, hostname, host_len);
01133 if (a) {
01134 return a;
01135 }
01136 }
01137 #endif
01138 return caches[frag_type]->lookup(cont, key, frag_type, hostname, host_len);
01139 }
01140
01141 TS_INLINE inkcoreapi Action *
01142 CacheProcessor::open_read(Continuation *cont, CacheKey *key, bool cluster_cache_local ATS_UNUSED,
01143 CacheFragType frag_type, char *hostname, int host_len)
01144 {
01145 #ifdef CLUSTER_CACHE
01146 if (cache_clustering_enabled > 0 && !cluster_cache_local) {
01147 return open_read_internal(CACHE_OPEN_READ, cont, (MIOBuffer *) 0,
01148 (CacheURL *) 0, (CacheHTTPHdr *) 0,
01149 (CacheLookupHttpConfig *) 0, key, 0, frag_type, hostname, host_len);
01150 }
01151 #endif
01152 return caches[frag_type]->open_read(cont, key, frag_type, hostname, host_len);
01153 }
01154
01155 TS_INLINE Action *
01156 CacheProcessor::open_read_buffer(Continuation *cont, MIOBuffer *buf ATS_UNUSED, CacheKey *key, CacheFragType frag_type,
01157 char *hostname, int host_len)
01158 {
01159 #ifdef CLUSTER_CACHE
01160 if (cache_clustering_enabled > 0) {
01161 return open_read_internal(CACHE_OPEN_READ_BUFFER, cont, buf,
01162 (CacheURL *) 0, (CacheHTTPHdr *) 0,
01163 (CacheLookupHttpConfig *) 0, key, 0, frag_type, hostname, host_len);
01164 }
01165 #endif
01166 return caches[frag_type]->open_read(cont, key, frag_type, hostname, host_len);
01167 }
01168
01169
01170 TS_INLINE inkcoreapi Action *
01171 CacheProcessor::open_write(Continuation *cont, CacheKey *key, bool cluster_cache_local ATS_UNUSED,
01172 CacheFragType frag_type, int expected_size ATS_UNUSED, int options,
01173 time_t pin_in_cache, char *hostname, int host_len)
01174 {
01175 #ifdef CLUSTER_CACHE
01176 if (cache_clustering_enabled > 0 && !cluster_cache_local) {
01177 ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
01178 if (m)
01179 return Cluster_write(cont, expected_size, (MIOBuffer *) 0, m,
01180 key, frag_type, options, pin_in_cache,
01181 CACHE_OPEN_WRITE, key, (CacheURL *) 0,
01182 (CacheHTTPHdr *) 0, (CacheHTTPInfo *) 0, hostname, host_len);
01183 }
01184 #endif
01185 return caches[frag_type]->open_write(cont, key, frag_type, options, pin_in_cache, hostname, host_len);
01186 }
01187
01188 TS_INLINE Action *
01189 CacheProcessor::open_write_buffer(Continuation *cont, MIOBuffer *buf, CacheKey *key,
01190 CacheFragType frag_type, int options, time_t pin_in_cache,
01191 char *hostname, int host_len)
01192 {
01193 (void)pin_in_cache;
01194 (void)cont;
01195 (void)buf;
01196 (void)key;
01197 (void)frag_type;
01198 (void)options;
01199 (void)hostname;
01200 (void)host_len;
01201 ink_assert(!"implemented");
01202 return NULL;
01203 }
01204
01205 TS_INLINE Action *
01206 CacheProcessor::remove(Continuation *cont, CacheKey *key, bool cluster_cache_local ATS_UNUSED, CacheFragType frag_type,
01207 bool rm_user_agents, bool rm_link, char *hostname, int host_len)
01208 {
01209 Debug("cache_remove", "[CacheProcessor::remove] Issuing cache delete for %u", cache_hash(*key));
01210 #ifdef CLUSTER_CACHE
01211 if (cache_clustering_enabled > 0 && !cluster_cache_local) {
01212 ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
01213
01214 if (m) {
01215 return Cluster_remove(m, cont, key, rm_user_agents, rm_link, frag_type, hostname, host_len);
01216 }
01217 }
01218 #endif
01219 return caches[frag_type]->remove(cont, key, frag_type, rm_user_agents, rm_link, hostname, host_len);
01220 }
01221
01222 # if 0
01223 TS_INLINE Action *
01224 scan(Continuation *cont, char *hostname = 0, int host_len = 0, int KB_per_second = 2500)
01225 {
01226 return caches[CACHE_FRAG_TYPE_HTTP]->scan(cont, hostname, host_len, KB_per_second);
01227 }
01228 # endif
01229
01230 #ifdef HTTP_CACHE
01231 TS_INLINE Action *
01232 CacheProcessor::lookup(Continuation *cont, URL *url, bool cluster_cache_local, bool local_only, CacheFragType frag_type)
01233 {
01234 (void) local_only;
01235 INK_MD5 md5;
01236 url->hash_get(&md5);
01237 int host_len = 0;
01238 const char *hostname = url->host_get(&host_len);
01239
01240 return lookup(cont, &md5, cluster_cache_local, local_only, frag_type, (char *) hostname, host_len);
01241 }
01242
01243 TS_INLINE Action *
01244 CacheProcessor::open_read_buffer(Continuation *cont, MIOBuffer *buf,
01245 URL *url, CacheHTTPHdr *request, CacheLookupHttpConfig *params, CacheFragType type)
01246 {
01247 (void) buf;
01248 #ifdef CLUSTER_CACHE
01249 if (cache_clustering_enabled > 0) {
01250 return open_read_internal(CACHE_OPEN_READ_BUFFER_LONG, cont, buf, url,
01251 request, params, (CacheKey *) 0, 0, type, (char *) 0, 0);
01252 }
01253 #endif
01254 return caches[type]->open_read(cont, url, request, params, type);
01255 }
01256
01257 TS_INLINE Action *
01258 CacheProcessor::open_write_buffer(Continuation * cont, MIOBuffer * buf, URL * url,
01259 CacheHTTPHdr * request, CacheHTTPHdr * response, CacheFragType type)
01260 {
01261 (void) cont;
01262 (void) buf;
01263 (void) url;
01264 (void) request;
01265 (void) response;
01266 (void) type;
01267 ink_assert(!"implemented");
01268 return NULL;
01269 }
01270
01271 #endif
01272
01273
01274 #ifdef CLUSTER_CACHE
01275 TS_INLINE Action *
01276 CacheProcessor::open_read_internal(int opcode,
01277 Continuation *cont, MIOBuffer *buf,
01278 CacheURL *url,
01279 CacheHTTPHdr *request,
01280 CacheLookupHttpConfig *params,
01281 CacheKey *key,
01282 time_t pin_in_cache, CacheFragType frag_type, char *hostname, int host_len)
01283 {
01284 INK_MD5 url_md5;
01285 if ((opcode == CACHE_OPEN_READ_LONG) || (opcode == CACHE_OPEN_READ_BUFFER_LONG)) {
01286 Cache::generate_key(&url_md5, url);
01287 } else {
01288 url_md5 = *key;
01289 }
01290 ClusterMachine *m = cluster_machine_at_depth(cache_hash(url_md5));
01291
01292 if (m) {
01293 return Cluster_read(m, opcode, cont, buf, url,
01294 request, params, key, pin_in_cache, frag_type, hostname, host_len);
01295 } else {
01296 if ((opcode == CACHE_OPEN_READ_LONG)
01297 || (opcode == CACHE_OPEN_READ_BUFFER_LONG)) {
01298 return caches[frag_type]->open_read(cont, &url_md5, request, params, frag_type, hostname, host_len);
01299 } else {
01300 return caches[frag_type]->open_read(cont, key, frag_type, hostname, host_len);
01301 }
01302 }
01303 }
01304 #endif
01305
01306 #ifdef CLUSTER_CACHE
01307 TS_INLINE Action *
01308 CacheProcessor::link(Continuation *cont, CacheKey *from, CacheKey *to, bool cluster_cache_local,
01309 CacheFragType type, char *hostname, int host_len)
01310 {
01311 if (cache_clustering_enabled > 0 && !cluster_cache_local) {
01312
01313 ClusterMachine *m = cluster_machine_at_depth(cache_hash(*from));
01314 if (m) {
01315 return Cluster_link(m, cont, from, to, type, hostname, host_len);
01316 }
01317 }
01318 return caches[type]->link(cont, from, to, type, hostname, host_len);
01319 }
01320
01321 TS_INLINE Action *
01322 CacheProcessor::deref(Continuation *cont, CacheKey *key, bool cluster_cache_local, CacheFragType type, char *hostname, int host_len)
01323 {
01324 if (cache_clustering_enabled > 0 && !cluster_cache_local) {
01325 ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
01326 if (m) {
01327 return Cluster_deref(m, cont, key, type, hostname, host_len);
01328 }
01329 }
01330 return caches[type]->deref(cont, key, type, hostname, host_len);
01331 }
01332 #endif
01333
01334 TS_INLINE Action *
01335 CacheProcessor::scan(Continuation *cont, char *hostname, int host_len, int KB_per_second)
01336 {
01337 return caches[CACHE_FRAG_TYPE_HTTP]->scan(cont, hostname, host_len, KB_per_second);
01338 }
01339
01340 TS_INLINE int
01341 CacheProcessor::IsCacheEnabled()
01342 {
01343 return CacheProcessor::initialized;
01344 }
01345
01346 TS_INLINE bool
01347 CacheProcessor::IsCacheReady(CacheFragType type)
01348 {
01349 if (IsCacheEnabled() != CACHE_INITIALIZED)
01350 return 0;
01351 return (bool)(cache_ready & (1 << type));
01352 }
01353
01354 TS_INLINE Cache *
01355 local_cache()
01356 {
01357 return theCache;
01358 }
01359
01360 LINK_DEFINITION(CacheVC, opendir_link)
01361
01362 #endif