00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 #include "P_Cache.h"
00026 #include "P_CacheTest.h"
00027 #include "api/ts/ts.h"
00028 
00029 CacheTestSM::CacheTestSM(RegressionTest *t) :
00030   RegressionSM(t),
00031   timeout(0),
00032   cache_action(0),
00033   start_time(0),
00034   cache_vc(0),
00035   cvio(0),
00036   buffer(0),
00037   buffer_reader(0),
00038   nbytes(-1),
00039   repeat_count(0),
00040   expect_event(EVENT_NONE),
00041   expect_initial_event(EVENT_NONE),
00042   initial_event(EVENT_NONE),
00043   content_salt(0)
00044 {
00045   SET_HANDLER(&CacheTestSM::event_handler);
00046 }
00047 
00048 CacheTestSM::~CacheTestSM() {
00049   ink_assert(!cache_action);
00050   ink_assert(!cache_vc);
00051   if (buffer_reader)
00052     buffer->dealloc_reader(buffer_reader);
00053   if (buffer)
00054     free_MIOBuffer(buffer);
00055 }
00056 
00057 int CacheTestSM::open_read_callout() {
00058   cvio = cache_vc->do_io_read(this, nbytes, buffer);
00059   return 1;
00060 }
00061 
00062 int CacheTestSM::open_write_callout() {
00063   cvio = cache_vc->do_io_write(this, nbytes, buffer_reader);
00064   return 1;
00065 }
00066 
00067 int CacheTestSM::event_handler(int event, void *data) {
00068 
00069   switch (event) {
00070 
00071     case EVENT_INTERVAL:
00072     case EVENT_IMMEDIATE:
00073       cancel_timeout();
00074       if (cache_action) {
00075         cache_action->cancel();
00076         cache_action = 0;
00077       }
00078       if (cache_vc) {
00079         cache_vc->do_io_close();
00080         cache_vc = 0;
00081       }
00082       cvio = 0;
00083       make_request();
00084       return EVENT_DONE;
00085 
00086     case CACHE_EVENT_LOOKUP_FAILED:
00087     case CACHE_EVENT_LOOKUP:
00088       goto Lcancel_next;
00089 
00090     case CACHE_EVENT_OPEN_READ:
00091       initial_event = event;
00092       cancel_timeout();
00093       cache_action = 0;
00094       cache_vc = (CacheVConnection*)data;
00095       buffer = new_empty_MIOBuffer();
00096       buffer_reader = buffer->alloc_reader();
00097       if (open_read_callout() < 0)
00098         goto Lclose_error_next;
00099       else
00100         return EVENT_DONE;
00101 
00102     case CACHE_EVENT_OPEN_READ_FAILED:
00103       goto Lcancel_next;
00104 
00105     case VC_EVENT_READ_READY:
00106       if (!check_buffer())
00107         goto Lclose_error_next;
00108       buffer_reader->consume(buffer_reader->read_avail());
00109       ((VIO*)data)->reenable();
00110       return EVENT_CONT;
00111 
00112     case VC_EVENT_READ_COMPLETE:
00113       if (!check_buffer())
00114         goto Lclose_error_next;
00115       goto Lclose_next;
00116 
00117     case VC_EVENT_ERROR:
00118     case VC_EVENT_EOS:
00119       goto Lclose_error_next;
00120 
00121     case CACHE_EVENT_OPEN_WRITE:
00122       initial_event = event;
00123       cancel_timeout();
00124       cache_action = 0;
00125       cache_vc = (CacheVConnection*)data;
00126       buffer = new_empty_MIOBuffer();
00127       buffer_reader = buffer->alloc_reader();
00128       if (open_write_callout() < 0)
00129         goto Lclose_error_next;
00130       else
00131         return EVENT_DONE;
00132 
00133     case CACHE_EVENT_OPEN_WRITE_FAILED:
00134       goto Lcancel_next;
00135 
00136     case VC_EVENT_WRITE_READY:
00137       fill_buffer();
00138       cvio->reenable();
00139       return EVENT_CONT;
00140 
00141     case VC_EVENT_WRITE_COMPLETE:
00142       if (nbytes != cvio->ndone)
00143         goto Lclose_error_next;
00144       goto Lclose_next;
00145 
00146     case CACHE_EVENT_REMOVE:
00147     case CACHE_EVENT_REMOVE_FAILED:
00148       goto Lcancel_next;
00149 
00150     case CACHE_EVENT_SCAN:
00151       initial_event = event;
00152       cache_vc = (CacheVConnection*)data;
00153       return EVENT_CONT;
00154 
00155     case CACHE_EVENT_SCAN_OBJECT:
00156       return CACHE_SCAN_RESULT_CONTINUE;
00157 
00158     case CACHE_EVENT_SCAN_OPERATION_FAILED:
00159       return CACHE_SCAN_RESULT_CONTINUE;
00160 
00161     case CACHE_EVENT_SCAN_OPERATION_BLOCKED:
00162       return CACHE_SCAN_RESULT_CONTINUE;
00163 
00164     case CACHE_EVENT_SCAN_DONE:
00165       return EVENT_CONT;
00166 
00167     case CACHE_EVENT_SCAN_FAILED:
00168       return EVENT_CONT;
00169 
00170     case AIO_EVENT_DONE:
00171       goto Lnext;
00172 
00173     default:
00174       ink_assert(!"case");
00175       break;
00176   }
00177   return EVENT_DONE;
00178 
00179 Lcancel_next:
00180   cancel_timeout();
00181   cache_action = 0;
00182   goto Lnext;
00183 Lclose_error_next:
00184   cache_vc->do_io_close(1);
00185   goto Lclose_next_internal;
00186 Lclose_next:
00187   cache_vc->do_io_close();
00188 Lclose_next_internal:
00189   cache_vc = 0;
00190   if (buffer_reader) {
00191     buffer->dealloc_reader(buffer_reader);
00192     buffer_reader = 0;
00193   }
00194   if (buffer) {
00195     free_MIOBuffer(buffer);
00196     buffer = 0;
00197   }
00198 Lnext:
00199   if (check_result(event) && repeat_count) {
00200     repeat_count--;
00201     timeout = eventProcessor.schedule_imm(this);
00202     return EVENT_DONE;
00203   } else
00204     return complete(event);
00205 }
00206 
00207 void CacheTestSM::fill_buffer() {
00208   int64_t avail = buffer->write_avail();
00209   CacheKey k = key;
00210   k.b[1] += content_salt;
00211   int64_t sk = (int64_t)sizeof(key);
00212   while (avail > 0) {
00213     int64_t l = avail;
00214     if (l > sk)
00215       l = sk;
00216 
00217     int64_t pos = cvio->ndone +  buffer_reader->read_avail();
00218     int64_t o = pos % sk;
00219 
00220     if (l > sk - o)
00221       l = sk - o;
00222     k.b[0] = pos / sk;
00223     char *x = ((char*)&k) + o;
00224     buffer->write(x, l);
00225     buffer->fill(l);
00226     avail -= l;
00227   }
00228 }
00229 
00230 int CacheTestSM::check_buffer() {
00231   int64_t avail = buffer_reader->read_avail();
00232   CacheKey k = key;
00233   k.b[1] += content_salt;
00234   char b[sizeof(key)];
00235   int64_t sk = (int64_t)sizeof(key);
00236   int64_t pos = cvio->ndone -  buffer_reader->read_avail();
00237   while (avail > 0) {
00238     int64_t l = avail;
00239     if (l > sk)
00240       l = sk;
00241     int64_t o = pos % sk;
00242     if (l > sk - o)
00243       l = sk - o;
00244     k.b[0] = pos / sk;
00245     char *x = ((char*)&k) + o;
00246     buffer_reader->read(&b[0], l);
00247     if (::memcmp(b, x, l))
00248       return 0;
00249     buffer_reader->consume(l);
00250     pos += l;
00251     avail -= l;
00252   }
00253   return 1;
00254 }
00255 
00256 int CacheTestSM::check_result(int event) {
00257   return
00258     initial_event == expect_initial_event &&
00259     event == expect_event;
00260 }
00261 
00262 int CacheTestSM::complete(int event) {
00263   if (!check_result(event))
00264     done(REGRESSION_TEST_FAILED);
00265   else
00266     done(REGRESSION_TEST_PASSED);
00267   delete this;
00268   return EVENT_DONE;
00269 }
00270 
00271 CacheTestSM::CacheTestSM(const CacheTestSM &ao) : RegressionSM(ao) {
00272   int o = (int)(((char*)&start_memcpy_on_clone) - ((char*)this));
00273   int s = (int)(((char*)&end_memcpy_on_clone) - ((char*)&start_memcpy_on_clone));
00274   memcpy(((char*)this)+o, ((char*)&ao)+o, s);
00275   SET_HANDLER(&CacheTestSM::event_handler);
00276 }
00277 
00278 EXCLUSIVE_REGRESSION_TEST(cache)(RegressionTest *t, int , int *pstatus) {
00279   if (cacheProcessor.IsCacheEnabled() != CACHE_INITIALIZED) {
00280     rprintf(t, "cache not initialized");
00281     *pstatus = REGRESSION_TEST_FAILED;
00282     return;
00283   }
00284 
00285   EThread *thread = this_ethread();
00286 
00287   CACHE_SM(t, write_test, { cacheProcessor.open_write(
00288         this, &key, false, CACHE_FRAG_TYPE_NONE, 100,
00289         CACHE_WRITE_OPT_SYNC); } );
00290   write_test.expect_initial_event = CACHE_EVENT_OPEN_WRITE;
00291   write_test.expect_event = VC_EVENT_WRITE_COMPLETE;
00292   write_test.nbytes = 100;
00293   rand_CacheKey(&write_test.key, thread->mutex);
00294 
00295   CACHE_SM(t, lookup_test, { cacheProcessor.lookup(this, &key, false); } );
00296   lookup_test.expect_event = CACHE_EVENT_LOOKUP;
00297   lookup_test.key = write_test.key;
00298 
00299   CACHE_SM(t, read_test, { cacheProcessor.open_read(this, &key, false); } );
00300   read_test.expect_initial_event = CACHE_EVENT_OPEN_READ;
00301   read_test.expect_event = VC_EVENT_READ_COMPLETE;
00302   read_test.nbytes = 100;
00303   read_test.key = write_test.key;
00304 
00305   CACHE_SM(t, remove_test, { cacheProcessor.remove(this, &key, false); } );
00306   remove_test.expect_event = CACHE_EVENT_REMOVE;
00307   remove_test.key = write_test.key;
00308 
00309   CACHE_SM(t, lookup_fail_test, { cacheProcessor.lookup(this, &key, false); } );
00310   lookup_fail_test.expect_event = CACHE_EVENT_LOOKUP_FAILED;
00311   lookup_fail_test.key = write_test.key;
00312 
00313   CACHE_SM(t, read_fail_test, { cacheProcessor.open_read(this, &key, false); } );
00314   read_fail_test.expect_event = CACHE_EVENT_OPEN_READ_FAILED;
00315   read_fail_test.key = write_test.key;
00316 
00317   CACHE_SM(t, remove_fail_test, { cacheProcessor.remove(this, &key, false); } );
00318   remove_fail_test.expect_event = CACHE_EVENT_REMOVE_FAILED;
00319   rand_CacheKey(&remove_fail_test.key, thread->mutex);
00320 
00321   CACHE_SM(t, replace_write_test, {
00322       cacheProcessor.open_write(this, &key, false, CACHE_FRAG_TYPE_NONE, 100,
00323                                 CACHE_WRITE_OPT_SYNC);
00324     }
00325     int open_write_callout() {
00326       header.serial = 10;
00327       cache_vc->set_header(&header, sizeof(header));
00328       cvio = cache_vc->do_io_write(this, nbytes, buffer_reader);
00329       return 1;
00330     });
00331   replace_write_test.expect_initial_event = CACHE_EVENT_OPEN_WRITE;
00332   replace_write_test.expect_event = VC_EVENT_WRITE_COMPLETE;
00333   replace_write_test.nbytes = 100;
00334   rand_CacheKey(&replace_write_test.key, thread->mutex);
00335 
00336   CACHE_SM(t, replace_test, {
00337       cacheProcessor.open_write(this, &key, false, CACHE_FRAG_TYPE_NONE, 100,
00338                                 CACHE_WRITE_OPT_OVERWRITE_SYNC);
00339     }
00340     int open_write_callout() {
00341       CacheTestHeader *h = 0;
00342       int hlen = 0;
00343       if (cache_vc->get_header((void**)&h, &hlen) < 0)
00344         return -1;
00345       if (h->serial != 10)
00346         return -1;
00347       header.serial = 11;
00348       cache_vc->set_header(&header, sizeof(header));
00349       cvio = cache_vc->do_io_write(this, nbytes, buffer_reader);
00350       return 1;
00351     });
00352   replace_test.expect_initial_event = CACHE_EVENT_OPEN_WRITE;
00353   replace_test.expect_event = VC_EVENT_WRITE_COMPLETE;
00354   replace_test.nbytes = 100;
00355   replace_test.key = replace_write_test.key;
00356   replace_test.content_salt = 1;
00357 
00358   CACHE_SM(t, replace_read_test, {
00359       cacheProcessor.open_read(this, &key, false);
00360     }
00361     int open_read_callout() {
00362       CacheTestHeader *h = 0;
00363       int hlen = 0;
00364       if (cache_vc->get_header((void**)&h, &hlen) < 0)
00365         return -1;
00366       if (h->serial != 11)
00367         return -1;
00368       cvio = cache_vc->do_io_read(this, nbytes, buffer);
00369       return 1;
00370     });
00371   replace_read_test.expect_initial_event = CACHE_EVENT_OPEN_READ;
00372   replace_read_test.expect_event = VC_EVENT_READ_COMPLETE;
00373   replace_read_test.nbytes = 100;
00374   replace_read_test.key = replace_test.key;
00375   replace_read_test.content_salt = 1;
00376 
00377   CACHE_SM(t, large_write_test, { cacheProcessor.open_write(
00378         this, &key, false, CACHE_FRAG_TYPE_NONE, 100,
00379         CACHE_WRITE_OPT_SYNC); } );
00380   large_write_test.expect_initial_event = CACHE_EVENT_OPEN_WRITE;
00381   large_write_test.expect_event = VC_EVENT_WRITE_COMPLETE;
00382   large_write_test.nbytes = 10000000;
00383   rand_CacheKey(&large_write_test.key, thread->mutex);
00384 
00385   CACHE_SM(t, pread_test, { 
00386       cacheProcessor.open_read(this, &key, false); 
00387     } 
00388     int open_read_callout() {
00389       cvio = cache_vc->do_io_pread(this, nbytes, buffer, 7000000);
00390       return 1;
00391     });
00392   pread_test.expect_initial_event = CACHE_EVENT_OPEN_READ;
00393   pread_test.expect_event = VC_EVENT_READ_COMPLETE;
00394   pread_test.nbytes = 100;
00395   pread_test.key = large_write_test.key;
00396 
00397   r_sequential(
00398     t,
00399     write_test.clone(),
00400     lookup_test.clone(),
00401     r_sequential(t, 10, read_test.clone()),
00402     remove_test.clone(),
00403     lookup_fail_test.clone(),
00404     read_fail_test.clone(),
00405     remove_fail_test.clone(),
00406     replace_write_test.clone(),
00407     replace_test.clone(),
00408     replace_read_test.clone(),
00409     large_write_test.clone(),
00410     pread_test.clone(),
00411     NULL_PTR
00412     )->run(pstatus);
00413   return;
00414 }
00415 
00416 void force_link_CacheTest() {
00417 }
00418 
00419 
00420 
00421 REGRESSION_TEST(cache_disk_replacement_stability)(RegressionTest *t, int level, int *pstatus) {
00422   static int const MAX_VOLS = 26; 
00423   static uint64_t DEFAULT_SKIP = 8192;
00424   static uint64_t DEFAULT_STRIPE_SIZE = 1024ULL * 1024 * 1024 * 911; 
00425   CacheDisk disk; 
00426   CacheHostRecord hr1, hr2;
00427   Vol* sample;
00428   static int const sample_idx = 16;
00429   Vol vols[MAX_VOLS];
00430   Vol* vol_ptrs[MAX_VOLS]; 
00431   char buff[2048];
00432 
00433   
00434   if (REGRESSION_TEST_EXTENDED > level) {
00435     *pstatus = REGRESSION_TEST_PASSED;
00436     return;
00437   }
00438 
00439   *pstatus = REGRESSION_TEST_INPROGRESS;
00440 
00441   disk.num_errors = 0;
00442 
00443   for ( int i = 0 ; i < MAX_VOLS ; ++i ) {
00444     vol_ptrs[i] = vols + i;
00445     vols[i].disk = &disk;
00446     vols[i].len = DEFAULT_STRIPE_SIZE;
00447     snprintf(buff, sizeof(buff), "/dev/sd%c %" PRIu64 ":%" PRIu64,
00448              'a' + i, DEFAULT_SKIP, vols[i].len);
00449     MD5Context().hash_immediate(vols[i].hash_id, buff, strlen(buff));
00450   }
00451 
00452   hr1.vol_hash_table = 0;
00453   hr1.vols = vol_ptrs;
00454   hr1.num_vols = MAX_VOLS;
00455   build_vol_hash_table(&hr1);
00456 
00457   hr2.vol_hash_table = 0;
00458   hr2.vols = vol_ptrs;
00459   hr2.num_vols = MAX_VOLS;
00460 
00461   sample = vols + sample_idx;
00462   sample->len = 1024ULL * 1024 * 1024 * (1024+128); 
00463   snprintf(buff, sizeof(buff), "/dev/sd%c %" PRIu64 ":%" PRIu64,
00464            'a' + sample_idx, DEFAULT_SKIP, sample->len);
00465   MD5Context().hash_immediate(sample->hash_id, buff, strlen(buff));
00466   build_vol_hash_table(&hr2);
00467 
00468   
00469   int to = 0, from = 0;
00470   int then = 0, now = 0;
00471   for ( int i = 0 ; i < VOL_HASH_TABLE_SIZE ; ++i ) {
00472     if (hr1.vol_hash_table[i] == sample_idx) ++then;
00473     if (hr2.vol_hash_table[i] == sample_idx) ++now;
00474     if (hr1.vol_hash_table[i] != hr2.vol_hash_table[i]) {
00475       if (hr1.vol_hash_table[i] == sample_idx)
00476         ++from;
00477       else
00478         ++to;
00479     }
00480   }
00481   rprintf(t, "Cache stability difference - "
00482           "delta = %d of %d : %d to, %d from, originally %d slots, now %d slots (net gain = %d/%d)\n"
00483           , to+from, VOL_HASH_TABLE_SIZE, to, from, then, now, now-then, to-from
00484     );
00485   *pstatus = REGRESSION_TEST_PASSED;
00486 
00487   hr1.vols = 0;
00488   hr2.vols = 0;
00489 }