00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 #include "P_Net.h"
00025 
00026 #define STATE_VIO_OFFSET ((uintptr_t)&((NetState*)0)->vio)
00027 #define STATE_FROM_VIO(_x) ((NetState*)(((char*)(_x)) - STATE_VIO_OFFSET))
00028 
00029 #define disable_read(_vc) (_vc)->read.enabled = 0
00030 #define disable_write(_vc) (_vc)->write.enabled = 0
00031 #define enable_read(_vc) (_vc)->read.enabled = 1
00032 #define enable_write(_vc) (_vc)->write.enabled = 1
00033 
00034 #ifndef UIO_MAXIOV
00035 #define NET_MAX_IOV 16          // UIO_MAXIOV shall be at least 16 1003.1g (5.4.1.1)
00036 #else
00037 #define NET_MAX_IOV UIO_MAXIOV
00038 #endif
00039 
00040 
00041 ClassAllocator<UnixNetVConnection> netVCAllocator("netVCAllocator");
00042 
00043 
00044 
00045 
00046 
00047 static inline void
00048 read_reschedule(NetHandler *nh, UnixNetVConnection *vc)
00049 {
00050   vc->ep.refresh(EVENTIO_READ);
00051   if (vc->read.triggered && vc->read.enabled) {
00052     nh->read_ready_list.in_or_enqueue(vc);
00053   } else
00054     nh->read_ready_list.remove(vc);
00055 }
00056 
00057 static inline void
00058 write_reschedule(NetHandler *nh, UnixNetVConnection *vc)
00059 {
00060   vc->ep.refresh(EVENTIO_WRITE);
00061   if (vc->write.triggered && vc->write.enabled) {
00062     nh->write_ready_list.in_or_enqueue(vc);
00063   } else
00064     nh->write_ready_list.remove(vc);
00065 }
00066 
00067 void
00068 net_activity(UnixNetVConnection *vc, EThread *thread)
00069 {
00070   (void) thread;
00071 #ifdef INACTIVITY_TIMEOUT
00072   if (vc->inactivity_timeout && vc->inactivity_timeout_in && vc->inactivity_timeout->ethread == thread)
00073     vc->inactivity_timeout->schedule_in(vc->inactivity_timeout_in);
00074   else {
00075     if (vc->inactivity_timeout)
00076       vc->inactivity_timeout->cancel_action();
00077     if (vc->inactivity_timeout_in) {
00078       vc->inactivity_timeout = vc->thread->schedule_in_local(vc, vc->inactivity_timeout_in);
00079     } else
00080       vc->inactivity_timeout = 0;
00081   }
00082 #else
00083   if (vc->inactivity_timeout_in)
00084     vc->next_inactivity_timeout_at = ink_get_hrtime() + vc->inactivity_timeout_in;
00085   else
00086     vc->next_inactivity_timeout_at = 0;
00087 #endif
00088 
00089 }
00090 
00091 
00092 
00093 
00094 void
00095 close_UnixNetVConnection(UnixNetVConnection *vc, EThread *t)
00096 {
00097   NetHandler *nh = vc->nh;
00098   vc->cancel_OOB();
00099   vc->ep.stop();
00100   vc->con.close();
00101 #ifdef INACTIVITY_TIMEOUT
00102   if (vc->inactivity_timeout) {
00103     vc->inactivity_timeout->cancel_action(vc);
00104     vc->inactivity_timeout = NULL;
00105   }
00106 #else
00107   vc->next_inactivity_timeout_at = 0;
00108 #endif
00109   vc->inactivity_timeout_in = 0;
00110   if (vc->active_timeout) {
00111     vc->active_timeout->cancel_action(vc);
00112     vc->active_timeout = NULL;
00113   }
00114   vc->active_timeout_in = 0;
00115   nh->open_list.remove(vc);
00116   nh->cop_list.remove(vc);
00117   nh->read_ready_list.remove(vc);
00118   nh->write_ready_list.remove(vc);
00119   if (vc->read.in_enabled_list) {
00120     nh->read_enable_list.remove(vc);
00121     vc->read.in_enabled_list = 0;
00122   }
00123   if (vc->write.in_enabled_list) {
00124     nh->write_enable_list.remove(vc);
00125     vc->write.in_enabled_list = 0;
00126   }
00127   vc->free(t);
00128 }
00129 
00130 
00131 
00132 
00133 static inline int
00134 read_signal_and_update(int event, UnixNetVConnection *vc)
00135 {
00136   vc->recursion++;
00137   vc->read.vio._cont->handleEvent(event, &vc->read.vio);
00138   if (!--vc->recursion && vc->closed) {
00139     
00140     ink_assert(vc->thread == this_ethread());
00141     close_UnixNetVConnection(vc, vc->thread);
00142     return EVENT_DONE;
00143   } else {
00144     return EVENT_CONT;
00145   }
00146 }
00147 
00148 static inline int
00149 write_signal_and_update(int event, UnixNetVConnection *vc)
00150 {
00151   vc->recursion++;
00152   vc->write.vio._cont->handleEvent(event, &vc->write.vio);
00153   if (!--vc->recursion && vc->closed) {
00154     
00155     ink_assert(vc->thread == this_ethread());
00156     close_UnixNetVConnection(vc, vc->thread);
00157     return EVENT_DONE;
00158   } else {
00159     return EVENT_CONT;
00160   }
00161 }
00162 
00163 static inline int
00164 read_signal_done(int event, NetHandler *nh, UnixNetVConnection *vc)
00165 {
00166   vc->read.enabled = 0;
00167   if (read_signal_and_update(event, vc) == EVENT_DONE) {
00168     return EVENT_DONE;
00169   } else {
00170     read_reschedule(nh, vc);
00171     return EVENT_CONT;
00172   }
00173 }
00174 
00175 static inline int
00176 write_signal_done(int event, NetHandler *nh, UnixNetVConnection *vc)
00177 {
00178   vc->write.enabled = 0;
00179   if (write_signal_and_update(event, vc) == EVENT_DONE) {
00180     return EVENT_DONE;
00181   } else {
00182     write_reschedule(nh, vc);
00183     return EVENT_CONT;
00184   }
00185 }
00186 
00187 static inline int
00188 read_signal_error(NetHandler *nh, UnixNetVConnection *vc, int lerrno)
00189 {
00190   vc->lerrno = lerrno;
00191   return read_signal_done(VC_EVENT_ERROR, nh, vc);
00192 }
00193 
00194 static inline int
00195 write_signal_error(NetHandler *nh, UnixNetVConnection *vc, int lerrno)
00196 {
00197   vc->lerrno = lerrno;
00198   return write_signal_done(VC_EVENT_ERROR, nh, vc);
00199 }
00200 
00201 
00202 
00203 
00204 
00205 static void
00206 read_from_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
00207 {
00208   NetState *s = &vc->read;
00209   ProxyMutex *mutex = thread->mutex;
00210   MIOBufferAccessor & buf = s->vio.buffer;
00211   int64_t r = 0;
00212 
00213   MUTEX_TRY_LOCK_FOR(lock, s->vio.mutex, thread, s->vio._cont);
00214 
00215   if (!lock || lock.m.m_ptr != s->vio.mutex.m_ptr) {
00216     read_reschedule(nh, vc);
00217     return;
00218   }
00219   
00220   if (!s->enabled || s->vio.op != VIO::READ) {
00221     read_disable(nh, vc);
00222     return;
00223   }
00224 
00225   ink_assert(buf.writer());
00226 
00227   
00228   int64_t ntodo = s->vio.ntodo();
00229   if (ntodo <= 0) {
00230     read_disable(nh, vc);
00231     return;
00232   }
00233   int64_t toread = buf.writer()->write_avail();
00234   if (toread > ntodo)
00235     toread = ntodo;
00236 
00237   
00238   int64_t rattempted = 0, total_read = 0;
00239   int niov = 0;
00240   IOVec tiovec[NET_MAX_IOV];
00241   if (toread) {
00242     IOBufferBlock *b = buf.writer()->first_write_block();
00243     do {
00244       niov = 0;
00245       rattempted = 0;
00246       while (b && niov < NET_MAX_IOV) {
00247         int64_t a = b->write_avail();
00248         if (a > 0) {
00249           tiovec[niov].iov_base = b->_end;
00250           int64_t togo = toread - total_read - rattempted;
00251           if (a > togo)
00252             a = togo;
00253           tiovec[niov].iov_len = a;
00254           rattempted += a;
00255           niov++;
00256           if (a >= togo)
00257             break;
00258         }
00259         b = b->next;
00260       }
00261 
00262       if (niov == 1) {
00263         r = socketManager.read(vc->con.fd, tiovec[0].iov_base, tiovec[0].iov_len);
00264       } else {
00265         r = socketManager.readv(vc->con.fd, &tiovec[0], niov);
00266       }
00267       NET_DEBUG_COUNT_DYN_STAT(net_calls_to_read_stat, 1);
00268       total_read += rattempted;
00269     } while (rattempted && r == rattempted && total_read < toread);
00270 
00271     
00272     if (total_read != rattempted) {
00273       if (r <= 0)
00274         r = total_read - rattempted;
00275       else
00276         r = total_read - rattempted + r;
00277     }
00278     
00279     if (r <= 0) {
00280 
00281       if (r == -EAGAIN || r == -ENOTCONN) {
00282         NET_DEBUG_COUNT_DYN_STAT(net_calls_to_read_nodata_stat, 1);
00283         vc->read.triggered = 0;
00284         nh->read_ready_list.remove(vc);
00285         return;
00286       }
00287 
00288       if (!r || r == -ECONNRESET) {
00289         vc->read.triggered = 0;
00290         nh->read_ready_list.remove(vc);
00291         read_signal_done(VC_EVENT_EOS, nh, vc);
00292         return;
00293       }
00294       vc->read.triggered = 0;
00295       read_signal_error(nh, vc, (int)-r);
00296       return;
00297     }
00298     NET_SUM_DYN_STAT(net_read_bytes_stat, r);
00299 
00300     
00301     buf.writer()->fill(r);
00302 #ifdef DEBUG
00303     if (buf.writer()->write_avail() <= 0)
00304       Debug("iocore_net", "read_from_net, read buffer full");
00305 #endif
00306     s->vio.ndone += r;
00307     net_activity(vc, thread);
00308   } else
00309     r = 0;
00310 
00311   
00312   if (r) {
00313     
00314     ink_assert(ntodo >= 0);
00315     if (s->vio.ntodo() <= 0) {
00316       read_signal_done(VC_EVENT_READ_COMPLETE, nh, vc);
00317       Debug("iocore_net", "read_from_net, read finished - signal done");
00318       return;
00319     } else {
00320       if (read_signal_and_update(VC_EVENT_READ_READY, vc) != EVENT_CONT)
00321         return;
00322       
00323       if (lock.m.m_ptr != s->vio.mutex.m_ptr) {
00324         read_reschedule(nh, vc);
00325         return;
00326       }
00327     }
00328   }
00329   
00330   if (s->vio.ntodo() <= 0 || !s->enabled || !buf.writer()->write_avail()) {
00331     read_disable(nh, vc);
00332     return;
00333   }
00334 
00335   read_reschedule(nh, vc);
00336 }
00337 
00338 
00339 
00340 
00341 
00342 
00343 void
00344 write_to_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
00345 {
00346   ProxyMutex *mutex = thread->mutex;
00347 
00348   NET_DEBUG_COUNT_DYN_STAT(net_calls_to_writetonet_stat, 1);
00349   NET_DEBUG_COUNT_DYN_STAT(net_calls_to_writetonet_afterpoll_stat, 1);
00350 
00351   write_to_net_io(nh, vc, thread);
00352 }
00353 
00354 
00355 void
00356 write_to_net_io(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
00357 {
00358   NetState *s = &vc->write;
00359   ProxyMutex *mutex = thread->mutex;
00360 
00361   MUTEX_TRY_LOCK_FOR(lock, s->vio.mutex, thread, s->vio._cont);
00362 
00363   if (!lock || lock.m.m_ptr != s->vio.mutex.m_ptr) {
00364     write_reschedule(nh, vc);
00365     return;
00366   }
00367 
00368   
00369   
00370   if (!vc->getSSLHandShakeComplete()) {
00371     int err, ret;
00372 
00373     if (vc->getSSLClientConnection())
00374       ret = vc->sslStartHandShake(SSL_EVENT_CLIENT, err);
00375     else
00376       ret = vc->sslStartHandShake(SSL_EVENT_SERVER, err);
00377 
00378     if (ret == EVENT_ERROR) {
00379       vc->write.triggered = 0;
00380       write_signal_error(nh, vc, err);
00381     } else if (ret == SSL_HANDSHAKE_WANT_READ || ret == SSL_HANDSHAKE_WANT_ACCEPT || ret == SSL_HANDSHAKE_WANT_CONNECT
00382                || ret == SSL_HANDSHAKE_WANT_WRITE) {
00383       vc->read.triggered = 0;
00384       nh->read_ready_list.remove(vc);
00385       vc->write.triggered = 0;
00386       nh->write_ready_list.remove(vc);
00387       if (!(ret == SSL_HANDSHAKE_WANT_READ || ret == SSL_HANDSHAKE_WANT_ACCEPT))
00388         write_reschedule(nh, vc);
00389     } else if (ret == EVENT_DONE) {
00390       vc->write.triggered = 1;
00391       if (vc->write.enabled)
00392         nh->write_ready_list.in_or_enqueue(vc);
00393     } else
00394       write_reschedule(nh, vc);
00395     return;
00396   }
00397   
00398   if (!s->enabled || s->vio.op != VIO::WRITE) {
00399     write_disable(nh, vc);
00400     return;
00401   }
00402   
00403   int64_t ntodo = s->vio.ntodo();
00404   if (ntodo <= 0) {
00405     write_disable(nh, vc);
00406     return;
00407   }
00408 
00409   MIOBufferAccessor & buf = s->vio.buffer;
00410   ink_assert(buf.writer());
00411 
00412   
00413   int64_t towrite = buf.reader()->read_avail();
00414   if (towrite > ntodo)
00415     towrite = ntodo;
00416   int signalled = 0;
00417 
00418   
00419   if (towrite != ntodo && buf.writer()->write_avail()) {
00420     if (write_signal_and_update(VC_EVENT_WRITE_READY, vc) != EVENT_CONT) {
00421       return;
00422     }
00423     ntodo = s->vio.ntodo();
00424     if (ntodo <= 0) {
00425       write_disable(nh, vc);
00426       return;
00427     }
00428     signalled = 1;
00429     
00430     towrite = buf.reader()->read_avail();
00431     if (towrite > ntodo)
00432       towrite = ntodo;
00433   }
00434   
00435   ink_assert(towrite >= 0);
00436   if (towrite <= 0) {
00437     write_disable(nh, vc);
00438     return;
00439   }
00440 
00441   int64_t total_wrote = 0, wattempted = 0;
00442   int needs = 0;
00443   int64_t r = vc->load_buffer_and_write(towrite, wattempted, total_wrote, buf, needs);
00444 
00445   
00446   if (total_wrote != wattempted) {
00447     if (r <= 0)
00448       r = total_wrote - wattempted;
00449     else
00450       r = total_wrote - wattempted + r;
00451   }
00452   
00453   if (r <= 0) {                 
00454     if (r == -EAGAIN || r == -ENOTCONN) {
00455       NET_DEBUG_COUNT_DYN_STAT(net_calls_to_write_nodata_stat, 1);
00456       if((needs & EVENTIO_WRITE) == EVENTIO_WRITE) {
00457         vc->write.triggered = 0;
00458         nh->write_ready_list.remove(vc);
00459         write_reschedule(nh, vc);
00460       }
00461       if((needs & EVENTIO_READ) == EVENTIO_READ) {
00462         vc->read.triggered = 0;
00463         nh->read_ready_list.remove(vc);
00464         read_reschedule(nh, vc);
00465       }
00466       return;
00467     }
00468     if (!r || r == -ECONNRESET) {
00469       vc->write.triggered = 0;
00470       write_signal_done(VC_EVENT_EOS, nh, vc);
00471       return;
00472     }
00473     vc->write.triggered = 0;
00474     write_signal_error(nh, vc, (int)-r);
00475     return;
00476   } else {
00477     NET_SUM_DYN_STAT(net_write_bytes_stat, r);
00478 
00479     
00480     ink_assert(buf.reader()->read_avail() >= r);
00481     buf.reader()->consume(r);
00482     ink_assert(buf.reader()->read_avail() >= 0);
00483     s->vio.ndone += r;
00484 
00485     net_activity(vc, thread);
00486     
00487     ink_assert(ntodo >= 0);
00488     if (s->vio.ntodo() <= 0) {
00489       write_signal_done(VC_EVENT_WRITE_COMPLETE, nh, vc);
00490       return;
00491     } else if (!signalled) {
00492       if (write_signal_and_update(VC_EVENT_WRITE_READY, vc) != EVENT_CONT) {
00493         return;
00494       }
00495       
00496       if (lock.m.m_ptr != s->vio.mutex.m_ptr) {
00497         write_reschedule(nh, vc);
00498         return;
00499       }
00500     }
00501     if (!buf.reader()->read_avail()) {
00502       write_disable(nh, vc);
00503       return;
00504     }
00505 
00506     if((needs & EVENTIO_WRITE) == EVENTIO_WRITE) {
00507       write_reschedule(nh, vc);
00508     }
00509     if((needs & EVENTIO_READ) == EVENTIO_READ) {
00510       read_reschedule(nh, vc);
00511     }
00512     return;
00513   }
00514 }
00515 
00516 bool
00517 UnixNetVConnection::get_data(int id, void *data)
00518 {
00519   union {
00520     TSVIO * vio;
00521     void * data;
00522   } ptr;
00523 
00524   ptr.data = data;
00525 
00526   switch (id) {
00527   case TS_API_DATA_READ_VIO:
00528     *ptr.vio = (TSVIO)&this->read.vio;
00529     return true;
00530   case TS_API_DATA_WRITE_VIO:
00531     *ptr.vio = (TSVIO)&this->write.vio;
00532     return true;
00533   default:
00534     return false;
00535   }
00536 }
00537 
00538 VIO *
00539 UnixNetVConnection::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
00540 {
00541   ink_assert(!closed);
00542   read.vio.op = VIO::READ;
00543   read.vio.mutex = c->mutex;
00544   read.vio._cont = c;
00545   read.vio.nbytes = nbytes;
00546   read.vio.ndone = 0;
00547   read.vio.vc_server = (VConnection *) this;
00548   if (buf) {
00549     read.vio.buffer.writer_for(buf);
00550     if (!read.enabled)
00551       read.vio.reenable();
00552   } else {
00553     read.vio.buffer.clear();
00554     disable_read(this);
00555   }
00556   return &read.vio;
00557 }
00558 
00559 VIO *
00560 UnixNetVConnection::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *reader, bool owner)
00561 {
00562   ink_assert(!closed);
00563   write.vio.op = VIO::WRITE;
00564   write.vio.mutex = c->mutex;
00565   write.vio._cont = c;
00566   write.vio.nbytes = nbytes;
00567   write.vio.ndone = 0;
00568   write.vio.vc_server = (VConnection *) this;
00569   if (reader) {
00570     ink_assert(!owner);
00571     write.vio.buffer.reader_for(reader);
00572     if (nbytes && !write.enabled)
00573       write.vio.reenable();
00574   } else {
00575     disable_write(this);
00576   }
00577   return &write.vio;
00578 }
00579 
00580 void
00581 UnixNetVConnection::do_io_close(int alerrno  )
00582 {
00583   disable_read(this);
00584   disable_write(this);
00585   read.vio.buffer.clear();
00586   read.vio.nbytes = 0;
00587   read.vio.op = VIO::NONE;
00588   write.vio.buffer.clear();
00589   write.vio.nbytes = 0;
00590   write.vio.op = VIO::NONE;
00591 
00592   EThread *t = this_ethread();
00593   bool close_inline = !recursion && nh->mutex->thread_holding == t;
00594 
00595   INK_WRITE_MEMORY_BARRIER;
00596   if (alerrno && alerrno != -1)
00597     this->lerrno = alerrno;
00598   if (alerrno == -1)
00599     closed = 1;
00600   else
00601     closed = -1;
00602 
00603   if (close_inline)
00604     close_UnixNetVConnection(this, t);
00605 }
00606 
00607 void
00608 UnixNetVConnection::do_io_shutdown(ShutdownHowTo_t howto)
00609 {
00610   switch (howto) {
00611   case IO_SHUTDOWN_READ:
00612     socketManager.shutdown(((UnixNetVConnection *) this)->con.fd, 0);
00613     disable_read(this);
00614     read.vio.buffer.clear();
00615     read.vio.nbytes = 0;
00616     f.shutdown = NET_VC_SHUTDOWN_READ;
00617     break;
00618   case IO_SHUTDOWN_WRITE:
00619     socketManager.shutdown(((UnixNetVConnection *) this)->con.fd, 1);
00620     disable_write(this);
00621     write.vio.buffer.clear();
00622     write.vio.nbytes = 0;
00623     f.shutdown = NET_VC_SHUTDOWN_WRITE;
00624     break;
00625   case IO_SHUTDOWN_READWRITE:
00626     socketManager.shutdown(((UnixNetVConnection *) this)->con.fd, 2);
00627     disable_read(this);
00628     disable_write(this);
00629     read.vio.buffer.clear();
00630     read.vio.nbytes = 0;
00631     write.vio.buffer.clear();
00632     write.vio.nbytes = 0;
00633     f.shutdown = NET_VC_SHUTDOWN_READ | NET_VC_SHUTDOWN_WRITE;
00634     break;
00635   default:
00636     ink_assert(!"not reached");
00637   }
00638 }
00639 
00640 int
00641 OOB_callback::retry_OOB_send(int , Event * )
00642 {
00643   ink_assert(mutex->thread_holding == this_ethread());
00644   
00645   server_vc->oob_ptr = NULL;
00646   server_vc->send_OOB(server_cont, data, length);
00647   delete this;
00648   return EVENT_DONE;
00649 }
00650 
00651 void
00652 UnixNetVConnection::cancel_OOB()
00653 {
00654   UnixNetVConnection *u = (UnixNetVConnection *) this;
00655   if (u->oob_ptr) {
00656     if (u->oob_ptr->trigger) {
00657       u->oob_ptr->trigger->cancel_action();
00658       u->oob_ptr->trigger = NULL;
00659     }
00660     delete u->oob_ptr;
00661     u->oob_ptr = NULL;
00662   }
00663 }
00664 
00665 Action *
00666 UnixNetVConnection::send_OOB(Continuation *cont, char *buf, int len)
00667 {
00668   UnixNetVConnection *u = (UnixNetVConnection *) this;
00669   ink_assert(len > 0);
00670   ink_assert(buf);
00671   ink_assert(!u->oob_ptr);
00672   int written;
00673   ink_assert(cont->mutex->thread_holding == this_ethread());
00674   written = socketManager.send(u->con.fd, buf, len, MSG_OOB);
00675   if (written == len) {
00676     cont->handleEvent(VC_EVENT_OOB_COMPLETE, NULL);
00677     return ACTION_RESULT_DONE;
00678   } else if (!written) {
00679     cont->handleEvent(VC_EVENT_EOS, NULL);
00680     return ACTION_RESULT_DONE;
00681   }
00682   if (written > 0 && written < len) {
00683     u->oob_ptr = new OOB_callback(mutex, this, cont, buf + written, len - written);
00684     u->oob_ptr->trigger = mutex->thread_holding->schedule_in_local(u->oob_ptr, HRTIME_MSECONDS(10));
00685     return u->oob_ptr->trigger;
00686   } else {
00687     
00688     
00689     written = -errno;
00690     ink_assert(written == -EAGAIN || written == -ENOTCONN);
00691     u->oob_ptr = new OOB_callback(mutex, this, cont, buf, len);
00692     u->oob_ptr->trigger = mutex->thread_holding->schedule_in_local(u->oob_ptr, HRTIME_MSECONDS(10));
00693     return u->oob_ptr->trigger;
00694   }
00695 }
00696 
00697 
00698 
00699 
00700 
00701 void
00702 UnixNetVConnection::reenable(VIO *vio)
00703 {
00704   if (STATE_FROM_VIO(vio)->enabled)
00705     return;
00706   set_enabled(vio);
00707   if (!thread)
00708     return;
00709   EThread *t = vio->mutex->thread_holding;
00710   ink_assert(t == this_ethread());
00711   ink_assert(!closed);
00712   if (nh->mutex->thread_holding == t) {
00713     if (vio == &read.vio) {
00714       ep.modify(EVENTIO_READ);
00715       ep.refresh(EVENTIO_READ);
00716       if (read.triggered)
00717         nh->read_ready_list.in_or_enqueue(this);
00718       else
00719         nh->read_ready_list.remove(this);
00720     } else {
00721       ep.modify(EVENTIO_WRITE);
00722       ep.refresh(EVENTIO_WRITE);
00723       if (write.triggered)
00724         nh->write_ready_list.in_or_enqueue(this);
00725       else
00726         nh->write_ready_list.remove(this);
00727     }
00728   } else {
00729     MUTEX_TRY_LOCK(lock, nh->mutex, t);
00730     if (!lock) {
00731       if (vio == &read.vio) {
00732         if (!read.in_enabled_list) {
00733           read.in_enabled_list = 1;
00734           nh->read_enable_list.push(this);
00735         }
00736       } else {
00737         if (!write.in_enabled_list) {
00738           write.in_enabled_list = 1;
00739           nh->write_enable_list.push(this);
00740         }
00741       }
00742       if (nh->trigger_event && nh->trigger_event->ethread->signal_hook)
00743         nh->trigger_event->ethread->signal_hook(nh->trigger_event->ethread);
00744     } else {
00745       if (vio == &read.vio) {
00746         ep.modify(EVENTIO_READ);
00747         ep.refresh(EVENTIO_READ);
00748         if (read.triggered)
00749           nh->read_ready_list.in_or_enqueue(this);
00750         else
00751           nh->read_ready_list.remove(this);
00752       } else {
00753         ep.modify(EVENTIO_WRITE);
00754         ep.refresh(EVENTIO_WRITE);
00755         if (write.triggered)
00756           nh->write_ready_list.in_or_enqueue(this);
00757         else
00758           nh->write_ready_list.remove(this);
00759       }
00760     }
00761   }
00762 }
00763 
00764 void
00765 UnixNetVConnection::reenable_re(VIO *vio)
00766 {
00767   if (!thread)
00768     return;
00769   EThread *t = vio->mutex->thread_holding;
00770   ink_assert(t == this_ethread());
00771   if (nh->mutex->thread_holding == t) {
00772     set_enabled(vio);
00773     if (vio == &read.vio) {
00774       ep.modify(EVENTIO_READ);
00775       ep.refresh(EVENTIO_READ);
00776       if (read.triggered)
00777         net_read_io(nh, t);
00778       else
00779         nh->read_ready_list.remove(this);
00780     } else {
00781       ep.modify(EVENTIO_WRITE);
00782       ep.refresh(EVENTIO_WRITE);
00783       if (write.triggered)
00784         write_to_net(nh, this, t);
00785       else
00786         nh->write_ready_list.remove(this);
00787     }
00788   } else
00789     reenable(vio);
00790 }
00791 
00792 
00793 UnixNetVConnection::UnixNetVConnection()
00794   : closed(0), inactivity_timeout_in(0), active_timeout_in(0),
00795 #ifdef INACTIVITY_TIMEOUT
00796     inactivity_timeout(NULL),
00797 #else
00798     next_inactivity_timeout_at(0),
00799 #endif
00800     active_timeout(NULL), nh(NULL),
00801     id(0), flags(0), recursion(0), submit_time(0), oob_ptr(0),
00802     from_accept_thread(false)
00803 {
00804   memset(&local_addr, 0, sizeof local_addr);
00805   memset(&server_addr, 0, sizeof server_addr);
00806   SET_HANDLER((NetVConnHandler) & UnixNetVConnection::startEvent);
00807 }
00808 
00809 
00810 
00811 void
00812 UnixNetVConnection::set_enabled(VIO *vio)
00813 {
00814   ink_assert(vio->mutex->thread_holding == this_ethread() && thread);
00815   ink_assert(!closed);
00816   STATE_FROM_VIO(vio)->enabled = 1;
00817 #ifdef INACTIVITY_TIMEOUT
00818   if (!inactivity_timeout && inactivity_timeout_in) {
00819     if (vio->mutex->thread_holding == thread)
00820       inactivity_timeout = thread->schedule_in_local(this, inactivity_timeout_in);
00821     else
00822       inactivity_timeout = thread->schedule_in(this, inactivity_timeout_in);
00823   }
00824 #else
00825   if (!next_inactivity_timeout_at && inactivity_timeout_in)
00826     next_inactivity_timeout_at = ink_get_hrtime() + inactivity_timeout_in;
00827 #endif
00828 }
00829 
00830 void
00831 UnixNetVConnection::net_read_io(NetHandler *nh, EThread *lthread)
00832 {
00833   read_from_net(nh, this, lthread);
00834 }
00835 
00836 
00837 
00838 
00839 
00840 int64_t
00841 UnixNetVConnection::load_buffer_and_write(int64_t towrite, int64_t &wattempted, int64_t &total_wrote, MIOBufferAccessor & buf, int &needs)
00842 {
00843   int64_t r = 0;
00844 
00845   
00846   int64_t offset = buf.reader()->start_offset;
00847   IOBufferBlock *b = buf.reader()->block;
00848 
00849   do {
00850     IOVec tiovec[NET_MAX_IOV];
00851     int niov = 0;
00852     int64_t total_wrote_last = total_wrote;
00853     while (b && niov < NET_MAX_IOV) {
00854       
00855       int64_t l = b->read_avail();
00856       l -= offset;
00857       if (l <= 0) {
00858         offset = -l;
00859         b = b->next;
00860         continue;
00861       }
00862       
00863       int64_t wavail = towrite - total_wrote;
00864       if (l > wavail)
00865         l = wavail;
00866       if (!l)
00867         break;
00868       total_wrote += l;
00869       
00870       tiovec[niov].iov_len = l;
00871       tiovec[niov].iov_base = b->start() + offset;
00872       niov++;
00873       
00874       offset = 0;
00875       b = b->next;
00876     }
00877     wattempted = total_wrote - total_wrote_last;
00878     if (niov == 1)
00879       r = socketManager.write(con.fd, tiovec[0].iov_base, tiovec[0].iov_len);
00880     else
00881       r = socketManager.writev(con.fd, &tiovec[0], niov);
00882     ProxyMutex *mutex = thread->mutex;
00883     NET_DEBUG_COUNT_DYN_STAT(net_calls_to_write_stat, 1);
00884   } while (r == wattempted && total_wrote < towrite);
00885 
00886   needs |= EVENTIO_WRITE;
00887 
00888   return (r);
00889 }
00890 
00891 void
00892 UnixNetVConnection::readDisable(NetHandler *nh)
00893 {
00894   read_disable(nh, this);
00895 }
00896 
00897 void
00898 UnixNetVConnection::readSignalError(NetHandler *nh, int err)
00899 {
00900   read_signal_error(nh, this, err);
00901 }
00902 
00903 int
00904 UnixNetVConnection::readSignalDone(int event, NetHandler *nh)
00905 {
00906   return (read_signal_done(event, nh, this));
00907 }
00908 
00909 
00910 int
00911 UnixNetVConnection::readSignalAndUpdate(int event)
00912 {
00913   return (read_signal_and_update(event, this));
00914 }
00915 
00916 
00917 
00918 
00919 void
00920 UnixNetVConnection::readReschedule(NetHandler *nh)
00921 {
00922   read_reschedule(nh, this);
00923 }
00924 
00925 void
00926 UnixNetVConnection::writeReschedule(NetHandler *nh)
00927 {
00928   write_reschedule(nh, this);
00929 }
00930 
00931 void
00932 UnixNetVConnection::netActivity(EThread *lthread)
00933 {
00934   net_activity(this, lthread);
00935 }
00936 
00937 int
00938 UnixNetVConnection::startEvent(int , Event *e)
00939 {
00940   MUTEX_TRY_LOCK(lock, get_NetHandler(e->ethread)->mutex, e->ethread);
00941   if (!lock) {
00942     e->schedule_in(NET_RETRY_DELAY);
00943     return EVENT_CONT;
00944   }
00945   if (!action_.cancelled)
00946     connectUp(e->ethread, NO_FD);
00947   else
00948     free(e->ethread);
00949   return EVENT_DONE;
00950 }
00951 
00952 int
00953 UnixNetVConnection::acceptEvent(int event, Event *e)
00954 {
00955   thread = e->ethread;
00956 
00957   MUTEX_TRY_LOCK(lock, get_NetHandler(thread)->mutex, e->ethread);
00958   if (!lock) {
00959     if (event == EVENT_NONE) {
00960       thread->schedule_in(this, NET_RETRY_DELAY);
00961       return EVENT_DONE;
00962     } else {
00963       e->schedule_in(NET_RETRY_DELAY);
00964       return EVENT_CONT;
00965     }
00966   }
00967 
00968   if (action_.cancelled) {
00969     free(thread);
00970     return EVENT_DONE;
00971   }
00972 
00973   SET_HANDLER((NetVConnHandler) & UnixNetVConnection::mainEvent);
00974 
00975   nh = get_NetHandler(thread);
00976   PollDescriptor *pd = get_PollDescriptor(thread);
00977   if (ep.start(pd, this, EVENTIO_READ|EVENTIO_WRITE) < 0) {
00978     Debug("iocore_net", "acceptEvent : failed EventIO::start\n");
00979     close_UnixNetVConnection(this, e->ethread);
00980     return EVENT_DONE;
00981   }
00982 
00983   nh->open_list.enqueue(this);
00984 
00985   if (inactivity_timeout_in) {
00986     UnixNetVConnection::set_inactivity_timeout(inactivity_timeout_in);
00987   }
00988 
00989   if (active_timeout_in) {
00990     UnixNetVConnection::set_active_timeout(active_timeout_in);
00991   }
00992 
00993   action_.continuation->handleEvent(NET_EVENT_ACCEPT, this);
00994   return EVENT_DONE;
00995 }
00996 
00997 
00998 
00999 
01000 
01001 
01002 int
01003 UnixNetVConnection::mainEvent(int event, Event *e)
01004 {
01005   ink_assert(event == EVENT_IMMEDIATE || event == EVENT_INTERVAL);
01006   ink_assert(thread == this_ethread());
01007 
01008   MUTEX_TRY_LOCK(hlock, get_NetHandler(thread)->mutex, e->ethread);
01009   MUTEX_TRY_LOCK(rlock, read.vio.mutex ? (ProxyMutex *) read.vio.mutex : (ProxyMutex *) e->ethread->mutex, e->ethread);
01010   MUTEX_TRY_LOCK(wlock, write.vio.mutex ? (ProxyMutex *) write.vio.mutex :
01011                  (ProxyMutex *) e->ethread->mutex, e->ethread);
01012   if (!hlock || !rlock || !wlock ||
01013       (read.vio.mutex.m_ptr && rlock.m.m_ptr != read.vio.mutex.m_ptr) ||
01014       (write.vio.mutex.m_ptr && wlock.m.m_ptr != write.vio.mutex.m_ptr)) {
01015 #ifndef INACTIVITY_TIMEOUT
01016     if (e == active_timeout)
01017 #endif
01018       e->schedule_in(NET_RETRY_DELAY);
01019     return EVENT_CONT;
01020   }
01021   if (e->cancelled)
01022     return EVENT_DONE;
01023 
01024   int signal_event;
01025   Event **signal_timeout;
01026   Continuation *reader_cont = NULL;
01027   Continuation *writer_cont = NULL;
01028   ink_hrtime next_activity_timeout_at = 0;
01029   ink_hrtime *signal_timeout_at = &next_activity_timeout_at;
01030   Event *t = NULL;
01031   signal_timeout = &t;
01032 
01033 #ifdef INACTIVITY_TIMEOUT
01034   if (e == inactivity_timeout) {
01035     signal_event = VC_EVENT_INACTIVITY_TIMEOUT;
01036     signal_timeout = &inactivity_timeout;
01037   }
01038 #else
01039   if (event == EVENT_IMMEDIATE) {
01040     
01041     
01042     
01043     if (!inactivity_timeout_in || next_inactivity_timeout_at > ink_get_hrtime())
01044       return EVENT_CONT;
01045     signal_event = VC_EVENT_INACTIVITY_TIMEOUT;
01046     signal_timeout_at = &next_inactivity_timeout_at;
01047   }
01048 #endif
01049   else {
01050     ink_assert(e == active_timeout);
01051     signal_event = VC_EVENT_ACTIVE_TIMEOUT;
01052     signal_timeout = &active_timeout;
01053   }
01054   *signal_timeout = 0;
01055   *signal_timeout_at = 0;
01056   writer_cont = write.vio._cont;
01057 
01058   if (closed) {
01059     close_UnixNetVConnection(this, thread);
01060     return EVENT_DONE;
01061   }
01062 
01063   if (read.vio.op == VIO::READ && !(f.shutdown & NET_VC_SHUTDOWN_READ)) {
01064     reader_cont = read.vio._cont;
01065     if (read_signal_and_update(signal_event, this) == EVENT_DONE)
01066       return EVENT_DONE;
01067   }
01068 
01069   if (!*signal_timeout &&
01070       !*signal_timeout_at &&
01071       !closed && write.vio.op == VIO::WRITE &&
01072       !(f.shutdown & NET_VC_SHUTDOWN_WRITE) && reader_cont != write.vio._cont && writer_cont == write.vio._cont)
01073     if (write_signal_and_update(signal_event, this) == EVENT_DONE)
01074       return EVENT_DONE;
01075   return EVENT_DONE;
01076 }
01077 
01078 
01079 int
01080 UnixNetVConnection::connectUp(EThread *t, int fd)
01081 {
01082   int res;
01083 
01084   thread = t;
01085   if (check_net_throttle(CONNECT, submit_time)) {
01086     check_throttle_warning();
01087     action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *) -ENET_THROTTLING);
01088     free(t);
01089     return CONNECT_FAILURE;
01090   }
01091 
01092   
01093   options.ip_family = server_addr.sa.sa_family;
01094   
01095   
01096   
01097   
01098   if (is_debug_tag_set("iocore_net")) {
01099     char addrbuf[INET6_ADDRSTRLEN];
01100     Debug("iocore_net", "connectUp:: local_addr=%s:%d [%s]\n",
01101       options.local_ip.isValid()
01102       ? options.local_ip.toString(addrbuf, sizeof(addrbuf))
01103       : "*",
01104       options.local_port,
01105       NetVCOptions::toString(options.addr_binding)
01106     );
01107   }
01108 
01109   
01110   
01111   if (fd == NO_FD) {
01112     res = con.open(options);
01113     if (res != 0) {
01114       goto fail;
01115     }
01116   } else {
01117     int len = sizeof(con.sock_type);
01118 
01119     res = safe_getsockopt(fd, SOL_SOCKET, SO_TYPE, (char *)&con.sock_type, &len);
01120     if (res != 0) {
01121       goto fail;
01122     }
01123 
01124     safe_nonblocking(fd);
01125     con.fd = fd;
01126     con.is_connected = true;
01127     con.is_bound = true;
01128   }
01129 
01130   
01131   
01132   if (ep.start(get_PollDescriptor(t), this, EVENTIO_READ|EVENTIO_WRITE) < 0) {
01133     lerrno = errno;
01134     Debug("iocore_net", "connectUp : Failed to add to epoll list\n");
01135     action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *)0); 
01136     free(t);
01137     return CONNECT_FAILURE;
01138   }
01139 
01140   if (fd == NO_FD) {
01141     res = con.connect(&server_addr.sa, options);
01142     if (res != 0) {
01143       goto fail;
01144     }
01145   }
01146 
01147   check_emergency_throttle(con);
01148 
01149   
01150 
01151   SET_HANDLER(&UnixNetVConnection::mainEvent);
01152 
01153   nh = get_NetHandler(t);
01154   nh->open_list.enqueue(this);
01155 
01156   ink_assert(!inactivity_timeout_in);
01157   ink_assert(!active_timeout_in);
01158   action_.continuation->handleEvent(NET_EVENT_OPEN, this);
01159   return CONNECT_SUCCESS;
01160 
01161 fail:
01162   lerrno = errno;
01163   action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *)(intptr_t)res);
01164   free(t);
01165   return CONNECT_FAILURE;
01166 }
01167 
01168 
01169 void
01170 UnixNetVConnection::free(EThread *t)
01171 {
01172   NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, -1);
01173   
01174   this->mutex.clear();
01175   action_.mutex.clear();
01176   got_remote_addr = 0;
01177   got_local_addr = 0;
01178   read.vio.mutex.clear();
01179   write.vio.mutex.clear();
01180   flags = 0;
01181   SET_CONTINUATION_HANDLER(this, (NetVConnHandler) & UnixNetVConnection::startEvent);
01182   nh = NULL;
01183   read.triggered = 0;
01184   write.triggered = 0;
01185   options.reset();
01186   closed = 0;
01187   ink_assert(!read.ready_link.prev && !read.ready_link.next);
01188   ink_assert(!read.enable_link.next);
01189   ink_assert(!write.ready_link.prev && !write.ready_link.next);
01190   ink_assert(!write.enable_link.next);
01191   ink_assert(!link.next && !link.prev);
01192   ink_assert(!active_timeout);
01193   ink_assert(con.fd == NO_FD);
01194   ink_assert(t == this_ethread());
01195 
01196   if (from_accept_thread) {
01197     netVCAllocator.free(this);  
01198   } else {
01199     THREAD_FREE(this, netVCAllocator, t);
01200   }
01201 }
01202 
01203 void
01204 UnixNetVConnection::apply_options()
01205 {
01206   con.apply_options(options);
01207 }