00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 #include "P_AIO.h"
00029 
00030 #if AIO_MODE == AIO_MODE_NATIVE
00031 #define AIO_PERIOD                                -HRTIME_MSECONDS(4)
00032 #else
00033 
00034 #define MAX_DISKS_POSSIBLE 100
00035 
00036 
00037 
00038 int ts_config_with_inkdiskio = 0;
00039 
00040 AIO_Reqs *aio_reqs[MAX_DISKS_POSSIBLE];
00041 
00042 volatile int num_filedes = 1;
00043 
00044 
00045 
00046 static ink_mutex insert_mutex;
00047 
00048 int thread_is_created = 0;
00049 #endif // AIO_MODE == AIO_MODE_NATIVE
00050 RecInt cache_config_threads_per_disk = 12;
00051 RecInt api_config_threads_per_disk = 12;
00052 
00053 RecRawStatBlock *aio_rsb = NULL;
00054 Continuation *aio_err_callbck = 0;
00055 
00056 uint64_t aio_num_read = 0;
00057 uint64_t aio_bytes_read = 0;
00058 uint64_t aio_num_write = 0;
00059 uint64_t aio_bytes_written = 0;
00060 
00061 
00062 
00063 
00064 
00065 static int
00066 aio_stats_cb(const char * , RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
00067 {
00068   (void) data_type;
00069   (void) rsb;
00070   int64_t new_val = 0;
00071   int64_t diff = 0;
00072   int64_t count, sum;
00073   ink_hrtime now = ink_get_hrtime();
00074   
00075   
00076   
00077   
00078   RecGetGlobalRawStatSum(aio_rsb, id, &sum);
00079   RecGetGlobalRawStatCount(aio_rsb, id, &count);
00080 
00081   int64_t time_diff = ink_hrtime_to_msec(now - count);
00082   if (time_diff == 0) {
00083     data->rec_float = 0.0;
00084     return 0;
00085   }
00086   switch (id) {
00087   case AIO_STAT_READ_PER_SEC:
00088     new_val = aio_num_read;
00089     break;
00090   case AIO_STAT_WRITE_PER_SEC:
00091     new_val = aio_num_write;
00092     break;
00093   case AIO_STAT_KB_READ_PER_SEC:
00094     new_val = aio_bytes_read >> 10;
00095     break;
00096   case AIO_STAT_KB_WRITE_PER_SEC:
00097     new_val = aio_bytes_written >> 10;
00098     break;
00099   default:
00100     ink_assert(0);
00101   }
00102   diff = new_val - sum;
00103   RecSetGlobalRawStatSum(aio_rsb, id, new_val);
00104   RecSetGlobalRawStatCount(aio_rsb, id, now);
00105   data->rec_float = (float) diff *1000.00 / (float) time_diff;
00106   return 0;
00107 }
00108 
00109 
00110 #ifdef AIO_STATS
00111 
00112 static int num_requests = 0;
00113 
00114 static AIOTestData *data;
00115 
00116 int
00117 AIOTestData::ink_aio_stats(int event, void *d)
00118 {
00119   ink_hrtime now = ink_get_hrtime();
00120   double time_msec = (double) (now - start) / (double) HRTIME_MSECOND;
00121   int i = (aio_reqs[0] == NULL)? 1 : 0;
00122   for (; i < num_filedes; ++i)
00123     printf("%0.2f\t%i\t%i\t%i\n", time_msec, aio_reqs[i]->filedes, aio_reqs[i]->pending, aio_reqs[i]->queued);
00124   printf("Num Requests: %i Num Queued: %i num Moved: %i\n\n", data->num_req, data->num_queue, data->num_temp);
00125   eventProcessor.schedule_in(this, HRTIME_MSECONDS(50), ET_CALL);
00126   return EVENT_DONE;
00127 }
00128 
00129 #endif // AIO_STATS
00130 
00131 
00132 
00133 
00134 AIOCallback *
00135 new_AIOCallback(void)
00136 {
00137   return new AIOCallbackInternal;
00138 }
00139 
00140 void
00141 ink_aio_set_callback(Continuation *callback)
00142 {
00143   aio_err_callbck = callback;
00144 }
00145 
00146 void
00147 ink_aio_init(ModuleVersion v)
00148 {
00149   ink_release_assert(!checkModuleVersion(v, AIO_MODULE_VERSION));
00150 
00151   aio_rsb = RecAllocateRawStatBlock((int) AIO_STAT_COUNT);
00152   RecRegisterRawStat(aio_rsb, RECT_PROCESS, "proxy.process.cache.read_per_sec",
00153                      RECD_FLOAT, RECP_PERSISTENT, (int) AIO_STAT_READ_PER_SEC, aio_stats_cb);
00154   RecRegisterRawStat(aio_rsb, RECT_PROCESS, "proxy.process.cache.write_per_sec",
00155                      RECD_FLOAT, RECP_PERSISTENT, (int) AIO_STAT_WRITE_PER_SEC, aio_stats_cb);
00156   RecRegisterRawStat(aio_rsb, RECT_PROCESS,
00157                      "proxy.process.cache.KB_read_per_sec",
00158                      RECD_FLOAT, RECP_PERSISTENT, (int) AIO_STAT_KB_READ_PER_SEC, aio_stats_cb);
00159   RecRegisterRawStat(aio_rsb, RECT_PROCESS,
00160                      "proxy.process.cache.KB_write_per_sec",
00161                      RECD_FLOAT, RECP_PERSISTENT, (int) AIO_STAT_KB_WRITE_PER_SEC, aio_stats_cb);
00162 #if AIO_MODE != AIO_MODE_NATIVE
00163   memset(&aio_reqs, 0, MAX_DISKS_POSSIBLE * sizeof(AIO_Reqs *));
00164   ink_mutex_init(&insert_mutex, NULL);
00165 #endif
00166   REC_ReadConfigInteger(cache_config_threads_per_disk, "proxy.config.cache.threads_per_disk");
00167 }
00168 
00169 int
00170 ink_aio_start()
00171 {
00172 #ifdef AIO_STATS
00173   data = new AIOTestData();
00174   eventProcessor.schedule_in(data, HRTIME_MSECONDS(100), ET_CALL);
00175 #endif
00176   return 0;
00177 }
00178 
00179 #if  AIO_MODE != AIO_MODE_NATIVE
00180 
00181 static void *aio_thread_main(void *arg);
00182 
00183 struct AIOThreadInfo:public Continuation
00184 {
00185 
00186   AIO_Reqs *req;
00187   int sleep_wait;
00188 
00189   int start(int event, Event *e)
00190   {
00191     (void) event;
00192     (void) e;
00193     aio_thread_main(this);
00194     return EVENT_DONE;
00195   }
00196 
00197   AIOThreadInfo(AIO_Reqs *thr_req, int sleep):Continuation(new_ProxyMutex()), req(thr_req), sleep_wait(sleep)
00198   {
00199     SET_HANDLER(&AIOThreadInfo::start);
00200   }
00201 
00202 };
00203 
00204 
00205 
00206 
00207 
00208 
00209 
00210 
00211 
00212 
00213 
00214 
00215 
00216 
00217 static AIO_Reqs *
00218 aio_init_fildes(int fildes, int fromAPI = 0)
00219 {
00220   char thr_name[MAX_THREAD_NAME_LENGTH];
00221   int i;
00222   AIO_Reqs *request = (AIO_Reqs *)ats_malloc(sizeof(AIO_Reqs));
00223 
00224   memset(request, 0, sizeof(AIO_Reqs));
00225 
00226   INK_WRITE_MEMORY_BARRIER;
00227 
00228   ink_cond_init(&request->aio_cond);
00229   ink_mutex_init(&request->aio_mutex, NULL);
00230   ink_atomiclist_init(&request->aio_temp_list, "temp_list", (uintptr_t) &((AIOCallback *) 0)->link);
00231 
00232   RecInt thread_num;
00233 
00234   if (fromAPI) {
00235     request->index = 0;
00236     request->filedes = -1;
00237     aio_reqs[0] = request;
00238     thread_is_created = 1;
00239     thread_num = api_config_threads_per_disk;
00240   } else {
00241     request->index = num_filedes;
00242     request->filedes = fildes;
00243     aio_reqs[num_filedes] = request;
00244     thread_num = cache_config_threads_per_disk;
00245   }
00246 
00247   
00248   AIOThreadInfo *thr_info;
00249   size_t stacksize;
00250 
00251   REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
00252   for (i = 0; i < thread_num; i++) {
00253     if (i == (thread_num - 1))
00254       thr_info = new AIOThreadInfo(request, 1);
00255     else
00256       thr_info = new AIOThreadInfo(request, 0);
00257     snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[ET_AIO %d:%d]", i, fildes);
00258     ink_assert(eventProcessor.spawn_thread(thr_info, thr_name, stacksize));
00259   }
00260 
00261   
00262 
00263   if (!fromAPI) {
00264     num_filedes++;
00265   }
00266   return request;
00267 }
00268 
00269 
00270 
00271 static void
00272 aio_insert(AIOCallback *op, AIO_Reqs *req)
00273 {
00274 #ifdef AIO_STATS
00275   num_requests++;
00276   req->queued++;
00277 #endif
00278   if (op->aiocb.aio_reqprio == AIO_LOWEST_PRIORITY)     
00279   {
00280     AIOCallback *cb = (AIOCallback *) req->http_aio_todo.tail;
00281     if (!cb)
00282       req->http_aio_todo.push(op);
00283     else
00284       req->http_aio_todo.insert(op, cb);
00285   } else {
00286 
00287     AIOCallback *cb = (AIOCallback *) req->aio_todo.tail;
00288 
00289     for (; cb; cb = (AIOCallback *) cb->link.prev) {
00290       if (cb->aiocb.aio_reqprio >= op->aiocb.aio_reqprio) {
00291         req->aio_todo.insert(op, cb);
00292         return;
00293       }
00294     }
00295 
00296     
00297     req->aio_todo.push(op);
00298   }
00299 }
00300 
00301 
00302 static void
00303 aio_move(AIO_Reqs *req)
00304 {
00305   AIOCallback *next = NULL, *prev = NULL, *cb = (AIOCallback *) ink_atomiclist_popall(&req->aio_temp_list);
00306   
00307   if (!cb)
00308     return;
00309   while (cb->link.next) {
00310     next = (AIOCallback *) cb->link.next;
00311     cb->link.next = prev;
00312     prev = cb;
00313     cb = next;
00314   }
00315   
00316   cb->link.next = prev;
00317   for (; cb; cb = next) {
00318     next = (AIOCallback *) cb->link.next;
00319     cb->link.next = NULL;
00320     cb->link.prev = NULL;
00321     aio_insert(cb, req);
00322   }
00323 }
00324 
00325 
00326 static void
00327 aio_queue_req(AIOCallbackInternal *op, int fromAPI = 0)
00328 {
00329   int thread_ndx = 1;
00330   AIO_Reqs *req = op->aio_req;
00331   op->link.next = NULL;;
00332   op->link.prev = NULL;
00333 #ifdef AIO_STATS
00334   ink_atomic_increment((int *) &data->num_req, 1);
00335 #endif
00336   if (!fromAPI && (!req || req->filedes != op->aiocb.aio_fildes)) {
00337     
00338     for (; thread_ndx < num_filedes; thread_ndx++) {
00339       if (aio_reqs[thread_ndx]->filedes == op->aiocb.aio_fildes) {
00340         
00341         req = aio_reqs[thread_ndx];
00342         break;
00343       }
00344     }
00345     if (!req) {
00346       ink_mutex_acquire(&insert_mutex);
00347       if (thread_ndx == num_filedes) {
00348         
00349         req = aio_init_fildes(op->aiocb.aio_fildes);
00350       } else {
00351         
00352 
00353 
00354 
00355         for (thread_ndx = 1; thread_ndx < num_filedes; thread_ndx++) {
00356           if (aio_reqs[thread_ndx]->filedes == op->aiocb.aio_fildes) {
00357             req = aio_reqs[thread_ndx];
00358             break;
00359           }
00360         }
00361         if (!req)
00362           req = aio_init_fildes(op->aiocb.aio_fildes);
00363       }
00364       ink_mutex_release(&insert_mutex);
00365     }
00366     op->aio_req = req;
00367   }
00368   if (fromAPI && (!req || req->filedes != -1)) {
00369     ink_mutex_acquire(&insert_mutex);
00370     if (aio_reqs[0] == NULL) {
00371       req = aio_init_fildes(-1, 1);
00372     } else {
00373       req = aio_reqs[0];
00374     }
00375     ink_mutex_release(&insert_mutex);
00376     op->aio_req = req;
00377   }
00378   ink_atomic_increment(&req->requests_queued, 1);
00379   if (!ink_mutex_try_acquire(&req->aio_mutex)) {
00380 #ifdef AIO_STATS
00381     ink_atomic_increment(&data->num_temp, 1);
00382 #endif
00383     ink_atomiclist_push(&req->aio_temp_list, op);
00384   } else {
00385     
00386 #ifdef AIO_STATS
00387     ink_atomic_increment(&data->num_queue, 1);
00388 #endif
00389     if (!INK_ATOMICLIST_EMPTY(req->aio_temp_list))
00390       aio_move(req);
00391     
00392     aio_insert(op, req);
00393     ink_cond_signal(&req->aio_cond);
00394     ink_mutex_release(&req->aio_mutex);
00395   }
00396 }
00397 
00398 static inline int
00399 cache_op(AIOCallbackInternal *op)
00400 {
00401   bool read = (op->aiocb.aio_lio_opcode == LIO_READ) ? 1 : 0;
00402   for (; op; op = (AIOCallbackInternal *) op->then) {
00403     ink_aiocb_t *a = &op->aiocb;
00404     ssize_t err, res = 0;
00405 
00406     while (a->aio_nbytes - res > 0) {
00407       do {
00408         if (read)
00409           err = pread(a->aio_fildes, ((char *) a->aio_buf) + res, a->aio_nbytes - res, a->aio_offset + res);
00410         else
00411           err = pwrite(a->aio_fildes, ((char *) a->aio_buf) + res, a->aio_nbytes - res, a->aio_offset + res);
00412       } while ((err < 0) && (errno == EINTR || errno == ENOBUFS || errno == ENOMEM));
00413       if (err <= 0) {
00414         Warning("cache disk operation failed %s %zd %d\n",
00415                 (a->aio_lio_opcode == LIO_READ) ? "READ" : "WRITE", err, errno);
00416         op->aio_result = -errno;
00417         return (err);
00418       }
00419       res += err;
00420     }
00421     op->aio_result = res;
00422     ink_assert(op->aio_result == (int64_t) a->aio_nbytes);
00423   }
00424   return 1;
00425 }
00426 
00427 int
00428 ink_aio_read(AIOCallback *op, int fromAPI)
00429 {
00430   op->aiocb.aio_lio_opcode = LIO_READ;
00431 
00432 #if (AIO_MODE == AIO_MODE_AIO)
00433   ink_assert(this_ethread() == op->thread);
00434   op->thread->aio_ops.enqueue(op);
00435   if (aio_read(&op->aiocb) < 0) {
00436     Warning("failed aio_read: %s\n", strerror(errno));
00437     op->thread->aio_ops.remove(op);
00438     return -1;
00439   }
00440 #elif (AIO_MODE == AIO_MODE_SYNC)
00441   cache_op((AIOCallbackInternal *) op);
00442   op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
00443 #elif (AIO_MODE == AIO_MODE_THREAD)
00444   aio_queue_req((AIOCallbackInternal *) op, fromAPI);
00445 #endif
00446 
00447   return 1;
00448 }
00449 
00450 int
00451 ink_aio_write(AIOCallback *op, int fromAPI)
00452 {
00453   op->aiocb.aio_lio_opcode = LIO_WRITE;
00454 
00455 #if (AIO_MODE == AIO_MODE_AIO)
00456   ink_assert(this_ethread() == op->thread);
00457   op->thread->aio_ops.enqueue(op);
00458   if (aio_write(&op->aiocb) < 0) {
00459     Warning("failed aio_write: %s\n", strerror(errno));
00460     op->thread->aio_ops.remove(op);
00461     return -1;
00462   }
00463 #elif (AIO_MODE == AIO_MODE_SYNC)
00464   cache_op((AIOCallbackInternal *) op);
00465   op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
00466 #elif (AIO_MODE == AIO_MODE_THREAD)
00467   aio_queue_req((AIOCallbackInternal *) op, fromAPI);
00468 #endif
00469 
00470   return 1;
00471 }
00472 
00473 bool
00474 ink_aio_thread_num_set(int thread_num)
00475 {
00476   if (thread_num > 0 && !thread_is_created) {
00477     api_config_threads_per_disk = thread_num;
00478     return true;
00479   }
00480 
00481   return false;
00482 }
00483 
00484 void *
00485 aio_thread_main(void *arg)
00486 {
00487   AIOThreadInfo *thr_info = (AIOThreadInfo *) arg;
00488   AIO_Reqs *my_aio_req = (AIO_Reqs *) thr_info->req;
00489   AIO_Reqs *current_req = NULL;
00490   AIOCallback *op = NULL;
00491   ink_mutex_acquire(&my_aio_req->aio_mutex);
00492   for (;;) {
00493     do {
00494       current_req = my_aio_req;
00495       
00496       if (!INK_ATOMICLIST_EMPTY(my_aio_req->aio_temp_list))
00497         aio_move(my_aio_req);
00498       if (!(op = my_aio_req->aio_todo.pop()) && !(op = my_aio_req->http_aio_todo.pop()))
00499         break;
00500 #ifdef AIO_STATS
00501       num_requests--;
00502       current_req->queued--;
00503       ink_atomic_increment((int *) ¤t_req->pending, 1);
00504 #endif
00505       
00506       if (op->aiocb.aio_lio_opcode == LIO_WRITE) {
00507         aio_num_write++;
00508         aio_bytes_written += op->aiocb.aio_nbytes;
00509       } else {
00510         aio_num_read++;
00511         aio_bytes_read += op->aiocb.aio_nbytes;
00512       }
00513       ink_mutex_release(¤t_req->aio_mutex);
00514       if (cache_op((AIOCallbackInternal *) op) <= 0) {
00515         if (aio_err_callbck) {
00516           AIOCallback *callback_op = new AIOCallbackInternal();
00517           callback_op->aiocb.aio_fildes = op->aiocb.aio_fildes;
00518           callback_op->mutex = aio_err_callbck->mutex;
00519           callback_op->action = aio_err_callbck;
00520           eventProcessor.schedule_imm(callback_op);
00521         }
00522       }
00523       ink_atomic_increment((int *) ¤t_req->requests_queued, -1);
00524 #ifdef AIO_STATS
00525       ink_atomic_increment((int *) ¤t_req->pending, -1);
00526 #endif
00527       op->link.prev = NULL;
00528       op->link.next = NULL;
00529       op->mutex = op->action.mutex;
00530       if (op->thread == AIO_CALLBACK_THREAD_AIO) {
00531         MUTEX_LOCK(lock, op->mutex, thr_info->mutex->thread_holding);
00532         if (!op->action.cancelled)
00533           op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
00534       } else if (op->thread == AIO_CALLBACK_THREAD_ANY)
00535         eventProcessor.schedule_imm_signal(op);
00536       else
00537         op->thread->schedule_imm_signal(op);
00538       ink_mutex_acquire(&my_aio_req->aio_mutex);
00539     } while (1);
00540     timespec timedwait_msec = ink_hrtime_to_timespec(ink_get_hrtime() + HRTIME_MSECONDS(net_config_poll_timeout));
00541     ink_cond_timedwait(&my_aio_req->aio_cond, &my_aio_req->aio_mutex, &timedwait_msec);
00542   }
00543   return 0;
00544 }
00545 #else
00546 int
00547 DiskHandler::startAIOEvent(int , Event *e) {
00548   SET_HANDLER(&DiskHandler::mainAIOEvent);
00549   e->schedule_every(AIO_PERIOD);
00550   trigger_event = e;
00551   return EVENT_CONT;
00552 }
00553 
00554 int
00555 DiskHandler::mainAIOEvent(int event, Event *e) {
00556   AIOCallback *op = NULL;
00557 Lagain:
00558   int ret = io_getevents(ctx, 0, MAX_AIO_EVENTS, events, NULL);
00559   
00560   for (int i = 0; i < ret; i++) {
00561     op = (AIOCallback *) events[i].data;
00562     op->aio_result = events[i].res;
00563     ink_assert(op->action.continuation);
00564     complete_list.enqueue(op);
00565     
00566   }
00567   if (ret == MAX_AIO_EVENTS)
00568     goto Lagain;
00569   if (ret < 0)
00570     perror("io_getevents");
00571 
00572   ink_aiocb_t *cbs[MAX_AIO_EVENTS];
00573   int num = 0;
00574   for (; num < MAX_AIO_EVENTS && ((op = ready_list.dequeue()) != NULL); ++num) {
00575     cbs[num] = &op->aiocb;
00576     ink_assert(op->action.continuation);
00577   }
00578   if (num > 0) {
00579     int ret;
00580     do {
00581       ret = io_submit(ctx, num, cbs);
00582     } while (ret < 0 && errno == EAGAIN);
00583 
00584     if (ret != num) {
00585       if (ret < 0)
00586         perror("io_submit error");
00587       else {
00588         fprintf(stderr, "could not sumbit IOs");
00589         ink_assert(0);
00590       }
00591     }
00592   }
00593 
00594   while ((op = complete_list.dequeue()) != NULL) {
00595     op->handleEvent(event, e);
00596   }
00597   return EVENT_CONT;
00598 }
00599 
00600 int
00601 ink_aio_read(AIOCallback *op, int ) {
00602   op->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
00603   op->aiocb.aio_lio_opcode = IO_CMD_PREAD;
00604   op->aiocb.data = op;
00605   this_ethread()->diskHandler->ready_list.enqueue(op);
00606 
00607   return 1;
00608 }
00609 
00610 int
00611 ink_aio_write(AIOCallback *op, int ) {
00612   op->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
00613   op->aiocb.aio_lio_opcode = IO_CMD_PWRITE;
00614   op->aiocb.data = op;
00615   this_ethread()->diskHandler->ready_list.enqueue(op);
00616 
00617   return 1;
00618 }
00619 
00620 int
00621 ink_aio_readv(AIOCallback *op, int ) {
00622   DiskHandler *dh = this_ethread()->diskHandler;
00623   AIOCallback *io = op;
00624   int sz = 0;
00625 
00626   while (io) {
00627     io->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
00628     io->aiocb.aio_lio_opcode = IO_CMD_PREAD;
00629     io->aiocb.data = io;
00630     dh->ready_list.enqueue(io);
00631     ++sz;
00632     io = io->then;
00633   }
00634 
00635   if (sz > 1) {
00636     ink_assert(op->action.continuation);
00637     AIOVec *vec = new AIOVec(sz, op);
00638     while (--sz >= 0) {
00639       op->action = vec;
00640       op = op->then;
00641     }
00642   }
00643   return 1;
00644 }
00645 
00646 int
00647 ink_aio_writev(AIOCallback *op, int ) {
00648   DiskHandler *dh = this_ethread()->diskHandler;
00649   AIOCallback *io = op;
00650   int sz = 0;
00651 
00652   while (io) {
00653     io->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
00654     io->aiocb.aio_lio_opcode = IO_CMD_PWRITE;
00655     io->aiocb.data = io;
00656     dh->ready_list.enqueue(io);
00657     ++sz;
00658     io = io->then;
00659   }
00660 
00661   if (sz > 1) {
00662     ink_assert(op->action.continuation);
00663     AIOVec *vec = new AIOVec(sz, op);
00664     while (--sz >= 0) {
00665       op->action = vec;
00666       op = op->then;
00667     }
00668   }
00669   return 1;
00670 }
00671 #endif // AIO_MODE != AIO_MODE_NATIVE