00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028 #include "P_AIO.h"
00029
00030 #if AIO_MODE == AIO_MODE_NATIVE
00031 #define AIO_PERIOD -HRTIME_MSECONDS(4)
00032 #else
00033
00034 #define MAX_DISKS_POSSIBLE 100
00035
00036
00037
00038 int ts_config_with_inkdiskio = 0;
00039
00040 AIO_Reqs *aio_reqs[MAX_DISKS_POSSIBLE];
00041
00042 volatile int num_filedes = 1;
00043
00044
00045
00046 static ink_mutex insert_mutex;
00047
00048 int thread_is_created = 0;
00049 #endif // AIO_MODE == AIO_MODE_NATIVE
00050 RecInt cache_config_threads_per_disk = 12;
00051 RecInt api_config_threads_per_disk = 12;
00052
00053 RecRawStatBlock *aio_rsb = NULL;
00054 Continuation *aio_err_callbck = 0;
00055
00056 uint64_t aio_num_read = 0;
00057 uint64_t aio_bytes_read = 0;
00058 uint64_t aio_num_write = 0;
00059 uint64_t aio_bytes_written = 0;
00060
00061
00062
00063
00064
00065 static int
00066 aio_stats_cb(const char * , RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
00067 {
00068 (void) data_type;
00069 (void) rsb;
00070 int64_t new_val = 0;
00071 int64_t diff = 0;
00072 int64_t count, sum;
00073 ink_hrtime now = ink_get_hrtime();
00074
00075
00076
00077
00078 RecGetGlobalRawStatSum(aio_rsb, id, &sum);
00079 RecGetGlobalRawStatCount(aio_rsb, id, &count);
00080
00081 int64_t time_diff = ink_hrtime_to_msec(now - count);
00082 if (time_diff == 0) {
00083 data->rec_float = 0.0;
00084 return 0;
00085 }
00086 switch (id) {
00087 case AIO_STAT_READ_PER_SEC:
00088 new_val = aio_num_read;
00089 break;
00090 case AIO_STAT_WRITE_PER_SEC:
00091 new_val = aio_num_write;
00092 break;
00093 case AIO_STAT_KB_READ_PER_SEC:
00094 new_val = aio_bytes_read >> 10;
00095 break;
00096 case AIO_STAT_KB_WRITE_PER_SEC:
00097 new_val = aio_bytes_written >> 10;
00098 break;
00099 default:
00100 ink_assert(0);
00101 }
00102 diff = new_val - sum;
00103 RecSetGlobalRawStatSum(aio_rsb, id, new_val);
00104 RecSetGlobalRawStatCount(aio_rsb, id, now);
00105 data->rec_float = (float) diff *1000.00 / (float) time_diff;
00106 return 0;
00107 }
00108
00109
00110 #ifdef AIO_STATS
00111
00112 static int num_requests = 0;
00113
00114 static AIOTestData *data;
00115
00116 int
00117 AIOTestData::ink_aio_stats(int event, void *d)
00118 {
00119 ink_hrtime now = ink_get_hrtime();
00120 double time_msec = (double) (now - start) / (double) HRTIME_MSECOND;
00121 int i = (aio_reqs[0] == NULL)? 1 : 0;
00122 for (; i < num_filedes; ++i)
00123 printf("%0.2f\t%i\t%i\t%i\n", time_msec, aio_reqs[i]->filedes, aio_reqs[i]->pending, aio_reqs[i]->queued);
00124 printf("Num Requests: %i Num Queued: %i num Moved: %i\n\n", data->num_req, data->num_queue, data->num_temp);
00125 eventProcessor.schedule_in(this, HRTIME_MSECONDS(50), ET_CALL);
00126 return EVENT_DONE;
00127 }
00128
00129 #endif // AIO_STATS
00130
00131
00132
00133
00134 AIOCallback *
00135 new_AIOCallback(void)
00136 {
00137 return new AIOCallbackInternal;
00138 }
00139
00140 void
00141 ink_aio_set_callback(Continuation *callback)
00142 {
00143 aio_err_callbck = callback;
00144 }
00145
00146 void
00147 ink_aio_init(ModuleVersion v)
00148 {
00149 ink_release_assert(!checkModuleVersion(v, AIO_MODULE_VERSION));
00150
00151 aio_rsb = RecAllocateRawStatBlock((int) AIO_STAT_COUNT);
00152 RecRegisterRawStat(aio_rsb, RECT_PROCESS, "proxy.process.cache.read_per_sec",
00153 RECD_FLOAT, RECP_PERSISTENT, (int) AIO_STAT_READ_PER_SEC, aio_stats_cb);
00154 RecRegisterRawStat(aio_rsb, RECT_PROCESS, "proxy.process.cache.write_per_sec",
00155 RECD_FLOAT, RECP_PERSISTENT, (int) AIO_STAT_WRITE_PER_SEC, aio_stats_cb);
00156 RecRegisterRawStat(aio_rsb, RECT_PROCESS,
00157 "proxy.process.cache.KB_read_per_sec",
00158 RECD_FLOAT, RECP_PERSISTENT, (int) AIO_STAT_KB_READ_PER_SEC, aio_stats_cb);
00159 RecRegisterRawStat(aio_rsb, RECT_PROCESS,
00160 "proxy.process.cache.KB_write_per_sec",
00161 RECD_FLOAT, RECP_PERSISTENT, (int) AIO_STAT_KB_WRITE_PER_SEC, aio_stats_cb);
00162 #if AIO_MODE != AIO_MODE_NATIVE
00163 memset(&aio_reqs, 0, MAX_DISKS_POSSIBLE * sizeof(AIO_Reqs *));
00164 ink_mutex_init(&insert_mutex, NULL);
00165 #endif
00166 REC_ReadConfigInteger(cache_config_threads_per_disk, "proxy.config.cache.threads_per_disk");
00167 }
00168
00169 int
00170 ink_aio_start()
00171 {
00172 #ifdef AIO_STATS
00173 data = new AIOTestData();
00174 eventProcessor.schedule_in(data, HRTIME_MSECONDS(100), ET_CALL);
00175 #endif
00176 return 0;
00177 }
00178
00179 #if AIO_MODE != AIO_MODE_NATIVE
00180
00181 static void *aio_thread_main(void *arg);
00182
00183 struct AIOThreadInfo:public Continuation
00184 {
00185
00186 AIO_Reqs *req;
00187 int sleep_wait;
00188
00189 int start(int event, Event *e)
00190 {
00191 (void) event;
00192 (void) e;
00193 aio_thread_main(this);
00194 return EVENT_DONE;
00195 }
00196
00197 AIOThreadInfo(AIO_Reqs *thr_req, int sleep):Continuation(new_ProxyMutex()), req(thr_req), sleep_wait(sleep)
00198 {
00199 SET_HANDLER(&AIOThreadInfo::start);
00200 }
00201
00202 };
00203
00204
00205
00206
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217 static AIO_Reqs *
00218 aio_init_fildes(int fildes, int fromAPI = 0)
00219 {
00220 char thr_name[MAX_THREAD_NAME_LENGTH];
00221 int i;
00222 AIO_Reqs *request = (AIO_Reqs *)ats_malloc(sizeof(AIO_Reqs));
00223
00224 memset(request, 0, sizeof(AIO_Reqs));
00225
00226 INK_WRITE_MEMORY_BARRIER;
00227
00228 ink_cond_init(&request->aio_cond);
00229 ink_mutex_init(&request->aio_mutex, NULL);
00230 ink_atomiclist_init(&request->aio_temp_list, "temp_list", (uintptr_t) &((AIOCallback *) 0)->link);
00231
00232 RecInt thread_num;
00233
00234 if (fromAPI) {
00235 request->index = 0;
00236 request->filedes = -1;
00237 aio_reqs[0] = request;
00238 thread_is_created = 1;
00239 thread_num = api_config_threads_per_disk;
00240 } else {
00241 request->index = num_filedes;
00242 request->filedes = fildes;
00243 aio_reqs[num_filedes] = request;
00244 thread_num = cache_config_threads_per_disk;
00245 }
00246
00247
00248 AIOThreadInfo *thr_info;
00249 size_t stacksize;
00250
00251 REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
00252 for (i = 0; i < thread_num; i++) {
00253 if (i == (thread_num - 1))
00254 thr_info = new AIOThreadInfo(request, 1);
00255 else
00256 thr_info = new AIOThreadInfo(request, 0);
00257 snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[ET_AIO %d:%d]", i, fildes);
00258 ink_assert(eventProcessor.spawn_thread(thr_info, thr_name, stacksize));
00259 }
00260
00261
00262
00263 if (!fromAPI) {
00264 num_filedes++;
00265 }
00266 return request;
00267 }
00268
00269
00270
00271 static void
00272 aio_insert(AIOCallback *op, AIO_Reqs *req)
00273 {
00274 #ifdef AIO_STATS
00275 num_requests++;
00276 req->queued++;
00277 #endif
00278 if (op->aiocb.aio_reqprio == AIO_LOWEST_PRIORITY)
00279 {
00280 AIOCallback *cb = (AIOCallback *) req->http_aio_todo.tail;
00281 if (!cb)
00282 req->http_aio_todo.push(op);
00283 else
00284 req->http_aio_todo.insert(op, cb);
00285 } else {
00286
00287 AIOCallback *cb = (AIOCallback *) req->aio_todo.tail;
00288
00289 for (; cb; cb = (AIOCallback *) cb->link.prev) {
00290 if (cb->aiocb.aio_reqprio >= op->aiocb.aio_reqprio) {
00291 req->aio_todo.insert(op, cb);
00292 return;
00293 }
00294 }
00295
00296
00297 req->aio_todo.push(op);
00298 }
00299 }
00300
00301
00302 static void
00303 aio_move(AIO_Reqs *req)
00304 {
00305 AIOCallback *next = NULL, *prev = NULL, *cb = (AIOCallback *) ink_atomiclist_popall(&req->aio_temp_list);
00306
00307 if (!cb)
00308 return;
00309 while (cb->link.next) {
00310 next = (AIOCallback *) cb->link.next;
00311 cb->link.next = prev;
00312 prev = cb;
00313 cb = next;
00314 }
00315
00316 cb->link.next = prev;
00317 for (; cb; cb = next) {
00318 next = (AIOCallback *) cb->link.next;
00319 cb->link.next = NULL;
00320 cb->link.prev = NULL;
00321 aio_insert(cb, req);
00322 }
00323 }
00324
00325
00326 static void
00327 aio_queue_req(AIOCallbackInternal *op, int fromAPI = 0)
00328 {
00329 int thread_ndx = 1;
00330 AIO_Reqs *req = op->aio_req;
00331 op->link.next = NULL;;
00332 op->link.prev = NULL;
00333 #ifdef AIO_STATS
00334 ink_atomic_increment((int *) &data->num_req, 1);
00335 #endif
00336 if (!fromAPI && (!req || req->filedes != op->aiocb.aio_fildes)) {
00337
00338 for (; thread_ndx < num_filedes; thread_ndx++) {
00339 if (aio_reqs[thread_ndx]->filedes == op->aiocb.aio_fildes) {
00340
00341 req = aio_reqs[thread_ndx];
00342 break;
00343 }
00344 }
00345 if (!req) {
00346 ink_mutex_acquire(&insert_mutex);
00347 if (thread_ndx == num_filedes) {
00348
00349 req = aio_init_fildes(op->aiocb.aio_fildes);
00350 } else {
00351
00352
00353
00354
00355 for (thread_ndx = 1; thread_ndx < num_filedes; thread_ndx++) {
00356 if (aio_reqs[thread_ndx]->filedes == op->aiocb.aio_fildes) {
00357 req = aio_reqs[thread_ndx];
00358 break;
00359 }
00360 }
00361 if (!req)
00362 req = aio_init_fildes(op->aiocb.aio_fildes);
00363 }
00364 ink_mutex_release(&insert_mutex);
00365 }
00366 op->aio_req = req;
00367 }
00368 if (fromAPI && (!req || req->filedes != -1)) {
00369 ink_mutex_acquire(&insert_mutex);
00370 if (aio_reqs[0] == NULL) {
00371 req = aio_init_fildes(-1, 1);
00372 } else {
00373 req = aio_reqs[0];
00374 }
00375 ink_mutex_release(&insert_mutex);
00376 op->aio_req = req;
00377 }
00378 ink_atomic_increment(&req->requests_queued, 1);
00379 if (!ink_mutex_try_acquire(&req->aio_mutex)) {
00380 #ifdef AIO_STATS
00381 ink_atomic_increment(&data->num_temp, 1);
00382 #endif
00383 ink_atomiclist_push(&req->aio_temp_list, op);
00384 } else {
00385
00386 #ifdef AIO_STATS
00387 ink_atomic_increment(&data->num_queue, 1);
00388 #endif
00389 if (!INK_ATOMICLIST_EMPTY(req->aio_temp_list))
00390 aio_move(req);
00391
00392 aio_insert(op, req);
00393 ink_cond_signal(&req->aio_cond);
00394 ink_mutex_release(&req->aio_mutex);
00395 }
00396 }
00397
00398 static inline int
00399 cache_op(AIOCallbackInternal *op)
00400 {
00401 bool read = (op->aiocb.aio_lio_opcode == LIO_READ) ? 1 : 0;
00402 for (; op; op = (AIOCallbackInternal *) op->then) {
00403 ink_aiocb_t *a = &op->aiocb;
00404 ssize_t err, res = 0;
00405
00406 while (a->aio_nbytes - res > 0) {
00407 do {
00408 if (read)
00409 err = pread(a->aio_fildes, ((char *) a->aio_buf) + res, a->aio_nbytes - res, a->aio_offset + res);
00410 else
00411 err = pwrite(a->aio_fildes, ((char *) a->aio_buf) + res, a->aio_nbytes - res, a->aio_offset + res);
00412 } while ((err < 0) && (errno == EINTR || errno == ENOBUFS || errno == ENOMEM));
00413 if (err <= 0) {
00414 Warning("cache disk operation failed %s %zd %d\n",
00415 (a->aio_lio_opcode == LIO_READ) ? "READ" : "WRITE", err, errno);
00416 op->aio_result = -errno;
00417 return (err);
00418 }
00419 res += err;
00420 }
00421 op->aio_result = res;
00422 ink_assert(op->aio_result == (int64_t) a->aio_nbytes);
00423 }
00424 return 1;
00425 }
00426
00427 int
00428 ink_aio_read(AIOCallback *op, int fromAPI)
00429 {
00430 op->aiocb.aio_lio_opcode = LIO_READ;
00431
00432 #if (AIO_MODE == AIO_MODE_AIO)
00433 ink_assert(this_ethread() == op->thread);
00434 op->thread->aio_ops.enqueue(op);
00435 if (aio_read(&op->aiocb) < 0) {
00436 Warning("failed aio_read: %s\n", strerror(errno));
00437 op->thread->aio_ops.remove(op);
00438 return -1;
00439 }
00440 #elif (AIO_MODE == AIO_MODE_SYNC)
00441 cache_op((AIOCallbackInternal *) op);
00442 op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
00443 #elif (AIO_MODE == AIO_MODE_THREAD)
00444 aio_queue_req((AIOCallbackInternal *) op, fromAPI);
00445 #endif
00446
00447 return 1;
00448 }
00449
00450 int
00451 ink_aio_write(AIOCallback *op, int fromAPI)
00452 {
00453 op->aiocb.aio_lio_opcode = LIO_WRITE;
00454
00455 #if (AIO_MODE == AIO_MODE_AIO)
00456 ink_assert(this_ethread() == op->thread);
00457 op->thread->aio_ops.enqueue(op);
00458 if (aio_write(&op->aiocb) < 0) {
00459 Warning("failed aio_write: %s\n", strerror(errno));
00460 op->thread->aio_ops.remove(op);
00461 return -1;
00462 }
00463 #elif (AIO_MODE == AIO_MODE_SYNC)
00464 cache_op((AIOCallbackInternal *) op);
00465 op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
00466 #elif (AIO_MODE == AIO_MODE_THREAD)
00467 aio_queue_req((AIOCallbackInternal *) op, fromAPI);
00468 #endif
00469
00470 return 1;
00471 }
00472
00473 bool
00474 ink_aio_thread_num_set(int thread_num)
00475 {
00476 if (thread_num > 0 && !thread_is_created) {
00477 api_config_threads_per_disk = thread_num;
00478 return true;
00479 }
00480
00481 return false;
00482 }
00483
00484 void *
00485 aio_thread_main(void *arg)
00486 {
00487 AIOThreadInfo *thr_info = (AIOThreadInfo *) arg;
00488 AIO_Reqs *my_aio_req = (AIO_Reqs *) thr_info->req;
00489 AIO_Reqs *current_req = NULL;
00490 AIOCallback *op = NULL;
00491 ink_mutex_acquire(&my_aio_req->aio_mutex);
00492 for (;;) {
00493 do {
00494 current_req = my_aio_req;
00495
00496 if (!INK_ATOMICLIST_EMPTY(my_aio_req->aio_temp_list))
00497 aio_move(my_aio_req);
00498 if (!(op = my_aio_req->aio_todo.pop()) && !(op = my_aio_req->http_aio_todo.pop()))
00499 break;
00500 #ifdef AIO_STATS
00501 num_requests--;
00502 current_req->queued--;
00503 ink_atomic_increment((int *) ¤t_req->pending, 1);
00504 #endif
00505
00506 if (op->aiocb.aio_lio_opcode == LIO_WRITE) {
00507 aio_num_write++;
00508 aio_bytes_written += op->aiocb.aio_nbytes;
00509 } else {
00510 aio_num_read++;
00511 aio_bytes_read += op->aiocb.aio_nbytes;
00512 }
00513 ink_mutex_release(¤t_req->aio_mutex);
00514 if (cache_op((AIOCallbackInternal *) op) <= 0) {
00515 if (aio_err_callbck) {
00516 AIOCallback *callback_op = new AIOCallbackInternal();
00517 callback_op->aiocb.aio_fildes = op->aiocb.aio_fildes;
00518 callback_op->mutex = aio_err_callbck->mutex;
00519 callback_op->action = aio_err_callbck;
00520 eventProcessor.schedule_imm(callback_op);
00521 }
00522 }
00523 ink_atomic_increment((int *) ¤t_req->requests_queued, -1);
00524 #ifdef AIO_STATS
00525 ink_atomic_increment((int *) ¤t_req->pending, -1);
00526 #endif
00527 op->link.prev = NULL;
00528 op->link.next = NULL;
00529 op->mutex = op->action.mutex;
00530 if (op->thread == AIO_CALLBACK_THREAD_AIO) {
00531 MUTEX_LOCK(lock, op->mutex, thr_info->mutex->thread_holding);
00532 if (!op->action.cancelled)
00533 op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
00534 } else if (op->thread == AIO_CALLBACK_THREAD_ANY)
00535 eventProcessor.schedule_imm_signal(op);
00536 else
00537 op->thread->schedule_imm_signal(op);
00538 ink_mutex_acquire(&my_aio_req->aio_mutex);
00539 } while (1);
00540 timespec timedwait_msec = ink_hrtime_to_timespec(ink_get_hrtime() + HRTIME_MSECONDS(net_config_poll_timeout));
00541 ink_cond_timedwait(&my_aio_req->aio_cond, &my_aio_req->aio_mutex, &timedwait_msec);
00542 }
00543 return 0;
00544 }
00545 #else
00546 int
00547 DiskHandler::startAIOEvent(int , Event *e) {
00548 SET_HANDLER(&DiskHandler::mainAIOEvent);
00549 e->schedule_every(AIO_PERIOD);
00550 trigger_event = e;
00551 return EVENT_CONT;
00552 }
00553
00554 int
00555 DiskHandler::mainAIOEvent(int event, Event *e) {
00556 AIOCallback *op = NULL;
00557 Lagain:
00558 int ret = io_getevents(ctx, 0, MAX_AIO_EVENTS, events, NULL);
00559
00560 for (int i = 0; i < ret; i++) {
00561 op = (AIOCallback *) events[i].data;
00562 op->aio_result = events[i].res;
00563 ink_assert(op->action.continuation);
00564 complete_list.enqueue(op);
00565
00566 }
00567 if (ret == MAX_AIO_EVENTS)
00568 goto Lagain;
00569 if (ret < 0)
00570 perror("io_getevents");
00571
00572 ink_aiocb_t *cbs[MAX_AIO_EVENTS];
00573 int num = 0;
00574 for (; num < MAX_AIO_EVENTS && ((op = ready_list.dequeue()) != NULL); ++num) {
00575 cbs[num] = &op->aiocb;
00576 ink_assert(op->action.continuation);
00577 }
00578 if (num > 0) {
00579 int ret;
00580 do {
00581 ret = io_submit(ctx, num, cbs);
00582 } while (ret < 0 && errno == EAGAIN);
00583
00584 if (ret != num) {
00585 if (ret < 0)
00586 perror("io_submit error");
00587 else {
00588 fprintf(stderr, "could not sumbit IOs");
00589 ink_assert(0);
00590 }
00591 }
00592 }
00593
00594 while ((op = complete_list.dequeue()) != NULL) {
00595 op->handleEvent(event, e);
00596 }
00597 return EVENT_CONT;
00598 }
00599
00600 int
00601 ink_aio_read(AIOCallback *op, int ) {
00602 op->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
00603 op->aiocb.aio_lio_opcode = IO_CMD_PREAD;
00604 op->aiocb.data = op;
00605 this_ethread()->diskHandler->ready_list.enqueue(op);
00606
00607 return 1;
00608 }
00609
00610 int
00611 ink_aio_write(AIOCallback *op, int ) {
00612 op->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
00613 op->aiocb.aio_lio_opcode = IO_CMD_PWRITE;
00614 op->aiocb.data = op;
00615 this_ethread()->diskHandler->ready_list.enqueue(op);
00616
00617 return 1;
00618 }
00619
00620 int
00621 ink_aio_readv(AIOCallback *op, int ) {
00622 DiskHandler *dh = this_ethread()->diskHandler;
00623 AIOCallback *io = op;
00624 int sz = 0;
00625
00626 while (io) {
00627 io->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
00628 io->aiocb.aio_lio_opcode = IO_CMD_PREAD;
00629 io->aiocb.data = io;
00630 dh->ready_list.enqueue(io);
00631 ++sz;
00632 io = io->then;
00633 }
00634
00635 if (sz > 1) {
00636 ink_assert(op->action.continuation);
00637 AIOVec *vec = new AIOVec(sz, op);
00638 while (--sz >= 0) {
00639 op->action = vec;
00640 op = op->then;
00641 }
00642 }
00643 return 1;
00644 }
00645
00646 int
00647 ink_aio_writev(AIOCallback *op, int ) {
00648 DiskHandler *dh = this_ethread()->diskHandler;
00649 AIOCallback *io = op;
00650 int sz = 0;
00651
00652 while (io) {
00653 io->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
00654 io->aiocb.aio_lio_opcode = IO_CMD_PWRITE;
00655 io->aiocb.data = io;
00656 dh->ready_list.enqueue(io);
00657 ++sz;
00658 io = io->then;
00659 }
00660
00661 if (sz > 1) {
00662 ink_assert(op->action.continuation);
00663 AIOVec *vec = new AIOVec(sz, op);
00664 while (--sz >= 0) {
00665 op->action = vec;
00666 op = op->then;
00667 }
00668 }
00669 return 1;
00670 }
00671 #endif // AIO_MODE != AIO_MODE_NATIVE