00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 
00036 
00037 
00038 
00039 
00040 
00041 
00042 
00043 
00044 
00045 
00046 
00047 
00048 
00049 
00050 
00051 
00052 
00053 
00054 
00055 
00056 
00057 
00058 
00059 
00060 
00061 
00062 
00063 
00064 
00065 
00066 
00067 
00068 
00069 
00070 
00071 
00072 
00073 
00074 #include "PluginVC.h"
00075 #include "P_EventSystem.h"
00076 #include "P_Net.h"
00077 #include "Regression.h"
00078 
00079 #define PVC_LOCK_RETRY_TIME HRTIME_MSECONDS(10)
00080 #define PVC_DEFAULT_MAX_BYTES 32768
00081 #define MIN_BLOCK_TRANSFER_BYTES 128
00082 
00083 #define EVENT_PTR_LOCKED (void*) 0x1
00084 #define EVENT_PTR_CLOSED (void*) 0x2
00085 
00086 #define PVC_TYPE    ((vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive")
00087 
00088 PluginVC::PluginVC(PluginVCCore *core_obj)
00089   : NetVConnection(),magic(PLUGIN_VC_MAGIC_ALIVE), vc_type(PLUGIN_VC_UNKNOWN), core_obj(core_obj),
00090     other_side(NULL), read_state(), write_state(), need_read_process(false), need_write_process(false),
00091     closed(false), sm_lock_retry_event(NULL), core_lock_retry_event(NULL),
00092     deletable(false), reentrancy_count(0), active_timeout(0), active_event(NULL),
00093     inactive_timeout(0), inactive_timeout_at(0), inactive_event(NULL), plugin_tag(NULL), plugin_id(0)
00094 {
00095   ink_assert(core_obj != NULL);
00096   SET_HANDLER(&PluginVC::main_handler);
00097 }
00098 
00099 PluginVC::~PluginVC()
00100 {
00101   mutex = NULL;
00102 }
00103 
00104 int
00105 PluginVC::main_handler(int event, void *data)
00106 {
00107 
00108   Debug("pvc_event", "[%u] %s: Received event %d", core_obj->id, PVC_TYPE, event);
00109 
00110   ink_release_assert(event == EVENT_INTERVAL || event == EVENT_IMMEDIATE);
00111   ink_release_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00112   ink_assert(!deletable);
00113   ink_assert(data != NULL);
00114 
00115   Event *call_event = (Event *) data;
00116   EThread *my_ethread = mutex->thread_holding;
00117   ink_release_assert(my_ethread != NULL);
00118 
00119   bool read_mutex_held = false;
00120   bool write_mutex_held = false;
00121   Ptr<ProxyMutex> read_side_mutex = read_state.vio.mutex;
00122   Ptr<ProxyMutex> write_side_mutex = write_state.vio.mutex;
00123 
00124   if (read_side_mutex) {
00125     read_mutex_held = MUTEX_TAKE_TRY_LOCK(read_side_mutex, my_ethread);
00126 
00127     if (!read_mutex_held) {
00128       if (call_event != inactive_event)
00129         call_event->schedule_in(PVC_LOCK_RETRY_TIME);
00130       return 0;
00131     }
00132 
00133     if (read_side_mutex.m_ptr != read_state.vio.mutex.m_ptr) {
00134       
00135       
00136       Mutex_unlock(read_side_mutex, my_ethread);
00137       if (call_event != inactive_event)
00138         call_event->schedule_in(PVC_LOCK_RETRY_TIME);
00139       return 0;
00140     }
00141   }
00142 
00143   if (write_side_mutex) {
00144     write_mutex_held = MUTEX_TAKE_TRY_LOCK(write_side_mutex, my_ethread);
00145 
00146     if (!write_mutex_held) {
00147       if (read_mutex_held) {
00148         Mutex_unlock(read_side_mutex, my_ethread);
00149       }
00150       if (call_event != inactive_event)
00151         call_event->schedule_in(PVC_LOCK_RETRY_TIME);
00152       return 0;
00153     }
00154 
00155     if (write_side_mutex.m_ptr != write_state.vio.mutex.m_ptr) {
00156       
00157       
00158       Mutex_unlock(write_side_mutex, my_ethread);
00159       if (read_mutex_held) {
00160         Mutex_unlock(read_side_mutex, my_ethread);
00161       }
00162       if (call_event != inactive_event)
00163         call_event->schedule_in(PVC_LOCK_RETRY_TIME);
00164       return 0;
00165     }
00166   }
00167   
00168   
00169   ink_release_assert(reentrancy_count == 0);
00170 
00171   if (closed) {
00172     process_close();
00173 
00174     if (read_mutex_held) {
00175       Mutex_unlock(read_side_mutex, my_ethread);
00176     }
00177 
00178     if (write_mutex_held) {
00179       Mutex_unlock(write_side_mutex, my_ethread);
00180     }
00181 
00182     return 0;
00183   }
00184   
00185   
00186   
00187   
00188   reentrancy_count++;
00189 
00190   if (call_event == active_event) {
00191     process_timeout(call_event, VC_EVENT_ACTIVE_TIMEOUT, &active_event);
00192   } else if (call_event == inactive_event) {
00193     if (inactive_timeout_at && inactive_timeout_at < ink_get_hrtime()) {
00194       process_timeout(call_event, VC_EVENT_INACTIVITY_TIMEOUT, &inactive_event);
00195       call_event->cancel();
00196     }
00197   } else {
00198     if (call_event == sm_lock_retry_event) {
00199       sm_lock_retry_event = NULL;
00200     } else {
00201       ink_release_assert(call_event == core_lock_retry_event);
00202       core_lock_retry_event = NULL;
00203     }
00204 
00205     if (need_read_process) {
00206       process_read_side(false);
00207     }
00208 
00209     if (need_write_process && !closed) {
00210       process_write_side(false);
00211     }
00212 
00213   }
00214 
00215   reentrancy_count--;
00216   if (closed) {
00217     process_close();
00218   }
00219 
00220   if (read_mutex_held) {
00221     Mutex_unlock(read_side_mutex, my_ethread);
00222   }
00223 
00224   if (write_mutex_held) {
00225     Mutex_unlock(write_side_mutex, my_ethread);
00226   }
00227 
00228   return 0;
00229 }
00230 
00231 VIO *
00232 PluginVC::do_io_read(Continuation * c, int64_t nbytes, MIOBuffer * buf)
00233 {
00234 
00235   ink_assert(!closed);
00236   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00237 
00238   if (buf) {
00239     read_state.vio.buffer.writer_for(buf);
00240   } else {
00241     read_state.vio.buffer.clear();
00242   }
00243 
00244   
00245   
00246   read_state.vio.mutex = c->mutex;
00247   read_state.vio._cont = c;
00248   read_state.vio.nbytes = nbytes;
00249   read_state.vio.ndone = 0;
00250   read_state.vio.vc_server = (VConnection *) this;
00251   read_state.vio.op = VIO::READ;
00252 
00253   Debug("pvc", "[%u] %s: do_io_read for %" PRId64" bytes", core_obj->id, PVC_TYPE, nbytes);
00254 
00255   
00256   
00257   need_read_process = true;
00258   setup_event_cb(0, &sm_lock_retry_event);
00259 
00260   return &read_state.vio;
00261 }
00262 
00263 VIO *
00264 PluginVC::do_io_write(Continuation * c, int64_t nbytes, IOBufferReader * abuffer, bool owner)
00265 {
00266 
00267   ink_assert(!closed);
00268   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00269 
00270   if (abuffer) {
00271     ink_assert(!owner);
00272     write_state.vio.buffer.reader_for(abuffer);
00273   } else {
00274     write_state.vio.buffer.clear();
00275   }
00276 
00277   
00278   
00279   write_state.vio.mutex = c->mutex;
00280   write_state.vio._cont = c;
00281   write_state.vio.nbytes = nbytes;
00282   write_state.vio.ndone = 0;
00283   write_state.vio.vc_server = (VConnection *) this;
00284   write_state.vio.op = VIO::WRITE;
00285 
00286   Debug("pvc", "[%u] %s: do_io_write for %" PRId64" bytes", core_obj->id, PVC_TYPE, nbytes);
00287 
00288   
00289   
00290   need_write_process = true;
00291   setup_event_cb(0, &sm_lock_retry_event);
00292 
00293   return &write_state.vio;
00294 }
00295 
00296 void
00297 PluginVC::reenable(VIO * vio)
00298 {
00299 
00300   ink_assert(!closed);
00301   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00302   ink_assert(vio->mutex->thread_holding == this_ethread());
00303 
00304   Debug("pvc", "[%u] %s: reenable %s", core_obj->id, PVC_TYPE, (vio->op == VIO::WRITE) ? "Write" : "Read");
00305 
00306   if (vio->op == VIO::WRITE) {
00307     ink_assert(vio == &write_state.vio);
00308     need_write_process = true;
00309   } else if (vio->op == VIO::READ) {
00310     need_read_process = true;
00311   } else {
00312     ink_release_assert(0);
00313   }
00314   setup_event_cb(0, &sm_lock_retry_event);
00315 }
00316 
00317 void
00318 PluginVC::reenable_re(VIO * vio)
00319 {
00320 
00321   ink_assert(!closed);
00322   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00323   ink_assert(vio->mutex->thread_holding == this_ethread());
00324 
00325   Debug("pvc", "[%u] %s: reenable_re %s", core_obj->id, PVC_TYPE, (vio->op == VIO::WRITE) ? "Write" : "Read");
00326 
00327   MUTEX_TRY_LOCK(lock, this->mutex, this_ethread());
00328   if (!lock) {
00329     if (vio->op == VIO::WRITE) {
00330       need_write_process = true;
00331     } else {
00332       need_read_process = true;
00333     }
00334     setup_event_cb(PVC_LOCK_RETRY_TIME, &sm_lock_retry_event);
00335     return;
00336   }
00337 
00338   reentrancy_count++;
00339 
00340   if (vio->op == VIO::WRITE) {
00341     ink_assert(vio == &write_state.vio);
00342     process_write_side(false);
00343   } else if (vio->op == VIO::READ) {
00344     ink_assert(vio == &read_state.vio);
00345     process_read_side(false);
00346   } else {
00347     ink_release_assert(0);
00348   }
00349 
00350   reentrancy_count--;
00351 
00352   
00353   
00354   
00355   if (closed) {
00356     setup_event_cb(0, &sm_lock_retry_event);
00357   }
00358 }
00359 
00360 void
00361 PluginVC::do_io_close(int )
00362 {
00363   ink_assert(closed == false);
00364   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00365 
00366   Debug("pvc", "[%u] %s: do_io_close", core_obj->id, PVC_TYPE);
00367 
00368   if (reentrancy_count > 0) {
00369     
00370     
00371     
00372     
00373     closed = true;
00374     return;
00375   }
00376 
00377   MUTEX_TRY_LOCK(lock, mutex, this_ethread());
00378 
00379   if (!lock) {
00380     setup_event_cb(PVC_LOCK_RETRY_TIME, &sm_lock_retry_event);
00381     closed = true;
00382     return;
00383   } else {
00384     closed = true;
00385   }
00386 
00387   process_close();
00388 }
00389 
00390 void
00391 PluginVC::do_io_shutdown(ShutdownHowTo_t howto)
00392 {
00393 
00394   ink_assert(!closed);
00395   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00396 
00397   switch (howto) {
00398   case IO_SHUTDOWN_READ:
00399     read_state.shutdown = true;
00400     break;
00401   case IO_SHUTDOWN_WRITE:
00402     write_state.shutdown = true;
00403     break;
00404   case IO_SHUTDOWN_READWRITE:
00405     read_state.shutdown = true;
00406     write_state.shutdown = true;
00407     break;
00408   }
00409 }
00410 
00411 
00412 
00413 
00414 
00415 
00416 
00417 
00418 
00419 
00420 
00421 
00422 
00423 
00424 
00425 
00426 
00427 int64_t
00428 PluginVC::transfer_bytes(MIOBuffer * transfer_to, IOBufferReader * transfer_from, int64_t act_on)
00429 {
00430 
00431   int64_t total_added = 0;
00432 
00433   ink_assert(act_on <= transfer_from->read_avail());
00434 
00435   while (act_on > 0) {
00436     int64_t block_read_avail = transfer_from->block_read_avail();
00437     int64_t to_move = MIN(act_on, block_read_avail);
00438     int64_t moved = 0;
00439 
00440     if (to_move <= 0) {
00441       break;
00442     }
00443 
00444     if (to_move >= MIN_BLOCK_TRANSFER_BYTES) {
00445       moved = transfer_to->write(transfer_from, to_move, 0);
00446     } else {
00447       
00448       
00449       
00450       
00451       
00452       moved = transfer_to->write(transfer_from->start(), to_move);
00453 
00454       if (moved == 0) {
00455         
00456         break;
00457       }
00458     }
00459 
00460     act_on -= moved;
00461     transfer_from->consume(moved);
00462     total_added += moved;
00463   }
00464 
00465   return total_added;
00466 }
00467 
00468 
00469 
00470 
00471 
00472 
00473 
00474 
00475 
00476 void
00477 PluginVC::process_write_side(bool other_side_call)
00478 {
00479 
00480   ink_assert(!deletable);
00481   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00482 
00483   MIOBuffer *core_buffer = (vc_type == PLUGIN_VC_ACTIVE) ? core_obj->a_to_p_buffer : core_obj->p_to_a_buffer;
00484 
00485   need_write_process = false;
00486 
00487   if (write_state.vio.op != VIO::WRITE || closed || write_state.shutdown) {
00488     return;
00489   }
00490   
00491   EThread *my_ethread = mutex->thread_holding;
00492   ink_assert(my_ethread != NULL);
00493   MUTEX_TRY_LOCK(lock, write_state.vio.mutex, my_ethread);
00494   if (!lock) {
00495     Debug("pvc_event", "[%u] %s: process_write_side lock miss, retrying", core_obj->id, PVC_TYPE);
00496 
00497     need_write_process = true;
00498     setup_event_cb(PVC_LOCK_RETRY_TIME, &core_lock_retry_event);
00499     return;
00500   }
00501 
00502   Debug("pvc", "[%u] %s: process_write_side", core_obj->id, PVC_TYPE);
00503   need_write_process = false;
00504 
00505 
00506   
00507   int64_t ntodo = write_state.vio.ntodo();
00508   if (ntodo == 0) {
00509     return;
00510   }
00511 
00512   IOBufferReader *reader = write_state.vio.get_reader();
00513   int64_t bytes_avail = reader->read_avail();
00514   int64_t act_on = MIN(bytes_avail, ntodo);
00515 
00516   Debug("pvc", "[%u] %s: process_write_side; act_on %" PRId64"", core_obj->id, PVC_TYPE, act_on);
00517 
00518   if (other_side->closed || other_side->read_state.shutdown) {
00519     write_state.vio._cont->handleEvent(VC_EVENT_ERROR, &write_state.vio);
00520     return;
00521   }
00522 
00523   if (act_on <= 0) {
00524     if (ntodo > 0) {
00525       
00526       
00527       write_state.vio._cont->handleEvent(VC_EVENT_WRITE_READY, &write_state.vio);
00528     }
00529     return;
00530   }
00531   
00532   
00533   
00534   int64_t buf_space = PVC_DEFAULT_MAX_BYTES - core_buffer->max_read_avail();
00535   if (buf_space <= 0) {
00536     Debug("pvc", "[%u] %s: process_write_side no buffer space", core_obj->id, PVC_TYPE);
00537     return;
00538   }
00539   act_on = MIN(act_on, buf_space);
00540 
00541   int64_t added = transfer_bytes(core_buffer, reader, act_on);
00542   if (added < 0) {
00543     
00544     
00545     
00546     Debug("pvc", "[%u] %s: process_write_side out of buffer space", core_obj->id, PVC_TYPE);
00547     return;
00548   }
00549 
00550   write_state.vio.ndone += added;
00551 
00552   Debug("pvc", "[%u] %s: process_write_side; added %" PRId64"", core_obj->id, PVC_TYPE, added);
00553 
00554   if (write_state.vio.ntodo() == 0) {
00555     write_state.vio._cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &write_state.vio);
00556   } else {
00557     write_state.vio._cont->handleEvent(VC_EVENT_WRITE_READY, &write_state.vio);
00558   }
00559 
00560   update_inactive_time();
00561 
00562   
00563   if (!other_side->closed) {
00564     if (!other_side_call) {
00565       other_side->process_read_side(true);
00566     } else {
00567       other_side->read_state.vio.reenable();
00568     }
00569   }
00570 }
00571 
00572 
00573 
00574 
00575 
00576 
00577 
00578 
00579 
00580 
00581 void
00582 PluginVC::process_read_side(bool other_side_call)
00583 {
00584 
00585   ink_assert(!deletable);
00586   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00587 
00588   
00589   
00590 
00591   IOBufferReader *core_reader;
00592 
00593   if (vc_type == PLUGIN_VC_ACTIVE) {
00594     
00595     core_reader = core_obj->p_to_a_reader;
00596   } else {
00597     ink_assert(vc_type == PLUGIN_VC_PASSIVE);
00598     
00599     core_reader = core_obj->a_to_p_reader;
00600   }
00601 
00602   need_read_process = false;
00603 
00604   if (read_state.vio.op != VIO::READ || closed || read_state.shutdown) {
00605     return;
00606   }
00607   
00608   EThread *my_ethread = mutex->thread_holding;
00609   ink_assert(my_ethread != NULL);
00610   MUTEX_TRY_LOCK(lock, read_state.vio.mutex, my_ethread);
00611   if (!lock) {
00612     Debug("pvc_event", "[%u] %s: process_read_side lock miss, retrying", core_obj->id, PVC_TYPE);
00613 
00614     need_read_process = true;
00615     setup_event_cb(PVC_LOCK_RETRY_TIME, &core_lock_retry_event);
00616     return;
00617   }
00618 
00619   Debug("pvc", "[%u] %s: process_read_side", core_obj->id, PVC_TYPE);
00620   need_read_process = false;
00621 
00622   
00623   int64_t ntodo = read_state.vio.ntodo();
00624   if (ntodo == 0) {
00625     return;
00626   }
00627 
00628   int64_t bytes_avail = core_reader->read_avail();
00629   int64_t act_on = MIN(bytes_avail, ntodo);
00630 
00631   Debug("pvc", "[%u] %s: process_read_side; act_on %" PRId64"", core_obj->id, PVC_TYPE, act_on);
00632 
00633   if (act_on <= 0) {
00634     if (other_side->closed || other_side->write_state.shutdown) {
00635       read_state.vio._cont->handleEvent(VC_EVENT_EOS, &read_state.vio);
00636     }
00637     return;
00638   }
00639   
00640   
00641   
00642   MIOBuffer *output_buffer = read_state.vio.get_writer();
00643 
00644   int64_t water_mark = output_buffer->water_mark;
00645   water_mark = MAX(water_mark, PVC_DEFAULT_MAX_BYTES);
00646   int64_t buf_space = water_mark - output_buffer->max_read_avail();
00647   if (buf_space <= 0) {
00648     Debug("pvc", "[%u] %s: process_read_side no buffer space", core_obj->id, PVC_TYPE);
00649     return;
00650   }
00651   act_on = MIN(act_on, buf_space);
00652 
00653   int64_t added = transfer_bytes(output_buffer, core_reader, act_on);
00654   if (added <= 0) {
00655     
00656     
00657     
00658     Debug("pvc", "[%u] %s: process_read_side out of buffer space", core_obj->id, PVC_TYPE);
00659     return;
00660   }
00661 
00662   read_state.vio.ndone += added;
00663 
00664   Debug("pvc", "[%u] %s: process_read_side; added %" PRId64"", core_obj->id, PVC_TYPE, added);
00665 
00666   if (read_state.vio.ntodo() == 0) {
00667     read_state.vio._cont->handleEvent(VC_EVENT_READ_COMPLETE, &read_state.vio);
00668   } else {
00669     read_state.vio._cont->handleEvent(VC_EVENT_READ_READY, &read_state.vio);
00670   }
00671 
00672   update_inactive_time();
00673 
00674   
00675   
00676   if (!other_side->closed) {
00677     if (!other_side_call) {
00678       other_side->process_write_side(true);
00679     } else {
00680       other_side->write_state.vio.reenable();
00681     }
00682   }
00683 }
00684 
00685 
00686 
00687 
00688 
00689 
00690 
00691 
00692 void
00693 PluginVC::process_close()
00694 {
00695 
00696   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00697 
00698   Debug("pvc", "[%u] %s: process_close", core_obj->id, PVC_TYPE);
00699 
00700   if (!deletable) {
00701     deletable = true;
00702   }
00703 
00704   if (sm_lock_retry_event) {
00705     sm_lock_retry_event->cancel();
00706     sm_lock_retry_event = NULL;
00707   }
00708 
00709   if (core_lock_retry_event) {
00710     core_lock_retry_event->cancel();
00711     core_lock_retry_event = NULL;
00712   }
00713 
00714   if (active_event) {
00715     active_event->cancel();
00716     active_event = NULL;
00717   }
00718 
00719   if (inactive_event) {
00720     inactive_event->cancel();
00721     inactive_event = NULL;
00722     inactive_timeout_at = 0;
00723   }
00724   
00725   
00726   
00727   
00728   if (!other_side->closed && core_obj->connected) {
00729     other_side->need_write_process = true;
00730     other_side->need_read_process = true;
00731     other_side->setup_event_cb(0, &other_side->core_lock_retry_event);
00732   }
00733 
00734   core_obj->attempt_delete();
00735 }
00736 
00737 
00738 
00739 
00740 
00741 
00742 
00743 
00744 
00745 
00746 
00747 
00748 void
00749 PluginVC::process_timeout(Event * e, int event_to_send, Event ** our_eptr)
00750 {
00751 
00752   ink_assert(e = *our_eptr);
00753 
00754   if (read_state.vio.op == VIO::READ && !read_state.shutdown && read_state.vio.ntodo() > 0) {
00755     MUTEX_TRY_LOCK(lock, read_state.vio.mutex, e->ethread);
00756     if (!lock) {
00757       e->schedule_in(PVC_LOCK_RETRY_TIME);
00758       return;
00759     }
00760     *our_eptr = NULL;
00761     read_state.vio._cont->handleEvent(event_to_send, &read_state.vio);
00762   } else if (write_state.vio.op == VIO::WRITE && !write_state.shutdown && write_state.vio.ntodo() > 0) {
00763     MUTEX_TRY_LOCK(lock, write_state.vio.mutex, e->ethread);
00764     if (!lock) {
00765       e->schedule_in(PVC_LOCK_RETRY_TIME);
00766       return;
00767     }
00768     *our_eptr = NULL;
00769     write_state.vio._cont->handleEvent(event_to_send, &write_state.vio);
00770   } else {
00771     *our_eptr = NULL;
00772   }
00773 }
00774 
00775 void
00776 PluginVC::update_inactive_time()
00777 {
00778   if (inactive_event && inactive_timeout) {
00779     
00780     
00781     inactive_timeout_at = ink_get_hrtime() + inactive_timeout;
00782   }
00783 }
00784 
00785 
00786 
00787 
00788 
00789 
00790 
00791 void
00792 PluginVC::setup_event_cb(ink_hrtime in, Event ** e_ptr)
00793 {
00794 
00795   ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00796 
00797   if (*e_ptr == NULL) {
00798 
00799     
00800     
00801     if (in == 0) {
00802       if(this_ethread()->tt == REGULAR) {
00803          *e_ptr = this_ethread()->schedule_imm_local(this);
00804       }
00805       else
00806       {
00807          *e_ptr = eventProcessor.schedule_imm(this);
00808       }
00809     } 
00810     else 
00811     {
00812       if(this_ethread()->tt == REGULAR) {
00813         *e_ptr = this_ethread()->schedule_in_local(this,in);
00814       }
00815       else
00816       {
00817         *e_ptr = eventProcessor.schedule_in(this, in);
00818       }
00819     }
00820   }
00821 }
00822 
00823 void
00824 PluginVC::set_active_timeout(ink_hrtime timeout_in)
00825 {
00826   active_timeout = timeout_in;
00827 
00828   
00829   
00830   if (active_event) {
00831     ink_assert(!active_event->cancelled);
00832     active_event->cancel();
00833     active_event = NULL;
00834   }
00835 
00836   if (active_timeout > 0) {
00837     active_event = eventProcessor.schedule_in(this, active_timeout);
00838   }
00839 }
00840 
00841 void
00842 PluginVC::set_inactivity_timeout(ink_hrtime timeout_in)
00843 {
00844   inactive_timeout = timeout_in;
00845   if (inactive_timeout != 0) {
00846     inactive_timeout_at = ink_get_hrtime() + inactive_timeout;
00847     if (inactive_event == NULL) {
00848       inactive_event = eventProcessor.schedule_every(this, HRTIME_SECONDS(1));
00849     }
00850   } else {
00851     inactive_timeout_at = 0;
00852     if (inactive_event) {
00853       inactive_event->cancel();
00854       inactive_event = NULL;
00855     }
00856   }
00857 }
00858 
00859 void
00860 PluginVC::cancel_active_timeout()
00861 {
00862   set_active_timeout(0);
00863 }
00864 
00865 void
00866 PluginVC::cancel_inactivity_timeout()
00867 {
00868   set_inactivity_timeout(0);
00869 }
00870 
00871 ink_hrtime
00872 PluginVC::get_active_timeout()
00873 {
00874   return active_timeout;
00875 }
00876 
00877 ink_hrtime
00878 PluginVC::get_inactivity_timeout()
00879 {
00880   return inactive_timeout;
00881 }
00882 
00883 SOCKET
00884 PluginVC::get_socket()
00885 {
00886   return 0;
00887 }
00888 
00889 void
00890 PluginVC::set_local_addr()
00891 {
00892   if (vc_type == PLUGIN_VC_ACTIVE) {
00893     ats_ip_copy(&local_addr, &core_obj->active_addr_struct);
00894 
00895   } else {
00896     ats_ip_copy(&local_addr, &core_obj->passive_addr_struct);
00897 
00898   }
00899 }
00900 
00901 void
00902 PluginVC::set_remote_addr()
00903 {
00904   if (vc_type == PLUGIN_VC_ACTIVE) {
00905     ats_ip_copy(&remote_addr, &core_obj->passive_addr_struct);
00906   } else {
00907     ats_ip_copy(&remote_addr, &core_obj->active_addr_struct);
00908   }
00909 }
00910 
00911 int
00912 PluginVC::set_tcp_init_cwnd(int )
00913 {
00914   return -1;
00915 }
00916 
00917 void
00918 PluginVC::apply_options()
00919 {
00920   
00921 }
00922 
00923 bool
00924 PluginVC::get_data(int id, void *data)
00925 {
00926   if (data == NULL) {
00927     return false;
00928   }
00929   switch (id) {
00930   case PLUGIN_VC_DATA_LOCAL:
00931     if (vc_type == PLUGIN_VC_ACTIVE) {
00932       *(void **) data = core_obj->active_data;
00933     } else {
00934       *(void **) data = core_obj->passive_data;
00935     }
00936     return true;
00937   case PLUGIN_VC_DATA_REMOTE:
00938     if (vc_type == PLUGIN_VC_ACTIVE) {
00939       *(void **) data = core_obj->passive_data;
00940     } else {
00941       *(void **) data = core_obj->active_data;
00942     }
00943     return true;
00944   default:
00945     *(void **) data = NULL;
00946     return false;
00947   }
00948 }
00949 
00950 bool
00951 PluginVC::set_data(int id, void *data)
00952 {
00953   switch (id) {
00954   case PLUGIN_VC_DATA_LOCAL:
00955     if (vc_type == PLUGIN_VC_ACTIVE) {
00956       core_obj->active_data = data;
00957     } else {
00958       core_obj->passive_data = data;
00959     }
00960     return true;
00961   case PLUGIN_VC_DATA_REMOTE:
00962     if (vc_type == PLUGIN_VC_ACTIVE) {
00963       core_obj->passive_data = data;
00964     } else {
00965       core_obj->active_data = data;
00966     }
00967     return true;
00968   default:
00969     return false;
00970   }
00971 }
00972 
00973 
00974 
00975 vint32
00976   PluginVCCore::nextid = 0;
00977 
00978 PluginVCCore::~PluginVCCore()
00979 {
00980 }
00981 
00982 PluginVCCore *
00983 PluginVCCore::alloc()
00984 {
00985   PluginVCCore *pvc = new PluginVCCore;
00986   pvc->init();
00987   return pvc;
00988 }
00989 
00990 void
00991 PluginVCCore::init()
00992 {
00993   mutex = new_ProxyMutex();
00994 
00995   active_vc.vc_type = PLUGIN_VC_ACTIVE;
00996   active_vc.other_side = &passive_vc;
00997   active_vc.core_obj = this;
00998   active_vc.mutex = mutex;
00999   active_vc.thread = this_ethread();
01000 
01001   passive_vc.vc_type = PLUGIN_VC_PASSIVE;
01002   passive_vc.other_side = &active_vc;
01003   passive_vc.core_obj = this;
01004   passive_vc.mutex = mutex;
01005   passive_vc.thread = active_vc.thread;
01006 
01007   p_to_a_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_32K);
01008   p_to_a_reader = p_to_a_buffer->alloc_reader();
01009 
01010   a_to_p_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_32K);
01011   a_to_p_reader = a_to_p_buffer->alloc_reader();
01012 
01013   Debug("pvc", "[%u] Created PluginVCCore at %p, active %p, passive %p", id, this, &active_vc, &passive_vc);
01014 }
01015 
01016 void
01017 PluginVCCore::destroy()
01018 {
01019 
01020   Debug("pvc", "[%u] Destroying PluginVCCore at %p", id, this);
01021 
01022   ink_assert(active_vc.closed == true || !connected);
01023   active_vc.mutex = NULL;
01024   active_vc.read_state.vio.buffer.clear();
01025   active_vc.write_state.vio.buffer.clear();
01026   active_vc.magic = PLUGIN_VC_MAGIC_DEAD;
01027 
01028   ink_assert(passive_vc.closed == true || !connected);
01029   passive_vc.mutex = NULL;
01030   passive_vc.read_state.vio.buffer.clear();
01031   passive_vc.write_state.vio.buffer.clear();
01032   passive_vc.magic = PLUGIN_VC_MAGIC_DEAD;
01033 
01034   if (p_to_a_buffer) {
01035     free_MIOBuffer(p_to_a_buffer);
01036     p_to_a_buffer = NULL;
01037   }
01038 
01039   if (a_to_p_buffer) {
01040     free_MIOBuffer(a_to_p_buffer);
01041     a_to_p_buffer = NULL;
01042   }
01043 
01044   this->mutex = NULL;
01045   delete this;
01046 }
01047 
01048 void
01049 PluginVCCore::set_accept_cont(Continuation * c)
01050 {
01051   connect_to = c;
01052   
01053 }
01054 
01055 PluginVC *
01056 PluginVCCore::connect()
01057 {
01058 
01059   
01060   if (connect_to == NULL) {
01061     return NULL;
01062   }
01063 
01064   connected = true;
01065   state_send_accept(EVENT_IMMEDIATE, NULL);
01066 
01067   return &active_vc;
01068 }
01069 
01070 Action *
01071 PluginVCCore::connect_re(Continuation * c)
01072 {
01073 
01074   
01075   if (connect_to == NULL) {
01076     return NULL;
01077   }
01078 
01079   EThread *my_thread = this_ethread();
01080   MUTEX_TAKE_LOCK(this->mutex, my_thread);
01081 
01082   connected = true;
01083   state_send_accept(EVENT_IMMEDIATE, NULL);
01084 
01085   
01086   
01087   
01088   
01089 
01090   c->handleEvent(NET_EVENT_OPEN, &active_vc);
01091   MUTEX_UNTAKE_LOCK(this->mutex, my_thread);
01092 
01093   return ACTION_RESULT_DONE;
01094 }
01095 
01096 int
01097 PluginVCCore::state_send_accept_failed(int , void * )
01098 {
01099   MUTEX_TRY_LOCK(lock, connect_to->mutex, this_ethread());
01100 
01101   if (lock) {
01102     connect_to->handleEvent(NET_EVENT_ACCEPT_FAILED, NULL);
01103     destroy();
01104   } else {
01105     SET_HANDLER(&PluginVCCore::state_send_accept_failed);
01106     eventProcessor.schedule_in(this, PVC_LOCK_RETRY_TIME);
01107   }
01108 
01109   return 0;
01110 
01111 }
01112 
01113 int
01114 PluginVCCore::state_send_accept(int , void * )
01115 {
01116   MUTEX_TRY_LOCK(lock, connect_to->mutex, this_ethread());
01117 
01118   if (lock) {
01119     connect_to->handleEvent(NET_EVENT_ACCEPT, &passive_vc);
01120   } else {
01121     SET_HANDLER(&PluginVCCore::state_send_accept);
01122     eventProcessor.schedule_in(this, PVC_LOCK_RETRY_TIME);
01123   }
01124 
01125   return 0;
01126 }
01127 
01128 
01129 
01130 
01131 
01132 
01133 void
01134 PluginVCCore::attempt_delete()
01135 {
01136 
01137   if (active_vc.deletable) {
01138     if (passive_vc.deletable) {
01139       destroy();
01140     } else if (!connected) {
01141       state_send_accept_failed(EVENT_IMMEDIATE, NULL);
01142     }
01143   }
01144 }
01145 
01146 
01147 
01148 
01149 
01150 
01151 void
01152 PluginVCCore::kill_no_connect()
01153 {
01154   ink_assert(!connected);
01155   ink_assert(!active_vc.closed);
01156   active_vc.do_io_close();
01157 }
01158 
01159 void
01160 PluginVCCore::set_passive_addr(in_addr_t ip, int port)
01161 {
01162   ats_ip4_set(&passive_addr_struct, htonl(ip), htons(port));
01163 }
01164 
01165 void
01166 PluginVCCore::set_passive_addr(sockaddr const* ip)
01167 {
01168   passive_addr_struct.assign(ip);
01169 }
01170 
01171 void
01172 PluginVCCore::set_active_addr(in_addr_t ip, int port)
01173 {
01174   ats_ip4_set(&active_addr_struct, htonl(ip), htons(port));
01175 }
01176 
01177 void
01178 PluginVCCore::set_active_addr(sockaddr const* ip)
01179 {
01180   active_addr_struct.assign(ip);
01181 }
01182 
01183 void
01184 PluginVCCore::set_passive_data(void *data)
01185 {
01186   passive_data = data;
01187 }
01188 
01189 void
01190 PluginVCCore::set_active_data(void *data)
01191 {
01192   active_data = data;
01193 }
01194 
01195 void
01196 PluginVCCore::set_transparent(bool passive_side, bool active_side)
01197 {
01198   passive_vc.set_is_transparent(passive_side);
01199   active_vc.set_is_transparent(active_side);
01200 }
01201 
01202 void
01203 PluginVCCore::set_plugin_id(int64_t id)
01204 {
01205   passive_vc.plugin_id = active_vc.plugin_id = id;
01206 }
01207 
01208 void
01209 PluginVCCore::set_plugin_tag(char const* tag)
01210 {
01211   passive_vc.plugin_tag = active_vc.plugin_tag = tag;
01212 }
01213 
01214 
01215 
01216 
01217 
01218 
01219 
01220 #if TS_HAS_TESTS
01221 class PVCTestDriver:public NetTestDriver
01222 {
01223 public:
01224   PVCTestDriver();
01225   ~PVCTestDriver();
01226 
01227   void start_tests(RegressionTest * r_arg, int *pstatus_arg);
01228   void run_next_test();
01229   int main_handler(int event, void *data);
01230 
01231 private:
01232   unsigned i;
01233   unsigned completions_received;
01234 };
01235 
01236 PVCTestDriver::PVCTestDriver():
01237 NetTestDriver(), i(0), completions_received(0)
01238 {
01239 }
01240 
01241 PVCTestDriver::~PVCTestDriver()
01242 {
01243   mutex = NULL;
01244 }
01245 
01246 void
01247 PVCTestDriver::start_tests(RegressionTest * r_arg, int *pstatus_arg)
01248 {
01249   mutex = new_ProxyMutex();
01250   MUTEX_TRY_LOCK(lock, mutex, this_ethread());
01251 
01252   r = r_arg;
01253   pstatus = pstatus_arg;
01254 
01255   run_next_test();
01256 
01257   SET_HANDLER(&PVCTestDriver::main_handler);
01258 }
01259 
01260 void
01261 PVCTestDriver::run_next_test()
01262 {
01263 
01264   unsigned a_index = i * 2;
01265   unsigned p_index = a_index + 1;
01266 
01267   if (p_index >= num_netvc_tests) {
01268     
01269     if (errors == 0) {
01270       *pstatus = REGRESSION_TEST_PASSED;
01271     } else {
01272       *pstatus = REGRESSION_TEST_FAILED;
01273     }
01274     delete this;
01275     return;
01276   }
01277   completions_received = 0;
01278   i++;
01279 
01280   Debug("pvc_test", "Starting test %s", netvc_tests_def[a_index].test_name);
01281 
01282   NetVCTest *p = new NetVCTest;
01283   NetVCTest *a = new NetVCTest;
01284   PluginVCCore *core = PluginVCCore::alloc();
01285   core->set_accept_cont(p);
01286 
01287   p->init_test(NET_VC_TEST_PASSIVE, this, NULL, r, &netvc_tests_def[p_index], "PluginVC", "pvc_test_detail");
01288   PluginVC *a_vc = core->connect();
01289 
01290   a->init_test(NET_VC_TEST_ACTIVE, this, a_vc, r, &netvc_tests_def[a_index], "PluginVC", "pvc_test_detail");
01291 }
01292 
01293 int
01294 PVCTestDriver::main_handler(int , void * )
01295 {
01296   completions_received++;
01297 
01298   if (completions_received == 2) {
01299     run_next_test();
01300   }
01301 
01302   return 0;
01303 }
01304 
01305 EXCLUSIVE_REGRESSION_TEST(PVC) (RegressionTest * t, int , int *pstatus)
01306 {
01307   PVCTestDriver *driver = new PVCTestDriver;
01308   driver->start_tests(t, pstatus);
01309 }
01310 #endif