00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074 #include "PluginVC.h"
00075 #include "P_EventSystem.h"
00076 #include "P_Net.h"
00077 #include "Regression.h"
00078
00079 #define PVC_LOCK_RETRY_TIME HRTIME_MSECONDS(10)
00080 #define PVC_DEFAULT_MAX_BYTES 32768
00081 #define MIN_BLOCK_TRANSFER_BYTES 128
00082
00083 #define EVENT_PTR_LOCKED (void*) 0x1
00084 #define EVENT_PTR_CLOSED (void*) 0x2
00085
00086 #define PVC_TYPE ((vc_type == PLUGIN_VC_ACTIVE) ? "Active" : "Passive")
00087
00088 PluginVC::PluginVC(PluginVCCore *core_obj)
00089 : NetVConnection(),magic(PLUGIN_VC_MAGIC_ALIVE), vc_type(PLUGIN_VC_UNKNOWN), core_obj(core_obj),
00090 other_side(NULL), read_state(), write_state(), need_read_process(false), need_write_process(false),
00091 closed(false), sm_lock_retry_event(NULL), core_lock_retry_event(NULL),
00092 deletable(false), reentrancy_count(0), active_timeout(0), active_event(NULL),
00093 inactive_timeout(0), inactive_timeout_at(0), inactive_event(NULL), plugin_tag(NULL), plugin_id(0)
00094 {
00095 ink_assert(core_obj != NULL);
00096 SET_HANDLER(&PluginVC::main_handler);
00097 }
00098
00099 PluginVC::~PluginVC()
00100 {
00101 mutex = NULL;
00102 }
00103
00104 int
00105 PluginVC::main_handler(int event, void *data)
00106 {
00107
00108 Debug("pvc_event", "[%u] %s: Received event %d", core_obj->id, PVC_TYPE, event);
00109
00110 ink_release_assert(event == EVENT_INTERVAL || event == EVENT_IMMEDIATE);
00111 ink_release_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00112 ink_assert(!deletable);
00113 ink_assert(data != NULL);
00114
00115 Event *call_event = (Event *) data;
00116 EThread *my_ethread = mutex->thread_holding;
00117 ink_release_assert(my_ethread != NULL);
00118
00119 bool read_mutex_held = false;
00120 bool write_mutex_held = false;
00121 Ptr<ProxyMutex> read_side_mutex = read_state.vio.mutex;
00122 Ptr<ProxyMutex> write_side_mutex = write_state.vio.mutex;
00123
00124 if (read_side_mutex) {
00125 read_mutex_held = MUTEX_TAKE_TRY_LOCK(read_side_mutex, my_ethread);
00126
00127 if (!read_mutex_held) {
00128 if (call_event != inactive_event)
00129 call_event->schedule_in(PVC_LOCK_RETRY_TIME);
00130 return 0;
00131 }
00132
00133 if (read_side_mutex.m_ptr != read_state.vio.mutex.m_ptr) {
00134
00135
00136 Mutex_unlock(read_side_mutex, my_ethread);
00137 if (call_event != inactive_event)
00138 call_event->schedule_in(PVC_LOCK_RETRY_TIME);
00139 return 0;
00140 }
00141 }
00142
00143 if (write_side_mutex) {
00144 write_mutex_held = MUTEX_TAKE_TRY_LOCK(write_side_mutex, my_ethread);
00145
00146 if (!write_mutex_held) {
00147 if (read_mutex_held) {
00148 Mutex_unlock(read_side_mutex, my_ethread);
00149 }
00150 if (call_event != inactive_event)
00151 call_event->schedule_in(PVC_LOCK_RETRY_TIME);
00152 return 0;
00153 }
00154
00155 if (write_side_mutex.m_ptr != write_state.vio.mutex.m_ptr) {
00156
00157
00158 Mutex_unlock(write_side_mutex, my_ethread);
00159 if (read_mutex_held) {
00160 Mutex_unlock(read_side_mutex, my_ethread);
00161 }
00162 if (call_event != inactive_event)
00163 call_event->schedule_in(PVC_LOCK_RETRY_TIME);
00164 return 0;
00165 }
00166 }
00167
00168
00169 ink_release_assert(reentrancy_count == 0);
00170
00171 if (closed) {
00172 process_close();
00173
00174 if (read_mutex_held) {
00175 Mutex_unlock(read_side_mutex, my_ethread);
00176 }
00177
00178 if (write_mutex_held) {
00179 Mutex_unlock(write_side_mutex, my_ethread);
00180 }
00181
00182 return 0;
00183 }
00184
00185
00186
00187
00188 reentrancy_count++;
00189
00190 if (call_event == active_event) {
00191 process_timeout(call_event, VC_EVENT_ACTIVE_TIMEOUT, &active_event);
00192 } else if (call_event == inactive_event) {
00193 if (inactive_timeout_at && inactive_timeout_at < ink_get_hrtime()) {
00194 process_timeout(call_event, VC_EVENT_INACTIVITY_TIMEOUT, &inactive_event);
00195 call_event->cancel();
00196 }
00197 } else {
00198 if (call_event == sm_lock_retry_event) {
00199 sm_lock_retry_event = NULL;
00200 } else {
00201 ink_release_assert(call_event == core_lock_retry_event);
00202 core_lock_retry_event = NULL;
00203 }
00204
00205 if (need_read_process) {
00206 process_read_side(false);
00207 }
00208
00209 if (need_write_process && !closed) {
00210 process_write_side(false);
00211 }
00212
00213 }
00214
00215 reentrancy_count--;
00216 if (closed) {
00217 process_close();
00218 }
00219
00220 if (read_mutex_held) {
00221 Mutex_unlock(read_side_mutex, my_ethread);
00222 }
00223
00224 if (write_mutex_held) {
00225 Mutex_unlock(write_side_mutex, my_ethread);
00226 }
00227
00228 return 0;
00229 }
00230
00231 VIO *
00232 PluginVC::do_io_read(Continuation * c, int64_t nbytes, MIOBuffer * buf)
00233 {
00234
00235 ink_assert(!closed);
00236 ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00237
00238 if (buf) {
00239 read_state.vio.buffer.writer_for(buf);
00240 } else {
00241 read_state.vio.buffer.clear();
00242 }
00243
00244
00245
00246 read_state.vio.mutex = c->mutex;
00247 read_state.vio._cont = c;
00248 read_state.vio.nbytes = nbytes;
00249 read_state.vio.ndone = 0;
00250 read_state.vio.vc_server = (VConnection *) this;
00251 read_state.vio.op = VIO::READ;
00252
00253 Debug("pvc", "[%u] %s: do_io_read for %" PRId64" bytes", core_obj->id, PVC_TYPE, nbytes);
00254
00255
00256
00257 need_read_process = true;
00258 setup_event_cb(0, &sm_lock_retry_event);
00259
00260 return &read_state.vio;
00261 }
00262
00263 VIO *
00264 PluginVC::do_io_write(Continuation * c, int64_t nbytes, IOBufferReader * abuffer, bool owner)
00265 {
00266
00267 ink_assert(!closed);
00268 ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00269
00270 if (abuffer) {
00271 ink_assert(!owner);
00272 write_state.vio.buffer.reader_for(abuffer);
00273 } else {
00274 write_state.vio.buffer.clear();
00275 }
00276
00277
00278
00279 write_state.vio.mutex = c->mutex;
00280 write_state.vio._cont = c;
00281 write_state.vio.nbytes = nbytes;
00282 write_state.vio.ndone = 0;
00283 write_state.vio.vc_server = (VConnection *) this;
00284 write_state.vio.op = VIO::WRITE;
00285
00286 Debug("pvc", "[%u] %s: do_io_write for %" PRId64" bytes", core_obj->id, PVC_TYPE, nbytes);
00287
00288
00289
00290 need_write_process = true;
00291 setup_event_cb(0, &sm_lock_retry_event);
00292
00293 return &write_state.vio;
00294 }
00295
00296 void
00297 PluginVC::reenable(VIO * vio)
00298 {
00299
00300 ink_assert(!closed);
00301 ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00302 ink_assert(vio->mutex->thread_holding == this_ethread());
00303
00304 Debug("pvc", "[%u] %s: reenable %s", core_obj->id, PVC_TYPE, (vio->op == VIO::WRITE) ? "Write" : "Read");
00305
00306 if (vio->op == VIO::WRITE) {
00307 ink_assert(vio == &write_state.vio);
00308 need_write_process = true;
00309 } else if (vio->op == VIO::READ) {
00310 need_read_process = true;
00311 } else {
00312 ink_release_assert(0);
00313 }
00314 setup_event_cb(0, &sm_lock_retry_event);
00315 }
00316
00317 void
00318 PluginVC::reenable_re(VIO * vio)
00319 {
00320
00321 ink_assert(!closed);
00322 ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00323 ink_assert(vio->mutex->thread_holding == this_ethread());
00324
00325 Debug("pvc", "[%u] %s: reenable_re %s", core_obj->id, PVC_TYPE, (vio->op == VIO::WRITE) ? "Write" : "Read");
00326
00327 MUTEX_TRY_LOCK(lock, this->mutex, this_ethread());
00328 if (!lock) {
00329 if (vio->op == VIO::WRITE) {
00330 need_write_process = true;
00331 } else {
00332 need_read_process = true;
00333 }
00334 setup_event_cb(PVC_LOCK_RETRY_TIME, &sm_lock_retry_event);
00335 return;
00336 }
00337
00338 reentrancy_count++;
00339
00340 if (vio->op == VIO::WRITE) {
00341 ink_assert(vio == &write_state.vio);
00342 process_write_side(false);
00343 } else if (vio->op == VIO::READ) {
00344 ink_assert(vio == &read_state.vio);
00345 process_read_side(false);
00346 } else {
00347 ink_release_assert(0);
00348 }
00349
00350 reentrancy_count--;
00351
00352
00353
00354
00355 if (closed) {
00356 setup_event_cb(0, &sm_lock_retry_event);
00357 }
00358 }
00359
00360 void
00361 PluginVC::do_io_close(int )
00362 {
00363 ink_assert(closed == false);
00364 ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00365
00366 Debug("pvc", "[%u] %s: do_io_close", core_obj->id, PVC_TYPE);
00367
00368 if (reentrancy_count > 0) {
00369
00370
00371
00372
00373 closed = true;
00374 return;
00375 }
00376
00377 MUTEX_TRY_LOCK(lock, mutex, this_ethread());
00378
00379 if (!lock) {
00380 setup_event_cb(PVC_LOCK_RETRY_TIME, &sm_lock_retry_event);
00381 closed = true;
00382 return;
00383 } else {
00384 closed = true;
00385 }
00386
00387 process_close();
00388 }
00389
00390 void
00391 PluginVC::do_io_shutdown(ShutdownHowTo_t howto)
00392 {
00393
00394 ink_assert(!closed);
00395 ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00396
00397 switch (howto) {
00398 case IO_SHUTDOWN_READ:
00399 read_state.shutdown = true;
00400 break;
00401 case IO_SHUTDOWN_WRITE:
00402 write_state.shutdown = true;
00403 break;
00404 case IO_SHUTDOWN_READWRITE:
00405 read_state.shutdown = true;
00406 write_state.shutdown = true;
00407 break;
00408 }
00409 }
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427 int64_t
00428 PluginVC::transfer_bytes(MIOBuffer * transfer_to, IOBufferReader * transfer_from, int64_t act_on)
00429 {
00430
00431 int64_t total_added = 0;
00432
00433 ink_assert(act_on <= transfer_from->read_avail());
00434
00435 while (act_on > 0) {
00436 int64_t block_read_avail = transfer_from->block_read_avail();
00437 int64_t to_move = MIN(act_on, block_read_avail);
00438 int64_t moved = 0;
00439
00440 if (to_move <= 0) {
00441 break;
00442 }
00443
00444 if (to_move >= MIN_BLOCK_TRANSFER_BYTES) {
00445 moved = transfer_to->write(transfer_from, to_move, 0);
00446 } else {
00447
00448
00449
00450
00451
00452 moved = transfer_to->write(transfer_from->start(), to_move);
00453
00454 if (moved == 0) {
00455
00456 break;
00457 }
00458 }
00459
00460 act_on -= moved;
00461 transfer_from->consume(moved);
00462 total_added += moved;
00463 }
00464
00465 return total_added;
00466 }
00467
00468
00469
00470
00471
00472
00473
00474
00475
00476 void
00477 PluginVC::process_write_side(bool other_side_call)
00478 {
00479
00480 ink_assert(!deletable);
00481 ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00482
00483 MIOBuffer *core_buffer = (vc_type == PLUGIN_VC_ACTIVE) ? core_obj->a_to_p_buffer : core_obj->p_to_a_buffer;
00484
00485 need_write_process = false;
00486
00487 if (write_state.vio.op != VIO::WRITE || closed || write_state.shutdown) {
00488 return;
00489 }
00490
00491 EThread *my_ethread = mutex->thread_holding;
00492 ink_assert(my_ethread != NULL);
00493 MUTEX_TRY_LOCK(lock, write_state.vio.mutex, my_ethread);
00494 if (!lock) {
00495 Debug("pvc_event", "[%u] %s: process_write_side lock miss, retrying", core_obj->id, PVC_TYPE);
00496
00497 need_write_process = true;
00498 setup_event_cb(PVC_LOCK_RETRY_TIME, &core_lock_retry_event);
00499 return;
00500 }
00501
00502 Debug("pvc", "[%u] %s: process_write_side", core_obj->id, PVC_TYPE);
00503 need_write_process = false;
00504
00505
00506
00507 int64_t ntodo = write_state.vio.ntodo();
00508 if (ntodo == 0) {
00509 return;
00510 }
00511
00512 IOBufferReader *reader = write_state.vio.get_reader();
00513 int64_t bytes_avail = reader->read_avail();
00514 int64_t act_on = MIN(bytes_avail, ntodo);
00515
00516 Debug("pvc", "[%u] %s: process_write_side; act_on %" PRId64"", core_obj->id, PVC_TYPE, act_on);
00517
00518 if (other_side->closed || other_side->read_state.shutdown) {
00519 write_state.vio._cont->handleEvent(VC_EVENT_ERROR, &write_state.vio);
00520 return;
00521 }
00522
00523 if (act_on <= 0) {
00524 if (ntodo > 0) {
00525
00526
00527 write_state.vio._cont->handleEvent(VC_EVENT_WRITE_READY, &write_state.vio);
00528 }
00529 return;
00530 }
00531
00532
00533
00534 int64_t buf_space = PVC_DEFAULT_MAX_BYTES - core_buffer->max_read_avail();
00535 if (buf_space <= 0) {
00536 Debug("pvc", "[%u] %s: process_write_side no buffer space", core_obj->id, PVC_TYPE);
00537 return;
00538 }
00539 act_on = MIN(act_on, buf_space);
00540
00541 int64_t added = transfer_bytes(core_buffer, reader, act_on);
00542 if (added < 0) {
00543
00544
00545
00546 Debug("pvc", "[%u] %s: process_write_side out of buffer space", core_obj->id, PVC_TYPE);
00547 return;
00548 }
00549
00550 write_state.vio.ndone += added;
00551
00552 Debug("pvc", "[%u] %s: process_write_side; added %" PRId64"", core_obj->id, PVC_TYPE, added);
00553
00554 if (write_state.vio.ntodo() == 0) {
00555 write_state.vio._cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &write_state.vio);
00556 } else {
00557 write_state.vio._cont->handleEvent(VC_EVENT_WRITE_READY, &write_state.vio);
00558 }
00559
00560 update_inactive_time();
00561
00562
00563 if (!other_side->closed) {
00564 if (!other_side_call) {
00565 other_side->process_read_side(true);
00566 } else {
00567 other_side->read_state.vio.reenable();
00568 }
00569 }
00570 }
00571
00572
00573
00574
00575
00576
00577
00578
00579
00580
00581 void
00582 PluginVC::process_read_side(bool other_side_call)
00583 {
00584
00585 ink_assert(!deletable);
00586 ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00587
00588
00589
00590
00591 IOBufferReader *core_reader;
00592
00593 if (vc_type == PLUGIN_VC_ACTIVE) {
00594
00595 core_reader = core_obj->p_to_a_reader;
00596 } else {
00597 ink_assert(vc_type == PLUGIN_VC_PASSIVE);
00598
00599 core_reader = core_obj->a_to_p_reader;
00600 }
00601
00602 need_read_process = false;
00603
00604 if (read_state.vio.op != VIO::READ || closed || read_state.shutdown) {
00605 return;
00606 }
00607
00608 EThread *my_ethread = mutex->thread_holding;
00609 ink_assert(my_ethread != NULL);
00610 MUTEX_TRY_LOCK(lock, read_state.vio.mutex, my_ethread);
00611 if (!lock) {
00612 Debug("pvc_event", "[%u] %s: process_read_side lock miss, retrying", core_obj->id, PVC_TYPE);
00613
00614 need_read_process = true;
00615 setup_event_cb(PVC_LOCK_RETRY_TIME, &core_lock_retry_event);
00616 return;
00617 }
00618
00619 Debug("pvc", "[%u] %s: process_read_side", core_obj->id, PVC_TYPE);
00620 need_read_process = false;
00621
00622
00623 int64_t ntodo = read_state.vio.ntodo();
00624 if (ntodo == 0) {
00625 return;
00626 }
00627
00628 int64_t bytes_avail = core_reader->read_avail();
00629 int64_t act_on = MIN(bytes_avail, ntodo);
00630
00631 Debug("pvc", "[%u] %s: process_read_side; act_on %" PRId64"", core_obj->id, PVC_TYPE, act_on);
00632
00633 if (act_on <= 0) {
00634 if (other_side->closed || other_side->write_state.shutdown) {
00635 read_state.vio._cont->handleEvent(VC_EVENT_EOS, &read_state.vio);
00636 }
00637 return;
00638 }
00639
00640
00641
00642 MIOBuffer *output_buffer = read_state.vio.get_writer();
00643
00644 int64_t water_mark = output_buffer->water_mark;
00645 water_mark = MAX(water_mark, PVC_DEFAULT_MAX_BYTES);
00646 int64_t buf_space = water_mark - output_buffer->max_read_avail();
00647 if (buf_space <= 0) {
00648 Debug("pvc", "[%u] %s: process_read_side no buffer space", core_obj->id, PVC_TYPE);
00649 return;
00650 }
00651 act_on = MIN(act_on, buf_space);
00652
00653 int64_t added = transfer_bytes(output_buffer, core_reader, act_on);
00654 if (added <= 0) {
00655
00656
00657
00658 Debug("pvc", "[%u] %s: process_read_side out of buffer space", core_obj->id, PVC_TYPE);
00659 return;
00660 }
00661
00662 read_state.vio.ndone += added;
00663
00664 Debug("pvc", "[%u] %s: process_read_side; added %" PRId64"", core_obj->id, PVC_TYPE, added);
00665
00666 if (read_state.vio.ntodo() == 0) {
00667 read_state.vio._cont->handleEvent(VC_EVENT_READ_COMPLETE, &read_state.vio);
00668 } else {
00669 read_state.vio._cont->handleEvent(VC_EVENT_READ_READY, &read_state.vio);
00670 }
00671
00672 update_inactive_time();
00673
00674
00675
00676 if (!other_side->closed) {
00677 if (!other_side_call) {
00678 other_side->process_write_side(true);
00679 } else {
00680 other_side->write_state.vio.reenable();
00681 }
00682 }
00683 }
00684
00685
00686
00687
00688
00689
00690
00691
00692 void
00693 PluginVC::process_close()
00694 {
00695
00696 ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00697
00698 Debug("pvc", "[%u] %s: process_close", core_obj->id, PVC_TYPE);
00699
00700 if (!deletable) {
00701 deletable = true;
00702 }
00703
00704 if (sm_lock_retry_event) {
00705 sm_lock_retry_event->cancel();
00706 sm_lock_retry_event = NULL;
00707 }
00708
00709 if (core_lock_retry_event) {
00710 core_lock_retry_event->cancel();
00711 core_lock_retry_event = NULL;
00712 }
00713
00714 if (active_event) {
00715 active_event->cancel();
00716 active_event = NULL;
00717 }
00718
00719 if (inactive_event) {
00720 inactive_event->cancel();
00721 inactive_event = NULL;
00722 inactive_timeout_at = 0;
00723 }
00724
00725
00726
00727
00728 if (!other_side->closed && core_obj->connected) {
00729 other_side->need_write_process = true;
00730 other_side->need_read_process = true;
00731 other_side->setup_event_cb(0, &other_side->core_lock_retry_event);
00732 }
00733
00734 core_obj->attempt_delete();
00735 }
00736
00737
00738
00739
00740
00741
00742
00743
00744
00745
00746
00747
00748 void
00749 PluginVC::process_timeout(Event * e, int event_to_send, Event ** our_eptr)
00750 {
00751
00752 ink_assert(e = *our_eptr);
00753
00754 if (read_state.vio.op == VIO::READ && !read_state.shutdown && read_state.vio.ntodo() > 0) {
00755 MUTEX_TRY_LOCK(lock, read_state.vio.mutex, e->ethread);
00756 if (!lock) {
00757 e->schedule_in(PVC_LOCK_RETRY_TIME);
00758 return;
00759 }
00760 *our_eptr = NULL;
00761 read_state.vio._cont->handleEvent(event_to_send, &read_state.vio);
00762 } else if (write_state.vio.op == VIO::WRITE && !write_state.shutdown && write_state.vio.ntodo() > 0) {
00763 MUTEX_TRY_LOCK(lock, write_state.vio.mutex, e->ethread);
00764 if (!lock) {
00765 e->schedule_in(PVC_LOCK_RETRY_TIME);
00766 return;
00767 }
00768 *our_eptr = NULL;
00769 write_state.vio._cont->handleEvent(event_to_send, &write_state.vio);
00770 } else {
00771 *our_eptr = NULL;
00772 }
00773 }
00774
00775 void
00776 PluginVC::update_inactive_time()
00777 {
00778 if (inactive_event && inactive_timeout) {
00779
00780
00781 inactive_timeout_at = ink_get_hrtime() + inactive_timeout;
00782 }
00783 }
00784
00785
00786
00787
00788
00789
00790
00791 void
00792 PluginVC::setup_event_cb(ink_hrtime in, Event ** e_ptr)
00793 {
00794
00795 ink_assert(magic == PLUGIN_VC_MAGIC_ALIVE);
00796
00797 if (*e_ptr == NULL) {
00798
00799
00800
00801 if (in == 0) {
00802 if(this_ethread()->tt == REGULAR) {
00803 *e_ptr = this_ethread()->schedule_imm_local(this);
00804 }
00805 else
00806 {
00807 *e_ptr = eventProcessor.schedule_imm(this);
00808 }
00809 }
00810 else
00811 {
00812 if(this_ethread()->tt == REGULAR) {
00813 *e_ptr = this_ethread()->schedule_in_local(this,in);
00814 }
00815 else
00816 {
00817 *e_ptr = eventProcessor.schedule_in(this, in);
00818 }
00819 }
00820 }
00821 }
00822
00823 void
00824 PluginVC::set_active_timeout(ink_hrtime timeout_in)
00825 {
00826 active_timeout = timeout_in;
00827
00828
00829
00830 if (active_event) {
00831 ink_assert(!active_event->cancelled);
00832 active_event->cancel();
00833 active_event = NULL;
00834 }
00835
00836 if (active_timeout > 0) {
00837 active_event = eventProcessor.schedule_in(this, active_timeout);
00838 }
00839 }
00840
00841 void
00842 PluginVC::set_inactivity_timeout(ink_hrtime timeout_in)
00843 {
00844 inactive_timeout = timeout_in;
00845 if (inactive_timeout != 0) {
00846 inactive_timeout_at = ink_get_hrtime() + inactive_timeout;
00847 if (inactive_event == NULL) {
00848 inactive_event = eventProcessor.schedule_every(this, HRTIME_SECONDS(1));
00849 }
00850 } else {
00851 inactive_timeout_at = 0;
00852 if (inactive_event) {
00853 inactive_event->cancel();
00854 inactive_event = NULL;
00855 }
00856 }
00857 }
00858
00859 void
00860 PluginVC::cancel_active_timeout()
00861 {
00862 set_active_timeout(0);
00863 }
00864
00865 void
00866 PluginVC::cancel_inactivity_timeout()
00867 {
00868 set_inactivity_timeout(0);
00869 }
00870
00871 ink_hrtime
00872 PluginVC::get_active_timeout()
00873 {
00874 return active_timeout;
00875 }
00876
00877 ink_hrtime
00878 PluginVC::get_inactivity_timeout()
00879 {
00880 return inactive_timeout;
00881 }
00882
00883 SOCKET
00884 PluginVC::get_socket()
00885 {
00886 return 0;
00887 }
00888
00889 void
00890 PluginVC::set_local_addr()
00891 {
00892 if (vc_type == PLUGIN_VC_ACTIVE) {
00893 ats_ip_copy(&local_addr, &core_obj->active_addr_struct);
00894
00895 } else {
00896 ats_ip_copy(&local_addr, &core_obj->passive_addr_struct);
00897
00898 }
00899 }
00900
00901 void
00902 PluginVC::set_remote_addr()
00903 {
00904 if (vc_type == PLUGIN_VC_ACTIVE) {
00905 ats_ip_copy(&remote_addr, &core_obj->passive_addr_struct);
00906 } else {
00907 ats_ip_copy(&remote_addr, &core_obj->active_addr_struct);
00908 }
00909 }
00910
00911 int
00912 PluginVC::set_tcp_init_cwnd(int )
00913 {
00914 return -1;
00915 }
00916
00917 void
00918 PluginVC::apply_options()
00919 {
00920
00921 }
00922
00923 bool
00924 PluginVC::get_data(int id, void *data)
00925 {
00926 if (data == NULL) {
00927 return false;
00928 }
00929 switch (id) {
00930 case PLUGIN_VC_DATA_LOCAL:
00931 if (vc_type == PLUGIN_VC_ACTIVE) {
00932 *(void **) data = core_obj->active_data;
00933 } else {
00934 *(void **) data = core_obj->passive_data;
00935 }
00936 return true;
00937 case PLUGIN_VC_DATA_REMOTE:
00938 if (vc_type == PLUGIN_VC_ACTIVE) {
00939 *(void **) data = core_obj->passive_data;
00940 } else {
00941 *(void **) data = core_obj->active_data;
00942 }
00943 return true;
00944 default:
00945 *(void **) data = NULL;
00946 return false;
00947 }
00948 }
00949
00950 bool
00951 PluginVC::set_data(int id, void *data)
00952 {
00953 switch (id) {
00954 case PLUGIN_VC_DATA_LOCAL:
00955 if (vc_type == PLUGIN_VC_ACTIVE) {
00956 core_obj->active_data = data;
00957 } else {
00958 core_obj->passive_data = data;
00959 }
00960 return true;
00961 case PLUGIN_VC_DATA_REMOTE:
00962 if (vc_type == PLUGIN_VC_ACTIVE) {
00963 core_obj->passive_data = data;
00964 } else {
00965 core_obj->active_data = data;
00966 }
00967 return true;
00968 default:
00969 return false;
00970 }
00971 }
00972
00973
00974
00975 vint32
00976 PluginVCCore::nextid = 0;
00977
00978 PluginVCCore::~PluginVCCore()
00979 {
00980 }
00981
00982 PluginVCCore *
00983 PluginVCCore::alloc()
00984 {
00985 PluginVCCore *pvc = new PluginVCCore;
00986 pvc->init();
00987 return pvc;
00988 }
00989
00990 void
00991 PluginVCCore::init()
00992 {
00993 mutex = new_ProxyMutex();
00994
00995 active_vc.vc_type = PLUGIN_VC_ACTIVE;
00996 active_vc.other_side = &passive_vc;
00997 active_vc.core_obj = this;
00998 active_vc.mutex = mutex;
00999 active_vc.thread = this_ethread();
01000
01001 passive_vc.vc_type = PLUGIN_VC_PASSIVE;
01002 passive_vc.other_side = &active_vc;
01003 passive_vc.core_obj = this;
01004 passive_vc.mutex = mutex;
01005 passive_vc.thread = active_vc.thread;
01006
01007 p_to_a_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_32K);
01008 p_to_a_reader = p_to_a_buffer->alloc_reader();
01009
01010 a_to_p_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_32K);
01011 a_to_p_reader = a_to_p_buffer->alloc_reader();
01012
01013 Debug("pvc", "[%u] Created PluginVCCore at %p, active %p, passive %p", id, this, &active_vc, &passive_vc);
01014 }
01015
01016 void
01017 PluginVCCore::destroy()
01018 {
01019
01020 Debug("pvc", "[%u] Destroying PluginVCCore at %p", id, this);
01021
01022 ink_assert(active_vc.closed == true || !connected);
01023 active_vc.mutex = NULL;
01024 active_vc.read_state.vio.buffer.clear();
01025 active_vc.write_state.vio.buffer.clear();
01026 active_vc.magic = PLUGIN_VC_MAGIC_DEAD;
01027
01028 ink_assert(passive_vc.closed == true || !connected);
01029 passive_vc.mutex = NULL;
01030 passive_vc.read_state.vio.buffer.clear();
01031 passive_vc.write_state.vio.buffer.clear();
01032 passive_vc.magic = PLUGIN_VC_MAGIC_DEAD;
01033
01034 if (p_to_a_buffer) {
01035 free_MIOBuffer(p_to_a_buffer);
01036 p_to_a_buffer = NULL;
01037 }
01038
01039 if (a_to_p_buffer) {
01040 free_MIOBuffer(a_to_p_buffer);
01041 a_to_p_buffer = NULL;
01042 }
01043
01044 this->mutex = NULL;
01045 delete this;
01046 }
01047
01048 void
01049 PluginVCCore::set_accept_cont(Continuation * c)
01050 {
01051 connect_to = c;
01052
01053 }
01054
01055 PluginVC *
01056 PluginVCCore::connect()
01057 {
01058
01059
01060 if (connect_to == NULL) {
01061 return NULL;
01062 }
01063
01064 connected = true;
01065 state_send_accept(EVENT_IMMEDIATE, NULL);
01066
01067 return &active_vc;
01068 }
01069
01070 Action *
01071 PluginVCCore::connect_re(Continuation * c)
01072 {
01073
01074
01075 if (connect_to == NULL) {
01076 return NULL;
01077 }
01078
01079 EThread *my_thread = this_ethread();
01080 MUTEX_TAKE_LOCK(this->mutex, my_thread);
01081
01082 connected = true;
01083 state_send_accept(EVENT_IMMEDIATE, NULL);
01084
01085
01086
01087
01088
01089
01090 c->handleEvent(NET_EVENT_OPEN, &active_vc);
01091 MUTEX_UNTAKE_LOCK(this->mutex, my_thread);
01092
01093 return ACTION_RESULT_DONE;
01094 }
01095
01096 int
01097 PluginVCCore::state_send_accept_failed(int , void * )
01098 {
01099 MUTEX_TRY_LOCK(lock, connect_to->mutex, this_ethread());
01100
01101 if (lock) {
01102 connect_to->handleEvent(NET_EVENT_ACCEPT_FAILED, NULL);
01103 destroy();
01104 } else {
01105 SET_HANDLER(&PluginVCCore::state_send_accept_failed);
01106 eventProcessor.schedule_in(this, PVC_LOCK_RETRY_TIME);
01107 }
01108
01109 return 0;
01110
01111 }
01112
01113 int
01114 PluginVCCore::state_send_accept(int , void * )
01115 {
01116 MUTEX_TRY_LOCK(lock, connect_to->mutex, this_ethread());
01117
01118 if (lock) {
01119 connect_to->handleEvent(NET_EVENT_ACCEPT, &passive_vc);
01120 } else {
01121 SET_HANDLER(&PluginVCCore::state_send_accept);
01122 eventProcessor.schedule_in(this, PVC_LOCK_RETRY_TIME);
01123 }
01124
01125 return 0;
01126 }
01127
01128
01129
01130
01131
01132
01133 void
01134 PluginVCCore::attempt_delete()
01135 {
01136
01137 if (active_vc.deletable) {
01138 if (passive_vc.deletable) {
01139 destroy();
01140 } else if (!connected) {
01141 state_send_accept_failed(EVENT_IMMEDIATE, NULL);
01142 }
01143 }
01144 }
01145
01146
01147
01148
01149
01150
01151 void
01152 PluginVCCore::kill_no_connect()
01153 {
01154 ink_assert(!connected);
01155 ink_assert(!active_vc.closed);
01156 active_vc.do_io_close();
01157 }
01158
01159 void
01160 PluginVCCore::set_passive_addr(in_addr_t ip, int port)
01161 {
01162 ats_ip4_set(&passive_addr_struct, htonl(ip), htons(port));
01163 }
01164
01165 void
01166 PluginVCCore::set_passive_addr(sockaddr const* ip)
01167 {
01168 passive_addr_struct.assign(ip);
01169 }
01170
01171 void
01172 PluginVCCore::set_active_addr(in_addr_t ip, int port)
01173 {
01174 ats_ip4_set(&active_addr_struct, htonl(ip), htons(port));
01175 }
01176
01177 void
01178 PluginVCCore::set_active_addr(sockaddr const* ip)
01179 {
01180 active_addr_struct.assign(ip);
01181 }
01182
01183 void
01184 PluginVCCore::set_passive_data(void *data)
01185 {
01186 passive_data = data;
01187 }
01188
01189 void
01190 PluginVCCore::set_active_data(void *data)
01191 {
01192 active_data = data;
01193 }
01194
01195 void
01196 PluginVCCore::set_transparent(bool passive_side, bool active_side)
01197 {
01198 passive_vc.set_is_transparent(passive_side);
01199 active_vc.set_is_transparent(active_side);
01200 }
01201
01202 void
01203 PluginVCCore::set_plugin_id(int64_t id)
01204 {
01205 passive_vc.plugin_id = active_vc.plugin_id = id;
01206 }
01207
01208 void
01209 PluginVCCore::set_plugin_tag(char const* tag)
01210 {
01211 passive_vc.plugin_tag = active_vc.plugin_tag = tag;
01212 }
01213
01214
01215
01216
01217
01218
01219
01220 #if TS_HAS_TESTS
01221 class PVCTestDriver:public NetTestDriver
01222 {
01223 public:
01224 PVCTestDriver();
01225 ~PVCTestDriver();
01226
01227 void start_tests(RegressionTest * r_arg, int *pstatus_arg);
01228 void run_next_test();
01229 int main_handler(int event, void *data);
01230
01231 private:
01232 unsigned i;
01233 unsigned completions_received;
01234 };
01235
01236 PVCTestDriver::PVCTestDriver():
01237 NetTestDriver(), i(0), completions_received(0)
01238 {
01239 }
01240
01241 PVCTestDriver::~PVCTestDriver()
01242 {
01243 mutex = NULL;
01244 }
01245
01246 void
01247 PVCTestDriver::start_tests(RegressionTest * r_arg, int *pstatus_arg)
01248 {
01249 mutex = new_ProxyMutex();
01250 MUTEX_TRY_LOCK(lock, mutex, this_ethread());
01251
01252 r = r_arg;
01253 pstatus = pstatus_arg;
01254
01255 run_next_test();
01256
01257 SET_HANDLER(&PVCTestDriver::main_handler);
01258 }
01259
01260 void
01261 PVCTestDriver::run_next_test()
01262 {
01263
01264 unsigned a_index = i * 2;
01265 unsigned p_index = a_index + 1;
01266
01267 if (p_index >= num_netvc_tests) {
01268
01269 if (errors == 0) {
01270 *pstatus = REGRESSION_TEST_PASSED;
01271 } else {
01272 *pstatus = REGRESSION_TEST_FAILED;
01273 }
01274 delete this;
01275 return;
01276 }
01277 completions_received = 0;
01278 i++;
01279
01280 Debug("pvc_test", "Starting test %s", netvc_tests_def[a_index].test_name);
01281
01282 NetVCTest *p = new NetVCTest;
01283 NetVCTest *a = new NetVCTest;
01284 PluginVCCore *core = PluginVCCore::alloc();
01285 core->set_accept_cont(p);
01286
01287 p->init_test(NET_VC_TEST_PASSIVE, this, NULL, r, &netvc_tests_def[p_index], "PluginVC", "pvc_test_detail");
01288 PluginVC *a_vc = core->connect();
01289
01290 a->init_test(NET_VC_TEST_ACTIVE, this, a_vc, r, &netvc_tests_def[a_index], "PluginVC", "pvc_test_detail");
01291 }
01292
01293 int
01294 PVCTestDriver::main_handler(int , void * )
01295 {
01296 completions_received++;
01297
01298 if (completions_received == 2) {
01299 run_next_test();
01300 }
01301
01302 return 0;
01303 }
01304
01305 EXCLUSIVE_REGRESSION_TEST(PVC) (RegressionTest * t, int , int *pstatus)
01306 {
01307 PVCTestDriver *driver = new PVCTestDriver;
01308 driver->start_tests(t, pstatus);
01309 }
01310 #endif