00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 #include "P_Cache.h"
00027 
00028 #define SCAN_BUF_SIZE      RECOVERY_SIZE
00029 #define SCAN_WRITER_LOCK_MAX_RETRY 5
00030 
00031 Action *
00032 Cache::scan(Continuation * cont, char *hostname, int host_len, int KB_per_second)
00033 {
00034   Debug("cache_scan_truss", "inside scan");
00035   if (!CacheProcessor::IsCacheReady(CACHE_FRAG_TYPE_HTTP)) {
00036     cont->handleEvent(CACHE_EVENT_SCAN_FAILED, 0);
00037     return ACTION_RESULT_DONE;
00038   }
00039 
00040   CacheVC *c = new_CacheVC(cont);
00041   c->vol = NULL;
00042   
00043   c->hostname = hostname;
00044   c->host_len = host_len;
00045   c->base_stat = cache_scan_active_stat;
00046   c->buf = new_IOBufferData(BUFFER_SIZE_FOR_XMALLOC(SCAN_BUF_SIZE), MEMALIGNED);
00047   c->scan_msec_delay = (SCAN_BUF_SIZE / KB_per_second);
00048   c->offset = 0;
00049   SET_CONTINUATION_HANDLER(c, &CacheVC::scanVol);
00050   eventProcessor.schedule_in(c, HRTIME_MSECONDS(c->scan_msec_delay));
00051   cont->handleEvent(CACHE_EVENT_SCAN, c);
00052   return &c->_action;
00053 }
00054 
00055 int
00056 CacheVC::scanVol(int , Event * )
00057 {
00058   Debug("cache_scan_truss", "inside %p:scanVol", this);
00059   if (_action.cancelled)
00060     return free_CacheVC(this);
00061   CacheHostRecord *rec = &theCache->hosttable->gen_host_rec;
00062   if (host_len) {
00063     CacheHostResult res;
00064     theCache->hosttable->Match(hostname, host_len, &res);
00065     if (res.record)
00066       rec = res.record;
00067   }
00068   if (!vol) {
00069     if (!rec->num_vols)
00070       goto Ldone;
00071     vol = rec->vols[0];
00072   } else {
00073     for (int i = 0; i < rec->num_vols - 1; i++)
00074       if (vol == rec->vols[i]) {
00075         vol = rec->vols[i + 1];
00076         goto Lcont;
00077       }
00078     goto Ldone;
00079   }
00080 Lcont:
00081   fragment = 0;
00082   SET_HANDLER(&CacheVC::scanObject);
00083   eventProcessor.schedule_in(this, HRTIME_MSECONDS(scan_msec_delay));
00084   return EVENT_CONT;
00085 Ldone:
00086   _action.continuation->handleEvent(CACHE_EVENT_SCAN_DONE, NULL);
00087   return free_CacheVC(this);
00088 }
00089 
00090 
00091 
00092 
00093 
00094 
00095 
00096 static off_t next_in_map(Vol *d, char *vol_map, off_t offset)
00097 {
00098   off_t start_offset = vol_offset_to_offset(d, 0);
00099   off_t new_off = (offset - start_offset);
00100   off_t vol_len = vol_relative_length(d, start_offset);
00101 
00102   while (new_off < vol_len && !vol_map[new_off / SCAN_BUF_SIZE]) new_off += SCAN_BUF_SIZE;
00103   if (new_off >= vol_len) return vol_len + start_offset;
00104   return new_off + start_offset;
00105 }
00106 
00107 
00108 int
00109 dir_bucket_loop_fix(Dir *start_dir, int s, Vol *d);
00110 
00111 
00112 
00113 
00114 
00115 
00116 
00117 static char *make_vol_map(Vol *d)
00118 {
00119   
00120   off_t start_offset = vol_offset_to_offset(d, 0);
00121   off_t vol_len = vol_relative_length(d, start_offset);
00122   size_t map_len = (vol_len + (SCAN_BUF_SIZE - 1)) / SCAN_BUF_SIZE;
00123   char *vol_map = (char *)ats_malloc(map_len);
00124 
00125   memset(vol_map, 0, map_len);
00126 
00127   
00128   
00129   for (int s = 0; s < d->segments; s++) {
00130     Dir *seg = dir_segment(s, d);
00131     for (int b = 0; b < d->buckets; b++) {
00132       Dir *e = dir_bucket(b, seg);
00133       if (dir_bucket_loop_fix(e, s, d)) {
00134         break;
00135       }
00136       while (e) {
00137         if (dir_offset(e)) {
00138             off_t offset = vol_offset(d, e) - start_offset;
00139             if (offset <= vol_len) vol_map[offset / SCAN_BUF_SIZE] = 1;
00140         }
00141         e = next_dir(e, seg);
00142         if (!e)
00143           break;
00144       }
00145     }
00146   }
00147   return vol_map;
00148 }
00149 
00150 int
00151 CacheVC::scanObject(int , Event * )
00152 {
00153   Debug("cache_scan_truss", "inside %p:scanObject", this);
00154 
00155   Doc *doc = NULL;
00156   void *result = NULL;
00157 #ifdef HTTP_CACHE
00158   int hlen = 0;
00159   char hname[500];
00160   bool hostinfo_copied = false;
00161 #endif
00162   off_t next_object_len = 0;
00163   bool might_need_overlap_read = false;
00164 
00165   cancel_trigger();
00166   set_io_not_in_progress();
00167   if (_action.cancelled)
00168     return free_CacheVC(this);
00169 
00170   CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00171   if (!lock) {
00172     Debug("cache_scan_truss", "delay %p:scanObject", this);
00173     mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
00174     return EVENT_CONT;
00175   }
00176 
00177   if (!fragment) {               
00178     fragment = 1;
00179     scan_vol_map = make_vol_map(vol);
00180     io.aiocb.aio_offset = next_in_map(vol, scan_vol_map, vol_offset_to_offset(vol, 0));
00181     if (io.aiocb.aio_offset >= (off_t)(vol->skip + vol->len))
00182       goto Ldone;
00183     io.aiocb.aio_nbytes = SCAN_BUF_SIZE;
00184     io.aiocb.aio_buf = buf->data();
00185     io.action = this;
00186     io.thread = AIO_CALLBACK_THREAD_ANY;
00187     Debug("cache_scan_truss", "read %p:scanObject", this);
00188     goto Lread;
00189   }
00190 
00191   if ((size_t)io.aio_result != (size_t) io.aiocb.aio_nbytes) {
00192     result = (void *) -ECACHE_READ_FAIL;
00193     goto Ldone;
00194   }
00195 
00196   doc = (Doc *) (buf->data() + offset);
00197   
00198   
00199   if (scan_fix_buffer_offset) {
00200     io.aio_result += scan_fix_buffer_offset;
00201     io.aiocb.aio_nbytes += scan_fix_buffer_offset;
00202     io.aiocb.aio_offset -= scan_fix_buffer_offset;
00203     io.aiocb.aio_buf = (char *)io.aiocb.aio_buf - scan_fix_buffer_offset;
00204     scan_fix_buffer_offset = 0;
00205   }
00206   while ((off_t)((char *) doc - buf->data()) + next_object_len < (off_t)io.aiocb.aio_nbytes) {
00207     might_need_overlap_read = false;
00208     doc = (Doc *) ((char *) doc + next_object_len);
00209     next_object_len = vol->round_to_approx_size(doc->len);
00210 #ifdef HTTP_CACHE
00211     int i;
00212     bool changed;
00213 
00214     if (doc->magic != DOC_MAGIC) {
00215       next_object_len = CACHE_BLOCK_SIZE;
00216       Debug("cache_scan_truss", "blockskip %p:scanObject", this);
00217       continue;
00218     }
00219       
00220     if (doc->doc_type != CACHE_FRAG_TYPE_HTTP || !doc->hlen)
00221       goto Lskip;
00222 
00223     last_collision = NULL;
00224     while (1) {
00225       if (!dir_probe(&doc->first_key, vol, &dir, &last_collision))
00226         goto Lskip;
00227       if (!dir_agg_valid(vol, &dir) || !dir_head(&dir) ||
00228           (vol_offset(vol, &dir) != io.aiocb.aio_offset + ((char *) doc - buf->data())))
00229         continue;
00230       break;
00231     }
00232     if (doc->data() - buf->data() > (int) io.aiocb.aio_nbytes) {
00233       might_need_overlap_read = true;
00234       goto Lskip;
00235     }
00236     {
00237       char *tmp = doc->hdr();
00238       int len = doc->hlen;
00239       while (len > 0) {
00240         int r = HTTPInfo::unmarshal(tmp, len, buf._ptr());
00241         if (r < 0) {
00242           ink_assert(!"CacheVC::scanObject unmarshal failed");
00243           goto Lskip;
00244         }
00245         len -= r;
00246         tmp += r;
00247       }
00248     }
00249     if (this->load_http_info(&vector, doc) != doc->hlen)
00250       goto Lskip;
00251     changed = false;
00252     hostinfo_copied = 0;
00253     for (i = 0; i < vector.count(); i++) {
00254       if (!vector.get(i)->valid())
00255         goto Lskip;
00256       if (!hostinfo_copied) {
00257         memccpy(hname, vector.get(i)->request_get()->host_get(&hlen), 0, 500);
00258         hname[hlen] = 0;
00259         Debug("cache_scan", "hostname = '%s', hostlen = %d", hname, hlen);
00260         hostinfo_copied = 1;
00261       }
00262       vector.get(i)->object_key_get(&key);
00263       alternate_index = i;
00264       
00265       if (!(key == doc->key)) {
00266         last_collision = NULL;
00267         if (!dir_probe(&key, vol, &earliest_dir, &last_collision))
00268           continue;
00269       }
00270       earliest_key = key;
00271       int result1 = _action.continuation->handleEvent(CACHE_EVENT_SCAN_OBJECT, vector.get(i));
00272       switch (result1) {
00273       case CACHE_SCAN_RESULT_CONTINUE:
00274         continue;
00275       case CACHE_SCAN_RESULT_DELETE:
00276         changed = true;
00277         vector.remove(i, true);
00278         i--;
00279         continue;
00280       case CACHE_SCAN_RESULT_DELETE_ALL_ALTERNATES:
00281         changed = true;
00282         vector.clear();
00283         i = 0;
00284         break;
00285       case CACHE_SCAN_RESULT_UPDATE:
00286         ink_assert(alternate_index >= 0);
00287         vector.insert(&alternate, alternate_index);
00288         if (!vector.get(alternate_index)->valid())
00289           continue;
00290         changed = true;
00291         continue;
00292       case EVENT_DONE:
00293         goto Lcancel;
00294       default:
00295         ink_assert(!"unexpected CACHE_SCAN_RESULT");
00296         continue;
00297       }
00298     }
00299     if (changed) {
00300       if (!vector.count()) {
00301         ink_assert(hostinfo_copied);
00302         SET_HANDLER(&CacheVC::scanRemoveDone);
00303         
00304         cacheProcessor.remove(this, &doc->first_key, true, CACHE_FRAG_TYPE_HTTP, true, false, (char *) hname, hlen);
00305         return EVENT_CONT;
00306       } else {
00307         offset = (char *) doc - buf->data();
00308         write_len = 0;
00309         frag_type = CACHE_FRAG_TYPE_HTTP;
00310         f.use_first_key = 1;
00311         f.evac_vector = 1;
00312         first_key = key = doc->first_key;
00313         alternate_index = CACHE_ALT_REMOVED;
00314         earliest_key = zero_key;
00315         writer_lock_retry = 0;
00316         SET_HANDLER(&CacheVC::scanOpenWrite);
00317         return scanOpenWrite(EVENT_NONE, 0);
00318       }
00319     }
00320     continue;
00321   Lskip:;
00322 #endif
00323   }
00324 #ifdef HTTP_CACHE
00325   vector.clear();
00326 #endif
00327     
00328     
00329   if (might_need_overlap_read &&
00330       ((off_t)((char *) doc - buf->data()) + next_object_len > (off_t)io.aiocb.aio_nbytes) &&
00331       next_object_len > 0) {
00332     off_t partial_object_len = io.aiocb.aio_nbytes - ((char *)doc - buf->data());
00333     
00334     memmove(buf->data(), (char *)doc, partial_object_len);
00335     io.aiocb.aio_offset += io.aiocb.aio_nbytes;
00336     io.aiocb.aio_nbytes = SCAN_BUF_SIZE - partial_object_len;
00337     io.aiocb.aio_buf = buf->data() + partial_object_len;
00338     scan_fix_buffer_offset = partial_object_len;
00339   } else { 
00340     io.aiocb.aio_offset += ((char *)doc - buf->data()) + next_object_len;
00341     Debug("cache_scan_truss", "next %p:scanObject %" PRId64, this, (int64_t)io.aiocb.aio_offset);
00342     io.aiocb.aio_offset = next_in_map(vol, scan_vol_map, io.aiocb.aio_offset);
00343     Debug("cache_scan_truss", "next_in_map %p:scanObject %" PRId64, this, (int64_t)io.aiocb.aio_offset);
00344     io.aiocb.aio_nbytes = SCAN_BUF_SIZE;
00345     io.aiocb.aio_buf = buf->data();
00346     scan_fix_buffer_offset = 0;
00347   }
00348 
00349   if (io.aiocb.aio_offset >= vol->skip + vol->len) {
00350     SET_HANDLER(&CacheVC::scanVol);
00351     eventProcessor.schedule_in(this, HRTIME_MSECONDS(scan_msec_delay));
00352     return EVENT_CONT;
00353   }
00354 
00355 Lread:
00356   io.aiocb.aio_fildes = vol->fd;
00357   if ((off_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > (off_t)(vol->skip + vol->len))
00358     io.aiocb.aio_nbytes = vol->skip + vol->len - io.aiocb.aio_offset;
00359   offset = 0;
00360   ink_assert(ink_aio_read(&io) >= 0);
00361   Debug("cache_scan_truss", "read %p:scanObject %" PRId64 " %zu", this,
00362         (int64_t)io.aiocb.aio_offset, (size_t)io.aiocb.aio_nbytes);
00363   return EVENT_CONT;
00364 
00365 Ldone:
00366    Debug("cache_scan_truss", "done %p:scanObject", this);
00367   _action.continuation->handleEvent(CACHE_EVENT_SCAN_DONE, result);
00368 #ifdef HTTP_CACHE
00369 Lcancel:
00370 #endif
00371   return free_CacheVC(this);
00372 }
00373 
00374 int
00375 CacheVC::scanRemoveDone(int , Event * )
00376 {
00377   Debug("cache_scan_truss", "inside %p:scanRemoveDone", this);
00378   Debug("cache_scan", "remove done.");
00379 #ifdef HTTP_CACHE
00380   alternate.destroy();
00381 #endif
00382   SET_HANDLER(&CacheVC::scanObject);
00383   return handleEvent(EVENT_IMMEDIATE, 0);
00384 }
00385 
00386 int
00387 CacheVC::scanOpenWrite(int , Event * )
00388 {
00389   Debug("cache_scan_truss", "inside %p:scanOpenWrite", this);
00390   cancel_trigger();
00391   
00392   if (writer_lock_retry > SCAN_WRITER_LOCK_MAX_RETRY) {
00393     int r = _action.continuation->handleEvent(CACHE_EVENT_SCAN_OPERATION_BLOCKED, 0);
00394     Debug("cache_scan", "still havent got the writer lock, asking user..");
00395     switch (r) {
00396     case CACHE_SCAN_RESULT_RETRY:
00397       writer_lock_retry = 0;
00398       break;
00399     case CACHE_SCAN_RESULT_CONTINUE:
00400       SET_HANDLER(&CacheVC::scanObject);
00401       return scanObject(EVENT_IMMEDIATE, 0);
00402     }
00403   }
00404   int ret = 0;
00405   {
00406     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00407     if (!lock) {
00408       Debug("cache_scan", "vol->mutex %p:scanOpenWrite", this);
00409       VC_SCHED_LOCK_RETRY();
00410     }
00411 
00412     Debug("cache_scan", "trying for writer lock");
00413     if (vol->open_write(this, false, 1)) {
00414       writer_lock_retry++;
00415       SET_HANDLER(&CacheVC::scanOpenWrite);
00416       mutex->thread_holding->schedule_in_local(this, scan_msec_delay);
00417       return EVENT_CONT;
00418     }
00419 
00420     ink_assert(this->od);
00421     
00422     int alt_count = vector.count();
00423     for (int i = 0; i < alt_count; i++) {
00424       write_vector->insert(vector.get(i));
00425     }
00426     od->writing_vec = 1;
00427     vector.clear(false);
00428     
00429     
00430     Debug("cache_scan", "got writer lock");
00431     Dir *l = NULL;
00432     Dir d;
00433     Doc *doc = (Doc *) (buf->data() + offset);
00434     offset = (char *) doc - buf->data() + vol->round_to_approx_size(doc->len);
00435     
00436     
00437     
00438     dir_assign(&od->first_dir, &dir);
00439     if (doc->total_len) {
00440       dir_assign(&od->single_doc_dir, &dir);
00441       dir_set_tag(&od->single_doc_dir, doc->key.slice32(2));
00442       od->single_doc_key = doc->key;
00443       od->move_resident_alt = 1;
00444     }
00445 
00446     while (1) {
00447       if (!dir_probe(&first_key, vol, &d, &l)) {
00448         vol->close_write(this);
00449         _action.continuation->handleEvent(CACHE_EVENT_SCAN_OPERATION_FAILED, 0);
00450         SET_HANDLER(&CacheVC::scanObject);
00451         return handleEvent(EVENT_IMMEDIATE, 0);
00452       }
00453       if (memcmp(&dir, &d, SIZEOF_DIR)) {
00454         Debug("cache_scan", "dir entry has changed");
00455         continue;
00456       }
00457       break;
00458     }
00459 
00460     
00461     
00462     
00463     if (f.evac_vector)
00464       header_len = write_vector->marshal_length();
00465     SET_HANDLER(&CacheVC::scanUpdateDone);
00466     ret = do_write_call();
00467   }
00468   if (ret == EVENT_RETURN)
00469     return handleEvent(AIO_EVENT_DONE, 0);
00470   return ret;
00471 }
00472 
00473 int
00474 CacheVC::scanUpdateDone(int , Event * )
00475 {
00476   Debug("cache_scan_truss", "inside %p:scanUpdateDone", this);
00477   cancel_trigger();
00478   
00479   CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00480   if (lock) {
00481     
00482     dir_overwrite(&first_key, vol, &dir, &od->first_dir, false);
00483     if (od->move_resident_alt) {
00484       dir_insert(&od->single_doc_key, vol, &od->single_doc_dir);
00485     }
00486     ink_assert(vol->open_read(&first_key));
00487     ink_assert(this->od);
00488     vol->close_write(this);
00489     SET_HANDLER(&CacheVC::scanObject);
00490     return handleEvent(EVENT_IMMEDIATE, 0);
00491   } else {
00492     mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
00493     return EVENT_CONT;
00494   }
00495 }
00496