• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

CacheVol.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 
00025 
00026 #include "P_Cache.h"
00027 
00028 #define SCAN_BUF_SIZE      RECOVERY_SIZE
00029 #define SCAN_WRITER_LOCK_MAX_RETRY 5
00030 
00031 Action *
00032 Cache::scan(Continuation * cont, char *hostname, int host_len, int KB_per_second)
00033 {
00034   Debug("cache_scan_truss", "inside scan");
00035   if (!CacheProcessor::IsCacheReady(CACHE_FRAG_TYPE_HTTP)) {
00036     cont->handleEvent(CACHE_EVENT_SCAN_FAILED, 0);
00037     return ACTION_RESULT_DONE;
00038   }
00039 
00040   CacheVC *c = new_CacheVC(cont);
00041   c->vol = NULL;
00042   /* do we need to make a copy */
00043   c->hostname = hostname;
00044   c->host_len = host_len;
00045   c->base_stat = cache_scan_active_stat;
00046   c->buf = new_IOBufferData(BUFFER_SIZE_FOR_XMALLOC(SCAN_BUF_SIZE), MEMALIGNED);
00047   c->scan_msec_delay = (SCAN_BUF_SIZE / KB_per_second);
00048   c->offset = 0;
00049   SET_CONTINUATION_HANDLER(c, &CacheVC::scanVol);
00050   eventProcessor.schedule_in(c, HRTIME_MSECONDS(c->scan_msec_delay));
00051   cont->handleEvent(CACHE_EVENT_SCAN, c);
00052   return &c->_action;
00053 }
00054 
00055 int
00056 CacheVC::scanVol(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
00057 {
00058   Debug("cache_scan_truss", "inside %p:scanVol", this);
00059   if (_action.cancelled)
00060     return free_CacheVC(this);
00061   CacheHostRecord *rec = &theCache->hosttable->gen_host_rec;
00062   if (host_len) {
00063     CacheHostResult res;
00064     theCache->hosttable->Match(hostname, host_len, &res);
00065     if (res.record)
00066       rec = res.record;
00067   }
00068   if (!vol) {
00069     if (!rec->num_vols)
00070       goto Ldone;
00071     vol = rec->vols[0];
00072   } else {
00073     for (int i = 0; i < rec->num_vols - 1; i++)
00074       if (vol == rec->vols[i]) {
00075         vol = rec->vols[i + 1];
00076         goto Lcont;
00077       }
00078     goto Ldone;
00079   }
00080 Lcont:
00081   fragment = 0;
00082   SET_HANDLER(&CacheVC::scanObject);
00083   eventProcessor.schedule_in(this, HRTIME_MSECONDS(scan_msec_delay));
00084   return EVENT_CONT;
00085 Ldone:
00086   _action.continuation->handleEvent(CACHE_EVENT_SCAN_DONE, NULL);
00087   return free_CacheVC(this);
00088 }
00089 
00090 /* Next block with some data in it in this partition.  Returns end of partition if no more
00091  * locations.
00092  *
00093  * d - Vol
00094  * vol_map - precalculated map
00095  * offset - offset to start looking at (and data at this location has not been read yet). */
00096 static off_t next_in_map(Vol *d, char *vol_map, off_t offset)
00097 {
00098   off_t start_offset = vol_offset_to_offset(d, 0);
00099   off_t new_off = (offset - start_offset);
00100   off_t vol_len = vol_relative_length(d, start_offset);
00101 
00102   while (new_off < vol_len && !vol_map[new_off / SCAN_BUF_SIZE]) new_off += SCAN_BUF_SIZE;
00103   if (new_off >= vol_len) return vol_len + start_offset;
00104   return new_off + start_offset;
00105 }
00106 
00107 // Function in CacheDir.cc that we need for make_vol_map().
00108 int
00109 dir_bucket_loop_fix(Dir *start_dir, int s, Vol *d);
00110 
00111 // TODO: If we used a bit vector, we could make a smaller map structure.
00112 // TODO: If we saved a high water mark we could have a smaller buf, and avoid searching it
00113 // when we are asked about the highest interesting offset.
00114 /* Make map of what blocks in partition are used.
00115  *
00116  * d - Vol to make a map of. */
00117 static char *make_vol_map(Vol *d)
00118 {
00119   // Map will be one byte for each SCAN_BUF_SIZE bytes.
00120   off_t start_offset = vol_offset_to_offset(d, 0);
00121   off_t vol_len = vol_relative_length(d, start_offset);
00122   size_t map_len = (vol_len + (SCAN_BUF_SIZE - 1)) / SCAN_BUF_SIZE;
00123   char *vol_map = (char *)ats_malloc(map_len);
00124 
00125   memset(vol_map, 0, map_len);
00126 
00127   // Scan directories.
00128   // Copied from dir_entries_used() and modified to fill in the map instead.
00129   for (int s = 0; s < d->segments; s++) {
00130     Dir *seg = dir_segment(s, d);
00131     for (int b = 0; b < d->buckets; b++) {
00132       Dir *e = dir_bucket(b, seg);
00133       if (dir_bucket_loop_fix(e, s, d)) {
00134         break;
00135       }
00136       while (e) {
00137         if (dir_offset(e)) {
00138             off_t offset = vol_offset(d, e) - start_offset;
00139             if (offset <= vol_len) vol_map[offset / SCAN_BUF_SIZE] = 1;
00140         }
00141         e = next_dir(e, seg);
00142         if (!e)
00143           break;
00144       }
00145     }
00146   }
00147   return vol_map;
00148 }
00149 
00150 int
00151 CacheVC::scanObject(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
00152 {
00153   Debug("cache_scan_truss", "inside %p:scanObject", this);
00154 
00155   Doc *doc = NULL;
00156   void *result = NULL;
00157 #ifdef HTTP_CACHE
00158   int hlen = 0;
00159   char hname[500];
00160   bool hostinfo_copied = false;
00161 #endif
00162   off_t next_object_len = 0;
00163   bool might_need_overlap_read = false;
00164 
00165   cancel_trigger();
00166   set_io_not_in_progress();
00167   if (_action.cancelled)
00168     return free_CacheVC(this);
00169 
00170   CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00171   if (!lock) {
00172     Debug("cache_scan_truss", "delay %p:scanObject", this);
00173     mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
00174     return EVENT_CONT;
00175   }
00176 
00177   if (!fragment) {               // initialize for first read
00178     fragment = 1;
00179     scan_vol_map = make_vol_map(vol);
00180     io.aiocb.aio_offset = next_in_map(vol, scan_vol_map, vol_offset_to_offset(vol, 0));
00181     if (io.aiocb.aio_offset >= (off_t)(vol->skip + vol->len))
00182       goto Ldone;
00183     io.aiocb.aio_nbytes = SCAN_BUF_SIZE;
00184     io.aiocb.aio_buf = buf->data();
00185     io.action = this;
00186     io.thread = AIO_CALLBACK_THREAD_ANY;
00187     Debug("cache_scan_truss", "read %p:scanObject", this);
00188     goto Lread;
00189   }
00190 
00191   if ((size_t)io.aio_result != (size_t) io.aiocb.aio_nbytes) {
00192     result = (void *) -ECACHE_READ_FAIL;
00193     goto Ldone;
00194   }
00195 
00196   doc = (Doc *) (buf->data() + offset);
00197   // If there is data in the buffer before the start that is from a partial object read previously
00198   // Fix things as if we read it this time.
00199   if (scan_fix_buffer_offset) {
00200     io.aio_result += scan_fix_buffer_offset;
00201     io.aiocb.aio_nbytes += scan_fix_buffer_offset;
00202     io.aiocb.aio_offset -= scan_fix_buffer_offset;
00203     io.aiocb.aio_buf = (char *)io.aiocb.aio_buf - scan_fix_buffer_offset;
00204     scan_fix_buffer_offset = 0;
00205   }
00206   while ((off_t)((char *) doc - buf->data()) + next_object_len < (off_t)io.aiocb.aio_nbytes) {
00207     might_need_overlap_read = false;
00208     doc = (Doc *) ((char *) doc + next_object_len);
00209     next_object_len = vol->round_to_approx_size(doc->len);
00210 #ifdef HTTP_CACHE
00211     int i;
00212     bool changed;
00213 
00214     if (doc->magic != DOC_MAGIC) {
00215       next_object_len = CACHE_BLOCK_SIZE;
00216       Debug("cache_scan_truss", "blockskip %p:scanObject", this);
00217       continue;
00218     }
00219       
00220     if (doc->doc_type != CACHE_FRAG_TYPE_HTTP || !doc->hlen)
00221       goto Lskip;
00222 
00223     last_collision = NULL;
00224     while (1) {
00225       if (!dir_probe(&doc->first_key, vol, &dir, &last_collision))
00226         goto Lskip;
00227       if (!dir_agg_valid(vol, &dir) || !dir_head(&dir) ||
00228           (vol_offset(vol, &dir) != io.aiocb.aio_offset + ((char *) doc - buf->data())))
00229         continue;
00230       break;
00231     }
00232     if (doc->data() - buf->data() > (int) io.aiocb.aio_nbytes) {
00233       might_need_overlap_read = true;
00234       goto Lskip;
00235     }
00236     {
00237       char *tmp = doc->hdr();
00238       int len = doc->hlen;
00239       while (len > 0) {
00240         int r = HTTPInfo::unmarshal(tmp, len, buf._ptr());
00241         if (r < 0) {
00242           ink_assert(!"CacheVC::scanObject unmarshal failed");
00243           goto Lskip;
00244         }
00245         len -= r;
00246         tmp += r;
00247       }
00248     }
00249     if (this->load_http_info(&vector, doc) != doc->hlen)
00250       goto Lskip;
00251     changed = false;
00252     hostinfo_copied = 0;
00253     for (i = 0; i < vector.count(); i++) {
00254       if (!vector.get(i)->valid())
00255         goto Lskip;
00256       if (!hostinfo_copied) {
00257         memccpy(hname, vector.get(i)->request_get()->host_get(&hlen), 0, 500);
00258         hname[hlen] = 0;
00259         Debug("cache_scan", "hostname = '%s', hostlen = %d", hname, hlen);
00260         hostinfo_copied = 1;
00261       }
00262       vector.get(i)->object_key_get(&key);
00263       alternate_index = i;
00264       // verify that the earliest block exists, reducing 'false hit' callbacks
00265       if (!(key == doc->key)) {
00266         last_collision = NULL;
00267         if (!dir_probe(&key, vol, &earliest_dir, &last_collision))
00268           continue;
00269       }
00270       earliest_key = key;
00271       int result1 = _action.continuation->handleEvent(CACHE_EVENT_SCAN_OBJECT, vector.get(i));
00272       switch (result1) {
00273       case CACHE_SCAN_RESULT_CONTINUE:
00274         continue;
00275       case CACHE_SCAN_RESULT_DELETE:
00276         changed = true;
00277         vector.remove(i, true);
00278         i--;
00279         continue;
00280       case CACHE_SCAN_RESULT_DELETE_ALL_ALTERNATES:
00281         changed = true;
00282         vector.clear();
00283         i = 0;
00284         break;
00285       case CACHE_SCAN_RESULT_UPDATE:
00286         ink_assert(alternate_index >= 0);
00287         vector.insert(&alternate, alternate_index);
00288         if (!vector.get(alternate_index)->valid())
00289           continue;
00290         changed = true;
00291         continue;
00292       case EVENT_DONE:
00293         goto Lcancel;
00294       default:
00295         ink_assert(!"unexpected CACHE_SCAN_RESULT");
00296         continue;
00297       }
00298     }
00299     if (changed) {
00300       if (!vector.count()) {
00301         ink_assert(hostinfo_copied);
00302         SET_HANDLER(&CacheVC::scanRemoveDone);
00303         // force remove even if there is a writer
00304         cacheProcessor.remove(this, &doc->first_key, true, CACHE_FRAG_TYPE_HTTP, true, false, (char *) hname, hlen);
00305         return EVENT_CONT;
00306       } else {
00307         offset = (char *) doc - buf->data();
00308         write_len = 0;
00309         frag_type = CACHE_FRAG_TYPE_HTTP;
00310         f.use_first_key = 1;
00311         f.evac_vector = 1;
00312         first_key = key = doc->first_key;
00313         alternate_index = CACHE_ALT_REMOVED;
00314         earliest_key = zero_key;
00315         writer_lock_retry = 0;
00316         SET_HANDLER(&CacheVC::scanOpenWrite);
00317         return scanOpenWrite(EVENT_NONE, 0);
00318       }
00319     }
00320     continue;
00321   Lskip:;
00322 #endif
00323   }
00324 #ifdef HTTP_CACHE
00325   vector.clear();
00326 #endif
00327     // If we had an object that went past the end of the buffer, and it is small enough to fix,
00328     // fix it.
00329   if (might_need_overlap_read &&
00330       ((off_t)((char *) doc - buf->data()) + next_object_len > (off_t)io.aiocb.aio_nbytes) &&
00331       next_object_len > 0) {
00332     off_t partial_object_len = io.aiocb.aio_nbytes - ((char *)doc - buf->data());
00333     // Copy partial object to beginning of the buffer.
00334     memmove(buf->data(), (char *)doc, partial_object_len);
00335     io.aiocb.aio_offset += io.aiocb.aio_nbytes;
00336     io.aiocb.aio_nbytes = SCAN_BUF_SIZE - partial_object_len;
00337     io.aiocb.aio_buf = buf->data() + partial_object_len;
00338     scan_fix_buffer_offset = partial_object_len;
00339   } else { // Normal case, where we ended on a object boundary.
00340     io.aiocb.aio_offset += ((char *)doc - buf->data()) + next_object_len;
00341     Debug("cache_scan_truss", "next %p:scanObject %" PRId64, this, (int64_t)io.aiocb.aio_offset);
00342     io.aiocb.aio_offset = next_in_map(vol, scan_vol_map, io.aiocb.aio_offset);
00343     Debug("cache_scan_truss", "next_in_map %p:scanObject %" PRId64, this, (int64_t)io.aiocb.aio_offset);
00344     io.aiocb.aio_nbytes = SCAN_BUF_SIZE;
00345     io.aiocb.aio_buf = buf->data();
00346     scan_fix_buffer_offset = 0;
00347   }
00348 
00349   if (io.aiocb.aio_offset >= vol->skip + vol->len) {
00350     SET_HANDLER(&CacheVC::scanVol);
00351     eventProcessor.schedule_in(this, HRTIME_MSECONDS(scan_msec_delay));
00352     return EVENT_CONT;
00353   }
00354 
00355 Lread:
00356   io.aiocb.aio_fildes = vol->fd;
00357   if ((off_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > (off_t)(vol->skip + vol->len))
00358     io.aiocb.aio_nbytes = vol->skip + vol->len - io.aiocb.aio_offset;
00359   offset = 0;
00360   ink_assert(ink_aio_read(&io) >= 0);
00361   Debug("cache_scan_truss", "read %p:scanObject %" PRId64 " %zu", this,
00362         (int64_t)io.aiocb.aio_offset, (size_t)io.aiocb.aio_nbytes);
00363   return EVENT_CONT;
00364 
00365 Ldone:
00366    Debug("cache_scan_truss", "done %p:scanObject", this);
00367   _action.continuation->handleEvent(CACHE_EVENT_SCAN_DONE, result);
00368 #ifdef HTTP_CACHE
00369 Lcancel:
00370 #endif
00371   return free_CacheVC(this);
00372 }
00373 
00374 int
00375 CacheVC::scanRemoveDone(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
00376 {
00377   Debug("cache_scan_truss", "inside %p:scanRemoveDone", this);
00378   Debug("cache_scan", "remove done.");
00379 #ifdef HTTP_CACHE
00380   alternate.destroy();
00381 #endif
00382   SET_HANDLER(&CacheVC::scanObject);
00383   return handleEvent(EVENT_IMMEDIATE, 0);
00384 }
00385 
00386 int
00387 CacheVC::scanOpenWrite(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
00388 {
00389   Debug("cache_scan_truss", "inside %p:scanOpenWrite", this);
00390   cancel_trigger();
00391   // get volume lock
00392   if (writer_lock_retry > SCAN_WRITER_LOCK_MAX_RETRY) {
00393     int r = _action.continuation->handleEvent(CACHE_EVENT_SCAN_OPERATION_BLOCKED, 0);
00394     Debug("cache_scan", "still havent got the writer lock, asking user..");
00395     switch (r) {
00396     case CACHE_SCAN_RESULT_RETRY:
00397       writer_lock_retry = 0;
00398       break;
00399     case CACHE_SCAN_RESULT_CONTINUE:
00400       SET_HANDLER(&CacheVC::scanObject);
00401       return scanObject(EVENT_IMMEDIATE, 0);
00402     }
00403   }
00404   int ret = 0;
00405   {
00406     CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00407     if (!lock) {
00408       Debug("cache_scan", "vol->mutex %p:scanOpenWrite", this);
00409       VC_SCHED_LOCK_RETRY();
00410     }
00411 
00412     Debug("cache_scan", "trying for writer lock");
00413     if (vol->open_write(this, false, 1)) {
00414       writer_lock_retry++;
00415       SET_HANDLER(&CacheVC::scanOpenWrite);
00416       mutex->thread_holding->schedule_in_local(this, scan_msec_delay);
00417       return EVENT_CONT;
00418     }
00419 
00420     ink_assert(this->od);
00421     // put all the alternates in the open directory vector
00422     int alt_count = vector.count();
00423     for (int i = 0; i < alt_count; i++) {
00424       write_vector->insert(vector.get(i));
00425     }
00426     od->writing_vec = 1;
00427     vector.clear(false);
00428     // check that the directory entry was not overwritten
00429     // if so return failure
00430     Debug("cache_scan", "got writer lock");
00431     Dir *l = NULL;
00432     Dir d;
00433     Doc *doc = (Doc *) (buf->data() + offset);
00434     offset = (char *) doc - buf->data() + vol->round_to_approx_size(doc->len);
00435     // if the doc contains some data, then we need to create
00436     // a new directory entry for this fragment. Remember the
00437     // offset and the key in earliest_key
00438     dir_assign(&od->first_dir, &dir);
00439     if (doc->total_len) {
00440       dir_assign(&od->single_doc_dir, &dir);
00441       dir_set_tag(&od->single_doc_dir, doc->key.slice32(2));
00442       od->single_doc_key = doc->key;
00443       od->move_resident_alt = 1;
00444     }
00445 
00446     while (1) {
00447       if (!dir_probe(&first_key, vol, &d, &l)) {
00448         vol->close_write(this);
00449         _action.continuation->handleEvent(CACHE_EVENT_SCAN_OPERATION_FAILED, 0);
00450         SET_HANDLER(&CacheVC::scanObject);
00451         return handleEvent(EVENT_IMMEDIATE, 0);
00452       }
00453       if (memcmp(&dir, &d, SIZEOF_DIR)) {
00454         Debug("cache_scan", "dir entry has changed");
00455         continue;
00456       }
00457       break;
00458     }
00459 
00460     // the document was not modified
00461     // we are safe from now on as we hold the
00462     // writer lock on the doc
00463     if (f.evac_vector)
00464       header_len = write_vector->marshal_length();
00465     SET_HANDLER(&CacheVC::scanUpdateDone);
00466     ret = do_write_call();
00467   }
00468   if (ret == EVENT_RETURN)
00469     return handleEvent(AIO_EVENT_DONE, 0);
00470   return ret;
00471 }
00472 
00473 int
00474 CacheVC::scanUpdateDone(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
00475 {
00476   Debug("cache_scan_truss", "inside %p:scanUpdateDone", this);
00477   cancel_trigger();
00478   // get volume lock
00479   CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
00480   if (lock) {
00481     // insert a directory entry for the previous fragment
00482     dir_overwrite(&first_key, vol, &dir, &od->first_dir, false);
00483     if (od->move_resident_alt) {
00484       dir_insert(&od->single_doc_key, vol, &od->single_doc_dir);
00485     }
00486     ink_assert(vol->open_read(&first_key));
00487     ink_assert(this->od);
00488     vol->close_write(this);
00489     SET_HANDLER(&CacheVC::scanObject);
00490     return handleEvent(EVENT_IMMEDIATE, 0);
00491   } else {
00492     mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
00493     return EVENT_CONT;
00494   }
00495 }
00496 

Generated by  doxygen 1.7.1