• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

UnixNet.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 #include "P_Net.h"
00025 
00026 ink_hrtime last_throttle_warning;
00027 ink_hrtime last_shedding_warning;
00028 ink_hrtime emergency_throttle_time;
00029 int net_connections_throttle;
00030 int fds_throttle;
00031 int fds_limit = 8000;
00032 ink_hrtime last_transient_accept_error;
00033 
00034 extern "C" void fd_reify(struct ev_loop *);
00035 
00036 
00037 #ifndef INACTIVITY_TIMEOUT
00038 // INKqa10496
00039 // One Inactivity cop runs on each thread once every second and
00040 // loops through the list of NetVCs and calls the timeouts
00041 class InactivityCop : public Continuation {
00042 public:
00043   InactivityCop(ProxyMutex *m):Continuation(m), default_inactivity_timeout(0) {
00044     SET_HANDLER(&InactivityCop::check_inactivity);
00045     REC_ReadConfigInteger(default_inactivity_timeout, "proxy.config.net.default_inactivity_timeout");
00046     Debug("inactivity_cop", "default inactivity timeout is set to: %d", default_inactivity_timeout);
00047   }
00048   int check_inactivity(int event, Event *e) {
00049     (void) event;
00050     ink_hrtime now = ink_get_hrtime();
00051     NetHandler *nh = get_NetHandler(this_ethread());
00052     // Copy the list and use pop() to catch any closes caused by callbacks.
00053     forl_LL(UnixNetVConnection, vc, nh->open_list) {
00054       if (vc->thread == this_ethread())
00055         nh->cop_list.push(vc);
00056     }
00057     while (UnixNetVConnection *vc = nh->cop_list.pop()) {
00058       // If we cannot get the lock don't stop just keep cleaning
00059       MUTEX_TRY_LOCK(lock, vc->mutex, this_ethread());
00060       if (!lock.lock_acquired) {
00061        NET_INCREMENT_DYN_STAT(inactivity_cop_lock_acquire_failure_stat);
00062        continue;
00063       }
00064 
00065       if (vc->closed) {
00066         close_UnixNetVConnection(vc, e->ethread);
00067         continue;
00068       }
00069 
00070       // set a default inactivity timeout if one is not set
00071       if (vc->next_inactivity_timeout_at == 0 && default_inactivity_timeout > 0) {
00072         Debug("inactivity_cop", "vc: %p inactivity timeout not set, setting a default of %d", vc, default_inactivity_timeout);
00073         vc->set_inactivity_timeout(HRTIME_SECONDS(default_inactivity_timeout));
00074       } else {
00075         Debug("inactivity_cop_verbose", "vc: %p timeout at: %" PRId64 " timeout in: %" PRId64, vc, ink_hrtime_to_sec(vc->next_inactivity_timeout_at),
00076             ink_hrtime_to_sec(vc->inactivity_timeout_in));
00077       }
00078 
00079       if (vc->next_inactivity_timeout_at && vc->next_inactivity_timeout_at < now)
00080         vc->handleEvent(EVENT_IMMEDIATE, e);
00081     }
00082     return 0;
00083   }
00084 private:
00085   int default_inactivity_timeout;  // only used when one is not set for some bad reason
00086 };
00087 #endif
00088 
00089 PollCont::PollCont(ProxyMutex *m, int pt):Continuation(m), net_handler(NULL), nextPollDescriptor(NULL), poll_timeout(pt) {
00090   pollDescriptor = new PollDescriptor;
00091   pollDescriptor->init();
00092   SET_HANDLER(&PollCont::pollEvent);
00093 }
00094 
00095 PollCont::PollCont(ProxyMutex *m, NetHandler *nh, int pt):Continuation(m), net_handler(nh), nextPollDescriptor(NULL), poll_timeout(pt)
00096 {
00097   pollDescriptor = new PollDescriptor;
00098   pollDescriptor->init();
00099   SET_HANDLER(&PollCont::pollEvent);
00100 }
00101 
00102 PollCont::~PollCont() {
00103   delete pollDescriptor;
00104   if (nextPollDescriptor != NULL) {
00105     delete nextPollDescriptor;
00106   }
00107 }
00108 
00109 //
00110 // PollCont continuation which does the epoll_wait
00111 // and stores the resultant events in ePoll_Triggered_Events
00112 //
00113 int
00114 PollCont::pollEvent(int event, Event *e) {
00115   (void) event;
00116   (void) e;
00117 
00118   if (likely(net_handler)) {
00119     /* checking to see whether there are connections on the ready_queue (either read or write) that need processing [ebalsa] */
00120     if (likely
00121         (!net_handler->read_ready_list.empty() || !net_handler->write_ready_list.empty() ||
00122          !net_handler->read_enable_list.empty() || !net_handler->write_enable_list.empty())) {
00123       NetDebug("iocore_net_poll", "rrq: %d, wrq: %d, rel: %d, wel: %d",
00124                net_handler->read_ready_list.empty(),
00125                net_handler->write_ready_list.empty(), net_handler->read_enable_list.empty(),
00126                net_handler->write_enable_list.empty());
00127       poll_timeout = 0;         //poll immediately returns -- we have triggered stuff to process right now
00128     } else {
00129       poll_timeout = net_config_poll_timeout;
00130     }
00131   }
00132   // wait for fd's to tigger, or don't wait if timeout is 0
00133 #if TS_USE_EPOLL
00134   pollDescriptor->result = epoll_wait(pollDescriptor->epoll_fd,
00135                                       pollDescriptor->ePoll_Triggered_Events, POLL_DESCRIPTOR_SIZE, poll_timeout);
00136   NetDebug("iocore_net_poll", "[PollCont::pollEvent] epoll_fd: %d, timeout: %d, results: %d", pollDescriptor->epoll_fd, poll_timeout,
00137            pollDescriptor->result);
00138 #elif TS_USE_KQUEUE
00139   struct timespec tv;
00140   tv.tv_sec = poll_timeout / 1000;
00141   tv.tv_nsec = 1000000 * (poll_timeout % 1000);
00142   pollDescriptor->result = kevent(pollDescriptor->kqueue_fd, NULL, 0,
00143                                   pollDescriptor->kq_Triggered_Events,
00144                                   POLL_DESCRIPTOR_SIZE,
00145                                   &tv);
00146   NetDebug("iocore_net_poll", "[PollCont::pollEvent] kueue_fd: %d, timeout: %d, results: %d", pollDescriptor->kqueue_fd, poll_timeout,
00147            pollDescriptor->result);
00148 #elif TS_USE_PORT
00149   int retval;
00150   timespec_t ptimeout;
00151   ptimeout.tv_sec = poll_timeout / 1000;
00152   ptimeout.tv_nsec = 1000000 * (poll_timeout % 1000);
00153   unsigned nget = 1;
00154   if((retval = port_getn(pollDescriptor->port_fd,
00155                          pollDescriptor->Port_Triggered_Events,
00156                          POLL_DESCRIPTOR_SIZE, &nget, &ptimeout)) < 0) {
00157     pollDescriptor->result = 0;
00158     switch(errno) {
00159     case EINTR:
00160     case EAGAIN:
00161     case ETIME:
00162       if (nget > 0) {
00163         pollDescriptor->result = (int)nget;
00164       }
00165       break;
00166     default:
00167       ink_assert(!"unhandled port_getn() case:");
00168       break;
00169     }
00170   } else {
00171     pollDescriptor->result = (int)nget;
00172   }
00173   NetDebug("iocore_net_poll", "[PollCont::pollEvent] %d[%s]=port_getn(%d,%p,%d,%d,%d),results(%d)",
00174            retval,retval < 0 ? strerror(errno) : "ok",
00175            pollDescriptor->port_fd, pollDescriptor->Port_Triggered_Events,
00176            POLL_DESCRIPTOR_SIZE, nget, poll_timeout, pollDescriptor->result);
00177 #else
00178 #error port me
00179 #endif
00180   return EVENT_CONT;
00181 }
00182 
00183 static void
00184 net_signal_hook_callback(EThread *thread) {
00185 #if HAVE_EVENTFD
00186   uint64_t counter;
00187   ATS_UNUSED_RETURN(read(thread->evfd, &counter, sizeof(uint64_t)));
00188 #elif TS_USE_PORT
00189   /* Nothing to drain or do */
00190 #else
00191   char dummy[1024];
00192   ATS_UNUSED_RETURN(read(thread->evpipe[0], &dummy[0], 1024));
00193 #endif
00194 }
00195 
00196 static void
00197 net_signal_hook_function(EThread *thread) {
00198 #if HAVE_EVENTFD
00199   uint64_t counter = 1;
00200   ATS_UNUSED_RETURN(write(thread->evfd, &counter, sizeof(uint64_t)));
00201 #elif TS_USE_PORT
00202   PollDescriptor *pd = get_PollDescriptor(thread);
00203   ATS_UNUSED_RETURN(port_send(pd->port_fd, 0, thread->ep));
00204 #else
00205   char dummy = 1;
00206   ATS_UNUSED_RETURN(write(thread->evpipe[1], &dummy, 1));
00207 #endif
00208 }
00209 
00210 void
00211 initialize_thread_for_net(EThread *thread)
00212 {
00213   new((ink_dummy_for_new *) get_NetHandler(thread)) NetHandler();
00214   new((ink_dummy_for_new *) get_PollCont(thread)) PollCont(thread->mutex, get_NetHandler(thread));
00215   get_NetHandler(thread)->mutex = new_ProxyMutex();
00216   PollCont *pc = get_PollCont(thread);
00217   PollDescriptor *pd = pc->pollDescriptor;
00218 
00219   thread->schedule_imm(get_NetHandler(thread));
00220 
00221 #ifndef INACTIVITY_TIMEOUT
00222   InactivityCop *inactivityCop = new InactivityCop(get_NetHandler(thread)->mutex);
00223   thread->schedule_every(inactivityCop, HRTIME_SECONDS(1));
00224 #endif
00225 
00226   thread->signal_hook = net_signal_hook_function;
00227   thread->ep = (EventIO*)ats_malloc(sizeof(EventIO));
00228   thread->ep->type = EVENTIO_ASYNC_SIGNAL;
00229 #if HAVE_EVENTFD
00230   thread->ep->start(pd, thread->evfd, 0, EVENTIO_READ);
00231 #else
00232   thread->ep->start(pd, thread->evpipe[0], 0, EVENTIO_READ);
00233 #endif
00234 }
00235 
00236 // NetHandler method definitions
00237 
00238 NetHandler::NetHandler():Continuation(NULL), trigger_event(0)
00239 {
00240   SET_HANDLER((NetContHandler) & NetHandler::startNetEvent);
00241 }
00242 
00243 //
00244 // Initialization here, in the thread in which we will be executing
00245 // from now on.
00246 //
00247 int
00248 NetHandler::startNetEvent(int event, Event *e)
00249 {
00250   (void) event;
00251   SET_HANDLER((NetContHandler) & NetHandler::mainNetEvent);
00252   e->schedule_every(NET_PERIOD);
00253   trigger_event = e;
00254   return EVENT_CONT;
00255 }
00256 
00257 //
00258 // Move VC's enabled on a different thread to the ready list
00259 //
00260 void
00261 NetHandler::process_enabled_list(NetHandler *nh)
00262 {
00263   UnixNetVConnection *vc = NULL;
00264 
00265   SListM(UnixNetVConnection, NetState, read, enable_link) rq(nh->read_enable_list.popall());
00266   while ((vc = rq.pop())) {
00267     vc->ep.modify(EVENTIO_READ);
00268     vc->ep.refresh(EVENTIO_READ);
00269     vc->read.in_enabled_list = 0;
00270     if ((vc->read.enabled && vc->read.triggered) || vc->closed)
00271       nh->read_ready_list.in_or_enqueue(vc);
00272   }
00273 
00274   SListM(UnixNetVConnection, NetState, write, enable_link) wq(nh->write_enable_list.popall());
00275   while ((vc = wq.pop())) {
00276     vc->ep.modify(EVENTIO_WRITE);
00277     vc->ep.refresh(EVENTIO_WRITE);
00278     vc->write.in_enabled_list = 0;
00279     if ((vc->write.enabled && vc->write.triggered) || vc->closed)
00280       nh->write_ready_list.in_or_enqueue(vc);
00281   }
00282 }
00283 
00284 
00285 //
00286 // The main event for NetHandler
00287 // This is called every NET_PERIOD, and handles all IO operations scheduled
00288 // for this period.
00289 //
00290 int
00291 NetHandler::mainNetEvent(int event, Event *e)
00292 {
00293   ink_assert(trigger_event == e && (event == EVENT_INTERVAL || event == EVENT_POLL));
00294   (void) event;
00295   (void) e;
00296   EventIO *epd = NULL;
00297   int poll_timeout;
00298 
00299   NET_INCREMENT_DYN_STAT(net_handler_run_stat);
00300 
00301   process_enabled_list(this);
00302   if (likely(!read_ready_list.empty() || !write_ready_list.empty() || !read_enable_list.empty() || !write_enable_list.empty()))
00303     poll_timeout = 0; // poll immediately returns -- we have triggered stuff to process right now
00304   else
00305     poll_timeout = net_config_poll_timeout;
00306 
00307   PollDescriptor *pd = get_PollDescriptor(trigger_event->ethread);
00308   UnixNetVConnection *vc = NULL;
00309 #if TS_USE_EPOLL
00310   pd->result = epoll_wait(pd->epoll_fd, pd->ePoll_Triggered_Events, POLL_DESCRIPTOR_SIZE, poll_timeout);
00311   NetDebug("iocore_net_main_poll", "[NetHandler::mainNetEvent] epoll_wait(%d,%d), result=%d", pd->epoll_fd,poll_timeout,pd->result);
00312 #elif TS_USE_KQUEUE
00313   struct timespec tv;
00314   tv.tv_sec = poll_timeout / 1000;
00315   tv.tv_nsec = 1000000 * (poll_timeout % 1000);
00316   pd->result = kevent(pd->kqueue_fd, NULL, 0, pd->kq_Triggered_Events, POLL_DESCRIPTOR_SIZE, &tv);
00317   NetDebug("iocore_net_main_poll", "[NetHandler::mainNetEvent] kevent(%d,%d), result=%d", pd->kqueue_fd,poll_timeout,pd->result);
00318 #elif TS_USE_PORT
00319   int retval;
00320   timespec_t ptimeout;
00321   ptimeout.tv_sec = poll_timeout / 1000;
00322   ptimeout.tv_nsec = 1000000 * (poll_timeout % 1000);
00323   unsigned nget = 1;
00324   if((retval = port_getn(pd->port_fd, pd->Port_Triggered_Events, POLL_DESCRIPTOR_SIZE, &nget, &ptimeout)) < 0) {
00325     pd->result = 0;
00326     switch(errno) {
00327     case EINTR:
00328     case EAGAIN:
00329     case ETIME:
00330       if (nget > 0) {
00331         pd->result = (int)nget;
00332       }
00333       break;
00334     default:
00335       ink_assert(!"unhandled port_getn() case:");
00336       break;
00337     }
00338   } else {
00339     pd->result = (int)nget;
00340   }
00341   NetDebug("iocore_net_main_poll", "[NetHandler::mainNetEvent] %d[%s]=port_getn(%d,%p,%d,%d,%d),results(%d)",
00342            retval,retval < 0 ? strerror(errno) : "ok",
00343            pd->port_fd, pd->Port_Triggered_Events,
00344            POLL_DESCRIPTOR_SIZE, nget, poll_timeout, pd->result);
00345 
00346 #else
00347 #error port me
00348 #endif
00349 
00350   vc = NULL;
00351   for (int x = 0; x < pd->result; x++) {
00352     epd = (EventIO*) get_ev_data(pd,x);
00353     if (epd->type == EVENTIO_READWRITE_VC) {
00354       vc = epd->data.vc;
00355       if (get_ev_events(pd,x) & (EVENTIO_READ|EVENTIO_ERROR)) {
00356         vc->read.triggered = 1;
00357         if (!read_ready_list.in(vc))
00358           read_ready_list.enqueue(vc);
00359         else if (get_ev_events(pd,x) & EVENTIO_ERROR) {
00360           // check for unhandled epoll events that should be handled
00361           Debug("iocore_net_main", "Unhandled epoll event on read: 0x%04x read.enabled=%d closed=%d read.netready_queue=%d",
00362                 get_ev_events(pd,x), vc->read.enabled, vc->closed, read_ready_list.in(vc));
00363         }
00364       }
00365       vc = epd->data.vc;
00366       if (get_ev_events(pd,x) & (EVENTIO_WRITE|EVENTIO_ERROR)) {
00367         vc->write.triggered = 1;
00368         if (!write_ready_list.in(vc))
00369           write_ready_list.enqueue(vc);
00370         else if (get_ev_events(pd,x) & EVENTIO_ERROR) {
00371           // check for unhandled epoll events that should be handled
00372           Debug("iocore_net_main",
00373                 "Unhandled epoll event on write: 0x%04x write.enabled=%d closed=%d write.netready_queue=%d",
00374                 get_ev_events(pd,x), vc->write.enabled, vc->closed, write_ready_list.in(vc));
00375         }
00376       } else if (!get_ev_events(pd,x) & EVENTIO_ERROR) {
00377         Debug("iocore_net_main", "Unhandled epoll event: 0x%04x", get_ev_events(pd,x));
00378       }
00379     } else if (epd->type == EVENTIO_DNS_CONNECTION) {
00380       if (epd->data.dnscon != NULL) {
00381         epd->data.dnscon->trigger(); // Make sure the DNSHandler for this con knows we triggered
00382 #if defined(USE_EDGE_TRIGGER)
00383         epd->refresh(EVENTIO_READ);
00384 #endif
00385       }
00386     } else if (epd->type == EVENTIO_ASYNC_SIGNAL)
00387       net_signal_hook_callback(trigger_event->ethread);
00388     ev_next_event(pd,x);
00389   }
00390 
00391   pd->result = 0;
00392 
00393 #if defined(USE_EDGE_TRIGGER)
00394  // UnixNetVConnection *
00395   while ((vc = read_ready_list.dequeue())) {
00396     if (vc->closed)
00397       close_UnixNetVConnection(vc, trigger_event->ethread);
00398     else if (vc->read.enabled && vc->read.triggered)
00399       vc->net_read_io(this, trigger_event->ethread);
00400     else if (!vc->read.enabled) {
00401       read_ready_list.remove(vc);
00402 #if defined(solaris)
00403       if (vc->read.triggered && vc->write.enabled) {
00404         vc->ep.modify(-EVENTIO_READ);
00405         vc->ep.refresh(EVENTIO_WRITE);
00406         vc->writeReschedule(this);
00407       }
00408 #endif
00409     }
00410   }
00411   while ((vc = write_ready_list.dequeue())) {
00412     if (vc->closed)
00413       close_UnixNetVConnection(vc, trigger_event->ethread);
00414     else if (vc->write.enabled && vc->write.triggered)
00415       write_to_net(this, vc, trigger_event->ethread);
00416     else if (!vc->write.enabled) {
00417       write_ready_list.remove(vc);
00418 #if defined(solaris)
00419       if (vc->write.triggered && vc->read.enabled) {
00420         vc->ep.modify(-EVENTIO_WRITE);
00421         vc->ep.refresh(EVENTIO_READ);
00422         vc->readReschedule(this);
00423       }
00424 #endif
00425     }
00426   }
00427 #else /* !USE_EDGE_TRIGGER */
00428   while ((vc = read_ready_list.dequeue())) {
00429     if (vc->closed)
00430       close_UnixNetVConnection(vc, trigger_event->ethread);
00431     else if (vc->read.enabled && vc->read.triggered)
00432       vc->net_read_io(this, trigger_event->ethread);
00433     else if (!vc->read.enabled)
00434       vc->ep.modify(-EVENTIO_READ);
00435   }
00436   while ((vc = write_ready_list.dequeue())) {
00437     if (vc->closed)
00438       close_UnixNetVConnection(vc, trigger_event->ethread);
00439     else if (vc->write.enabled && vc->write.triggered)
00440       write_to_net(this, vc, trigger_event->ethread);
00441     else if (!vc->write.enabled)
00442       vc->ep.modify(-EVENTIO_WRITE);
00443   }
00444 #endif /* !USE_EDGE_TRIGGER */
00445 
00446   return EVENT_CONT;
00447 }
00448 

Generated by  doxygen 1.7.1