00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026 #include "P_Cache.h"
00027
00028 #define SCAN_BUF_SIZE RECOVERY_SIZE
00029 #define SCAN_WRITER_LOCK_MAX_RETRY 5
00030
00031 Action *
00032 Cache::scan(Continuation * cont, char *hostname, int host_len, int KB_per_second)
00033 {
00034 Debug("cache_scan_truss", "inside scan");
00035 if (!CacheProcessor::IsCacheReady(CACHE_FRAG_TYPE_HTTP)) {
00036 cont->handleEvent(CACHE_EVENT_SCAN_FAILED, 0);
00037 return ACTION_RESULT_DONE;
00038 }
00039
00040 CacheVC *c = new_CacheVC(cont);
00041 c->vol = NULL;
00042
00043 c->hostname = hostname;
00044 c->host_len = host_len;
00045 c->base_stat = cache_scan_active_stat;
00046 c->buf = new_IOBufferData(BUFFER_SIZE_FOR_XMALLOC(SCAN_BUF_SIZE), MEMALIGNED);
00047 c->scan_msec_delay = (SCAN_BUF_SIZE / KB_per_second);
00048 c->offset = 0;
00049 SET_CONTINUATION_HANDLER(c, &CacheVC::scanVol);
00050 eventProcessor.schedule_in(c, HRTIME_MSECONDS(c->scan_msec_delay));
00051 cont->handleEvent(CACHE_EVENT_SCAN, c);
00052 return &c->_action;
00053 }
00054
00055 int
00056 CacheVC::scanVol(int , Event * )
00057 {
00058 Debug("cache_scan_truss", "inside %p:scanVol", this);
00059 if (_action.cancelled)
00060 return free_CacheVC(this);
00061 CacheHostRecord *rec = &theCache->hosttable->gen_host_rec;
00062 if (host_len) {
00063 CacheHostResult res;
00064 theCache->hosttable->Match(hostname, host_len, &res);
00065 if (res.record)
00066 rec = res.record;
00067 }
00068 if (!vol) {
00069 if (!rec->num_vols)
00070 goto Ldone;
00071 vol = rec->vols[0];
00072 } else {
00073 for (int i = 0; i < rec->num_vols - 1; i++)
00074 if (vol == rec->vols[i]) {
00075 vol = rec->vols[i + 1];
00076 goto Lcont;
00077 }
00078 goto Ldone;
00079 }
00080 Lcont:
00081 fragment = 0;
00082 SET_HANDLER(&CacheVC::scanObject);
00083 eventProcessor.schedule_in(this, HRTIME_MSECONDS(scan_msec_delay));
00084 return EVENT_CONT;
00085 Ldone:
00086 _action.continuation->handleEvent(CACHE_EVENT_SCAN_DONE, NULL);
00087 return free_CacheVC(this);
00088 }
00089
00090
00091
00092
00093
00094
00095
00096 static off_t next_in_map(Vol *d, char *vol_map, off_t offset)
00097 {
00098 off_t start_offset = vol_offset_to_offset(d, 0);
00099 off_t new_off = (offset - start_offset);
00100 off_t vol_len = vol_relative_length(d, start_offset);
00101
00102 while (new_off < vol_len && !vol_map[new_off / SCAN_BUF_SIZE]) new_off += SCAN_BUF_SIZE;
00103 if (new_off >= vol_len) return vol_len + start_offset;
00104 return new_off + start_offset;
00105 }
00106
00107
00108 int
00109 dir_bucket_loop_fix(Dir *start_dir, int s, Vol *d);
00110
00111
00112
00113
00114
00115
00116
00117 static char *make_vol_map(Vol *d)
00118 {
00119
00120 off_t start_offset = vol_offset_to_offset(d, 0);
00121 off_t vol_len = vol_relative_length(d, start_offset);
00122 size_t map_len = (vol_len + (SCAN_BUF_SIZE - 1)) / SCAN_BUF_SIZE;
00123 char *vol_map = (char *)ats_malloc(map_len);
00124
00125 memset(vol_map, 0, map_len);
00126
00127
00128
00129 for (int s = 0; s < d->segments; s++) {
00130 Dir *seg = dir_segment(s, d);
00131 for (int b = 0; b < d->buckets; b++) {
00132 Dir *e = dir_bucket(b, seg);
00133 if (dir_bucket_loop_fix(e, s, d)) {
00134 break;
00135 }
00136 while (e) {
00137 if (dir_offset(e)) {
00138 off_t offset = vol_offset(d, e) - start_offset;
00139 if (offset <= vol_len) vol_map[offset / SCAN_BUF_SIZE] = 1;
00140 }
00141 e = next_dir(e, seg);
00142 if (!e)
00143 break;
00144 }
00145 }
00146 }
00147 return vol_map;
00148 }
00149
00150 int
00151 CacheVC::scanObject(int , Event * )
00152 {
00153 Debug("cache_scan_truss", "inside %p:scanObject", this);
00154
00155 Doc *doc = NULL;
00156 void *result = NULL;
00157 #ifdef HTTP_CACHE
00158 int hlen = 0;
00159 char hname[500];
00160 bool hostinfo_copied = false;
00161 #endif
00162 off_t next_object_len = 0;
00163 bool might_need_overlap_read = false;
00164
00165 cancel_trigger();
00166 set_io_not_in_progress();
00167 if (_action.cancelled)
00168 return free_CacheVC(this);
00169
00170 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00171 if (!lock) {
00172 Debug("cache_scan_truss", "delay %p:scanObject", this);
00173 mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
00174 return EVENT_CONT;
00175 }
00176
00177 if (!fragment) {
00178 fragment = 1;
00179 scan_vol_map = make_vol_map(vol);
00180 io.aiocb.aio_offset = next_in_map(vol, scan_vol_map, vol_offset_to_offset(vol, 0));
00181 if (io.aiocb.aio_offset >= (off_t)(vol->skip + vol->len))
00182 goto Ldone;
00183 io.aiocb.aio_nbytes = SCAN_BUF_SIZE;
00184 io.aiocb.aio_buf = buf->data();
00185 io.action = this;
00186 io.thread = AIO_CALLBACK_THREAD_ANY;
00187 Debug("cache_scan_truss", "read %p:scanObject", this);
00188 goto Lread;
00189 }
00190
00191 if ((size_t)io.aio_result != (size_t) io.aiocb.aio_nbytes) {
00192 result = (void *) -ECACHE_READ_FAIL;
00193 goto Ldone;
00194 }
00195
00196 doc = (Doc *) (buf->data() + offset);
00197
00198
00199 if (scan_fix_buffer_offset) {
00200 io.aio_result += scan_fix_buffer_offset;
00201 io.aiocb.aio_nbytes += scan_fix_buffer_offset;
00202 io.aiocb.aio_offset -= scan_fix_buffer_offset;
00203 io.aiocb.aio_buf = (char *)io.aiocb.aio_buf - scan_fix_buffer_offset;
00204 scan_fix_buffer_offset = 0;
00205 }
00206 while ((off_t)((char *) doc - buf->data()) + next_object_len < (off_t)io.aiocb.aio_nbytes) {
00207 might_need_overlap_read = false;
00208 doc = (Doc *) ((char *) doc + next_object_len);
00209 next_object_len = vol->round_to_approx_size(doc->len);
00210 #ifdef HTTP_CACHE
00211 int i;
00212 bool changed;
00213
00214 if (doc->magic != DOC_MAGIC) {
00215 next_object_len = CACHE_BLOCK_SIZE;
00216 Debug("cache_scan_truss", "blockskip %p:scanObject", this);
00217 continue;
00218 }
00219
00220 if (doc->doc_type != CACHE_FRAG_TYPE_HTTP || !doc->hlen)
00221 goto Lskip;
00222
00223 last_collision = NULL;
00224 while (1) {
00225 if (!dir_probe(&doc->first_key, vol, &dir, &last_collision))
00226 goto Lskip;
00227 if (!dir_agg_valid(vol, &dir) || !dir_head(&dir) ||
00228 (vol_offset(vol, &dir) != io.aiocb.aio_offset + ((char *) doc - buf->data())))
00229 continue;
00230 break;
00231 }
00232 if (doc->data() - buf->data() > (int) io.aiocb.aio_nbytes) {
00233 might_need_overlap_read = true;
00234 goto Lskip;
00235 }
00236 {
00237 char *tmp = doc->hdr();
00238 int len = doc->hlen;
00239 while (len > 0) {
00240 int r = HTTPInfo::unmarshal(tmp, len, buf._ptr());
00241 if (r < 0) {
00242 ink_assert(!"CacheVC::scanObject unmarshal failed");
00243 goto Lskip;
00244 }
00245 len -= r;
00246 tmp += r;
00247 }
00248 }
00249 if (this->load_http_info(&vector, doc) != doc->hlen)
00250 goto Lskip;
00251 changed = false;
00252 hostinfo_copied = 0;
00253 for (i = 0; i < vector.count(); i++) {
00254 if (!vector.get(i)->valid())
00255 goto Lskip;
00256 if (!hostinfo_copied) {
00257 memccpy(hname, vector.get(i)->request_get()->host_get(&hlen), 0, 500);
00258 hname[hlen] = 0;
00259 Debug("cache_scan", "hostname = '%s', hostlen = %d", hname, hlen);
00260 hostinfo_copied = 1;
00261 }
00262 vector.get(i)->object_key_get(&key);
00263 alternate_index = i;
00264
00265 if (!(key == doc->key)) {
00266 last_collision = NULL;
00267 if (!dir_probe(&key, vol, &earliest_dir, &last_collision))
00268 continue;
00269 }
00270 earliest_key = key;
00271 int result1 = _action.continuation->handleEvent(CACHE_EVENT_SCAN_OBJECT, vector.get(i));
00272 switch (result1) {
00273 case CACHE_SCAN_RESULT_CONTINUE:
00274 continue;
00275 case CACHE_SCAN_RESULT_DELETE:
00276 changed = true;
00277 vector.remove(i, true);
00278 i--;
00279 continue;
00280 case CACHE_SCAN_RESULT_DELETE_ALL_ALTERNATES:
00281 changed = true;
00282 vector.clear();
00283 i = 0;
00284 break;
00285 case CACHE_SCAN_RESULT_UPDATE:
00286 ink_assert(alternate_index >= 0);
00287 vector.insert(&alternate, alternate_index);
00288 if (!vector.get(alternate_index)->valid())
00289 continue;
00290 changed = true;
00291 continue;
00292 case EVENT_DONE:
00293 goto Lcancel;
00294 default:
00295 ink_assert(!"unexpected CACHE_SCAN_RESULT");
00296 continue;
00297 }
00298 }
00299 if (changed) {
00300 if (!vector.count()) {
00301 ink_assert(hostinfo_copied);
00302 SET_HANDLER(&CacheVC::scanRemoveDone);
00303
00304 cacheProcessor.remove(this, &doc->first_key, true, CACHE_FRAG_TYPE_HTTP, true, false, (char *) hname, hlen);
00305 return EVENT_CONT;
00306 } else {
00307 offset = (char *) doc - buf->data();
00308 write_len = 0;
00309 frag_type = CACHE_FRAG_TYPE_HTTP;
00310 f.use_first_key = 1;
00311 f.evac_vector = 1;
00312 first_key = key = doc->first_key;
00313 alternate_index = CACHE_ALT_REMOVED;
00314 earliest_key = zero_key;
00315 writer_lock_retry = 0;
00316 SET_HANDLER(&CacheVC::scanOpenWrite);
00317 return scanOpenWrite(EVENT_NONE, 0);
00318 }
00319 }
00320 continue;
00321 Lskip:;
00322 #endif
00323 }
00324 #ifdef HTTP_CACHE
00325 vector.clear();
00326 #endif
00327
00328
00329 if (might_need_overlap_read &&
00330 ((off_t)((char *) doc - buf->data()) + next_object_len > (off_t)io.aiocb.aio_nbytes) &&
00331 next_object_len > 0) {
00332 off_t partial_object_len = io.aiocb.aio_nbytes - ((char *)doc - buf->data());
00333
00334 memmove(buf->data(), (char *)doc, partial_object_len);
00335 io.aiocb.aio_offset += io.aiocb.aio_nbytes;
00336 io.aiocb.aio_nbytes = SCAN_BUF_SIZE - partial_object_len;
00337 io.aiocb.aio_buf = buf->data() + partial_object_len;
00338 scan_fix_buffer_offset = partial_object_len;
00339 } else {
00340 io.aiocb.aio_offset += ((char *)doc - buf->data()) + next_object_len;
00341 Debug("cache_scan_truss", "next %p:scanObject %" PRId64, this, (int64_t)io.aiocb.aio_offset);
00342 io.aiocb.aio_offset = next_in_map(vol, scan_vol_map, io.aiocb.aio_offset);
00343 Debug("cache_scan_truss", "next_in_map %p:scanObject %" PRId64, this, (int64_t)io.aiocb.aio_offset);
00344 io.aiocb.aio_nbytes = SCAN_BUF_SIZE;
00345 io.aiocb.aio_buf = buf->data();
00346 scan_fix_buffer_offset = 0;
00347 }
00348
00349 if (io.aiocb.aio_offset >= vol->skip + vol->len) {
00350 SET_HANDLER(&CacheVC::scanVol);
00351 eventProcessor.schedule_in(this, HRTIME_MSECONDS(scan_msec_delay));
00352 return EVENT_CONT;
00353 }
00354
00355 Lread:
00356 io.aiocb.aio_fildes = vol->fd;
00357 if ((off_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > (off_t)(vol->skip + vol->len))
00358 io.aiocb.aio_nbytes = vol->skip + vol->len - io.aiocb.aio_offset;
00359 offset = 0;
00360 ink_assert(ink_aio_read(&io) >= 0);
00361 Debug("cache_scan_truss", "read %p:scanObject %" PRId64 " %zu", this,
00362 (int64_t)io.aiocb.aio_offset, (size_t)io.aiocb.aio_nbytes);
00363 return EVENT_CONT;
00364
00365 Ldone:
00366 Debug("cache_scan_truss", "done %p:scanObject", this);
00367 _action.continuation->handleEvent(CACHE_EVENT_SCAN_DONE, result);
00368 #ifdef HTTP_CACHE
00369 Lcancel:
00370 #endif
00371 return free_CacheVC(this);
00372 }
00373
00374 int
00375 CacheVC::scanRemoveDone(int , Event * )
00376 {
00377 Debug("cache_scan_truss", "inside %p:scanRemoveDone", this);
00378 Debug("cache_scan", "remove done.");
00379 #ifdef HTTP_CACHE
00380 alternate.destroy();
00381 #endif
00382 SET_HANDLER(&CacheVC::scanObject);
00383 return handleEvent(EVENT_IMMEDIATE, 0);
00384 }
00385
00386 int
00387 CacheVC::scanOpenWrite(int , Event * )
00388 {
00389 Debug("cache_scan_truss", "inside %p:scanOpenWrite", this);
00390 cancel_trigger();
00391
00392 if (writer_lock_retry > SCAN_WRITER_LOCK_MAX_RETRY) {
00393 int r = _action.continuation->handleEvent(CACHE_EVENT_SCAN_OPERATION_BLOCKED, 0);
00394 Debug("cache_scan", "still havent got the writer lock, asking user..");
00395 switch (r) {
00396 case CACHE_SCAN_RESULT_RETRY:
00397 writer_lock_retry = 0;
00398 break;
00399 case CACHE_SCAN_RESULT_CONTINUE:
00400 SET_HANDLER(&CacheVC::scanObject);
00401 return scanObject(EVENT_IMMEDIATE, 0);
00402 }
00403 }
00404 int ret = 0;
00405 {
00406 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00407 if (!lock) {
00408 Debug("cache_scan", "vol->mutex %p:scanOpenWrite", this);
00409 VC_SCHED_LOCK_RETRY();
00410 }
00411
00412 Debug("cache_scan", "trying for writer lock");
00413 if (vol->open_write(this, false, 1)) {
00414 writer_lock_retry++;
00415 SET_HANDLER(&CacheVC::scanOpenWrite);
00416 mutex->thread_holding->schedule_in_local(this, scan_msec_delay);
00417 return EVENT_CONT;
00418 }
00419
00420 ink_assert(this->od);
00421
00422 int alt_count = vector.count();
00423 for (int i = 0; i < alt_count; i++) {
00424 write_vector->insert(vector.get(i));
00425 }
00426 od->writing_vec = 1;
00427 vector.clear(false);
00428
00429
00430 Debug("cache_scan", "got writer lock");
00431 Dir *l = NULL;
00432 Dir d;
00433 Doc *doc = (Doc *) (buf->data() + offset);
00434 offset = (char *) doc - buf->data() + vol->round_to_approx_size(doc->len);
00435
00436
00437
00438 dir_assign(&od->first_dir, &dir);
00439 if (doc->total_len) {
00440 dir_assign(&od->single_doc_dir, &dir);
00441 dir_set_tag(&od->single_doc_dir, doc->key.slice32(2));
00442 od->single_doc_key = doc->key;
00443 od->move_resident_alt = 1;
00444 }
00445
00446 while (1) {
00447 if (!dir_probe(&first_key, vol, &d, &l)) {
00448 vol->close_write(this);
00449 _action.continuation->handleEvent(CACHE_EVENT_SCAN_OPERATION_FAILED, 0);
00450 SET_HANDLER(&CacheVC::scanObject);
00451 return handleEvent(EVENT_IMMEDIATE, 0);
00452 }
00453 if (memcmp(&dir, &d, SIZEOF_DIR)) {
00454 Debug("cache_scan", "dir entry has changed");
00455 continue;
00456 }
00457 break;
00458 }
00459
00460
00461
00462
00463 if (f.evac_vector)
00464 header_len = write_vector->marshal_length();
00465 SET_HANDLER(&CacheVC::scanUpdateDone);
00466 ret = do_write_call();
00467 }
00468 if (ret == EVENT_RETURN)
00469 return handleEvent(AIO_EVENT_DONE, 0);
00470 return ret;
00471 }
00472
00473 int
00474 CacheVC::scanUpdateDone(int , Event * )
00475 {
00476 Debug("cache_scan_truss", "inside %p:scanUpdateDone", this);
00477 cancel_trigger();
00478
00479 CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00480 if (lock) {
00481
00482 dir_overwrite(&first_key, vol, &dir, &od->first_dir, false);
00483 if (od->move_resident_alt) {
00484 dir_insert(&od->single_doc_key, vol, &od->single_doc_dir);
00485 }
00486 ink_assert(vol->open_read(&first_key));
00487 ink_assert(this->od);
00488 vol->close_write(this);
00489 SET_HANDLER(&CacheVC::scanObject);
00490 return handleEvent(EVENT_IMMEDIATE, 0);
00491 } else {
00492 mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
00493 return EVENT_CONT;
00494 }
00495 }
00496