• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

UnixNetVConnection.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003     A brief file description
00004 
00005     @section license License
00006 
00007     Licensed to the Apache Software Foundation (ASF) under one
00008     or more contributor license agreements.  See the NOTICE file
00009     distributed with this work for additional information
00010     regarding copyright ownership.  The ASF licenses this file
00011     to you under the Apache License, Version 2.0 (the
00012     "License"); you may not use this file except in compliance
00013     with the License.  You may obtain a copy of the License at
00014 
00015     http://www.apache.org/licenses/LICENSE-2.0
00016 
00017     Unless required by applicable law or agreed to in writing, software
00018     distributed under the License is distributed on an "AS IS" BASIS,
00019     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020     See the License for the specific language governing permissions and
00021     limitations under the License.
00022 */
00023 
00024 #include "P_Net.h"
00025 
00026 #define STATE_VIO_OFFSET ((uintptr_t)&((NetState*)0)->vio)
00027 #define STATE_FROM_VIO(_x) ((NetState*)(((char*)(_x)) - STATE_VIO_OFFSET))
00028 
00029 #define disable_read(_vc) (_vc)->read.enabled = 0
00030 #define disable_write(_vc) (_vc)->write.enabled = 0
00031 #define enable_read(_vc) (_vc)->read.enabled = 1
00032 #define enable_write(_vc) (_vc)->write.enabled = 1
00033 
00034 #ifndef UIO_MAXIOV
00035 #define NET_MAX_IOV 16          // UIO_MAXIOV shall be at least 16 1003.1g (5.4.1.1)
00036 #else
00037 #define NET_MAX_IOV UIO_MAXIOV
00038 #endif
00039 
00040 // Global
00041 ClassAllocator<UnixNetVConnection> netVCAllocator("netVCAllocator");
00042 
00043 //
00044 // Reschedule a UnixNetVConnection by moving it
00045 // onto or off of the ready_list
00046 //
00047 static inline void
00048 read_reschedule(NetHandler *nh, UnixNetVConnection *vc)
00049 {
00050   vc->ep.refresh(EVENTIO_READ);
00051   if (vc->read.triggered && vc->read.enabled) {
00052     nh->read_ready_list.in_or_enqueue(vc);
00053   } else
00054     nh->read_ready_list.remove(vc);
00055 }
00056 
00057 static inline void
00058 write_reschedule(NetHandler *nh, UnixNetVConnection *vc)
00059 {
00060   vc->ep.refresh(EVENTIO_WRITE);
00061   if (vc->write.triggered && vc->write.enabled) {
00062     nh->write_ready_list.in_or_enqueue(vc);
00063   } else
00064     nh->write_ready_list.remove(vc);
00065 }
00066 
00067 void
00068 net_activity(UnixNetVConnection *vc, EThread *thread)
00069 {
00070   (void) thread;
00071 #ifdef INACTIVITY_TIMEOUT
00072   if (vc->inactivity_timeout && vc->inactivity_timeout_in && vc->inactivity_timeout->ethread == thread)
00073     vc->inactivity_timeout->schedule_in(vc->inactivity_timeout_in);
00074   else {
00075     if (vc->inactivity_timeout)
00076       vc->inactivity_timeout->cancel_action();
00077     if (vc->inactivity_timeout_in) {
00078       vc->inactivity_timeout = vc->thread->schedule_in_local(vc, vc->inactivity_timeout_in);
00079     } else
00080       vc->inactivity_timeout = 0;
00081   }
00082 #else
00083   if (vc->inactivity_timeout_in)
00084     vc->next_inactivity_timeout_at = ink_get_hrtime() + vc->inactivity_timeout_in;
00085   else
00086     vc->next_inactivity_timeout_at = 0;
00087 #endif
00088 
00089 }
00090 
00091 //
00092 // Function used to close a UnixNetVConnection and free the vc
00093 //
00094 void
00095 close_UnixNetVConnection(UnixNetVConnection *vc, EThread *t)
00096 {
00097   NetHandler *nh = vc->nh;
00098   vc->cancel_OOB();
00099   vc->ep.stop();
00100   vc->con.close();
00101 #ifdef INACTIVITY_TIMEOUT
00102   if (vc->inactivity_timeout) {
00103     vc->inactivity_timeout->cancel_action(vc);
00104     vc->inactivity_timeout = NULL;
00105   }
00106 #else
00107   vc->next_inactivity_timeout_at = 0;
00108 #endif
00109   vc->inactivity_timeout_in = 0;
00110   if (vc->active_timeout) {
00111     vc->active_timeout->cancel_action(vc);
00112     vc->active_timeout = NULL;
00113   }
00114   vc->active_timeout_in = 0;
00115   nh->open_list.remove(vc);
00116   nh->cop_list.remove(vc);
00117   nh->read_ready_list.remove(vc);
00118   nh->write_ready_list.remove(vc);
00119   if (vc->read.in_enabled_list) {
00120     nh->read_enable_list.remove(vc);
00121     vc->read.in_enabled_list = 0;
00122   }
00123   if (vc->write.in_enabled_list) {
00124     nh->write_enable_list.remove(vc);
00125     vc->write.in_enabled_list = 0;
00126   }
00127   vc->free(t);
00128 }
00129 
00130 //
00131 // Signal an event
00132 //
00133 static inline int
00134 read_signal_and_update(int event, UnixNetVConnection *vc)
00135 {
00136   vc->recursion++;
00137   vc->read.vio._cont->handleEvent(event, &vc->read.vio);
00138   if (!--vc->recursion && vc->closed) {
00139     /* BZ  31932 */
00140     ink_assert(vc->thread == this_ethread());
00141     close_UnixNetVConnection(vc, vc->thread);
00142     return EVENT_DONE;
00143   } else {
00144     return EVENT_CONT;
00145   }
00146 }
00147 
00148 static inline int
00149 write_signal_and_update(int event, UnixNetVConnection *vc)
00150 {
00151   vc->recursion++;
00152   vc->write.vio._cont->handleEvent(event, &vc->write.vio);
00153   if (!--vc->recursion && vc->closed) {
00154     /* BZ  31932 */
00155     ink_assert(vc->thread == this_ethread());
00156     close_UnixNetVConnection(vc, vc->thread);
00157     return EVENT_DONE;
00158   } else {
00159     return EVENT_CONT;
00160   }
00161 }
00162 
00163 static inline int
00164 read_signal_done(int event, NetHandler *nh, UnixNetVConnection *vc)
00165 {
00166   vc->read.enabled = 0;
00167   if (read_signal_and_update(event, vc) == EVENT_DONE) {
00168     return EVENT_DONE;
00169   } else {
00170     read_reschedule(nh, vc);
00171     return EVENT_CONT;
00172   }
00173 }
00174 
00175 static inline int
00176 write_signal_done(int event, NetHandler *nh, UnixNetVConnection *vc)
00177 {
00178   vc->write.enabled = 0;
00179   if (write_signal_and_update(event, vc) == EVENT_DONE) {
00180     return EVENT_DONE;
00181   } else {
00182     write_reschedule(nh, vc);
00183     return EVENT_CONT;
00184   }
00185 }
00186 
00187 static inline int
00188 read_signal_error(NetHandler *nh, UnixNetVConnection *vc, int lerrno)
00189 {
00190   vc->lerrno = lerrno;
00191   return read_signal_done(VC_EVENT_ERROR, nh, vc);
00192 }
00193 
00194 static inline int
00195 write_signal_error(NetHandler *nh, UnixNetVConnection *vc, int lerrno)
00196 {
00197   vc->lerrno = lerrno;
00198   return write_signal_done(VC_EVENT_ERROR, nh, vc);
00199 }
00200 
00201 // Read the data for a UnixNetVConnection.
00202 // Rescheduling the UnixNetVConnection by moving the VC
00203 // onto or off of the ready_list.
00204 // Had to wrap this function with net_read_io for SSL.
00205 static void
00206 read_from_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
00207 {
00208   NetState *s = &vc->read;
00209   ProxyMutex *mutex = thread->mutex;
00210   MIOBufferAccessor & buf = s->vio.buffer;
00211   int64_t r = 0;
00212 
00213   MUTEX_TRY_LOCK_FOR(lock, s->vio.mutex, thread, s->vio._cont);
00214 
00215   if (!lock || lock.m.m_ptr != s->vio.mutex.m_ptr) {
00216     read_reschedule(nh, vc);
00217     return;
00218   }
00219   // if it is not enabled.
00220   if (!s->enabled || s->vio.op != VIO::READ) {
00221     read_disable(nh, vc);
00222     return;
00223   }
00224 
00225   ink_assert(buf.writer());
00226 
00227   // if there is nothing to do, disable connection
00228   int64_t ntodo = s->vio.ntodo();
00229   if (ntodo <= 0) {
00230     read_disable(nh, vc);
00231     return;
00232   }
00233   int64_t toread = buf.writer()->write_avail();
00234   if (toread > ntodo)
00235     toread = ntodo;
00236 
00237   // read data
00238   int64_t rattempted = 0, total_read = 0;
00239   int niov = 0;
00240   IOVec tiovec[NET_MAX_IOV];
00241   if (toread) {
00242     IOBufferBlock *b = buf.writer()->first_write_block();
00243     do {
00244       niov = 0;
00245       rattempted = 0;
00246       while (b && niov < NET_MAX_IOV) {
00247         int64_t a = b->write_avail();
00248         if (a > 0) {
00249           tiovec[niov].iov_base = b->_end;
00250           int64_t togo = toread - total_read - rattempted;
00251           if (a > togo)
00252             a = togo;
00253           tiovec[niov].iov_len = a;
00254           rattempted += a;
00255           niov++;
00256           if (a >= togo)
00257             break;
00258         }
00259         b = b->next;
00260       }
00261 
00262       if (niov == 1) {
00263         r = socketManager.read(vc->con.fd, tiovec[0].iov_base, tiovec[0].iov_len);
00264       } else {
00265         r = socketManager.readv(vc->con.fd, &tiovec[0], niov);
00266       }
00267       NET_DEBUG_COUNT_DYN_STAT(net_calls_to_read_stat, 1);
00268       total_read += rattempted;
00269     } while (rattempted && r == rattempted && total_read < toread);
00270 
00271     // if we have already moved some bytes successfully, summarize in r
00272     if (total_read != rattempted) {
00273       if (r <= 0)
00274         r = total_read - rattempted;
00275       else
00276         r = total_read - rattempted + r;
00277     }
00278     // check for errors
00279     if (r <= 0) {
00280 
00281       if (r == -EAGAIN || r == -ENOTCONN) {
00282         NET_DEBUG_COUNT_DYN_STAT(net_calls_to_read_nodata_stat, 1);
00283         vc->read.triggered = 0;
00284         nh->read_ready_list.remove(vc);
00285         return;
00286       }
00287 
00288       if (!r || r == -ECONNRESET) {
00289         vc->read.triggered = 0;
00290         nh->read_ready_list.remove(vc);
00291         read_signal_done(VC_EVENT_EOS, nh, vc);
00292         return;
00293       }
00294       vc->read.triggered = 0;
00295       read_signal_error(nh, vc, (int)-r);
00296       return;
00297     }
00298     NET_SUM_DYN_STAT(net_read_bytes_stat, r);
00299 
00300     // Add data to buffer and signal continuation.
00301     buf.writer()->fill(r);
00302 #ifdef DEBUG
00303     if (buf.writer()->write_avail() <= 0)
00304       Debug("iocore_net", "read_from_net, read buffer full");
00305 #endif
00306     s->vio.ndone += r;
00307     net_activity(vc, thread);
00308   } else
00309     r = 0;
00310 
00311   // Signal read ready, check if user is not done
00312   if (r) {
00313     // If there are no more bytes to read, signal read complete
00314     ink_assert(ntodo >= 0);
00315     if (s->vio.ntodo() <= 0) {
00316       read_signal_done(VC_EVENT_READ_COMPLETE, nh, vc);
00317       Debug("iocore_net", "read_from_net, read finished - signal done");
00318       return;
00319     } else {
00320       if (read_signal_and_update(VC_EVENT_READ_READY, vc) != EVENT_CONT)
00321         return;
00322       // change of lock... don't look at shared variables!
00323       if (lock.m.m_ptr != s->vio.mutex.m_ptr) {
00324         read_reschedule(nh, vc);
00325         return;
00326       }
00327     }
00328   }
00329   // If here are is no more room, or nothing to do, disable the connection
00330   if (s->vio.ntodo() <= 0 || !s->enabled || !buf.writer()->write_avail()) {
00331     read_disable(nh, vc);
00332     return;
00333   }
00334 
00335   read_reschedule(nh, vc);
00336 }
00337 
00338 
00339 //
00340 // Write the data for a UnixNetVConnection.
00341 // Rescheduling the UnixNetVConnection when necessary.
00342 //
00343 void
00344 write_to_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
00345 {
00346   ProxyMutex *mutex = thread->mutex;
00347 
00348   NET_DEBUG_COUNT_DYN_STAT(net_calls_to_writetonet_stat, 1);
00349   NET_DEBUG_COUNT_DYN_STAT(net_calls_to_writetonet_afterpoll_stat, 1);
00350 
00351   write_to_net_io(nh, vc, thread);
00352 }
00353 
00354 
00355 void
00356 write_to_net_io(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
00357 {
00358   NetState *s = &vc->write;
00359   ProxyMutex *mutex = thread->mutex;
00360 
00361   MUTEX_TRY_LOCK_FOR(lock, s->vio.mutex, thread, s->vio._cont);
00362 
00363   if (!lock || lock.m.m_ptr != s->vio.mutex.m_ptr) {
00364     write_reschedule(nh, vc);
00365     return;
00366   }
00367 
00368   // This function will always return true unless
00369   // vc is an SSLNetVConnection.
00370   if (!vc->getSSLHandShakeComplete()) {
00371     int err, ret;
00372 
00373     if (vc->getSSLClientConnection())
00374       ret = vc->sslStartHandShake(SSL_EVENT_CLIENT, err);
00375     else
00376       ret = vc->sslStartHandShake(SSL_EVENT_SERVER, err);
00377 
00378     if (ret == EVENT_ERROR) {
00379       vc->write.triggered = 0;
00380       write_signal_error(nh, vc, err);
00381     } else if (ret == SSL_HANDSHAKE_WANT_READ || ret == SSL_HANDSHAKE_WANT_ACCEPT || ret == SSL_HANDSHAKE_WANT_CONNECT
00382                || ret == SSL_HANDSHAKE_WANT_WRITE) {
00383       vc->read.triggered = 0;
00384       nh->read_ready_list.remove(vc);
00385       vc->write.triggered = 0;
00386       nh->write_ready_list.remove(vc);
00387       if (!(ret == SSL_HANDSHAKE_WANT_READ || ret == SSL_HANDSHAKE_WANT_ACCEPT))
00388         write_reschedule(nh, vc);
00389     } else if (ret == EVENT_DONE) {
00390       vc->write.triggered = 1;
00391       if (vc->write.enabled)
00392         nh->write_ready_list.in_or_enqueue(vc);
00393     } else
00394       write_reschedule(nh, vc);
00395     return;
00396   }
00397   // If it is not enabled,add to WaitList.
00398   if (!s->enabled || s->vio.op != VIO::WRITE) {
00399     write_disable(nh, vc);
00400     return;
00401   }
00402   // If there is nothing to do, disable
00403   int64_t ntodo = s->vio.ntodo();
00404   if (ntodo <= 0) {
00405     write_disable(nh, vc);
00406     return;
00407   }
00408 
00409   MIOBufferAccessor & buf = s->vio.buffer;
00410   ink_assert(buf.writer());
00411 
00412   // Calculate amount to write
00413   int64_t towrite = buf.reader()->read_avail();
00414   if (towrite > ntodo)
00415     towrite = ntodo;
00416   int signalled = 0;
00417 
00418   // signal write ready to allow user to fill the buffer
00419   if (towrite != ntodo && buf.writer()->write_avail()) {
00420     if (write_signal_and_update(VC_EVENT_WRITE_READY, vc) != EVENT_CONT) {
00421       return;
00422     }
00423     ntodo = s->vio.ntodo();
00424     if (ntodo <= 0) {
00425       write_disable(nh, vc);
00426       return;
00427     }
00428     signalled = 1;
00429     // Recalculate amount to write
00430     towrite = buf.reader()->read_avail();
00431     if (towrite > ntodo)
00432       towrite = ntodo;
00433   }
00434   // if there is nothing to do, disable
00435   ink_assert(towrite >= 0);
00436   if (towrite <= 0) {
00437     write_disable(nh, vc);
00438     return;
00439   }
00440 
00441   int64_t total_wrote = 0, wattempted = 0;
00442   int needs = 0;
00443   int64_t r = vc->load_buffer_and_write(towrite, wattempted, total_wrote, buf, needs);
00444 
00445   // if we have already moved some bytes successfully, summarize in r
00446   if (total_wrote != wattempted) {
00447     if (r <= 0)
00448       r = total_wrote - wattempted;
00449     else
00450       r = total_wrote - wattempted + r;
00451   }
00452   // check for errors
00453   if (r <= 0) {                 // if the socket was not ready,add to WaitList
00454     if (r == -EAGAIN || r == -ENOTCONN) {
00455       NET_DEBUG_COUNT_DYN_STAT(net_calls_to_write_nodata_stat, 1);
00456       if((needs & EVENTIO_WRITE) == EVENTIO_WRITE) {
00457         vc->write.triggered = 0;
00458         nh->write_ready_list.remove(vc);
00459         write_reschedule(nh, vc);
00460       }
00461       if((needs & EVENTIO_READ) == EVENTIO_READ) {
00462         vc->read.triggered = 0;
00463         nh->read_ready_list.remove(vc);
00464         read_reschedule(nh, vc);
00465       }
00466       return;
00467     }
00468     if (!r || r == -ECONNRESET) {
00469       vc->write.triggered = 0;
00470       write_signal_done(VC_EVENT_EOS, nh, vc);
00471       return;
00472     }
00473     vc->write.triggered = 0;
00474     write_signal_error(nh, vc, (int)-r);
00475     return;
00476   } else {
00477     NET_SUM_DYN_STAT(net_write_bytes_stat, r);
00478 
00479     // Remove data from the buffer and signal continuation.
00480     ink_assert(buf.reader()->read_avail() >= r);
00481     buf.reader()->consume(r);
00482     ink_assert(buf.reader()->read_avail() >= 0);
00483     s->vio.ndone += r;
00484 
00485     net_activity(vc, thread);
00486     // If there are no more bytes to write, signal write complete,
00487     ink_assert(ntodo >= 0);
00488     if (s->vio.ntodo() <= 0) {
00489       write_signal_done(VC_EVENT_WRITE_COMPLETE, nh, vc);
00490       return;
00491     } else if (!signalled) {
00492       if (write_signal_and_update(VC_EVENT_WRITE_READY, vc) != EVENT_CONT) {
00493         return;
00494       }
00495       // change of lock... don't look at shared variables!
00496       if (lock.m.m_ptr != s->vio.mutex.m_ptr) {
00497         write_reschedule(nh, vc);
00498         return;
00499       }
00500     }
00501     if (!buf.reader()->read_avail()) {
00502       write_disable(nh, vc);
00503       return;
00504     }
00505 
00506     if((needs & EVENTIO_WRITE) == EVENTIO_WRITE) {
00507       write_reschedule(nh, vc);
00508     }
00509     if((needs & EVENTIO_READ) == EVENTIO_READ) {
00510       read_reschedule(nh, vc);
00511     }
00512     return;
00513   }
00514 }
00515 
00516 bool
00517 UnixNetVConnection::get_data(int id, void *data)
00518 {
00519   union {
00520     TSVIO * vio;
00521     void * data;
00522   } ptr;
00523 
00524   ptr.data = data;
00525 
00526   switch (id) {
00527   case TS_API_DATA_READ_VIO:
00528     *ptr.vio = (TSVIO)&this->read.vio;
00529     return true;
00530   case TS_API_DATA_WRITE_VIO:
00531     *ptr.vio = (TSVIO)&this->write.vio;
00532     return true;
00533   default:
00534     return false;
00535   }
00536 }
00537 
00538 VIO *
00539 UnixNetVConnection::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
00540 {
00541   ink_assert(!closed);
00542   read.vio.op = VIO::READ;
00543   read.vio.mutex = c->mutex;
00544   read.vio._cont = c;
00545   read.vio.nbytes = nbytes;
00546   read.vio.ndone = 0;
00547   read.vio.vc_server = (VConnection *) this;
00548   if (buf) {
00549     read.vio.buffer.writer_for(buf);
00550     if (!read.enabled)
00551       read.vio.reenable();
00552   } else {
00553     read.vio.buffer.clear();
00554     disable_read(this);
00555   }
00556   return &read.vio;
00557 }
00558 
00559 VIO *
00560 UnixNetVConnection::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *reader, bool owner)
00561 {
00562   ink_assert(!closed);
00563   write.vio.op = VIO::WRITE;
00564   write.vio.mutex = c->mutex;
00565   write.vio._cont = c;
00566   write.vio.nbytes = nbytes;
00567   write.vio.ndone = 0;
00568   write.vio.vc_server = (VConnection *) this;
00569   if (reader) {
00570     ink_assert(!owner);
00571     write.vio.buffer.reader_for(reader);
00572     if (nbytes && !write.enabled)
00573       write.vio.reenable();
00574   } else {
00575     disable_write(this);
00576   }
00577   return &write.vio;
00578 }
00579 
00580 void
00581 UnixNetVConnection::do_io_close(int alerrno /* = -1 */ )
00582 {
00583   disable_read(this);
00584   disable_write(this);
00585   read.vio.buffer.clear();
00586   read.vio.nbytes = 0;
00587   read.vio.op = VIO::NONE;
00588   write.vio.buffer.clear();
00589   write.vio.nbytes = 0;
00590   write.vio.op = VIO::NONE;
00591 
00592   EThread *t = this_ethread();
00593   bool close_inline = !recursion && nh->mutex->thread_holding == t;
00594 
00595   INK_WRITE_MEMORY_BARRIER;
00596   if (alerrno && alerrno != -1)
00597     this->lerrno = alerrno;
00598   if (alerrno == -1)
00599     closed = 1;
00600   else
00601     closed = -1;
00602 
00603   if (close_inline)
00604     close_UnixNetVConnection(this, t);
00605 }
00606 
00607 void
00608 UnixNetVConnection::do_io_shutdown(ShutdownHowTo_t howto)
00609 {
00610   switch (howto) {
00611   case IO_SHUTDOWN_READ:
00612     socketManager.shutdown(((UnixNetVConnection *) this)->con.fd, 0);
00613     disable_read(this);
00614     read.vio.buffer.clear();
00615     read.vio.nbytes = 0;
00616     f.shutdown = NET_VC_SHUTDOWN_READ;
00617     break;
00618   case IO_SHUTDOWN_WRITE:
00619     socketManager.shutdown(((UnixNetVConnection *) this)->con.fd, 1);
00620     disable_write(this);
00621     write.vio.buffer.clear();
00622     write.vio.nbytes = 0;
00623     f.shutdown = NET_VC_SHUTDOWN_WRITE;
00624     break;
00625   case IO_SHUTDOWN_READWRITE:
00626     socketManager.shutdown(((UnixNetVConnection *) this)->con.fd, 2);
00627     disable_read(this);
00628     disable_write(this);
00629     read.vio.buffer.clear();
00630     read.vio.nbytes = 0;
00631     write.vio.buffer.clear();
00632     write.vio.nbytes = 0;
00633     f.shutdown = NET_VC_SHUTDOWN_READ | NET_VC_SHUTDOWN_WRITE;
00634     break;
00635   default:
00636     ink_assert(!"not reached");
00637   }
00638 }
00639 
00640 int
00641 OOB_callback::retry_OOB_send(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
00642 {
00643   ink_assert(mutex->thread_holding == this_ethread());
00644   // the NetVC and the OOB_callback share a mutex
00645   server_vc->oob_ptr = NULL;
00646   server_vc->send_OOB(server_cont, data, length);
00647   delete this;
00648   return EVENT_DONE;
00649 }
00650 
00651 void
00652 UnixNetVConnection::cancel_OOB()
00653 {
00654   UnixNetVConnection *u = (UnixNetVConnection *) this;
00655   if (u->oob_ptr) {
00656     if (u->oob_ptr->trigger) {
00657       u->oob_ptr->trigger->cancel_action();
00658       u->oob_ptr->trigger = NULL;
00659     }
00660     delete u->oob_ptr;
00661     u->oob_ptr = NULL;
00662   }
00663 }
00664 
00665 Action *
00666 UnixNetVConnection::send_OOB(Continuation *cont, char *buf, int len)
00667 {
00668   UnixNetVConnection *u = (UnixNetVConnection *) this;
00669   ink_assert(len > 0);
00670   ink_assert(buf);
00671   ink_assert(!u->oob_ptr);
00672   int written;
00673   ink_assert(cont->mutex->thread_holding == this_ethread());
00674   written = socketManager.send(u->con.fd, buf, len, MSG_OOB);
00675   if (written == len) {
00676     cont->handleEvent(VC_EVENT_OOB_COMPLETE, NULL);
00677     return ACTION_RESULT_DONE;
00678   } else if (!written) {
00679     cont->handleEvent(VC_EVENT_EOS, NULL);
00680     return ACTION_RESULT_DONE;
00681   }
00682   if (written > 0 && written < len) {
00683     u->oob_ptr = new OOB_callback(mutex, this, cont, buf + written, len - written);
00684     u->oob_ptr->trigger = mutex->thread_holding->schedule_in_local(u->oob_ptr, HRTIME_MSECONDS(10));
00685     return u->oob_ptr->trigger;
00686   } else {
00687     // should be a rare case : taking a new continuation should not be
00688     // expensive for this
00689     written = -errno;
00690     ink_assert(written == -EAGAIN || written == -ENOTCONN);
00691     u->oob_ptr = new OOB_callback(mutex, this, cont, buf, len);
00692     u->oob_ptr->trigger = mutex->thread_holding->schedule_in_local(u->oob_ptr, HRTIME_MSECONDS(10));
00693     return u->oob_ptr->trigger;
00694   }
00695 }
00696 
00697 //
00698 // Function used to reenable the VC for reading or
00699 // writing.
00700 //
00701 void
00702 UnixNetVConnection::reenable(VIO *vio)
00703 {
00704   if (STATE_FROM_VIO(vio)->enabled)
00705     return;
00706   set_enabled(vio);
00707   if (!thread)
00708     return;
00709   EThread *t = vio->mutex->thread_holding;
00710   ink_assert(t == this_ethread());
00711   ink_assert(!closed);
00712   if (nh->mutex->thread_holding == t) {
00713     if (vio == &read.vio) {
00714       ep.modify(EVENTIO_READ);
00715       ep.refresh(EVENTIO_READ);
00716       if (read.triggered)
00717         nh->read_ready_list.in_or_enqueue(this);
00718       else
00719         nh->read_ready_list.remove(this);
00720     } else {
00721       ep.modify(EVENTIO_WRITE);
00722       ep.refresh(EVENTIO_WRITE);
00723       if (write.triggered)
00724         nh->write_ready_list.in_or_enqueue(this);
00725       else
00726         nh->write_ready_list.remove(this);
00727     }
00728   } else {
00729     MUTEX_TRY_LOCK(lock, nh->mutex, t);
00730     if (!lock) {
00731       if (vio == &read.vio) {
00732         if (!read.in_enabled_list) {
00733           read.in_enabled_list = 1;
00734           nh->read_enable_list.push(this);
00735         }
00736       } else {
00737         if (!write.in_enabled_list) {
00738           write.in_enabled_list = 1;
00739           nh->write_enable_list.push(this);
00740         }
00741       }
00742       if (nh->trigger_event && nh->trigger_event->ethread->signal_hook)
00743         nh->trigger_event->ethread->signal_hook(nh->trigger_event->ethread);
00744     } else {
00745       if (vio == &read.vio) {
00746         ep.modify(EVENTIO_READ);
00747         ep.refresh(EVENTIO_READ);
00748         if (read.triggered)
00749           nh->read_ready_list.in_or_enqueue(this);
00750         else
00751           nh->read_ready_list.remove(this);
00752       } else {
00753         ep.modify(EVENTIO_WRITE);
00754         ep.refresh(EVENTIO_WRITE);
00755         if (write.triggered)
00756           nh->write_ready_list.in_or_enqueue(this);
00757         else
00758           nh->write_ready_list.remove(this);
00759       }
00760     }
00761   }
00762 }
00763 
00764 void
00765 UnixNetVConnection::reenable_re(VIO *vio)
00766 {
00767   if (!thread)
00768     return;
00769   EThread *t = vio->mutex->thread_holding;
00770   ink_assert(t == this_ethread());
00771   if (nh->mutex->thread_holding == t) {
00772     set_enabled(vio);
00773     if (vio == &read.vio) {
00774       ep.modify(EVENTIO_READ);
00775       ep.refresh(EVENTIO_READ);
00776       if (read.triggered)
00777         net_read_io(nh, t);
00778       else
00779         nh->read_ready_list.remove(this);
00780     } else {
00781       ep.modify(EVENTIO_WRITE);
00782       ep.refresh(EVENTIO_WRITE);
00783       if (write.triggered)
00784         write_to_net(nh, this, t);
00785       else
00786         nh->write_ready_list.remove(this);
00787     }
00788   } else
00789     reenable(vio);
00790 }
00791 
00792 
00793 UnixNetVConnection::UnixNetVConnection()
00794   : closed(0), inactivity_timeout_in(0), active_timeout_in(0),
00795 #ifdef INACTIVITY_TIMEOUT
00796     inactivity_timeout(NULL),
00797 #else
00798     next_inactivity_timeout_at(0),
00799 #endif
00800     active_timeout(NULL), nh(NULL),
00801     id(0), flags(0), recursion(0), submit_time(0), oob_ptr(0),
00802     from_accept_thread(false)
00803 {
00804   memset(&local_addr, 0, sizeof local_addr);
00805   memset(&server_addr, 0, sizeof server_addr);
00806   SET_HANDLER((NetVConnHandler) & UnixNetVConnection::startEvent);
00807 }
00808 
00809 // Private methods
00810 
00811 void
00812 UnixNetVConnection::set_enabled(VIO *vio)
00813 {
00814   ink_assert(vio->mutex->thread_holding == this_ethread() && thread);
00815   ink_assert(!closed);
00816   STATE_FROM_VIO(vio)->enabled = 1;
00817 #ifdef INACTIVITY_TIMEOUT
00818   if (!inactivity_timeout && inactivity_timeout_in) {
00819     if (vio->mutex->thread_holding == thread)
00820       inactivity_timeout = thread->schedule_in_local(this, inactivity_timeout_in);
00821     else
00822       inactivity_timeout = thread->schedule_in(this, inactivity_timeout_in);
00823   }
00824 #else
00825   if (!next_inactivity_timeout_at && inactivity_timeout_in)
00826     next_inactivity_timeout_at = ink_get_hrtime() + inactivity_timeout_in;
00827 #endif
00828 }
00829 
00830 void
00831 UnixNetVConnection::net_read_io(NetHandler *nh, EThread *lthread)
00832 {
00833   read_from_net(nh, this, lthread);
00834 }
00835 
00836 // This code was pulled out of write_to_net so
00837 // I could overwrite it for the SSL implementation
00838 // (SSL read does not support overlapped i/o)
00839 // without duplicating all the code in write_to_net.
00840 int64_t
00841 UnixNetVConnection::load_buffer_and_write(int64_t towrite, int64_t &wattempted, int64_t &total_wrote, MIOBufferAccessor & buf, int &needs)
00842 {
00843   int64_t r = 0;
00844 
00845   // XXX Rather than dealing with the block directly, we should use the IOBufferReader API.
00846   int64_t offset = buf.reader()->start_offset;
00847   IOBufferBlock *b = buf.reader()->block;
00848 
00849   do {
00850     IOVec tiovec[NET_MAX_IOV];
00851     int niov = 0;
00852     int64_t total_wrote_last = total_wrote;
00853     while (b && niov < NET_MAX_IOV) {
00854       // check if we have done this block
00855       int64_t l = b->read_avail();
00856       l -= offset;
00857       if (l <= 0) {
00858         offset = -l;
00859         b = b->next;
00860         continue;
00861       }
00862       // check if to amount to write exceeds that in this buffer
00863       int64_t wavail = towrite - total_wrote;
00864       if (l > wavail)
00865         l = wavail;
00866       if (!l)
00867         break;
00868       total_wrote += l;
00869       // build an iov entry
00870       tiovec[niov].iov_len = l;
00871       tiovec[niov].iov_base = b->start() + offset;
00872       niov++;
00873       // on to the next block
00874       offset = 0;
00875       b = b->next;
00876     }
00877     wattempted = total_wrote - total_wrote_last;
00878     if (niov == 1)
00879       r = socketManager.write(con.fd, tiovec[0].iov_base, tiovec[0].iov_len);
00880     else
00881       r = socketManager.writev(con.fd, &tiovec[0], niov);
00882     ProxyMutex *mutex = thread->mutex;
00883     NET_DEBUG_COUNT_DYN_STAT(net_calls_to_write_stat, 1);
00884   } while (r == wattempted && total_wrote < towrite);
00885 
00886   needs |= EVENTIO_WRITE;
00887 
00888   return (r);
00889 }
00890 
00891 void
00892 UnixNetVConnection::readDisable(NetHandler *nh)
00893 {
00894   read_disable(nh, this);
00895 }
00896 
00897 void
00898 UnixNetVConnection::readSignalError(NetHandler *nh, int err)
00899 {
00900   read_signal_error(nh, this, err);
00901 }
00902 
00903 int
00904 UnixNetVConnection::readSignalDone(int event, NetHandler *nh)
00905 {
00906   return (read_signal_done(event, nh, this));
00907 }
00908 
00909 
00910 int
00911 UnixNetVConnection::readSignalAndUpdate(int event)
00912 {
00913   return (read_signal_and_update(event, this));
00914 }
00915 
00916 // Interface so SSL inherited class can call some static in-line functions
00917 // without affecting regular net stuff or copying a bunch of code into
00918 // the header files.
00919 void
00920 UnixNetVConnection::readReschedule(NetHandler *nh)
00921 {
00922   read_reschedule(nh, this);
00923 }
00924 
00925 void
00926 UnixNetVConnection::writeReschedule(NetHandler *nh)
00927 {
00928   write_reschedule(nh, this);
00929 }
00930 
00931 void
00932 UnixNetVConnection::netActivity(EThread *lthread)
00933 {
00934   net_activity(this, lthread);
00935 }
00936 
00937 int
00938 UnixNetVConnection::startEvent(int /* event ATS_UNUSED */, Event *e)
00939 {
00940   MUTEX_TRY_LOCK(lock, get_NetHandler(e->ethread)->mutex, e->ethread);
00941   if (!lock) {
00942     e->schedule_in(NET_RETRY_DELAY);
00943     return EVENT_CONT;
00944   }
00945   if (!action_.cancelled)
00946     connectUp(e->ethread, NO_FD);
00947   else
00948     free(e->ethread);
00949   return EVENT_DONE;
00950 }
00951 
00952 int
00953 UnixNetVConnection::acceptEvent(int event, Event *e)
00954 {
00955   thread = e->ethread;
00956 
00957   MUTEX_TRY_LOCK(lock, get_NetHandler(thread)->mutex, e->ethread);
00958   if (!lock) {
00959     if (event == EVENT_NONE) {
00960       thread->schedule_in(this, NET_RETRY_DELAY);
00961       return EVENT_DONE;
00962     } else {
00963       e->schedule_in(NET_RETRY_DELAY);
00964       return EVENT_CONT;
00965     }
00966   }
00967 
00968   if (action_.cancelled) {
00969     free(thread);
00970     return EVENT_DONE;
00971   }
00972 
00973   SET_HANDLER((NetVConnHandler) & UnixNetVConnection::mainEvent);
00974 
00975   nh = get_NetHandler(thread);
00976   PollDescriptor *pd = get_PollDescriptor(thread);
00977   if (ep.start(pd, this, EVENTIO_READ|EVENTIO_WRITE) < 0) {
00978     Debug("iocore_net", "acceptEvent : failed EventIO::start\n");
00979     close_UnixNetVConnection(this, e->ethread);
00980     return EVENT_DONE;
00981   }
00982 
00983   nh->open_list.enqueue(this);
00984 
00985   if (inactivity_timeout_in) {
00986     UnixNetVConnection::set_inactivity_timeout(inactivity_timeout_in);
00987   }
00988 
00989   if (active_timeout_in) {
00990     UnixNetVConnection::set_active_timeout(active_timeout_in);
00991   }
00992 
00993   action_.continuation->handleEvent(NET_EVENT_ACCEPT, this);
00994   return EVENT_DONE;
00995 }
00996 
00997 //
00998 // The main event for UnixNetVConnections.
00999 // This is called by the Event subsystem to initialize the UnixNetVConnection
01000 // and for active and inactivity timeouts.
01001 //
01002 int
01003 UnixNetVConnection::mainEvent(int event, Event *e)
01004 {
01005   ink_assert(event == EVENT_IMMEDIATE || event == EVENT_INTERVAL);
01006   ink_assert(thread == this_ethread());
01007 
01008   MUTEX_TRY_LOCK(hlock, get_NetHandler(thread)->mutex, e->ethread);
01009   MUTEX_TRY_LOCK(rlock, read.vio.mutex ? (ProxyMutex *) read.vio.mutex : (ProxyMutex *) e->ethread->mutex, e->ethread);
01010   MUTEX_TRY_LOCK(wlock, write.vio.mutex ? (ProxyMutex *) write.vio.mutex :
01011                  (ProxyMutex *) e->ethread->mutex, e->ethread);
01012   if (!hlock || !rlock || !wlock ||
01013       (read.vio.mutex.m_ptr && rlock.m.m_ptr != read.vio.mutex.m_ptr) ||
01014       (write.vio.mutex.m_ptr && wlock.m.m_ptr != write.vio.mutex.m_ptr)) {
01015 #ifndef INACTIVITY_TIMEOUT
01016     if (e == active_timeout)
01017 #endif
01018       e->schedule_in(NET_RETRY_DELAY);
01019     return EVENT_CONT;
01020   }
01021   if (e->cancelled)
01022     return EVENT_DONE;
01023 
01024   int signal_event;
01025   Event **signal_timeout;
01026   Continuation *reader_cont = NULL;
01027   Continuation *writer_cont = NULL;
01028   ink_hrtime next_activity_timeout_at = 0;
01029   ink_hrtime *signal_timeout_at = &next_activity_timeout_at;
01030   Event *t = NULL;
01031   signal_timeout = &t;
01032 
01033 #ifdef INACTIVITY_TIMEOUT
01034   if (e == inactivity_timeout) {
01035     signal_event = VC_EVENT_INACTIVITY_TIMEOUT;
01036     signal_timeout = &inactivity_timeout;
01037   }
01038 #else
01039   if (event == EVENT_IMMEDIATE) {
01040     /* BZ 49408 */
01041     //ink_assert(inactivity_timeout_in);
01042     //ink_assert(next_inactivity_timeout_at < ink_get_hrtime());
01043     if (!inactivity_timeout_in || next_inactivity_timeout_at > ink_get_hrtime())
01044       return EVENT_CONT;
01045     signal_event = VC_EVENT_INACTIVITY_TIMEOUT;
01046     signal_timeout_at = &next_inactivity_timeout_at;
01047   }
01048 #endif
01049   else {
01050     ink_assert(e == active_timeout);
01051     signal_event = VC_EVENT_ACTIVE_TIMEOUT;
01052     signal_timeout = &active_timeout;
01053   }
01054   *signal_timeout = 0;
01055   *signal_timeout_at = 0;
01056   writer_cont = write.vio._cont;
01057 
01058   if (closed) {
01059     close_UnixNetVConnection(this, thread);
01060     return EVENT_DONE;
01061   }
01062 
01063   if (read.vio.op == VIO::READ && !(f.shutdown & NET_VC_SHUTDOWN_READ)) {
01064     reader_cont = read.vio._cont;
01065     if (read_signal_and_update(signal_event, this) == EVENT_DONE)
01066       return EVENT_DONE;
01067   }
01068 
01069   if (!*signal_timeout &&
01070       !*signal_timeout_at &&
01071       !closed && write.vio.op == VIO::WRITE &&
01072       !(f.shutdown & NET_VC_SHUTDOWN_WRITE) && reader_cont != write.vio._cont && writer_cont == write.vio._cont)
01073     if (write_signal_and_update(signal_event, this) == EVENT_DONE)
01074       return EVENT_DONE;
01075   return EVENT_DONE;
01076 }
01077 
01078 
01079 int
01080 UnixNetVConnection::connectUp(EThread *t, int fd)
01081 {
01082   int res;
01083 
01084   thread = t;
01085   if (check_net_throttle(CONNECT, submit_time)) {
01086     check_throttle_warning();
01087     action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *) -ENET_THROTTLING);
01088     free(t);
01089     return CONNECT_FAILURE;
01090   }
01091 
01092   // Force family to agree with remote (server) address.
01093   options.ip_family = server_addr.sa.sa_family;
01094   
01095   //
01096   // Initialize this UnixNetVConnection
01097   //
01098   if (is_debug_tag_set("iocore_net")) {
01099     char addrbuf[INET6_ADDRSTRLEN];
01100     Debug("iocore_net", "connectUp:: local_addr=%s:%d [%s]\n",
01101       options.local_ip.isValid()
01102       ? options.local_ip.toString(addrbuf, sizeof(addrbuf))
01103       : "*",
01104       options.local_port,
01105       NetVCOptions::toString(options.addr_binding)
01106     );
01107   }
01108 
01109   // If this is getting called from the TS API, then we are wiring up a file descriptor
01110   // provided by the caller. In that case, we know that the socket is already connected.
01111   if (fd == NO_FD) {
01112     res = con.open(options);
01113     if (res != 0) {
01114       goto fail;
01115     }
01116   } else {
01117     int len = sizeof(con.sock_type);
01118 
01119     res = safe_getsockopt(fd, SOL_SOCKET, SO_TYPE, (char *)&con.sock_type, &len);
01120     if (res != 0) {
01121       goto fail;
01122     }
01123 
01124     safe_nonblocking(fd);
01125     con.fd = fd;
01126     con.is_connected = true;
01127     con.is_bound = true;
01128   }
01129 
01130   // Must connect after EventIO::Start() to avoid a race condition
01131   // when edge triggering is used.
01132   if (ep.start(get_PollDescriptor(t), this, EVENTIO_READ|EVENTIO_WRITE) < 0) {
01133     lerrno = errno;
01134     Debug("iocore_net", "connectUp : Failed to add to epoll list\n");
01135     action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *)0); // 0 == res
01136     free(t);
01137     return CONNECT_FAILURE;
01138   }
01139 
01140   if (fd == NO_FD) {
01141     res = con.connect(&server_addr.sa, options);
01142     if (res != 0) {
01143       goto fail;
01144     }
01145   }
01146 
01147   check_emergency_throttle(con);
01148 
01149   // start up next round immediately
01150 
01151   SET_HANDLER(&UnixNetVConnection::mainEvent);
01152 
01153   nh = get_NetHandler(t);
01154   nh->open_list.enqueue(this);
01155 
01156   ink_assert(!inactivity_timeout_in);
01157   ink_assert(!active_timeout_in);
01158   action_.continuation->handleEvent(NET_EVENT_OPEN, this);
01159   return CONNECT_SUCCESS;
01160 
01161 fail:
01162   lerrno = errno;
01163   action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *)(intptr_t)res);
01164   free(t);
01165   return CONNECT_FAILURE;
01166 }
01167 
01168 
01169 void
01170 UnixNetVConnection::free(EThread *t)
01171 {
01172   NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, -1);
01173   // clear variables for reuse
01174   this->mutex.clear();
01175   action_.mutex.clear();
01176   got_remote_addr = 0;
01177   got_local_addr = 0;
01178   read.vio.mutex.clear();
01179   write.vio.mutex.clear();
01180   flags = 0;
01181   SET_CONTINUATION_HANDLER(this, (NetVConnHandler) & UnixNetVConnection::startEvent);
01182   nh = NULL;
01183   read.triggered = 0;
01184   write.triggered = 0;
01185   options.reset();
01186   closed = 0;
01187   ink_assert(!read.ready_link.prev && !read.ready_link.next);
01188   ink_assert(!read.enable_link.next);
01189   ink_assert(!write.ready_link.prev && !write.ready_link.next);
01190   ink_assert(!write.enable_link.next);
01191   ink_assert(!link.next && !link.prev);
01192   ink_assert(!active_timeout);
01193   ink_assert(con.fd == NO_FD);
01194   ink_assert(t == this_ethread());
01195 
01196   if (from_accept_thread) {
01197     netVCAllocator.free(this);  
01198   } else {
01199     THREAD_FREE(this, netVCAllocator, t);
01200   }
01201 }
01202 
01203 void
01204 UnixNetVConnection::apply_options()
01205 {
01206   con.apply_options(options);
01207 }

Generated by  doxygen 1.7.1