00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "P_Net.h"
00025
00026 #define STATE_VIO_OFFSET ((uintptr_t)&((NetState*)0)->vio)
00027 #define STATE_FROM_VIO(_x) ((NetState*)(((char*)(_x)) - STATE_VIO_OFFSET))
00028
00029 #define disable_read(_vc) (_vc)->read.enabled = 0
00030 #define disable_write(_vc) (_vc)->write.enabled = 0
00031 #define enable_read(_vc) (_vc)->read.enabled = 1
00032 #define enable_write(_vc) (_vc)->write.enabled = 1
00033
00034 #ifndef UIO_MAXIOV
00035 #define NET_MAX_IOV 16 // UIO_MAXIOV shall be at least 16 1003.1g (5.4.1.1)
00036 #else
00037 #define NET_MAX_IOV UIO_MAXIOV
00038 #endif
00039
00040
00041 ClassAllocator<UnixNetVConnection> netVCAllocator("netVCAllocator");
00042
00043
00044
00045
00046
00047 static inline void
00048 read_reschedule(NetHandler *nh, UnixNetVConnection *vc)
00049 {
00050 vc->ep.refresh(EVENTIO_READ);
00051 if (vc->read.triggered && vc->read.enabled) {
00052 nh->read_ready_list.in_or_enqueue(vc);
00053 } else
00054 nh->read_ready_list.remove(vc);
00055 }
00056
00057 static inline void
00058 write_reschedule(NetHandler *nh, UnixNetVConnection *vc)
00059 {
00060 vc->ep.refresh(EVENTIO_WRITE);
00061 if (vc->write.triggered && vc->write.enabled) {
00062 nh->write_ready_list.in_or_enqueue(vc);
00063 } else
00064 nh->write_ready_list.remove(vc);
00065 }
00066
00067 void
00068 net_activity(UnixNetVConnection *vc, EThread *thread)
00069 {
00070 (void) thread;
00071 #ifdef INACTIVITY_TIMEOUT
00072 if (vc->inactivity_timeout && vc->inactivity_timeout_in && vc->inactivity_timeout->ethread == thread)
00073 vc->inactivity_timeout->schedule_in(vc->inactivity_timeout_in);
00074 else {
00075 if (vc->inactivity_timeout)
00076 vc->inactivity_timeout->cancel_action();
00077 if (vc->inactivity_timeout_in) {
00078 vc->inactivity_timeout = vc->thread->schedule_in_local(vc, vc->inactivity_timeout_in);
00079 } else
00080 vc->inactivity_timeout = 0;
00081 }
00082 #else
00083 if (vc->inactivity_timeout_in)
00084 vc->next_inactivity_timeout_at = ink_get_hrtime() + vc->inactivity_timeout_in;
00085 else
00086 vc->next_inactivity_timeout_at = 0;
00087 #endif
00088
00089 }
00090
00091
00092
00093
00094 void
00095 close_UnixNetVConnection(UnixNetVConnection *vc, EThread *t)
00096 {
00097 NetHandler *nh = vc->nh;
00098 vc->cancel_OOB();
00099 vc->ep.stop();
00100 vc->con.close();
00101 #ifdef INACTIVITY_TIMEOUT
00102 if (vc->inactivity_timeout) {
00103 vc->inactivity_timeout->cancel_action(vc);
00104 vc->inactivity_timeout = NULL;
00105 }
00106 #else
00107 vc->next_inactivity_timeout_at = 0;
00108 #endif
00109 vc->inactivity_timeout_in = 0;
00110 if (vc->active_timeout) {
00111 vc->active_timeout->cancel_action(vc);
00112 vc->active_timeout = NULL;
00113 }
00114 vc->active_timeout_in = 0;
00115 nh->open_list.remove(vc);
00116 nh->cop_list.remove(vc);
00117 nh->read_ready_list.remove(vc);
00118 nh->write_ready_list.remove(vc);
00119 if (vc->read.in_enabled_list) {
00120 nh->read_enable_list.remove(vc);
00121 vc->read.in_enabled_list = 0;
00122 }
00123 if (vc->write.in_enabled_list) {
00124 nh->write_enable_list.remove(vc);
00125 vc->write.in_enabled_list = 0;
00126 }
00127 vc->free(t);
00128 }
00129
00130
00131
00132
00133 static inline int
00134 read_signal_and_update(int event, UnixNetVConnection *vc)
00135 {
00136 vc->recursion++;
00137 vc->read.vio._cont->handleEvent(event, &vc->read.vio);
00138 if (!--vc->recursion && vc->closed) {
00139
00140 ink_assert(vc->thread == this_ethread());
00141 close_UnixNetVConnection(vc, vc->thread);
00142 return EVENT_DONE;
00143 } else {
00144 return EVENT_CONT;
00145 }
00146 }
00147
00148 static inline int
00149 write_signal_and_update(int event, UnixNetVConnection *vc)
00150 {
00151 vc->recursion++;
00152 vc->write.vio._cont->handleEvent(event, &vc->write.vio);
00153 if (!--vc->recursion && vc->closed) {
00154
00155 ink_assert(vc->thread == this_ethread());
00156 close_UnixNetVConnection(vc, vc->thread);
00157 return EVENT_DONE;
00158 } else {
00159 return EVENT_CONT;
00160 }
00161 }
00162
00163 static inline int
00164 read_signal_done(int event, NetHandler *nh, UnixNetVConnection *vc)
00165 {
00166 vc->read.enabled = 0;
00167 if (read_signal_and_update(event, vc) == EVENT_DONE) {
00168 return EVENT_DONE;
00169 } else {
00170 read_reschedule(nh, vc);
00171 return EVENT_CONT;
00172 }
00173 }
00174
00175 static inline int
00176 write_signal_done(int event, NetHandler *nh, UnixNetVConnection *vc)
00177 {
00178 vc->write.enabled = 0;
00179 if (write_signal_and_update(event, vc) == EVENT_DONE) {
00180 return EVENT_DONE;
00181 } else {
00182 write_reschedule(nh, vc);
00183 return EVENT_CONT;
00184 }
00185 }
00186
00187 static inline int
00188 read_signal_error(NetHandler *nh, UnixNetVConnection *vc, int lerrno)
00189 {
00190 vc->lerrno = lerrno;
00191 return read_signal_done(VC_EVENT_ERROR, nh, vc);
00192 }
00193
00194 static inline int
00195 write_signal_error(NetHandler *nh, UnixNetVConnection *vc, int lerrno)
00196 {
00197 vc->lerrno = lerrno;
00198 return write_signal_done(VC_EVENT_ERROR, nh, vc);
00199 }
00200
00201
00202
00203
00204
00205 static void
00206 read_from_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
00207 {
00208 NetState *s = &vc->read;
00209 ProxyMutex *mutex = thread->mutex;
00210 MIOBufferAccessor & buf = s->vio.buffer;
00211 int64_t r = 0;
00212
00213 MUTEX_TRY_LOCK_FOR(lock, s->vio.mutex, thread, s->vio._cont);
00214
00215 if (!lock || lock.m.m_ptr != s->vio.mutex.m_ptr) {
00216 read_reschedule(nh, vc);
00217 return;
00218 }
00219
00220 if (!s->enabled || s->vio.op != VIO::READ) {
00221 read_disable(nh, vc);
00222 return;
00223 }
00224
00225 ink_assert(buf.writer());
00226
00227
00228 int64_t ntodo = s->vio.ntodo();
00229 if (ntodo <= 0) {
00230 read_disable(nh, vc);
00231 return;
00232 }
00233 int64_t toread = buf.writer()->write_avail();
00234 if (toread > ntodo)
00235 toread = ntodo;
00236
00237
00238 int64_t rattempted = 0, total_read = 0;
00239 int niov = 0;
00240 IOVec tiovec[NET_MAX_IOV];
00241 if (toread) {
00242 IOBufferBlock *b = buf.writer()->first_write_block();
00243 do {
00244 niov = 0;
00245 rattempted = 0;
00246 while (b && niov < NET_MAX_IOV) {
00247 int64_t a = b->write_avail();
00248 if (a > 0) {
00249 tiovec[niov].iov_base = b->_end;
00250 int64_t togo = toread - total_read - rattempted;
00251 if (a > togo)
00252 a = togo;
00253 tiovec[niov].iov_len = a;
00254 rattempted += a;
00255 niov++;
00256 if (a >= togo)
00257 break;
00258 }
00259 b = b->next;
00260 }
00261
00262 if (niov == 1) {
00263 r = socketManager.read(vc->con.fd, tiovec[0].iov_base, tiovec[0].iov_len);
00264 } else {
00265 r = socketManager.readv(vc->con.fd, &tiovec[0], niov);
00266 }
00267 NET_DEBUG_COUNT_DYN_STAT(net_calls_to_read_stat, 1);
00268 total_read += rattempted;
00269 } while (rattempted && r == rattempted && total_read < toread);
00270
00271
00272 if (total_read != rattempted) {
00273 if (r <= 0)
00274 r = total_read - rattempted;
00275 else
00276 r = total_read - rattempted + r;
00277 }
00278
00279 if (r <= 0) {
00280
00281 if (r == -EAGAIN || r == -ENOTCONN) {
00282 NET_DEBUG_COUNT_DYN_STAT(net_calls_to_read_nodata_stat, 1);
00283 vc->read.triggered = 0;
00284 nh->read_ready_list.remove(vc);
00285 return;
00286 }
00287
00288 if (!r || r == -ECONNRESET) {
00289 vc->read.triggered = 0;
00290 nh->read_ready_list.remove(vc);
00291 read_signal_done(VC_EVENT_EOS, nh, vc);
00292 return;
00293 }
00294 vc->read.triggered = 0;
00295 read_signal_error(nh, vc, (int)-r);
00296 return;
00297 }
00298 NET_SUM_DYN_STAT(net_read_bytes_stat, r);
00299
00300
00301 buf.writer()->fill(r);
00302 #ifdef DEBUG
00303 if (buf.writer()->write_avail() <= 0)
00304 Debug("iocore_net", "read_from_net, read buffer full");
00305 #endif
00306 s->vio.ndone += r;
00307 net_activity(vc, thread);
00308 } else
00309 r = 0;
00310
00311
00312 if (r) {
00313
00314 ink_assert(ntodo >= 0);
00315 if (s->vio.ntodo() <= 0) {
00316 read_signal_done(VC_EVENT_READ_COMPLETE, nh, vc);
00317 Debug("iocore_net", "read_from_net, read finished - signal done");
00318 return;
00319 } else {
00320 if (read_signal_and_update(VC_EVENT_READ_READY, vc) != EVENT_CONT)
00321 return;
00322
00323 if (lock.m.m_ptr != s->vio.mutex.m_ptr) {
00324 read_reschedule(nh, vc);
00325 return;
00326 }
00327 }
00328 }
00329
00330 if (s->vio.ntodo() <= 0 || !s->enabled || !buf.writer()->write_avail()) {
00331 read_disable(nh, vc);
00332 return;
00333 }
00334
00335 read_reschedule(nh, vc);
00336 }
00337
00338
00339
00340
00341
00342
00343 void
00344 write_to_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
00345 {
00346 ProxyMutex *mutex = thread->mutex;
00347
00348 NET_DEBUG_COUNT_DYN_STAT(net_calls_to_writetonet_stat, 1);
00349 NET_DEBUG_COUNT_DYN_STAT(net_calls_to_writetonet_afterpoll_stat, 1);
00350
00351 write_to_net_io(nh, vc, thread);
00352 }
00353
00354
00355 void
00356 write_to_net_io(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
00357 {
00358 NetState *s = &vc->write;
00359 ProxyMutex *mutex = thread->mutex;
00360
00361 MUTEX_TRY_LOCK_FOR(lock, s->vio.mutex, thread, s->vio._cont);
00362
00363 if (!lock || lock.m.m_ptr != s->vio.mutex.m_ptr) {
00364 write_reschedule(nh, vc);
00365 return;
00366 }
00367
00368
00369
00370 if (!vc->getSSLHandShakeComplete()) {
00371 int err, ret;
00372
00373 if (vc->getSSLClientConnection())
00374 ret = vc->sslStartHandShake(SSL_EVENT_CLIENT, err);
00375 else
00376 ret = vc->sslStartHandShake(SSL_EVENT_SERVER, err);
00377
00378 if (ret == EVENT_ERROR) {
00379 vc->write.triggered = 0;
00380 write_signal_error(nh, vc, err);
00381 } else if (ret == SSL_HANDSHAKE_WANT_READ || ret == SSL_HANDSHAKE_WANT_ACCEPT || ret == SSL_HANDSHAKE_WANT_CONNECT
00382 || ret == SSL_HANDSHAKE_WANT_WRITE) {
00383 vc->read.triggered = 0;
00384 nh->read_ready_list.remove(vc);
00385 vc->write.triggered = 0;
00386 nh->write_ready_list.remove(vc);
00387 if (!(ret == SSL_HANDSHAKE_WANT_READ || ret == SSL_HANDSHAKE_WANT_ACCEPT))
00388 write_reschedule(nh, vc);
00389 } else if (ret == EVENT_DONE) {
00390 vc->write.triggered = 1;
00391 if (vc->write.enabled)
00392 nh->write_ready_list.in_or_enqueue(vc);
00393 } else
00394 write_reschedule(nh, vc);
00395 return;
00396 }
00397
00398 if (!s->enabled || s->vio.op != VIO::WRITE) {
00399 write_disable(nh, vc);
00400 return;
00401 }
00402
00403 int64_t ntodo = s->vio.ntodo();
00404 if (ntodo <= 0) {
00405 write_disable(nh, vc);
00406 return;
00407 }
00408
00409 MIOBufferAccessor & buf = s->vio.buffer;
00410 ink_assert(buf.writer());
00411
00412
00413 int64_t towrite = buf.reader()->read_avail();
00414 if (towrite > ntodo)
00415 towrite = ntodo;
00416 int signalled = 0;
00417
00418
00419 if (towrite != ntodo && buf.writer()->write_avail()) {
00420 if (write_signal_and_update(VC_EVENT_WRITE_READY, vc) != EVENT_CONT) {
00421 return;
00422 }
00423 ntodo = s->vio.ntodo();
00424 if (ntodo <= 0) {
00425 write_disable(nh, vc);
00426 return;
00427 }
00428 signalled = 1;
00429
00430 towrite = buf.reader()->read_avail();
00431 if (towrite > ntodo)
00432 towrite = ntodo;
00433 }
00434
00435 ink_assert(towrite >= 0);
00436 if (towrite <= 0) {
00437 write_disable(nh, vc);
00438 return;
00439 }
00440
00441 int64_t total_wrote = 0, wattempted = 0;
00442 int needs = 0;
00443 int64_t r = vc->load_buffer_and_write(towrite, wattempted, total_wrote, buf, needs);
00444
00445
00446 if (total_wrote != wattempted) {
00447 if (r <= 0)
00448 r = total_wrote - wattempted;
00449 else
00450 r = total_wrote - wattempted + r;
00451 }
00452
00453 if (r <= 0) {
00454 if (r == -EAGAIN || r == -ENOTCONN) {
00455 NET_DEBUG_COUNT_DYN_STAT(net_calls_to_write_nodata_stat, 1);
00456 if((needs & EVENTIO_WRITE) == EVENTIO_WRITE) {
00457 vc->write.triggered = 0;
00458 nh->write_ready_list.remove(vc);
00459 write_reschedule(nh, vc);
00460 }
00461 if((needs & EVENTIO_READ) == EVENTIO_READ) {
00462 vc->read.triggered = 0;
00463 nh->read_ready_list.remove(vc);
00464 read_reschedule(nh, vc);
00465 }
00466 return;
00467 }
00468 if (!r || r == -ECONNRESET) {
00469 vc->write.triggered = 0;
00470 write_signal_done(VC_EVENT_EOS, nh, vc);
00471 return;
00472 }
00473 vc->write.triggered = 0;
00474 write_signal_error(nh, vc, (int)-r);
00475 return;
00476 } else {
00477 NET_SUM_DYN_STAT(net_write_bytes_stat, r);
00478
00479
00480 ink_assert(buf.reader()->read_avail() >= r);
00481 buf.reader()->consume(r);
00482 ink_assert(buf.reader()->read_avail() >= 0);
00483 s->vio.ndone += r;
00484
00485 net_activity(vc, thread);
00486
00487 ink_assert(ntodo >= 0);
00488 if (s->vio.ntodo() <= 0) {
00489 write_signal_done(VC_EVENT_WRITE_COMPLETE, nh, vc);
00490 return;
00491 } else if (!signalled) {
00492 if (write_signal_and_update(VC_EVENT_WRITE_READY, vc) != EVENT_CONT) {
00493 return;
00494 }
00495
00496 if (lock.m.m_ptr != s->vio.mutex.m_ptr) {
00497 write_reschedule(nh, vc);
00498 return;
00499 }
00500 }
00501 if (!buf.reader()->read_avail()) {
00502 write_disable(nh, vc);
00503 return;
00504 }
00505
00506 if((needs & EVENTIO_WRITE) == EVENTIO_WRITE) {
00507 write_reschedule(nh, vc);
00508 }
00509 if((needs & EVENTIO_READ) == EVENTIO_READ) {
00510 read_reschedule(nh, vc);
00511 }
00512 return;
00513 }
00514 }
00515
00516 bool
00517 UnixNetVConnection::get_data(int id, void *data)
00518 {
00519 union {
00520 TSVIO * vio;
00521 void * data;
00522 } ptr;
00523
00524 ptr.data = data;
00525
00526 switch (id) {
00527 case TS_API_DATA_READ_VIO:
00528 *ptr.vio = (TSVIO)&this->read.vio;
00529 return true;
00530 case TS_API_DATA_WRITE_VIO:
00531 *ptr.vio = (TSVIO)&this->write.vio;
00532 return true;
00533 default:
00534 return false;
00535 }
00536 }
00537
00538 VIO *
00539 UnixNetVConnection::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
00540 {
00541 ink_assert(!closed);
00542 read.vio.op = VIO::READ;
00543 read.vio.mutex = c->mutex;
00544 read.vio._cont = c;
00545 read.vio.nbytes = nbytes;
00546 read.vio.ndone = 0;
00547 read.vio.vc_server = (VConnection *) this;
00548 if (buf) {
00549 read.vio.buffer.writer_for(buf);
00550 if (!read.enabled)
00551 read.vio.reenable();
00552 } else {
00553 read.vio.buffer.clear();
00554 disable_read(this);
00555 }
00556 return &read.vio;
00557 }
00558
00559 VIO *
00560 UnixNetVConnection::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *reader, bool owner)
00561 {
00562 ink_assert(!closed);
00563 write.vio.op = VIO::WRITE;
00564 write.vio.mutex = c->mutex;
00565 write.vio._cont = c;
00566 write.vio.nbytes = nbytes;
00567 write.vio.ndone = 0;
00568 write.vio.vc_server = (VConnection *) this;
00569 if (reader) {
00570 ink_assert(!owner);
00571 write.vio.buffer.reader_for(reader);
00572 if (nbytes && !write.enabled)
00573 write.vio.reenable();
00574 } else {
00575 disable_write(this);
00576 }
00577 return &write.vio;
00578 }
00579
00580 void
00581 UnixNetVConnection::do_io_close(int alerrno )
00582 {
00583 disable_read(this);
00584 disable_write(this);
00585 read.vio.buffer.clear();
00586 read.vio.nbytes = 0;
00587 read.vio.op = VIO::NONE;
00588 write.vio.buffer.clear();
00589 write.vio.nbytes = 0;
00590 write.vio.op = VIO::NONE;
00591
00592 EThread *t = this_ethread();
00593 bool close_inline = !recursion && nh->mutex->thread_holding == t;
00594
00595 INK_WRITE_MEMORY_BARRIER;
00596 if (alerrno && alerrno != -1)
00597 this->lerrno = alerrno;
00598 if (alerrno == -1)
00599 closed = 1;
00600 else
00601 closed = -1;
00602
00603 if (close_inline)
00604 close_UnixNetVConnection(this, t);
00605 }
00606
00607 void
00608 UnixNetVConnection::do_io_shutdown(ShutdownHowTo_t howto)
00609 {
00610 switch (howto) {
00611 case IO_SHUTDOWN_READ:
00612 socketManager.shutdown(((UnixNetVConnection *) this)->con.fd, 0);
00613 disable_read(this);
00614 read.vio.buffer.clear();
00615 read.vio.nbytes = 0;
00616 f.shutdown = NET_VC_SHUTDOWN_READ;
00617 break;
00618 case IO_SHUTDOWN_WRITE:
00619 socketManager.shutdown(((UnixNetVConnection *) this)->con.fd, 1);
00620 disable_write(this);
00621 write.vio.buffer.clear();
00622 write.vio.nbytes = 0;
00623 f.shutdown = NET_VC_SHUTDOWN_WRITE;
00624 break;
00625 case IO_SHUTDOWN_READWRITE:
00626 socketManager.shutdown(((UnixNetVConnection *) this)->con.fd, 2);
00627 disable_read(this);
00628 disable_write(this);
00629 read.vio.buffer.clear();
00630 read.vio.nbytes = 0;
00631 write.vio.buffer.clear();
00632 write.vio.nbytes = 0;
00633 f.shutdown = NET_VC_SHUTDOWN_READ | NET_VC_SHUTDOWN_WRITE;
00634 break;
00635 default:
00636 ink_assert(!"not reached");
00637 }
00638 }
00639
00640 int
00641 OOB_callback::retry_OOB_send(int , Event * )
00642 {
00643 ink_assert(mutex->thread_holding == this_ethread());
00644
00645 server_vc->oob_ptr = NULL;
00646 server_vc->send_OOB(server_cont, data, length);
00647 delete this;
00648 return EVENT_DONE;
00649 }
00650
00651 void
00652 UnixNetVConnection::cancel_OOB()
00653 {
00654 UnixNetVConnection *u = (UnixNetVConnection *) this;
00655 if (u->oob_ptr) {
00656 if (u->oob_ptr->trigger) {
00657 u->oob_ptr->trigger->cancel_action();
00658 u->oob_ptr->trigger = NULL;
00659 }
00660 delete u->oob_ptr;
00661 u->oob_ptr = NULL;
00662 }
00663 }
00664
00665 Action *
00666 UnixNetVConnection::send_OOB(Continuation *cont, char *buf, int len)
00667 {
00668 UnixNetVConnection *u = (UnixNetVConnection *) this;
00669 ink_assert(len > 0);
00670 ink_assert(buf);
00671 ink_assert(!u->oob_ptr);
00672 int written;
00673 ink_assert(cont->mutex->thread_holding == this_ethread());
00674 written = socketManager.send(u->con.fd, buf, len, MSG_OOB);
00675 if (written == len) {
00676 cont->handleEvent(VC_EVENT_OOB_COMPLETE, NULL);
00677 return ACTION_RESULT_DONE;
00678 } else if (!written) {
00679 cont->handleEvent(VC_EVENT_EOS, NULL);
00680 return ACTION_RESULT_DONE;
00681 }
00682 if (written > 0 && written < len) {
00683 u->oob_ptr = new OOB_callback(mutex, this, cont, buf + written, len - written);
00684 u->oob_ptr->trigger = mutex->thread_holding->schedule_in_local(u->oob_ptr, HRTIME_MSECONDS(10));
00685 return u->oob_ptr->trigger;
00686 } else {
00687
00688
00689 written = -errno;
00690 ink_assert(written == -EAGAIN || written == -ENOTCONN);
00691 u->oob_ptr = new OOB_callback(mutex, this, cont, buf, len);
00692 u->oob_ptr->trigger = mutex->thread_holding->schedule_in_local(u->oob_ptr, HRTIME_MSECONDS(10));
00693 return u->oob_ptr->trigger;
00694 }
00695 }
00696
00697
00698
00699
00700
00701 void
00702 UnixNetVConnection::reenable(VIO *vio)
00703 {
00704 if (STATE_FROM_VIO(vio)->enabled)
00705 return;
00706 set_enabled(vio);
00707 if (!thread)
00708 return;
00709 EThread *t = vio->mutex->thread_holding;
00710 ink_assert(t == this_ethread());
00711 ink_assert(!closed);
00712 if (nh->mutex->thread_holding == t) {
00713 if (vio == &read.vio) {
00714 ep.modify(EVENTIO_READ);
00715 ep.refresh(EVENTIO_READ);
00716 if (read.triggered)
00717 nh->read_ready_list.in_or_enqueue(this);
00718 else
00719 nh->read_ready_list.remove(this);
00720 } else {
00721 ep.modify(EVENTIO_WRITE);
00722 ep.refresh(EVENTIO_WRITE);
00723 if (write.triggered)
00724 nh->write_ready_list.in_or_enqueue(this);
00725 else
00726 nh->write_ready_list.remove(this);
00727 }
00728 } else {
00729 MUTEX_TRY_LOCK(lock, nh->mutex, t);
00730 if (!lock) {
00731 if (vio == &read.vio) {
00732 if (!read.in_enabled_list) {
00733 read.in_enabled_list = 1;
00734 nh->read_enable_list.push(this);
00735 }
00736 } else {
00737 if (!write.in_enabled_list) {
00738 write.in_enabled_list = 1;
00739 nh->write_enable_list.push(this);
00740 }
00741 }
00742 if (nh->trigger_event && nh->trigger_event->ethread->signal_hook)
00743 nh->trigger_event->ethread->signal_hook(nh->trigger_event->ethread);
00744 } else {
00745 if (vio == &read.vio) {
00746 ep.modify(EVENTIO_READ);
00747 ep.refresh(EVENTIO_READ);
00748 if (read.triggered)
00749 nh->read_ready_list.in_or_enqueue(this);
00750 else
00751 nh->read_ready_list.remove(this);
00752 } else {
00753 ep.modify(EVENTIO_WRITE);
00754 ep.refresh(EVENTIO_WRITE);
00755 if (write.triggered)
00756 nh->write_ready_list.in_or_enqueue(this);
00757 else
00758 nh->write_ready_list.remove(this);
00759 }
00760 }
00761 }
00762 }
00763
00764 void
00765 UnixNetVConnection::reenable_re(VIO *vio)
00766 {
00767 if (!thread)
00768 return;
00769 EThread *t = vio->mutex->thread_holding;
00770 ink_assert(t == this_ethread());
00771 if (nh->mutex->thread_holding == t) {
00772 set_enabled(vio);
00773 if (vio == &read.vio) {
00774 ep.modify(EVENTIO_READ);
00775 ep.refresh(EVENTIO_READ);
00776 if (read.triggered)
00777 net_read_io(nh, t);
00778 else
00779 nh->read_ready_list.remove(this);
00780 } else {
00781 ep.modify(EVENTIO_WRITE);
00782 ep.refresh(EVENTIO_WRITE);
00783 if (write.triggered)
00784 write_to_net(nh, this, t);
00785 else
00786 nh->write_ready_list.remove(this);
00787 }
00788 } else
00789 reenable(vio);
00790 }
00791
00792
00793 UnixNetVConnection::UnixNetVConnection()
00794 : closed(0), inactivity_timeout_in(0), active_timeout_in(0),
00795 #ifdef INACTIVITY_TIMEOUT
00796 inactivity_timeout(NULL),
00797 #else
00798 next_inactivity_timeout_at(0),
00799 #endif
00800 active_timeout(NULL), nh(NULL),
00801 id(0), flags(0), recursion(0), submit_time(0), oob_ptr(0),
00802 from_accept_thread(false)
00803 {
00804 memset(&local_addr, 0, sizeof local_addr);
00805 memset(&server_addr, 0, sizeof server_addr);
00806 SET_HANDLER((NetVConnHandler) & UnixNetVConnection::startEvent);
00807 }
00808
00809
00810
00811 void
00812 UnixNetVConnection::set_enabled(VIO *vio)
00813 {
00814 ink_assert(vio->mutex->thread_holding == this_ethread() && thread);
00815 ink_assert(!closed);
00816 STATE_FROM_VIO(vio)->enabled = 1;
00817 #ifdef INACTIVITY_TIMEOUT
00818 if (!inactivity_timeout && inactivity_timeout_in) {
00819 if (vio->mutex->thread_holding == thread)
00820 inactivity_timeout = thread->schedule_in_local(this, inactivity_timeout_in);
00821 else
00822 inactivity_timeout = thread->schedule_in(this, inactivity_timeout_in);
00823 }
00824 #else
00825 if (!next_inactivity_timeout_at && inactivity_timeout_in)
00826 next_inactivity_timeout_at = ink_get_hrtime() + inactivity_timeout_in;
00827 #endif
00828 }
00829
00830 void
00831 UnixNetVConnection::net_read_io(NetHandler *nh, EThread *lthread)
00832 {
00833 read_from_net(nh, this, lthread);
00834 }
00835
00836
00837
00838
00839
00840 int64_t
00841 UnixNetVConnection::load_buffer_and_write(int64_t towrite, int64_t &wattempted, int64_t &total_wrote, MIOBufferAccessor & buf, int &needs)
00842 {
00843 int64_t r = 0;
00844
00845
00846 int64_t offset = buf.reader()->start_offset;
00847 IOBufferBlock *b = buf.reader()->block;
00848
00849 do {
00850 IOVec tiovec[NET_MAX_IOV];
00851 int niov = 0;
00852 int64_t total_wrote_last = total_wrote;
00853 while (b && niov < NET_MAX_IOV) {
00854
00855 int64_t l = b->read_avail();
00856 l -= offset;
00857 if (l <= 0) {
00858 offset = -l;
00859 b = b->next;
00860 continue;
00861 }
00862
00863 int64_t wavail = towrite - total_wrote;
00864 if (l > wavail)
00865 l = wavail;
00866 if (!l)
00867 break;
00868 total_wrote += l;
00869
00870 tiovec[niov].iov_len = l;
00871 tiovec[niov].iov_base = b->start() + offset;
00872 niov++;
00873
00874 offset = 0;
00875 b = b->next;
00876 }
00877 wattempted = total_wrote - total_wrote_last;
00878 if (niov == 1)
00879 r = socketManager.write(con.fd, tiovec[0].iov_base, tiovec[0].iov_len);
00880 else
00881 r = socketManager.writev(con.fd, &tiovec[0], niov);
00882 ProxyMutex *mutex = thread->mutex;
00883 NET_DEBUG_COUNT_DYN_STAT(net_calls_to_write_stat, 1);
00884 } while (r == wattempted && total_wrote < towrite);
00885
00886 needs |= EVENTIO_WRITE;
00887
00888 return (r);
00889 }
00890
00891 void
00892 UnixNetVConnection::readDisable(NetHandler *nh)
00893 {
00894 read_disable(nh, this);
00895 }
00896
00897 void
00898 UnixNetVConnection::readSignalError(NetHandler *nh, int err)
00899 {
00900 read_signal_error(nh, this, err);
00901 }
00902
00903 int
00904 UnixNetVConnection::readSignalDone(int event, NetHandler *nh)
00905 {
00906 return (read_signal_done(event, nh, this));
00907 }
00908
00909
00910 int
00911 UnixNetVConnection::readSignalAndUpdate(int event)
00912 {
00913 return (read_signal_and_update(event, this));
00914 }
00915
00916
00917
00918
00919 void
00920 UnixNetVConnection::readReschedule(NetHandler *nh)
00921 {
00922 read_reschedule(nh, this);
00923 }
00924
00925 void
00926 UnixNetVConnection::writeReschedule(NetHandler *nh)
00927 {
00928 write_reschedule(nh, this);
00929 }
00930
00931 void
00932 UnixNetVConnection::netActivity(EThread *lthread)
00933 {
00934 net_activity(this, lthread);
00935 }
00936
00937 int
00938 UnixNetVConnection::startEvent(int , Event *e)
00939 {
00940 MUTEX_TRY_LOCK(lock, get_NetHandler(e->ethread)->mutex, e->ethread);
00941 if (!lock) {
00942 e->schedule_in(NET_RETRY_DELAY);
00943 return EVENT_CONT;
00944 }
00945 if (!action_.cancelled)
00946 connectUp(e->ethread, NO_FD);
00947 else
00948 free(e->ethread);
00949 return EVENT_DONE;
00950 }
00951
00952 int
00953 UnixNetVConnection::acceptEvent(int event, Event *e)
00954 {
00955 thread = e->ethread;
00956
00957 MUTEX_TRY_LOCK(lock, get_NetHandler(thread)->mutex, e->ethread);
00958 if (!lock) {
00959 if (event == EVENT_NONE) {
00960 thread->schedule_in(this, NET_RETRY_DELAY);
00961 return EVENT_DONE;
00962 } else {
00963 e->schedule_in(NET_RETRY_DELAY);
00964 return EVENT_CONT;
00965 }
00966 }
00967
00968 if (action_.cancelled) {
00969 free(thread);
00970 return EVENT_DONE;
00971 }
00972
00973 SET_HANDLER((NetVConnHandler) & UnixNetVConnection::mainEvent);
00974
00975 nh = get_NetHandler(thread);
00976 PollDescriptor *pd = get_PollDescriptor(thread);
00977 if (ep.start(pd, this, EVENTIO_READ|EVENTIO_WRITE) < 0) {
00978 Debug("iocore_net", "acceptEvent : failed EventIO::start\n");
00979 close_UnixNetVConnection(this, e->ethread);
00980 return EVENT_DONE;
00981 }
00982
00983 nh->open_list.enqueue(this);
00984
00985 if (inactivity_timeout_in) {
00986 UnixNetVConnection::set_inactivity_timeout(inactivity_timeout_in);
00987 }
00988
00989 if (active_timeout_in) {
00990 UnixNetVConnection::set_active_timeout(active_timeout_in);
00991 }
00992
00993 action_.continuation->handleEvent(NET_EVENT_ACCEPT, this);
00994 return EVENT_DONE;
00995 }
00996
00997
00998
00999
01000
01001
01002 int
01003 UnixNetVConnection::mainEvent(int event, Event *e)
01004 {
01005 ink_assert(event == EVENT_IMMEDIATE || event == EVENT_INTERVAL);
01006 ink_assert(thread == this_ethread());
01007
01008 MUTEX_TRY_LOCK(hlock, get_NetHandler(thread)->mutex, e->ethread);
01009 MUTEX_TRY_LOCK(rlock, read.vio.mutex ? (ProxyMutex *) read.vio.mutex : (ProxyMutex *) e->ethread->mutex, e->ethread);
01010 MUTEX_TRY_LOCK(wlock, write.vio.mutex ? (ProxyMutex *) write.vio.mutex :
01011 (ProxyMutex *) e->ethread->mutex, e->ethread);
01012 if (!hlock || !rlock || !wlock ||
01013 (read.vio.mutex.m_ptr && rlock.m.m_ptr != read.vio.mutex.m_ptr) ||
01014 (write.vio.mutex.m_ptr && wlock.m.m_ptr != write.vio.mutex.m_ptr)) {
01015 #ifndef INACTIVITY_TIMEOUT
01016 if (e == active_timeout)
01017 #endif
01018 e->schedule_in(NET_RETRY_DELAY);
01019 return EVENT_CONT;
01020 }
01021 if (e->cancelled)
01022 return EVENT_DONE;
01023
01024 int signal_event;
01025 Event **signal_timeout;
01026 Continuation *reader_cont = NULL;
01027 Continuation *writer_cont = NULL;
01028 ink_hrtime next_activity_timeout_at = 0;
01029 ink_hrtime *signal_timeout_at = &next_activity_timeout_at;
01030 Event *t = NULL;
01031 signal_timeout = &t;
01032
01033 #ifdef INACTIVITY_TIMEOUT
01034 if (e == inactivity_timeout) {
01035 signal_event = VC_EVENT_INACTIVITY_TIMEOUT;
01036 signal_timeout = &inactivity_timeout;
01037 }
01038 #else
01039 if (event == EVENT_IMMEDIATE) {
01040
01041
01042
01043 if (!inactivity_timeout_in || next_inactivity_timeout_at > ink_get_hrtime())
01044 return EVENT_CONT;
01045 signal_event = VC_EVENT_INACTIVITY_TIMEOUT;
01046 signal_timeout_at = &next_inactivity_timeout_at;
01047 }
01048 #endif
01049 else {
01050 ink_assert(e == active_timeout);
01051 signal_event = VC_EVENT_ACTIVE_TIMEOUT;
01052 signal_timeout = &active_timeout;
01053 }
01054 *signal_timeout = 0;
01055 *signal_timeout_at = 0;
01056 writer_cont = write.vio._cont;
01057
01058 if (closed) {
01059 close_UnixNetVConnection(this, thread);
01060 return EVENT_DONE;
01061 }
01062
01063 if (read.vio.op == VIO::READ && !(f.shutdown & NET_VC_SHUTDOWN_READ)) {
01064 reader_cont = read.vio._cont;
01065 if (read_signal_and_update(signal_event, this) == EVENT_DONE)
01066 return EVENT_DONE;
01067 }
01068
01069 if (!*signal_timeout &&
01070 !*signal_timeout_at &&
01071 !closed && write.vio.op == VIO::WRITE &&
01072 !(f.shutdown & NET_VC_SHUTDOWN_WRITE) && reader_cont != write.vio._cont && writer_cont == write.vio._cont)
01073 if (write_signal_and_update(signal_event, this) == EVENT_DONE)
01074 return EVENT_DONE;
01075 return EVENT_DONE;
01076 }
01077
01078
01079 int
01080 UnixNetVConnection::connectUp(EThread *t, int fd)
01081 {
01082 int res;
01083
01084 thread = t;
01085 if (check_net_throttle(CONNECT, submit_time)) {
01086 check_throttle_warning();
01087 action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *) -ENET_THROTTLING);
01088 free(t);
01089 return CONNECT_FAILURE;
01090 }
01091
01092
01093 options.ip_family = server_addr.sa.sa_family;
01094
01095
01096
01097
01098 if (is_debug_tag_set("iocore_net")) {
01099 char addrbuf[INET6_ADDRSTRLEN];
01100 Debug("iocore_net", "connectUp:: local_addr=%s:%d [%s]\n",
01101 options.local_ip.isValid()
01102 ? options.local_ip.toString(addrbuf, sizeof(addrbuf))
01103 : "*",
01104 options.local_port,
01105 NetVCOptions::toString(options.addr_binding)
01106 );
01107 }
01108
01109
01110
01111 if (fd == NO_FD) {
01112 res = con.open(options);
01113 if (res != 0) {
01114 goto fail;
01115 }
01116 } else {
01117 int len = sizeof(con.sock_type);
01118
01119 res = safe_getsockopt(fd, SOL_SOCKET, SO_TYPE, (char *)&con.sock_type, &len);
01120 if (res != 0) {
01121 goto fail;
01122 }
01123
01124 safe_nonblocking(fd);
01125 con.fd = fd;
01126 con.is_connected = true;
01127 con.is_bound = true;
01128 }
01129
01130
01131
01132 if (ep.start(get_PollDescriptor(t), this, EVENTIO_READ|EVENTIO_WRITE) < 0) {
01133 lerrno = errno;
01134 Debug("iocore_net", "connectUp : Failed to add to epoll list\n");
01135 action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *)0);
01136 free(t);
01137 return CONNECT_FAILURE;
01138 }
01139
01140 if (fd == NO_FD) {
01141 res = con.connect(&server_addr.sa, options);
01142 if (res != 0) {
01143 goto fail;
01144 }
01145 }
01146
01147 check_emergency_throttle(con);
01148
01149
01150
01151 SET_HANDLER(&UnixNetVConnection::mainEvent);
01152
01153 nh = get_NetHandler(t);
01154 nh->open_list.enqueue(this);
01155
01156 ink_assert(!inactivity_timeout_in);
01157 ink_assert(!active_timeout_in);
01158 action_.continuation->handleEvent(NET_EVENT_OPEN, this);
01159 return CONNECT_SUCCESS;
01160
01161 fail:
01162 lerrno = errno;
01163 action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *)(intptr_t)res);
01164 free(t);
01165 return CONNECT_FAILURE;
01166 }
01167
01168
01169 void
01170 UnixNetVConnection::free(EThread *t)
01171 {
01172 NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, -1);
01173
01174 this->mutex.clear();
01175 action_.mutex.clear();
01176 got_remote_addr = 0;
01177 got_local_addr = 0;
01178 read.vio.mutex.clear();
01179 write.vio.mutex.clear();
01180 flags = 0;
01181 SET_CONTINUATION_HANDLER(this, (NetVConnHandler) & UnixNetVConnection::startEvent);
01182 nh = NULL;
01183 read.triggered = 0;
01184 write.triggered = 0;
01185 options.reset();
01186 closed = 0;
01187 ink_assert(!read.ready_link.prev && !read.ready_link.next);
01188 ink_assert(!read.enable_link.next);
01189 ink_assert(!write.ready_link.prev && !write.ready_link.next);
01190 ink_assert(!write.enable_link.next);
01191 ink_assert(!link.next && !link.prev);
01192 ink_assert(!active_timeout);
01193 ink_assert(con.fd == NO_FD);
01194 ink_assert(t == this_ethread());
01195
01196 if (from_accept_thread) {
01197 netVCAllocator.free(this);
01198 } else {
01199 THREAD_FREE(this, netVCAllocator, t);
01200 }
01201 }
01202
01203 void
01204 UnixNetVConnection::apply_options()
01205 {
01206 con.apply_options(options);
01207 }