• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

PluginVC.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 /****************************************************************************
00025 
00026    PluginVC.cc
00027 
00028    Description: Allows bi-directional transfer for data from one
00029       continuation to another via a mechanism that impersonates a
00030       NetVC.  Should implement all external attributes of NetVConnections.
00031 
00032    Since data is transfered within Traffic Server, this is a two
00033    headed beast.  One NetVC on initiating side (active side) and
00034    one NetVC on the receiving side (passive side).
00035 
00036    The two NetVC subclasses, PluginVC, are part PluginVCCore object.  All
00037    three objects share the same mutex.  That mutex is required
00038    for doing operations that affect the shared buffers,
00039    read state from the PluginVC on the other side or deal with deallocation.
00040 
00041    To simplify the code, all data passing through the system goes initially
00042    into a shared buffer.  There are two shared buffers, one for each
00043    direction of the connection.  While it's more efficient to transfer
00044    the data from one buffer to another directly, this creates a lot
00045    of tricky conditions since you must be holding the lock for both
00046    sides, in additional this VC's lock.  Additionally, issues like
00047    watermarks are very hard to deal with.  Since we try to
00048    to move data by IOBufferData references the efficiency penalty shouldn't
00049    be too bad and if it is a big penalty, a brave soul can reimplement
00050    to move the data directly without the intermediate buffer.
00051 
00052    Locking is difficult issue for this multi-headed beast.  In each
00053    PluginVC, there a two locks. The one we got from our PluginVCCore and
00054    the lock from the state machine using the PluginVC.  The read side
00055    lock & the write side lock must be the same.  The regular net processor has
00056    this constraint as well.  In order to handle scheduling of retry events cleanly,
00057    we have two event pointers, one for each lock.  sm_lock_retry_event can only
00058    be changed while holding the using state machine's lock and
00059    core_lock_retry_event can only be manipulated while holding the PluginVC's
00060    lock.  On entry to PluginVC::main_handler, we obtain all the locks
00061    before looking at the events.  If we can't get all the locks
00062    we reschedule the event for further retries.  Since all the locks are
00063    obtained in the beginning of the handler, we know we are running
00064    exclusively in the later parts of the handler and we will
00065    be free from do_io or reenable calls on the PluginVC.
00066 
00067    The assumption is made (consistent with IO Core spec) that any close,
00068    shutdown, reenable, or do_io_{read,write) operation is done by the callee
00069    while holding the lock for that side of the operation.
00070 
00071 
00072  ****************************************************************************/
00073 
00074 #include "PluginVC.h"
00075 #include "P_EventSystem.h"
00076 #include "P_Net.h"
00077 #include "Regression.h"
00078 
00079 #define PVC_LOCK_RETRY_TIME HRTIME_MSECONDS(10)
00080 #define PVC_DEFAULT_MAX_BYTES 32768
00081 #define MIN_BLOCK_TRANSFER_BYTES 128
00082 
00083 #define EVENT_PTR_LOCKED (void*) 0x1
00084 #define EVENT_PTR_CLOSED (void*) 0x2
00085 
00086 #define PVC_TYPE    ((vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive")
00087 
00088 PluginVC::PluginVC(PluginVCCore *core_obj)
00089   : NetVConnection(),magic(PLUGIN_VC_MAGIC_ALIVE), vc_type(PLUGIN_VC_UNKNOWN), core_obj(core_obj),
00090     other_side(NULL), read_state(), write_state(), need_read_process(false), need_write_process(false),
00091     closed(false), sm_lock_retry_event(NULL), core_lock_retry_event(NULL),
00092     deletable(false), reentrancy_count(0), active_timeout(0), active_event(NULL),
00093     inactive_timeout(0), inactive_timeout_at(0), inactive_event(NULL), plugin_tag(NULL), plugin_id(0)
00094 {
00095   ink_assert(core_obj != NULL);
00096   SET_HANDLER(&PluginVC::main_handler);
00097 }
00098 
00099 PluginVC::~PluginVC()
00100 {
00101   mutex = NULL;
00102 }
00103 
00104 int
00105 PluginVC::main_handler(int event, void *data)
00106 {
00107 
00108   Debug("pvc_event", "[%u] %s: Received event %d", core_obj->id, PVC_TYPE, event);
00109 
00110   ink_release_assert(event == EVENT_INTERVAL || event == EVENT_IMMEDIATE);
00111   ink_release_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00112   ink_assert(!deletable);
00113   ink_assert(data != NULL);
00114 
00115   Event *call_event = (Event *) data;
00116   EThread *my_ethread = mutex->thread_holding;
00117   ink_release_assert(my_ethread != NULL);
00118 
00119   bool read_mutex_held = false;
00120   bool write_mutex_held = false;
00121   Ptr<ProxyMutex> read_side_mutex = read_state.vio.mutex;
00122   Ptr<ProxyMutex> write_side_mutex = write_state.vio.mutex;
00123 
00124   if (read_side_mutex) {
00125     read_mutex_held = MUTEX_TAKE_TRY_LOCK(read_side_mutex, my_ethread);
00126 
00127     if (!read_mutex_held) {
00128       if (call_event != inactive_event)
00129         call_event->schedule_in(PVC_LOCK_RETRY_TIME);
00130       return 0;
00131     }
00132 
00133     if (read_side_mutex.m_ptr != read_state.vio.mutex.m_ptr) {
00134       // It's possible some swapped the mutex on us before
00135       //  we were able to grab it
00136       Mutex_unlock(read_side_mutex, my_ethread);
00137       if (call_event != inactive_event)
00138         call_event->schedule_in(PVC_LOCK_RETRY_TIME);
00139       return 0;
00140     }
00141   }
00142 
00143   if (write_side_mutex) {
00144     write_mutex_held = MUTEX_TAKE_TRY_LOCK(write_side_mutex, my_ethread);
00145 
00146     if (!write_mutex_held) {
00147       if (read_mutex_held) {
00148         Mutex_unlock(read_side_mutex, my_ethread);
00149       }
00150       if (call_event != inactive_event)
00151         call_event->schedule_in(PVC_LOCK_RETRY_TIME);
00152       return 0;
00153     }
00154 
00155     if (write_side_mutex.m_ptr != write_state.vio.mutex.m_ptr) {
00156       // It's possible some swapped the mutex on us before
00157       //  we were able to grab it
00158       Mutex_unlock(write_side_mutex, my_ethread);
00159       if (read_mutex_held) {
00160         Mutex_unlock(read_side_mutex, my_ethread);
00161       }
00162       if (call_event != inactive_event)
00163         call_event->schedule_in(PVC_LOCK_RETRY_TIME);
00164       return 0;
00165     }
00166   }
00167   // We've got all the locks so there should not be any
00168   //   other calls active
00169   ink_release_assert(reentrancy_count == 0);
00170 
00171   if (closed) {
00172     process_close();
00173 
00174     if (read_mutex_held) {
00175       Mutex_unlock(read_side_mutex, my_ethread);
00176     }
00177 
00178     if (write_mutex_held) {
00179       Mutex_unlock(write_side_mutex, my_ethread);
00180     }
00181 
00182     return 0;
00183   }
00184   // We can get closed while we're calling back the
00185   //  continuation.  Set the reentrancy count so we know
00186   //  we could be calling the continuation and that we
00187   //  need to defer close processing
00188   reentrancy_count++;
00189 
00190   if (call_event == active_event) {
00191     process_timeout(call_event, VC_EVENT_ACTIVE_TIMEOUT, &active_event);
00192   } else if (call_event == inactive_event) {
00193     if (inactive_timeout_at && inactive_timeout_at < ink_get_hrtime()) {
00194       process_timeout(call_event, VC_EVENT_INACTIVITY_TIMEOUT, &inactive_event);
00195       call_event->cancel();
00196     }
00197   } else {
00198     if (call_event == sm_lock_retry_event) {
00199       sm_lock_retry_event = NULL;
00200     } else {
00201       ink_release_assert(call_event == core_lock_retry_event);
00202       core_lock_retry_event = NULL;
00203     }
00204 
00205     if (need_read_process) {
00206       process_read_side(false);
00207     }
00208 
00209     if (need_write_process && !closed) {
00210       process_write_side(false);
00211     }
00212 
00213   }
00214 
00215   reentrancy_count--;
00216   if (closed) {
00217     process_close();
00218   }
00219 
00220   if (read_mutex_held) {
00221     Mutex_unlock(read_side_mutex, my_ethread);
00222   }
00223 
00224   if (write_mutex_held) {
00225     Mutex_unlock(write_side_mutex, my_ethread);
00226   }
00227 
00228   return 0;
00229 }
00230 
00231 VIO *
00232 PluginVC::do_io_read(Continuation * c, int64_t nbytes, MIOBuffer * buf)
00233 {
00234 
00235   ink_assert(!closed);
00236   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00237 
00238   if (buf) {
00239     read_state.vio.buffer.writer_for(buf);
00240   } else {
00241     read_state.vio.buffer.clear();
00242   }
00243 
00244   // Note: we set vio.op last because process_read_side looks at it to
00245   //  tell if the VConnection is active.
00246   read_state.vio.mutex = c->mutex;
00247   read_state.vio._cont = c;
00248   read_state.vio.nbytes = nbytes;
00249   read_state.vio.ndone = 0;
00250   read_state.vio.vc_server = (VConnection *) this;
00251   read_state.vio.op = VIO::READ;
00252 
00253   Debug("pvc", "[%u] %s: do_io_read for %" PRId64" bytes", core_obj->id, PVC_TYPE, nbytes);
00254 
00255   // Since reentrant callbacks are not allowed on from do_io
00256   //   functions schedule ourselves get on a different stack
00257   need_read_process = true;
00258   setup_event_cb(0, &sm_lock_retry_event);
00259 
00260   return &read_state.vio;
00261 }
00262 
00263 VIO *
00264 PluginVC::do_io_write(Continuation * c, int64_t nbytes, IOBufferReader * abuffer, bool owner)
00265 {
00266 
00267   ink_assert(!closed);
00268   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00269 
00270   if (abuffer) {
00271     ink_assert(!owner);
00272     write_state.vio.buffer.reader_for(abuffer);
00273   } else {
00274     write_state.vio.buffer.clear();
00275   }
00276 
00277   // Note: we set vio.op last because process_write_side looks at it to
00278   //  tell if the VConnection is active.
00279   write_state.vio.mutex = c->mutex;
00280   write_state.vio._cont = c;
00281   write_state.vio.nbytes = nbytes;
00282   write_state.vio.ndone = 0;
00283   write_state.vio.vc_server = (VConnection *) this;
00284   write_state.vio.op = VIO::WRITE;
00285 
00286   Debug("pvc", "[%u] %s: do_io_write for %" PRId64" bytes", core_obj->id, PVC_TYPE, nbytes);
00287 
00288   // Since reentrant callbacks are not allowed on from do_io
00289   //   functions schedule ourselves get on a different stack
00290   need_write_process = true;
00291   setup_event_cb(0, &sm_lock_retry_event);
00292 
00293   return &write_state.vio;
00294 }
00295 
00296 void
00297 PluginVC::reenable(VIO * vio)
00298 {
00299 
00300   ink_assert(!closed);
00301   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00302   ink_assert(vio->mutex->thread_holding == this_ethread());
00303 
00304   Debug("pvc", "[%u] %s: reenable %s", core_obj->id, PVC_TYPE, (vio->op == VIO::WRITE) ? "Write" : "Read");
00305 
00306   if (vio->op == VIO::WRITE) {
00307     ink_assert(vio == &write_state.vio);
00308     need_write_process = true;
00309   } else if (vio->op == VIO::READ) {
00310     need_read_process = true;
00311   } else {
00312     ink_release_assert(0);
00313   }
00314   setup_event_cb(0, &sm_lock_retry_event);
00315 }
00316 
00317 void
00318 PluginVC::reenable_re(VIO * vio)
00319 {
00320 
00321   ink_assert(!closed);
00322   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00323   ink_assert(vio->mutex->thread_holding == this_ethread());
00324 
00325   Debug("pvc", "[%u] %s: reenable_re %s", core_obj->id, PVC_TYPE, (vio->op == VIO::WRITE) ? "Write" : "Read");
00326 
00327   MUTEX_TRY_LOCK(lock, this->mutex, this_ethread());
00328   if (!lock) {
00329     if (vio->op == VIO::WRITE) {
00330       need_write_process = true;
00331     } else {
00332       need_read_process = true;
00333     }
00334     setup_event_cb(PVC_LOCK_RETRY_TIME, &sm_lock_retry_event);
00335     return;
00336   }
00337 
00338   reentrancy_count++;
00339 
00340   if (vio->op == VIO::WRITE) {
00341     ink_assert(vio == &write_state.vio);
00342     process_write_side(false);
00343   } else if (vio->op == VIO::READ) {
00344     ink_assert(vio == &read_state.vio);
00345     process_read_side(false);
00346   } else {
00347     ink_release_assert(0);
00348   }
00349 
00350   reentrancy_count--;
00351 
00352   // To process the close, we need the lock
00353   //   for the PluginVC.  Schedule an event
00354   //   to make sure we get it
00355   if (closed) {
00356     setup_event_cb(0, &sm_lock_retry_event);
00357   }
00358 }
00359 
00360 void
00361 PluginVC::do_io_close(int /* flag ATS_UNUSED */)
00362 {
00363   ink_assert(closed == false);
00364   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00365 
00366   Debug("pvc", "[%u] %s: do_io_close", core_obj->id, PVC_TYPE);
00367 
00368   if (reentrancy_count > 0) {
00369     // Do nothing since dealloacting ourselves
00370     //  now will lead to us running on a dead
00371     //  PluginVC since we are being called
00372     //  reentrantly
00373     closed = true;
00374     return;
00375   }
00376 
00377   MUTEX_TRY_LOCK(lock, mutex, this_ethread());
00378 
00379   if (!lock) {
00380     setup_event_cb(PVC_LOCK_RETRY_TIME, &sm_lock_retry_event);
00381     closed = true;
00382     return;
00383   } else {
00384     closed = true;
00385   }
00386 
00387   process_close();
00388 }
00389 
00390 void
00391 PluginVC::do_io_shutdown(ShutdownHowTo_t howto)
00392 {
00393 
00394   ink_assert(!closed);
00395   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00396 
00397   switch (howto) {
00398   case IO_SHUTDOWN_READ:
00399     read_state.shutdown = true;
00400     break;
00401   case IO_SHUTDOWN_WRITE:
00402     write_state.shutdown = true;
00403     break;
00404   case IO_SHUTDOWN_READWRITE:
00405     read_state.shutdown = true;
00406     write_state.shutdown = true;
00407     break;
00408   }
00409 }
00410 
00411 // int PluginVC::transfer_bytes(MIOBuffer* transfer_to,
00412 //                              IOBufferReader* transfer_from, int act_on)
00413 //
00414 //   Takes care of transfering bytes from a reader to another buffer
00415 //      In the case of large transfers, we move blocks.  In the case
00416 //      of small transfers we copy data so as to not build too many
00417 //      buffer blocks
00418 //
00419 // Args:
00420 //   transfer_to:  buffer to copy to
00421 //   transfer_from:  buffer_copy_from
00422 //   act_on: is the max number of bytes we are to copy.  There must
00423 //          be at least act_on bytes available from transfer_from
00424 //
00425 // Returns number of bytes transfered
00426 //
00427 int64_t
00428 PluginVC::transfer_bytes(MIOBuffer * transfer_to, IOBufferReader * transfer_from, int64_t act_on)
00429 {
00430 
00431   int64_t total_added = 0;
00432 
00433   ink_assert(act_on <= transfer_from->read_avail());
00434 
00435   while (act_on > 0) {
00436     int64_t block_read_avail = transfer_from->block_read_avail();
00437     int64_t to_move = MIN(act_on, block_read_avail);
00438     int64_t moved = 0;
00439 
00440     if (to_move <= 0) {
00441       break;
00442     }
00443 
00444     if (to_move >= MIN_BLOCK_TRANSFER_BYTES) {
00445       moved = transfer_to->write(transfer_from, to_move, 0);
00446     } else {
00447       // We have a really small amount of data.  To make
00448       //  sure we don't get a huge build up of blocks which
00449       //  can lead to stack overflows if the buffer is destroyed
00450       //  before we read from it, we need copy over to the new
00451       //  buffer instead of doing a block transfer
00452       moved = transfer_to->write(transfer_from->start(), to_move);
00453 
00454       if (moved == 0) {
00455         // We are out of buffer space
00456         break;
00457       }
00458     }
00459 
00460     act_on -= moved;
00461     transfer_from->consume(moved);
00462     total_added += moved;
00463   }
00464 
00465   return total_added;
00466 }
00467 
00468 // void PluginVC::process_write_side(bool cb_ok)
00469 //
00470 //   This function may only be called while holding
00471 //      this->mutex & while it is ok to callback the
00472 //      write side continuation
00473 //
00474 //   Does write side processing
00475 //
00476 void
00477 PluginVC::process_write_side(bool other_side_call)
00478 {
00479 
00480   ink_assert(!deletable);
00481   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00482 
00483   MIOBuffer *core_buffer = (vc_type == PLUGIN_VC_ACTIVE) ? core_obj->a_to_p_buffer : core_obj->p_to_a_buffer;
00484 
00485   need_write_process = false;
00486 
00487   if (write_state.vio.op != VIO::WRITE || closed || write_state.shutdown) {
00488     return;
00489   }
00490   // Acquire the lock of the write side continuation
00491   EThread *my_ethread = mutex->thread_holding;
00492   ink_assert(my_ethread != NULL);
00493   MUTEX_TRY_LOCK(lock, write_state.vio.mutex, my_ethread);
00494   if (!lock) {
00495     Debug("pvc_event", "[%u] %s: process_write_side lock miss, retrying", core_obj->id, PVC_TYPE);
00496 
00497     need_write_process = true;
00498     setup_event_cb(PVC_LOCK_RETRY_TIME, &core_lock_retry_event);
00499     return;
00500   }
00501 
00502   Debug("pvc", "[%u] %s: process_write_side", core_obj->id, PVC_TYPE);
00503   need_write_process = false;
00504 
00505 
00506   // Check the state of our write buffer as well as ntodo
00507   int64_t ntodo = write_state.vio.ntodo();
00508   if (ntodo == 0) {
00509     return;
00510   }
00511 
00512   IOBufferReader *reader = write_state.vio.get_reader();
00513   int64_t bytes_avail = reader->read_avail();
00514   int64_t act_on = MIN(bytes_avail, ntodo);
00515 
00516   Debug("pvc", "[%u] %s: process_write_side; act_on %" PRId64"", core_obj->id, PVC_TYPE, act_on);
00517 
00518   if (other_side->closed || other_side->read_state.shutdown) {
00519     write_state.vio._cont->handleEvent(VC_EVENT_ERROR, &write_state.vio);
00520     return;
00521   }
00522 
00523   if (act_on <= 0) {
00524     if (ntodo > 0) {
00525       // Notify the continuation that we are "disabling"
00526       //  ourselves due to to nothing to write
00527       write_state.vio._cont->handleEvent(VC_EVENT_WRITE_READY, &write_state.vio);
00528     }
00529     return;
00530   }
00531   // Bytes available, try to transfer to the PluginVCCore
00532   //   intermediate buffer
00533   //
00534   int64_t buf_space = PVC_DEFAULT_MAX_BYTES - core_buffer->max_read_avail();
00535   if (buf_space <= 0) {
00536     Debug("pvc", "[%u] %s: process_write_side no buffer space", core_obj->id, PVC_TYPE);
00537     return;
00538   }
00539   act_on = MIN(act_on, buf_space);
00540 
00541   int64_t added = transfer_bytes(core_buffer, reader, act_on);
00542   if (added < 0) {
00543     // Couldn't actually get the buffer space.  This only
00544     //   happens on small transfers with the above
00545     //   PVC_DEFAULT_MAX_BYTES factor doesn't apply
00546     Debug("pvc", "[%u] %s: process_write_side out of buffer space", core_obj->id, PVC_TYPE);
00547     return;
00548   }
00549 
00550   write_state.vio.ndone += added;
00551 
00552   Debug("pvc", "[%u] %s: process_write_side; added %" PRId64"", core_obj->id, PVC_TYPE, added);
00553 
00554   if (write_state.vio.ntodo() == 0) {
00555     write_state.vio._cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &write_state.vio);
00556   } else {
00557     write_state.vio._cont->handleEvent(VC_EVENT_WRITE_READY, &write_state.vio);
00558   }
00559 
00560   update_inactive_time();
00561 
00562   // Wake up the read side on the other side to process these bytes
00563   if (!other_side->closed) {
00564     if (!other_side_call) {
00565       other_side->process_read_side(true);
00566     } else {
00567       other_side->read_state.vio.reenable();
00568     }
00569   }
00570 }
00571 
00572 
00573 // void PluginVC::process_read_side()
00574 //
00575 //   This function may only be called while holding
00576 //      this->mutex & while it is ok to callback the
00577 //      read side continuation
00578 //
00579 //   Does read side processing
00580 //
00581 void
00582 PluginVC::process_read_side(bool other_side_call)
00583 {
00584 
00585   ink_assert(!deletable);
00586   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00587 
00588   // TODO: Never used??
00589   //MIOBuffer *core_buffer;
00590 
00591   IOBufferReader *core_reader;
00592 
00593   if (vc_type == PLUGIN_VC_ACTIVE) {
00594     //core_buffer = core_obj->p_to_a_buffer;
00595     core_reader = core_obj->p_to_a_reader;
00596   } else {
00597     ink_assert(vc_type == PLUGIN_VC_PASSIVE);
00598     //core_buffer = core_obj->a_to_p_buffer;
00599     core_reader = core_obj->a_to_p_reader;
00600   }
00601 
00602   need_read_process = false;
00603 
00604   if (read_state.vio.op != VIO::READ || closed || read_state.shutdown) {
00605     return;
00606   }
00607   // Acquire the lock of the read side continuation
00608   EThread *my_ethread = mutex->thread_holding;
00609   ink_assert(my_ethread != NULL);
00610   MUTEX_TRY_LOCK(lock, read_state.vio.mutex, my_ethread);
00611   if (!lock) {
00612     Debug("pvc_event", "[%u] %s: process_read_side lock miss, retrying", core_obj->id, PVC_TYPE);
00613 
00614     need_read_process = true;
00615     setup_event_cb(PVC_LOCK_RETRY_TIME, &core_lock_retry_event);
00616     return;
00617   }
00618 
00619   Debug("pvc", "[%u] %s: process_read_side", core_obj->id, PVC_TYPE);
00620   need_read_process = false;
00621 
00622   // Check the state of our read buffer as well as ntodo
00623   int64_t ntodo = read_state.vio.ntodo();
00624   if (ntodo == 0) {
00625     return;
00626   }
00627 
00628   int64_t bytes_avail = core_reader->read_avail();
00629   int64_t act_on = MIN(bytes_avail, ntodo);
00630 
00631   Debug("pvc", "[%u] %s: process_read_side; act_on %" PRId64"", core_obj->id, PVC_TYPE, act_on);
00632 
00633   if (act_on <= 0) {
00634     if (other_side->closed || other_side->write_state.shutdown) {
00635       read_state.vio._cont->handleEvent(VC_EVENT_EOS, &read_state.vio);
00636     }
00637     return;
00638   }
00639   // Bytes available, try to transfer from the PluginVCCore
00640   //   intermediate buffer
00641   //
00642   MIOBuffer *output_buffer = read_state.vio.get_writer();
00643 
00644   int64_t water_mark = output_buffer->water_mark;
00645   water_mark = MAX(water_mark, PVC_DEFAULT_MAX_BYTES);
00646   int64_t buf_space = water_mark - output_buffer->max_read_avail();
00647   if (buf_space <= 0) {
00648     Debug("pvc", "[%u] %s: process_read_side no buffer space", core_obj->id, PVC_TYPE);
00649     return;
00650   }
00651   act_on = MIN(act_on, buf_space);
00652 
00653   int64_t added = transfer_bytes(output_buffer, core_reader, act_on);
00654   if (added <= 0) {
00655     // Couldn't actually get the buffer space.  This only
00656     //   happens on small transfers with the above
00657     //   PVC_DEFAULT_MAX_BYTES factor doesn't apply
00658     Debug("pvc", "[%u] %s: process_read_side out of buffer space", core_obj->id, PVC_TYPE);
00659     return;
00660   }
00661 
00662   read_state.vio.ndone += added;
00663 
00664   Debug("pvc", "[%u] %s: process_read_side; added %" PRId64"", core_obj->id, PVC_TYPE, added);
00665 
00666   if (read_state.vio.ntodo() == 0) {
00667     read_state.vio._cont->handleEvent(VC_EVENT_READ_COMPLETE, &read_state.vio);
00668   } else {
00669     read_state.vio._cont->handleEvent(VC_EVENT_READ_READY, &read_state.vio);
00670   }
00671 
00672   update_inactive_time();
00673 
00674   // Wake up the other side so it knows there is space available in
00675   //  intermediate buffer
00676   if (!other_side->closed) {
00677     if (!other_side_call) {
00678       other_side->process_write_side(true);
00679     } else {
00680       other_side->write_state.vio.reenable();
00681     }
00682   }
00683 }
00684 
00685 // void PluginVC::process_read_close()
00686 //
00687 //   This function may only be called while holding
00688 //      this->mutex
00689 //
00690 //   Tries to close the and dealloc the the vc
00691 //
00692 void
00693 PluginVC::process_close()
00694 {
00695 
00696   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00697 
00698   Debug("pvc", "[%u] %s: process_close", core_obj->id, PVC_TYPE);
00699 
00700   if (!deletable) {
00701     deletable = true;
00702   }
00703 
00704   if (sm_lock_retry_event) {
00705     sm_lock_retry_event->cancel();
00706     sm_lock_retry_event = NULL;
00707   }
00708 
00709   if (core_lock_retry_event) {
00710     core_lock_retry_event->cancel();
00711     core_lock_retry_event = NULL;
00712   }
00713 
00714   if (active_event) {
00715     active_event->cancel();
00716     active_event = NULL;
00717   }
00718 
00719   if (inactive_event) {
00720     inactive_event->cancel();
00721     inactive_event = NULL;
00722     inactive_timeout_at = 0;
00723   }
00724   // If the other side of the PluginVC is not closed
00725   //  we need to force it process both living sides
00726   //  of the connection in order that it recognizes
00727   //  the close
00728   if (!other_side->closed && core_obj->connected) {
00729     other_side->need_write_process = true;
00730     other_side->need_read_process = true;
00731     other_side->setup_event_cb(0, &other_side->core_lock_retry_event);
00732   }
00733 
00734   core_obj->attempt_delete();
00735 }
00736 
00737 // void PluginVC::process_timeout(Event* e, int event_to_send, Event** our_eptr)
00738 //
00739 //   Handles sending timeout event to the VConnection.  e is the event we got
00740 //     which indicats the timeout.  event_to_send is the event to the
00741 //     vc user.  Our_eptr is a pointer our event either inactive_event,
00742 //     or active_event.  If we successfully send the timeout to vc user,
00743 //     we clear the pointer, otherwise we reschedule it.
00744 //
00745 //   Because the possibility of reentrant close from vc user, we don't want to
00746 //      touch any state after making the call back
00747 //
00748 void
00749 PluginVC::process_timeout(Event * e, int event_to_send, Event ** our_eptr)
00750 {
00751 
00752   ink_assert(e = *our_eptr);
00753 
00754   if (read_state.vio.op == VIO::READ && !read_state.shutdown && read_state.vio.ntodo() > 0) {
00755     MUTEX_TRY_LOCK(lock, read_state.vio.mutex, e->ethread);
00756     if (!lock) {
00757       e->schedule_in(PVC_LOCK_RETRY_TIME);
00758       return;
00759     }
00760     *our_eptr = NULL;
00761     read_state.vio._cont->handleEvent(event_to_send, &read_state.vio);
00762   } else if (write_state.vio.op == VIO::WRITE && !write_state.shutdown && write_state.vio.ntodo() > 0) {
00763     MUTEX_TRY_LOCK(lock, write_state.vio.mutex, e->ethread);
00764     if (!lock) {
00765       e->schedule_in(PVC_LOCK_RETRY_TIME);
00766       return;
00767     }
00768     *our_eptr = NULL;
00769     write_state.vio._cont->handleEvent(event_to_send, &write_state.vio);
00770   } else {
00771     *our_eptr = NULL;
00772   }
00773 }
00774 
00775 void
00776 PluginVC::update_inactive_time()
00777 {
00778   if (inactive_event && inactive_timeout) {
00779     //inactive_event->cancel();
00780     //inactive_event = eventProcessor.schedule_in(this, inactive_timeout);
00781     inactive_timeout_at = ink_get_hrtime() + inactive_timeout;
00782   }
00783 }
00784 
00785 // void PluginVC::setup_event_cb(ink_hrtime in)
00786 //
00787 //    Setup up the event processor to call us back.
00788 //      We've got two different event pointers to handle
00789 //      locking issues
00790 //
00791 void
00792 PluginVC::setup_event_cb(ink_hrtime in, Event ** e_ptr)
00793 {
00794 
00795   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00796 
00797   if (*e_ptr == NULL) {
00798 
00799     // We locked the pointer so we can now allocate an event
00800     //   to call us back
00801     if (in == 0) {
00802       if(this_ethread()->tt == REGULAR) {
00803          *e_ptr = this_ethread()->schedule_imm_local(this);
00804       }
00805       else
00806       {
00807          *e_ptr = eventProcessor.schedule_imm(this);
00808       }
00809     } 
00810     else 
00811     {
00812       if(this_ethread()->tt == REGULAR) {
00813         *e_ptr = this_ethread()->schedule_in_local(this,in);
00814       }
00815       else
00816       {
00817         *e_ptr = eventProcessor.schedule_in(this, in);
00818       }
00819     }
00820   }
00821 }
00822 
00823 void
00824 PluginVC::set_active_timeout(ink_hrtime timeout_in)
00825 {
00826   active_timeout = timeout_in;
00827 
00828   // FIX - Do we need to handle the case where the timeout is set
00829   //   but no io has been done?
00830   if (active_event) {
00831     ink_assert(!active_event->cancelled);
00832     active_event->cancel();
00833     active_event = NULL;
00834   }
00835 
00836   if (active_timeout > 0) {
00837     active_event = eventProcessor.schedule_in(this, active_timeout);
00838   }
00839 }
00840 
00841 void
00842 PluginVC::set_inactivity_timeout(ink_hrtime timeout_in)
00843 {
00844   inactive_timeout = timeout_in;
00845   if (inactive_timeout != 0) {
00846     inactive_timeout_at = ink_get_hrtime() + inactive_timeout;
00847     if (inactive_event == NULL) {
00848       inactive_event = eventProcessor.schedule_every(this, HRTIME_SECONDS(1));
00849     }
00850   } else {
00851     inactive_timeout_at = 0;
00852     if (inactive_event) {
00853       inactive_event->cancel();
00854       inactive_event = NULL;
00855     }
00856   }
00857 }
00858 
00859 void
00860 PluginVC::cancel_active_timeout()
00861 {
00862   set_active_timeout(0);
00863 }
00864 
00865 void
00866 PluginVC::cancel_inactivity_timeout()
00867 {
00868   set_inactivity_timeout(0);
00869 }
00870 
00871 ink_hrtime
00872 PluginVC::get_active_timeout()
00873 {
00874   return active_timeout;
00875 }
00876 
00877 ink_hrtime
00878 PluginVC::get_inactivity_timeout()
00879 {
00880   return inactive_timeout;
00881 }
00882 
00883 SOCKET
00884 PluginVC::get_socket()
00885 {
00886   return 0;
00887 }
00888 
00889 void
00890 PluginVC::set_local_addr()
00891 {
00892   if (vc_type == PLUGIN_VC_ACTIVE) {
00893     ats_ip_copy(&local_addr, &core_obj->active_addr_struct);
00894 //    local_addr = core_obj->active_addr_struct;
00895   } else {
00896     ats_ip_copy(&local_addr, &core_obj->passive_addr_struct);
00897 //    local_addr = core_obj->passive_addr_struct;
00898   }
00899 }
00900 
00901 void
00902 PluginVC::set_remote_addr()
00903 {
00904   if (vc_type == PLUGIN_VC_ACTIVE) {
00905     ats_ip_copy(&remote_addr, &core_obj->passive_addr_struct);
00906   } else {
00907     ats_ip_copy(&remote_addr, &core_obj->active_addr_struct);
00908   }
00909 }
00910 
00911 int
00912 PluginVC::set_tcp_init_cwnd(int /* init_cwnd ATS_UNUSED */)
00913 {
00914   return -1;
00915 }
00916 
00917 void
00918 PluginVC::apply_options()
00919 {
00920   // do nothing
00921 }
00922 
00923 bool
00924 PluginVC::get_data(int id, void *data)
00925 {
00926   if (data == NULL) {
00927     return false;
00928   }
00929   switch (id) {
00930   case PLUGIN_VC_DATA_LOCAL:
00931     if (vc_type == PLUGIN_VC_ACTIVE) {
00932       *(void **) data = core_obj->active_data;
00933     } else {
00934       *(void **) data = core_obj->passive_data;
00935     }
00936     return true;
00937   case PLUGIN_VC_DATA_REMOTE:
00938     if (vc_type == PLUGIN_VC_ACTIVE) {
00939       *(void **) data = core_obj->passive_data;
00940     } else {
00941       *(void **) data = core_obj->active_data;
00942     }
00943     return true;
00944   default:
00945     *(void **) data = NULL;
00946     return false;
00947   }
00948 }
00949 
00950 bool
00951 PluginVC::set_data(int id, void *data)
00952 {
00953   switch (id) {
00954   case PLUGIN_VC_DATA_LOCAL:
00955     if (vc_type == PLUGIN_VC_ACTIVE) {
00956       core_obj->active_data = data;
00957     } else {
00958       core_obj->passive_data = data;
00959     }
00960     return true;
00961   case PLUGIN_VC_DATA_REMOTE:
00962     if (vc_type == PLUGIN_VC_ACTIVE) {
00963       core_obj->passive_data = data;
00964     } else {
00965       core_obj->active_data = data;
00966     }
00967     return true;
00968   default:
00969     return false;
00970   }
00971 }
00972 
00973 // PluginVCCore
00974 
00975 vint32
00976   PluginVCCore::nextid = 0;
00977 
00978 PluginVCCore::~PluginVCCore()
00979 {
00980 }
00981 
00982 PluginVCCore *
00983 PluginVCCore::alloc()
00984 {
00985   PluginVCCore *pvc = new PluginVCCore;
00986   pvc->init();
00987   return pvc;
00988 }
00989 
00990 void
00991 PluginVCCore::init()
00992 {
00993   mutex = new_ProxyMutex();
00994 
00995   active_vc.vc_type = PLUGIN_VC_ACTIVE;
00996   active_vc.other_side = &passive_vc;
00997   active_vc.core_obj = this;
00998   active_vc.mutex = mutex;
00999   active_vc.thread = this_ethread();
01000 
01001   passive_vc.vc_type = PLUGIN_VC_PASSIVE;
01002   passive_vc.other_side = &active_vc;
01003   passive_vc.core_obj = this;
01004   passive_vc.mutex = mutex;
01005   passive_vc.thread = active_vc.thread;
01006 
01007   p_to_a_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_32K);
01008   p_to_a_reader = p_to_a_buffer->alloc_reader();
01009 
01010   a_to_p_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_32K);
01011   a_to_p_reader = a_to_p_buffer->alloc_reader();
01012 
01013   Debug("pvc", "[%u] Created PluginVCCore at %p, active %p, passive %p", id, this, &active_vc, &passive_vc);
01014 }
01015 
01016 void
01017 PluginVCCore::destroy()
01018 {
01019 
01020   Debug("pvc", "[%u] Destroying PluginVCCore at %p", id, this);
01021 
01022   ink_assert(active_vc.closed == true || !connected);
01023   active_vc.mutex = NULL;
01024   active_vc.read_state.vio.buffer.clear();
01025   active_vc.write_state.vio.buffer.clear();
01026   active_vc.magic = PLUGIN_VC_MAGIC_DEAD;
01027 
01028   ink_assert(passive_vc.closed == true || !connected);
01029   passive_vc.mutex = NULL;
01030   passive_vc.read_state.vio.buffer.clear();
01031   passive_vc.write_state.vio.buffer.clear();
01032   passive_vc.magic = PLUGIN_VC_MAGIC_DEAD;
01033 
01034   if (p_to_a_buffer) {
01035     free_MIOBuffer(p_to_a_buffer);
01036     p_to_a_buffer = NULL;
01037   }
01038 
01039   if (a_to_p_buffer) {
01040     free_MIOBuffer(a_to_p_buffer);
01041     a_to_p_buffer = NULL;
01042   }
01043 
01044   this->mutex = NULL;
01045   delete this;
01046 }
01047 
01048 void
01049 PluginVCCore::set_accept_cont(Continuation * c)
01050 {
01051   connect_to = c;
01052   // FIX ME - must return action
01053 }
01054 
01055 PluginVC *
01056 PluginVCCore::connect()
01057 {
01058 
01059   // Make sure there is another end to connect to
01060   if (connect_to == NULL) {
01061     return NULL;
01062   }
01063 
01064   connected = true;
01065   state_send_accept(EVENT_IMMEDIATE, NULL);
01066 
01067   return &active_vc;
01068 }
01069 
01070 Action *
01071 PluginVCCore::connect_re(Continuation * c)
01072 {
01073 
01074   // Make sure there is another end to connect to
01075   if (connect_to == NULL) {
01076     return NULL;
01077   }
01078 
01079   EThread *my_thread = this_ethread();
01080   MUTEX_TAKE_LOCK(this->mutex, my_thread);
01081 
01082   connected = true;
01083   state_send_accept(EVENT_IMMEDIATE, NULL);
01084 
01085   // We have to take out our mutex because rest of the
01086   //   system expects the VC mutex to held when calling back.
01087   // We can use take lock here instead of try lock because the
01088   //   lock should never already be held.
01089 
01090   c->handleEvent(NET_EVENT_OPEN, &active_vc);
01091   MUTEX_UNTAKE_LOCK(this->mutex, my_thread);
01092 
01093   return ACTION_RESULT_DONE;
01094 }
01095 
01096 int
01097 PluginVCCore::state_send_accept_failed(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
01098 {
01099   MUTEX_TRY_LOCK(lock, connect_to->mutex, this_ethread());
01100 
01101   if (lock) {
01102     connect_to->handleEvent(NET_EVENT_ACCEPT_FAILED, NULL);
01103     destroy();
01104   } else {
01105     SET_HANDLER(&PluginVCCore::state_send_accept_failed);
01106     eventProcessor.schedule_in(this, PVC_LOCK_RETRY_TIME);
01107   }
01108 
01109   return 0;
01110 
01111 }
01112 
01113 int
01114 PluginVCCore::state_send_accept(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
01115 {
01116   MUTEX_TRY_LOCK(lock, connect_to->mutex, this_ethread());
01117 
01118   if (lock) {
01119     connect_to->handleEvent(NET_EVENT_ACCEPT, &passive_vc);
01120   } else {
01121     SET_HANDLER(&PluginVCCore::state_send_accept);
01122     eventProcessor.schedule_in(this, PVC_LOCK_RETRY_TIME);
01123   }
01124 
01125   return 0;
01126 }
01127 
01128 
01129 // void PluginVCCore::attempt_delete()
01130 //
01131 //  Mutex must be held when calling this function
01132 //
01133 void
01134 PluginVCCore::attempt_delete()
01135 {
01136 
01137   if (active_vc.deletable) {
01138     if (passive_vc.deletable) {
01139       destroy();
01140     } else if (!connected) {
01141       state_send_accept_failed(EVENT_IMMEDIATE, NULL);
01142     }
01143   }
01144 }
01145 
01146 // void PluginVCCore::kill_no_connect()
01147 //
01148 //   Called to kill the PluginVCCore when the
01149 //     connect call hasn't been made yet
01150 //
01151 void
01152 PluginVCCore::kill_no_connect()
01153 {
01154   ink_assert(!connected);
01155   ink_assert(!active_vc.closed);
01156   active_vc.do_io_close();
01157 }
01158 
01159 void
01160 PluginVCCore::set_passive_addr(in_addr_t ip, int port)
01161 {
01162   ats_ip4_set(&passive_addr_struct, htonl(ip), htons(port));
01163 }
01164 
01165 void
01166 PluginVCCore::set_passive_addr(sockaddr const* ip)
01167 {
01168   passive_addr_struct.assign(ip);
01169 }
01170 
01171 void
01172 PluginVCCore::set_active_addr(in_addr_t ip, int port)
01173 {
01174   ats_ip4_set(&active_addr_struct, htonl(ip), htons(port));
01175 }
01176 
01177 void
01178 PluginVCCore::set_active_addr(sockaddr const* ip)
01179 {
01180   active_addr_struct.assign(ip);
01181 }
01182 
01183 void
01184 PluginVCCore::set_passive_data(void *data)
01185 {
01186   passive_data = data;
01187 }
01188 
01189 void
01190 PluginVCCore::set_active_data(void *data)
01191 {
01192   active_data = data;
01193 }
01194 
01195 void
01196 PluginVCCore::set_transparent(bool passive_side, bool active_side)
01197 {
01198   passive_vc.set_is_transparent(passive_side);
01199   active_vc.set_is_transparent(active_side);
01200 }
01201 
01202 void
01203 PluginVCCore::set_plugin_id(int64_t id)
01204 {
01205   passive_vc.plugin_id = active_vc.plugin_id = id;
01206 }
01207 
01208 void
01209 PluginVCCore::set_plugin_tag(char const* tag)
01210 {
01211   passive_vc.plugin_tag = active_vc.plugin_tag = tag;
01212 }
01213 
01214 /*************************************************************
01215  *
01216  *   REGRESSION TEST STUFF
01217  *
01218  **************************************************************/
01219 
01220 #if TS_HAS_TESTS
01221 class PVCTestDriver:public NetTestDriver
01222 {
01223 public:
01224   PVCTestDriver();
01225   ~PVCTestDriver();
01226 
01227   void start_tests(RegressionTest * r_arg, int *pstatus_arg);
01228   void run_next_test();
01229   int main_handler(int event, void *data);
01230 
01231 private:
01232   unsigned i;
01233   unsigned completions_received;
01234 };
01235 
01236 PVCTestDriver::PVCTestDriver():
01237 NetTestDriver(), i(0), completions_received(0)
01238 {
01239 }
01240 
01241 PVCTestDriver::~PVCTestDriver()
01242 {
01243   mutex = NULL;
01244 }
01245 
01246 void
01247 PVCTestDriver::start_tests(RegressionTest * r_arg, int *pstatus_arg)
01248 {
01249   mutex = new_ProxyMutex();
01250   MUTEX_TRY_LOCK(lock, mutex, this_ethread());
01251 
01252   r = r_arg;
01253   pstatus = pstatus_arg;
01254 
01255   run_next_test();
01256 
01257   SET_HANDLER(&PVCTestDriver::main_handler);
01258 }
01259 
01260 void
01261 PVCTestDriver::run_next_test()
01262 {
01263 
01264   unsigned a_index = i * 2;
01265   unsigned p_index = a_index + 1;
01266 
01267   if (p_index >= num_netvc_tests) {
01268     // We are done - // FIX - PASS or FAIL?
01269     if (errors == 0) {
01270       *pstatus = REGRESSION_TEST_PASSED;
01271     } else {
01272       *pstatus = REGRESSION_TEST_FAILED;
01273     }
01274     delete this;
01275     return;
01276   }
01277   completions_received = 0;
01278   i++;
01279 
01280   Debug("pvc_test", "Starting test %s", netvc_tests_def[a_index].test_name);
01281 
01282   NetVCTest *p = new NetVCTest;
01283   NetVCTest *a = new NetVCTest;
01284   PluginVCCore *core = PluginVCCore::alloc();
01285   core->set_accept_cont(p);
01286 
01287   p->init_test(NET_VC_TEST_PASSIVE, this, NULL, r, &netvc_tests_def[p_index], "PluginVC", "pvc_test_detail");
01288   PluginVC *a_vc = core->connect();
01289 
01290   a->init_test(NET_VC_TEST_ACTIVE, this, a_vc, r, &netvc_tests_def[a_index], "PluginVC", "pvc_test_detail");
01291 }
01292 
01293 int
01294 PVCTestDriver::main_handler(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
01295 {
01296   completions_received++;
01297 
01298   if (completions_received == 2) {
01299     run_next_test();
01300   }
01301 
01302   return 0;
01303 }
01304 
01305 EXCLUSIVE_REGRESSION_TEST(PVC) (RegressionTest * t, int /* atype ATS_UNUSED */, int *pstatus)
01306 {
01307   PVCTestDriver *driver = new PVCTestDriver;
01308   driver->start_tests(t, pstatus);
01309 }
01310 #endif

Generated by  doxygen 1.7.1