• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

AIO.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 /*
00025  * Async Disk IO operations.
00026  */
00027 
00028 #include "P_AIO.h"
00029 
00030 #if AIO_MODE == AIO_MODE_NATIVE
00031 #define AIO_PERIOD                                -HRTIME_MSECONDS(4)
00032 #else
00033 
00034 #define MAX_DISKS_POSSIBLE 100
00035 
00036 // globals
00037 
00038 int ts_config_with_inkdiskio = 0;
00039 /* structure to hold information about each file descriptor */
00040 AIO_Reqs *aio_reqs[MAX_DISKS_POSSIBLE];
00041 /* number of unique file descriptors in the aio_reqs array */
00042 volatile int num_filedes = 1;
00043 
00044 // acquire this mutex before inserting a new entry in the aio_reqs array.
00045 // Don't need to acquire this for searching the array
00046 static ink_mutex insert_mutex;
00047 
00048 int thread_is_created = 0;
00049 #endif // AIO_MODE == AIO_MODE_NATIVE
00050 RecInt cache_config_threads_per_disk = 12;
00051 RecInt api_config_threads_per_disk = 12;
00052 
00053 RecRawStatBlock *aio_rsb = NULL;
00054 Continuation *aio_err_callbck = 0;
00055 // AIO Stats
00056 uint64_t aio_num_read = 0;
00057 uint64_t aio_bytes_read = 0;
00058 uint64_t aio_num_write = 0;
00059 uint64_t aio_bytes_written = 0;
00060 
00061 /*
00062  * Stats
00063  */
00064 
00065 static int
00066 aio_stats_cb(const char * /* name ATS_UNUSED */, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
00067 {
00068   (void) data_type;
00069   (void) rsb;
00070   int64_t new_val = 0;
00071   int64_t diff = 0;
00072   int64_t count, sum;
00073   ink_hrtime now = ink_get_hrtime();
00074   // The RecGetGlobalXXX stat functions are cheaper than the
00075   // RecGetXXX functions. The Global ones are expensive
00076   // for increments and decrements. But for AIO stats we
00077   // only do Sets and Gets, so they are cheaper in our case.
00078   RecGetGlobalRawStatSum(aio_rsb, id, &sum);
00079   RecGetGlobalRawStatCount(aio_rsb, id, &count);
00080 
00081   int64_t time_diff = ink_hrtime_to_msec(now - count);
00082   if (time_diff == 0) {
00083     data->rec_float = 0.0;
00084     return 0;
00085   }
00086   switch (id) {
00087   case AIO_STAT_READ_PER_SEC:
00088     new_val = aio_num_read;
00089     break;
00090   case AIO_STAT_WRITE_PER_SEC:
00091     new_val = aio_num_write;
00092     break;
00093   case AIO_STAT_KB_READ_PER_SEC:
00094     new_val = aio_bytes_read >> 10;
00095     break;
00096   case AIO_STAT_KB_WRITE_PER_SEC:
00097     new_val = aio_bytes_written >> 10;
00098     break;
00099   default:
00100     ink_assert(0);
00101   }
00102   diff = new_val - sum;
00103   RecSetGlobalRawStatSum(aio_rsb, id, new_val);
00104   RecSetGlobalRawStatCount(aio_rsb, id, now);
00105   data->rec_float = (float) diff *1000.00 / (float) time_diff;
00106   return 0;
00107 }
00108 
00109 
00110 #ifdef AIO_STATS
00111 /* total number of requests received - for debugging */
00112 static int num_requests = 0;
00113 /* performance results */
00114 static AIOTestData *data;
00115 
00116 int
00117 AIOTestData::ink_aio_stats(int event, void *d)
00118 {
00119   ink_hrtime now = ink_get_hrtime();
00120   double time_msec = (double) (now - start) / (double) HRTIME_MSECOND;
00121   int i = (aio_reqs[0] == NULL)? 1 : 0;
00122   for (; i < num_filedes; ++i)
00123     printf("%0.2f\t%i\t%i\t%i\n", time_msec, aio_reqs[i]->filedes, aio_reqs[i]->pending, aio_reqs[i]->queued);
00124   printf("Num Requests: %i Num Queued: %i num Moved: %i\n\n", data->num_req, data->num_queue, data->num_temp);
00125   eventProcessor.schedule_in(this, HRTIME_MSECONDS(50), ET_CALL);
00126   return EVENT_DONE;
00127 }
00128 
00129 #endif // AIO_STATS
00130 
00131 /*
00132  * Common
00133  */
00134 AIOCallback *
00135 new_AIOCallback(void)
00136 {
00137   return new AIOCallbackInternal;
00138 }
00139 
00140 void
00141 ink_aio_set_callback(Continuation *callback)
00142 {
00143   aio_err_callbck = callback;
00144 }
00145 
00146 void
00147 ink_aio_init(ModuleVersion v)
00148 {
00149   ink_release_assert(!checkModuleVersion(v, AIO_MODULE_VERSION));
00150 
00151   aio_rsb = RecAllocateRawStatBlock((int) AIO_STAT_COUNT);
00152   RecRegisterRawStat(aio_rsb, RECT_PROCESS, "proxy.process.cache.read_per_sec",
00153                      RECD_FLOAT, RECP_PERSISTENT, (int) AIO_STAT_READ_PER_SEC, aio_stats_cb);
00154   RecRegisterRawStat(aio_rsb, RECT_PROCESS, "proxy.process.cache.write_per_sec",
00155                      RECD_FLOAT, RECP_PERSISTENT, (int) AIO_STAT_WRITE_PER_SEC, aio_stats_cb);
00156   RecRegisterRawStat(aio_rsb, RECT_PROCESS,
00157                      "proxy.process.cache.KB_read_per_sec",
00158                      RECD_FLOAT, RECP_PERSISTENT, (int) AIO_STAT_KB_READ_PER_SEC, aio_stats_cb);
00159   RecRegisterRawStat(aio_rsb, RECT_PROCESS,
00160                      "proxy.process.cache.KB_write_per_sec",
00161                      RECD_FLOAT, RECP_PERSISTENT, (int) AIO_STAT_KB_WRITE_PER_SEC, aio_stats_cb);
00162 #if AIO_MODE != AIO_MODE_NATIVE
00163   memset(&aio_reqs, 0, MAX_DISKS_POSSIBLE * sizeof(AIO_Reqs *));
00164   ink_mutex_init(&insert_mutex, NULL);
00165 #endif
00166   REC_ReadConfigInteger(cache_config_threads_per_disk, "proxy.config.cache.threads_per_disk");
00167 }
00168 
00169 int
00170 ink_aio_start()
00171 {
00172 #ifdef AIO_STATS
00173   data = new AIOTestData();
00174   eventProcessor.schedule_in(data, HRTIME_MSECONDS(100), ET_CALL);
00175 #endif
00176   return 0;
00177 }
00178 
00179 #if  AIO_MODE != AIO_MODE_NATIVE
00180 
00181 static void *aio_thread_main(void *arg);
00182 
00183 struct AIOThreadInfo:public Continuation
00184 {
00185 
00186   AIO_Reqs *req;
00187   int sleep_wait;
00188 
00189   int start(int event, Event *e)
00190   {
00191     (void) event;
00192     (void) e;
00193     aio_thread_main(this);
00194     return EVENT_DONE;
00195   }
00196 
00197   AIOThreadInfo(AIO_Reqs *thr_req, int sleep):Continuation(new_ProxyMutex()), req(thr_req), sleep_wait(sleep)
00198   {
00199     SET_HANDLER(&AIOThreadInfo::start);
00200   }
00201 
00202 };
00203 
00204 /* priority scheduling */
00205 /* Have 2 queues per file descriptor - A queue for http requests and another
00206    for non-http (streaming) request. Each file descriptor has a lock
00207    and condition variable associated with it. A dedicated number of threads
00208    (THREADS_PER_DISK) wait on the condition variable associated with the
00209    file descriptor. The cache threads try to put the request in the
00210    appropriate queue. If they fail to acquire the lock, they put the
00211    request in the atomic list. Requests are served in the order of
00212    highest priority first. If both the queues are empty, the aio threads
00213    check if there is any request on the other disks */
00214 
00215 
00216 /* insert  an entry for file descriptor fildes into aio_reqs */
00217 static AIO_Reqs *
00218 aio_init_fildes(int fildes, int fromAPI = 0)
00219 {
00220   char thr_name[MAX_THREAD_NAME_LENGTH];
00221   int i;
00222   AIO_Reqs *request = (AIO_Reqs *)ats_malloc(sizeof(AIO_Reqs));
00223 
00224   memset(request, 0, sizeof(AIO_Reqs));
00225 
00226   INK_WRITE_MEMORY_BARRIER;
00227 
00228   ink_cond_init(&request->aio_cond);
00229   ink_mutex_init(&request->aio_mutex, NULL);
00230   ink_atomiclist_init(&request->aio_temp_list, "temp_list", (uintptr_t) &((AIOCallback *) 0)->link);
00231 
00232   RecInt thread_num;
00233 
00234   if (fromAPI) {
00235     request->index = 0;
00236     request->filedes = -1;
00237     aio_reqs[0] = request;
00238     thread_is_created = 1;
00239     thread_num = api_config_threads_per_disk;
00240   } else {
00241     request->index = num_filedes;
00242     request->filedes = fildes;
00243     aio_reqs[num_filedes] = request;
00244     thread_num = cache_config_threads_per_disk;
00245   }
00246 
00247   /* create the main thread */
00248   AIOThreadInfo *thr_info;
00249   size_t stacksize;
00250 
00251   REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
00252   for (i = 0; i < thread_num; i++) {
00253     if (i == (thread_num - 1))
00254       thr_info = new AIOThreadInfo(request, 1);
00255     else
00256       thr_info = new AIOThreadInfo(request, 0);
00257     snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[ET_AIO %d:%d]", i, fildes);
00258     ink_assert(eventProcessor.spawn_thread(thr_info, thr_name, stacksize));
00259   }
00260 
00261   /* the num_filedes should be incremented after initializing everything.
00262      This prevents a thread from looking at uninitialized fields */
00263   if (!fromAPI) {
00264     num_filedes++;
00265   }
00266   return request;
00267 }
00268 
00269 /* insert a request into either aio_todo or http_todo queue. aio_todo
00270    list is kept sorted */
00271 static void
00272 aio_insert(AIOCallback *op, AIO_Reqs *req)
00273 {
00274 #ifdef AIO_STATS
00275   num_requests++;
00276   req->queued++;
00277 #endif
00278   if (op->aiocb.aio_reqprio == AIO_LOWEST_PRIORITY)     // http request
00279   {
00280     AIOCallback *cb = (AIOCallback *) req->http_aio_todo.tail;
00281     if (!cb)
00282       req->http_aio_todo.push(op);
00283     else
00284       req->http_aio_todo.insert(op, cb);
00285   } else {
00286 
00287     AIOCallback *cb = (AIOCallback *) req->aio_todo.tail;
00288 
00289     for (; cb; cb = (AIOCallback *) cb->link.prev) {
00290       if (cb->aiocb.aio_reqprio >= op->aiocb.aio_reqprio) {
00291         req->aio_todo.insert(op, cb);
00292         return;
00293       }
00294     }
00295 
00296     /* Either the queue was empty or this request has the highest priority */
00297     req->aio_todo.push(op);
00298   }
00299 }
00300 
00301 /* move the request from the atomic list to the queue */
00302 static void
00303 aio_move(AIO_Reqs *req)
00304 {
00305   AIOCallback *next = NULL, *prev = NULL, *cb = (AIOCallback *) ink_atomiclist_popall(&req->aio_temp_list);
00306   /* flip the list */
00307   if (!cb)
00308     return;
00309   while (cb->link.next) {
00310     next = (AIOCallback *) cb->link.next;
00311     cb->link.next = prev;
00312     prev = cb;
00313     cb = next;
00314   }
00315   /* fix the last pointer */
00316   cb->link.next = prev;
00317   for (; cb; cb = next) {
00318     next = (AIOCallback *) cb->link.next;
00319     cb->link.next = NULL;
00320     cb->link.prev = NULL;
00321     aio_insert(cb, req);
00322   }
00323 }
00324 
00325 /* queue the new request */
00326 static void
00327 aio_queue_req(AIOCallbackInternal *op, int fromAPI = 0)
00328 {
00329   int thread_ndx = 1;
00330   AIO_Reqs *req = op->aio_req;
00331   op->link.next = NULL;;
00332   op->link.prev = NULL;
00333 #ifdef AIO_STATS
00334   ink_atomic_increment((int *) &data->num_req, 1);
00335 #endif
00336   if (!fromAPI && (!req || req->filedes != op->aiocb.aio_fildes)) {
00337     /* search for the matching file descriptor */
00338     for (; thread_ndx < num_filedes; thread_ndx++) {
00339       if (aio_reqs[thread_ndx]->filedes == op->aiocb.aio_fildes) {
00340         /* found the matching file descriptor */
00341         req = aio_reqs[thread_ndx];
00342         break;
00343       }
00344     }
00345     if (!req) {
00346       ink_mutex_acquire(&insert_mutex);
00347       if (thread_ndx == num_filedes) {
00348         /* insert a new entry */
00349         req = aio_init_fildes(op->aiocb.aio_fildes);
00350       } else {
00351         /* a new entry was inserted between the time we checked the
00352            aio_reqs and acquired the mutex. check the aio_reqs array to
00353            make sure the entry inserted does not correspond  to the current
00354            file descriptor */
00355         for (thread_ndx = 1; thread_ndx < num_filedes; thread_ndx++) {
00356           if (aio_reqs[thread_ndx]->filedes == op->aiocb.aio_fildes) {
00357             req = aio_reqs[thread_ndx];
00358             break;
00359           }
00360         }
00361         if (!req)
00362           req = aio_init_fildes(op->aiocb.aio_fildes);
00363       }
00364       ink_mutex_release(&insert_mutex);
00365     }
00366     op->aio_req = req;
00367   }
00368   if (fromAPI && (!req || req->filedes != -1)) {
00369     ink_mutex_acquire(&insert_mutex);
00370     if (aio_reqs[0] == NULL) {
00371       req = aio_init_fildes(-1, 1);
00372     } else {
00373       req = aio_reqs[0];
00374     }
00375     ink_mutex_release(&insert_mutex);
00376     op->aio_req = req;
00377   }
00378   ink_atomic_increment(&req->requests_queued, 1);
00379   if (!ink_mutex_try_acquire(&req->aio_mutex)) {
00380 #ifdef AIO_STATS
00381     ink_atomic_increment(&data->num_temp, 1);
00382 #endif
00383     ink_atomiclist_push(&req->aio_temp_list, op);
00384   } else {
00385     /* check if any pending requests on the atomic list */
00386 #ifdef AIO_STATS
00387     ink_atomic_increment(&data->num_queue, 1);
00388 #endif
00389     if (!INK_ATOMICLIST_EMPTY(req->aio_temp_list))
00390       aio_move(req);
00391     /* now put the new request */
00392     aio_insert(op, req);
00393     ink_cond_signal(&req->aio_cond);
00394     ink_mutex_release(&req->aio_mutex);
00395   }
00396 }
00397 
00398 static inline int
00399 cache_op(AIOCallbackInternal *op)
00400 {
00401   bool read = (op->aiocb.aio_lio_opcode == LIO_READ) ? 1 : 0;
00402   for (; op; op = (AIOCallbackInternal *) op->then) {
00403     ink_aiocb_t *a = &op->aiocb;
00404     ssize_t err, res = 0;
00405 
00406     while (a->aio_nbytes - res > 0) {
00407       do {
00408         if (read)
00409           err = pread(a->aio_fildes, ((char *) a->aio_buf) + res, a->aio_nbytes - res, a->aio_offset + res);
00410         else
00411           err = pwrite(a->aio_fildes, ((char *) a->aio_buf) + res, a->aio_nbytes - res, a->aio_offset + res);
00412       } while ((err < 0) && (errno == EINTR || errno == ENOBUFS || errno == ENOMEM));
00413       if (err <= 0) {
00414         Warning("cache disk operation failed %s %zd %d\n",
00415                 (a->aio_lio_opcode == LIO_READ) ? "READ" : "WRITE", err, errno);
00416         op->aio_result = -errno;
00417         return (err);
00418       }
00419       res += err;
00420     }
00421     op->aio_result = res;
00422     ink_assert(op->aio_result == (int64_t) a->aio_nbytes);
00423   }
00424   return 1;
00425 }
00426 
00427 int
00428 ink_aio_read(AIOCallback *op, int fromAPI)
00429 {
00430   op->aiocb.aio_lio_opcode = LIO_READ;
00431 
00432 #if (AIO_MODE == AIO_MODE_AIO)
00433   ink_assert(this_ethread() == op->thread);
00434   op->thread->aio_ops.enqueue(op);
00435   if (aio_read(&op->aiocb) < 0) {
00436     Warning("failed aio_read: %s\n", strerror(errno));
00437     op->thread->aio_ops.remove(op);
00438     return -1;
00439   }
00440 #elif (AIO_MODE == AIO_MODE_SYNC)
00441   cache_op((AIOCallbackInternal *) op);
00442   op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
00443 #elif (AIO_MODE == AIO_MODE_THREAD)
00444   aio_queue_req((AIOCallbackInternal *) op, fromAPI);
00445 #endif
00446 
00447   return 1;
00448 }
00449 
00450 int
00451 ink_aio_write(AIOCallback *op, int fromAPI)
00452 {
00453   op->aiocb.aio_lio_opcode = LIO_WRITE;
00454 
00455 #if (AIO_MODE == AIO_MODE_AIO)
00456   ink_assert(this_ethread() == op->thread);
00457   op->thread->aio_ops.enqueue(op);
00458   if (aio_write(&op->aiocb) < 0) {
00459     Warning("failed aio_write: %s\n", strerror(errno));
00460     op->thread->aio_ops.remove(op);
00461     return -1;
00462   }
00463 #elif (AIO_MODE == AIO_MODE_SYNC)
00464   cache_op((AIOCallbackInternal *) op);
00465   op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
00466 #elif (AIO_MODE == AIO_MODE_THREAD)
00467   aio_queue_req((AIOCallbackInternal *) op, fromAPI);
00468 #endif
00469 
00470   return 1;
00471 }
00472 
00473 bool
00474 ink_aio_thread_num_set(int thread_num)
00475 {
00476   if (thread_num > 0 && !thread_is_created) {
00477     api_config_threads_per_disk = thread_num;
00478     return true;
00479   }
00480 
00481   return false;
00482 }
00483 
00484 void *
00485 aio_thread_main(void *arg)
00486 {
00487   AIOThreadInfo *thr_info = (AIOThreadInfo *) arg;
00488   AIO_Reqs *my_aio_req = (AIO_Reqs *) thr_info->req;
00489   AIO_Reqs *current_req = NULL;
00490   AIOCallback *op = NULL;
00491   ink_mutex_acquire(&my_aio_req->aio_mutex);
00492   for (;;) {
00493     do {
00494       current_req = my_aio_req;
00495       /* check if any pending requests on the atomic list */
00496       if (!INK_ATOMICLIST_EMPTY(my_aio_req->aio_temp_list))
00497         aio_move(my_aio_req);
00498       if (!(op = my_aio_req->aio_todo.pop()) && !(op = my_aio_req->http_aio_todo.pop()))
00499         break;
00500 #ifdef AIO_STATS
00501       num_requests--;
00502       current_req->queued--;
00503       ink_atomic_increment((int *) &current_req->pending, 1);
00504 #endif
00505       // update the stats;
00506       if (op->aiocb.aio_lio_opcode == LIO_WRITE) {
00507         aio_num_write++;
00508         aio_bytes_written += op->aiocb.aio_nbytes;
00509       } else {
00510         aio_num_read++;
00511         aio_bytes_read += op->aiocb.aio_nbytes;
00512       }
00513       ink_mutex_release(&current_req->aio_mutex);
00514       if (cache_op((AIOCallbackInternal *) op) <= 0) {
00515         if (aio_err_callbck) {
00516           AIOCallback *callback_op = new AIOCallbackInternal();
00517           callback_op->aiocb.aio_fildes = op->aiocb.aio_fildes;
00518           callback_op->mutex = aio_err_callbck->mutex;
00519           callback_op->action = aio_err_callbck;
00520           eventProcessor.schedule_imm(callback_op);
00521         }
00522       }
00523       ink_atomic_increment((int *) &current_req->requests_queued, -1);
00524 #ifdef AIO_STATS
00525       ink_atomic_increment((int *) &current_req->pending, -1);
00526 #endif
00527       op->link.prev = NULL;
00528       op->link.next = NULL;
00529       op->mutex = op->action.mutex;
00530       if (op->thread == AIO_CALLBACK_THREAD_AIO) {
00531         MUTEX_LOCK(lock, op->mutex, thr_info->mutex->thread_holding);
00532         if (!op->action.cancelled)
00533           op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
00534       } else if (op->thread == AIO_CALLBACK_THREAD_ANY)
00535         eventProcessor.schedule_imm_signal(op);
00536       else
00537         op->thread->schedule_imm_signal(op);
00538       ink_mutex_acquire(&my_aio_req->aio_mutex);
00539     } while (1);
00540     timespec timedwait_msec = ink_hrtime_to_timespec(ink_get_hrtime() + HRTIME_MSECONDS(net_config_poll_timeout));
00541     ink_cond_timedwait(&my_aio_req->aio_cond, &my_aio_req->aio_mutex, &timedwait_msec);
00542   }
00543   return 0;
00544 }
00545 #else
00546 int
00547 DiskHandler::startAIOEvent(int /* event ATS_UNUSED */, Event *e) {
00548   SET_HANDLER(&DiskHandler::mainAIOEvent);
00549   e->schedule_every(AIO_PERIOD);
00550   trigger_event = e;
00551   return EVENT_CONT;
00552 }
00553 
00554 int
00555 DiskHandler::mainAIOEvent(int event, Event *e) {
00556   AIOCallback *op = NULL;
00557 Lagain:
00558   int ret = io_getevents(ctx, 0, MAX_AIO_EVENTS, events, NULL);
00559   //printf("%d\n", ret);
00560   for (int i = 0; i < ret; i++) {
00561     op = (AIOCallback *) events[i].data;
00562     op->aio_result = events[i].res;
00563     ink_assert(op->action.continuation);
00564     complete_list.enqueue(op);
00565     //op->handleEvent(event, e);
00566   }
00567   if (ret == MAX_AIO_EVENTS)
00568     goto Lagain;
00569   if (ret < 0)
00570     perror("io_getevents");
00571 
00572   ink_aiocb_t *cbs[MAX_AIO_EVENTS];
00573   int num = 0;
00574   for (; num < MAX_AIO_EVENTS && ((op = ready_list.dequeue()) != NULL); ++num) {
00575     cbs[num] = &op->aiocb;
00576     ink_assert(op->action.continuation);
00577   }
00578   if (num > 0) {
00579     int ret;
00580     do {
00581       ret = io_submit(ctx, num, cbs);
00582     } while (ret < 0 && errno == EAGAIN);
00583 
00584     if (ret != num) {
00585       if (ret < 0)
00586         perror("io_submit error");
00587       else {
00588         fprintf(stderr, "could not sumbit IOs");
00589         ink_assert(0);
00590       }
00591     }
00592   }
00593 
00594   while ((op = complete_list.dequeue()) != NULL) {
00595     op->handleEvent(event, e);
00596   }
00597   return EVENT_CONT;
00598 }
00599 
00600 int
00601 ink_aio_read(AIOCallback *op, int /* fromAPI ATS_UNUSED */) {
00602   op->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
00603   op->aiocb.aio_lio_opcode = IO_CMD_PREAD;
00604   op->aiocb.data = op;
00605   this_ethread()->diskHandler->ready_list.enqueue(op);
00606 
00607   return 1;
00608 }
00609 
00610 int
00611 ink_aio_write(AIOCallback *op, int /* fromAPI ATS_UNUSED */) {
00612   op->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
00613   op->aiocb.aio_lio_opcode = IO_CMD_PWRITE;
00614   op->aiocb.data = op;
00615   this_ethread()->diskHandler->ready_list.enqueue(op);
00616 
00617   return 1;
00618 }
00619 
00620 int
00621 ink_aio_readv(AIOCallback *op, int /* fromAPI ATS_UNUSED */) {
00622   DiskHandler *dh = this_ethread()->diskHandler;
00623   AIOCallback *io = op;
00624   int sz = 0;
00625 
00626   while (io) {
00627     io->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
00628     io->aiocb.aio_lio_opcode = IO_CMD_PREAD;
00629     io->aiocb.data = io;
00630     dh->ready_list.enqueue(io);
00631     ++sz;
00632     io = io->then;
00633   }
00634 
00635   if (sz > 1) {
00636     ink_assert(op->action.continuation);
00637     AIOVec *vec = new AIOVec(sz, op);
00638     while (--sz >= 0) {
00639       op->action = vec;
00640       op = op->then;
00641     }
00642   }
00643   return 1;
00644 }
00645 
00646 int
00647 ink_aio_writev(AIOCallback *op, int /* fromAPI ATS_UNUSED */) {
00648   DiskHandler *dh = this_ethread()->diskHandler;
00649   AIOCallback *io = op;
00650   int sz = 0;
00651 
00652   while (io) {
00653     io->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
00654     io->aiocb.aio_lio_opcode = IO_CMD_PWRITE;
00655     io->aiocb.data = io;
00656     dh->ready_list.enqueue(io);
00657     ++sz;
00658     io = io->then;
00659   }
00660 
00661   if (sz > 1) {
00662     ink_assert(op->action.continuation);
00663     AIOVec *vec = new AIOVec(sz, op);
00664     while (--sz >= 0) {
00665       op->action = vec;
00666       op = op->then;
00667     }
00668   }
00669   return 1;
00670 }
00671 #endif // AIO_MODE != AIO_MODE_NATIVE

Generated by  doxygen 1.7.1