00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 #include "P_Net.h"
00025 
00026 ink_hrtime last_throttle_warning;
00027 ink_hrtime last_shedding_warning;
00028 ink_hrtime emergency_throttle_time;
00029 int net_connections_throttle;
00030 int fds_throttle;
00031 int fds_limit = 8000;
00032 ink_hrtime last_transient_accept_error;
00033 
00034 extern "C" void fd_reify(struct ev_loop *);
00035 
00036 
00037 #ifndef INACTIVITY_TIMEOUT
00038 
00039 
00040 
00041 class InactivityCop : public Continuation {
00042 public:
00043   InactivityCop(ProxyMutex *m):Continuation(m), default_inactivity_timeout(0) {
00044     SET_HANDLER(&InactivityCop::check_inactivity);
00045     REC_ReadConfigInteger(default_inactivity_timeout, "proxy.config.net.default_inactivity_timeout");
00046     Debug("inactivity_cop", "default inactivity timeout is set to: %d", default_inactivity_timeout);
00047   }
00048   int check_inactivity(int event, Event *e) {
00049     (void) event;
00050     ink_hrtime now = ink_get_hrtime();
00051     NetHandler *nh = get_NetHandler(this_ethread());
00052     
00053     forl_LL(UnixNetVConnection, vc, nh->open_list) {
00054       if (vc->thread == this_ethread())
00055         nh->cop_list.push(vc);
00056     }
00057     while (UnixNetVConnection *vc = nh->cop_list.pop()) {
00058       
00059       MUTEX_TRY_LOCK(lock, vc->mutex, this_ethread());
00060       if (!lock.lock_acquired) {
00061        NET_INCREMENT_DYN_STAT(inactivity_cop_lock_acquire_failure_stat);
00062        continue;
00063       }
00064 
00065       if (vc->closed) {
00066         close_UnixNetVConnection(vc, e->ethread);
00067         continue;
00068       }
00069 
00070       
00071       if (vc->next_inactivity_timeout_at == 0 && default_inactivity_timeout > 0) {
00072         Debug("inactivity_cop", "vc: %p inactivity timeout not set, setting a default of %d", vc, default_inactivity_timeout);
00073         vc->set_inactivity_timeout(HRTIME_SECONDS(default_inactivity_timeout));
00074       } else {
00075         Debug("inactivity_cop_verbose", "vc: %p timeout at: %" PRId64 " timeout in: %" PRId64, vc, ink_hrtime_to_sec(vc->next_inactivity_timeout_at),
00076             ink_hrtime_to_sec(vc->inactivity_timeout_in));
00077       }
00078 
00079       if (vc->next_inactivity_timeout_at && vc->next_inactivity_timeout_at < now)
00080         vc->handleEvent(EVENT_IMMEDIATE, e);
00081     }
00082     return 0;
00083   }
00084 private:
00085   int default_inactivity_timeout;  
00086 };
00087 #endif
00088 
00089 PollCont::PollCont(ProxyMutex *m, int pt):Continuation(m), net_handler(NULL), nextPollDescriptor(NULL), poll_timeout(pt) {
00090   pollDescriptor = new PollDescriptor;
00091   pollDescriptor->init();
00092   SET_HANDLER(&PollCont::pollEvent);
00093 }
00094 
00095 PollCont::PollCont(ProxyMutex *m, NetHandler *nh, int pt):Continuation(m), net_handler(nh), nextPollDescriptor(NULL), poll_timeout(pt)
00096 {
00097   pollDescriptor = new PollDescriptor;
00098   pollDescriptor->init();
00099   SET_HANDLER(&PollCont::pollEvent);
00100 }
00101 
00102 PollCont::~PollCont() {
00103   delete pollDescriptor;
00104   if (nextPollDescriptor != NULL) {
00105     delete nextPollDescriptor;
00106   }
00107 }
00108 
00109 
00110 
00111 
00112 
00113 int
00114 PollCont::pollEvent(int event, Event *e) {
00115   (void) event;
00116   (void) e;
00117 
00118   if (likely(net_handler)) {
00119     
00120     if (likely
00121         (!net_handler->read_ready_list.empty() || !net_handler->write_ready_list.empty() ||
00122          !net_handler->read_enable_list.empty() || !net_handler->write_enable_list.empty())) {
00123       NetDebug("iocore_net_poll", "rrq: %d, wrq: %d, rel: %d, wel: %d",
00124                net_handler->read_ready_list.empty(),
00125                net_handler->write_ready_list.empty(), net_handler->read_enable_list.empty(),
00126                net_handler->write_enable_list.empty());
00127       poll_timeout = 0;         
00128     } else {
00129       poll_timeout = net_config_poll_timeout;
00130     }
00131   }
00132   
00133 #if TS_USE_EPOLL
00134   pollDescriptor->result = epoll_wait(pollDescriptor->epoll_fd,
00135                                       pollDescriptor->ePoll_Triggered_Events, POLL_DESCRIPTOR_SIZE, poll_timeout);
00136   NetDebug("iocore_net_poll", "[PollCont::pollEvent] epoll_fd: %d, timeout: %d, results: %d", pollDescriptor->epoll_fd, poll_timeout,
00137            pollDescriptor->result);
00138 #elif TS_USE_KQUEUE
00139   struct timespec tv;
00140   tv.tv_sec = poll_timeout / 1000;
00141   tv.tv_nsec = 1000000 * (poll_timeout % 1000);
00142   pollDescriptor->result = kevent(pollDescriptor->kqueue_fd, NULL, 0,
00143                                   pollDescriptor->kq_Triggered_Events,
00144                                   POLL_DESCRIPTOR_SIZE,
00145                                   &tv);
00146   NetDebug("iocore_net_poll", "[PollCont::pollEvent] kueue_fd: %d, timeout: %d, results: %d", pollDescriptor->kqueue_fd, poll_timeout,
00147            pollDescriptor->result);
00148 #elif TS_USE_PORT
00149   int retval;
00150   timespec_t ptimeout;
00151   ptimeout.tv_sec = poll_timeout / 1000;
00152   ptimeout.tv_nsec = 1000000 * (poll_timeout % 1000);
00153   unsigned nget = 1;
00154   if((retval = port_getn(pollDescriptor->port_fd,
00155                          pollDescriptor->Port_Triggered_Events,
00156                          POLL_DESCRIPTOR_SIZE, &nget, &ptimeout)) < 0) {
00157     pollDescriptor->result = 0;
00158     switch(errno) {
00159     case EINTR:
00160     case EAGAIN:
00161     case ETIME:
00162       if (nget > 0) {
00163         pollDescriptor->result = (int)nget;
00164       }
00165       break;
00166     default:
00167       ink_assert(!"unhandled port_getn() case:");
00168       break;
00169     }
00170   } else {
00171     pollDescriptor->result = (int)nget;
00172   }
00173   NetDebug("iocore_net_poll", "[PollCont::pollEvent] %d[%s]=port_getn(%d,%p,%d,%d,%d),results(%d)",
00174            retval,retval < 0 ? strerror(errno) : "ok",
00175            pollDescriptor->port_fd, pollDescriptor->Port_Triggered_Events,
00176            POLL_DESCRIPTOR_SIZE, nget, poll_timeout, pollDescriptor->result);
00177 #else
00178 #error port me
00179 #endif
00180   return EVENT_CONT;
00181 }
00182 
00183 static void
00184 net_signal_hook_callback(EThread *thread) {
00185 #if HAVE_EVENTFD
00186   uint64_t counter;
00187   ATS_UNUSED_RETURN(read(thread->evfd, &counter, sizeof(uint64_t)));
00188 #elif TS_USE_PORT
00189   
00190 #else
00191   char dummy[1024];
00192   ATS_UNUSED_RETURN(read(thread->evpipe[0], &dummy[0], 1024));
00193 #endif
00194 }
00195 
00196 static void
00197 net_signal_hook_function(EThread *thread) {
00198 #if HAVE_EVENTFD
00199   uint64_t counter = 1;
00200   ATS_UNUSED_RETURN(write(thread->evfd, &counter, sizeof(uint64_t)));
00201 #elif TS_USE_PORT
00202   PollDescriptor *pd = get_PollDescriptor(thread);
00203   ATS_UNUSED_RETURN(port_send(pd->port_fd, 0, thread->ep));
00204 #else
00205   char dummy = 1;
00206   ATS_UNUSED_RETURN(write(thread->evpipe[1], &dummy, 1));
00207 #endif
00208 }
00209 
00210 void
00211 initialize_thread_for_net(EThread *thread)
00212 {
00213   new((ink_dummy_for_new *) get_NetHandler(thread)) NetHandler();
00214   new((ink_dummy_for_new *) get_PollCont(thread)) PollCont(thread->mutex, get_NetHandler(thread));
00215   get_NetHandler(thread)->mutex = new_ProxyMutex();
00216   PollCont *pc = get_PollCont(thread);
00217   PollDescriptor *pd = pc->pollDescriptor;
00218 
00219   thread->schedule_imm(get_NetHandler(thread));
00220 
00221 #ifndef INACTIVITY_TIMEOUT
00222   InactivityCop *inactivityCop = new InactivityCop(get_NetHandler(thread)->mutex);
00223   thread->schedule_every(inactivityCop, HRTIME_SECONDS(1));
00224 #endif
00225 
00226   thread->signal_hook = net_signal_hook_function;
00227   thread->ep = (EventIO*)ats_malloc(sizeof(EventIO));
00228   thread->ep->type = EVENTIO_ASYNC_SIGNAL;
00229 #if HAVE_EVENTFD
00230   thread->ep->start(pd, thread->evfd, 0, EVENTIO_READ);
00231 #else
00232   thread->ep->start(pd, thread->evpipe[0], 0, EVENTIO_READ);
00233 #endif
00234 }
00235 
00236 
00237 
00238 NetHandler::NetHandler():Continuation(NULL), trigger_event(0)
00239 {
00240   SET_HANDLER((NetContHandler) & NetHandler::startNetEvent);
00241 }
00242 
00243 
00244 
00245 
00246 
00247 int
00248 NetHandler::startNetEvent(int event, Event *e)
00249 {
00250   (void) event;
00251   SET_HANDLER((NetContHandler) & NetHandler::mainNetEvent);
00252   e->schedule_every(NET_PERIOD);
00253   trigger_event = e;
00254   return EVENT_CONT;
00255 }
00256 
00257 
00258 
00259 
00260 void
00261 NetHandler::process_enabled_list(NetHandler *nh)
00262 {
00263   UnixNetVConnection *vc = NULL;
00264 
00265   SListM(UnixNetVConnection, NetState, read, enable_link) rq(nh->read_enable_list.popall());
00266   while ((vc = rq.pop())) {
00267     vc->ep.modify(EVENTIO_READ);
00268     vc->ep.refresh(EVENTIO_READ);
00269     vc->read.in_enabled_list = 0;
00270     if ((vc->read.enabled && vc->read.triggered) || vc->closed)
00271       nh->read_ready_list.in_or_enqueue(vc);
00272   }
00273 
00274   SListM(UnixNetVConnection, NetState, write, enable_link) wq(nh->write_enable_list.popall());
00275   while ((vc = wq.pop())) {
00276     vc->ep.modify(EVENTIO_WRITE);
00277     vc->ep.refresh(EVENTIO_WRITE);
00278     vc->write.in_enabled_list = 0;
00279     if ((vc->write.enabled && vc->write.triggered) || vc->closed)
00280       nh->write_ready_list.in_or_enqueue(vc);
00281   }
00282 }
00283 
00284 
00285 
00286 
00287 
00288 
00289 
00290 int
00291 NetHandler::mainNetEvent(int event, Event *e)
00292 {
00293   ink_assert(trigger_event == e && (event == EVENT_INTERVAL || event == EVENT_POLL));
00294   (void) event;
00295   (void) e;
00296   EventIO *epd = NULL;
00297   int poll_timeout;
00298 
00299   NET_INCREMENT_DYN_STAT(net_handler_run_stat);
00300 
00301   process_enabled_list(this);
00302   if (likely(!read_ready_list.empty() || !write_ready_list.empty() || !read_enable_list.empty() || !write_enable_list.empty()))
00303     poll_timeout = 0; 
00304   else
00305     poll_timeout = net_config_poll_timeout;
00306 
00307   PollDescriptor *pd = get_PollDescriptor(trigger_event->ethread);
00308   UnixNetVConnection *vc = NULL;
00309 #if TS_USE_EPOLL
00310   pd->result = epoll_wait(pd->epoll_fd, pd->ePoll_Triggered_Events, POLL_DESCRIPTOR_SIZE, poll_timeout);
00311   NetDebug("iocore_net_main_poll", "[NetHandler::mainNetEvent] epoll_wait(%d,%d), result=%d", pd->epoll_fd,poll_timeout,pd->result);
00312 #elif TS_USE_KQUEUE
00313   struct timespec tv;
00314   tv.tv_sec = poll_timeout / 1000;
00315   tv.tv_nsec = 1000000 * (poll_timeout % 1000);
00316   pd->result = kevent(pd->kqueue_fd, NULL, 0, pd->kq_Triggered_Events, POLL_DESCRIPTOR_SIZE, &tv);
00317   NetDebug("iocore_net_main_poll", "[NetHandler::mainNetEvent] kevent(%d,%d), result=%d", pd->kqueue_fd,poll_timeout,pd->result);
00318 #elif TS_USE_PORT
00319   int retval;
00320   timespec_t ptimeout;
00321   ptimeout.tv_sec = poll_timeout / 1000;
00322   ptimeout.tv_nsec = 1000000 * (poll_timeout % 1000);
00323   unsigned nget = 1;
00324   if((retval = port_getn(pd->port_fd, pd->Port_Triggered_Events, POLL_DESCRIPTOR_SIZE, &nget, &ptimeout)) < 0) {
00325     pd->result = 0;
00326     switch(errno) {
00327     case EINTR:
00328     case EAGAIN:
00329     case ETIME:
00330       if (nget > 0) {
00331         pd->result = (int)nget;
00332       }
00333       break;
00334     default:
00335       ink_assert(!"unhandled port_getn() case:");
00336       break;
00337     }
00338   } else {
00339     pd->result = (int)nget;
00340   }
00341   NetDebug("iocore_net_main_poll", "[NetHandler::mainNetEvent] %d[%s]=port_getn(%d,%p,%d,%d,%d),results(%d)",
00342            retval,retval < 0 ? strerror(errno) : "ok",
00343            pd->port_fd, pd->Port_Triggered_Events,
00344            POLL_DESCRIPTOR_SIZE, nget, poll_timeout, pd->result);
00345 
00346 #else
00347 #error port me
00348 #endif
00349 
00350   vc = NULL;
00351   for (int x = 0; x < pd->result; x++) {
00352     epd = (EventIO*) get_ev_data(pd,x);
00353     if (epd->type == EVENTIO_READWRITE_VC) {
00354       vc = epd->data.vc;
00355       if (get_ev_events(pd,x) & (EVENTIO_READ|EVENTIO_ERROR)) {
00356         vc->read.triggered = 1;
00357         if (!read_ready_list.in(vc))
00358           read_ready_list.enqueue(vc);
00359         else if (get_ev_events(pd,x) & EVENTIO_ERROR) {
00360           
00361           Debug("iocore_net_main", "Unhandled epoll event on read: 0x%04x read.enabled=%d closed=%d read.netready_queue=%d",
00362                 get_ev_events(pd,x), vc->read.enabled, vc->closed, read_ready_list.in(vc));
00363         }
00364       }
00365       vc = epd->data.vc;
00366       if (get_ev_events(pd,x) & (EVENTIO_WRITE|EVENTIO_ERROR)) {
00367         vc->write.triggered = 1;
00368         if (!write_ready_list.in(vc))
00369           write_ready_list.enqueue(vc);
00370         else if (get_ev_events(pd,x) & EVENTIO_ERROR) {
00371           
00372           Debug("iocore_net_main",
00373                 "Unhandled epoll event on write: 0x%04x write.enabled=%d closed=%d write.netready_queue=%d",
00374                 get_ev_events(pd,x), vc->write.enabled, vc->closed, write_ready_list.in(vc));
00375         }
00376       } else if (!get_ev_events(pd,x) & EVENTIO_ERROR) {
00377         Debug("iocore_net_main", "Unhandled epoll event: 0x%04x", get_ev_events(pd,x));
00378       }
00379     } else if (epd->type == EVENTIO_DNS_CONNECTION) {
00380       if (epd->data.dnscon != NULL) {
00381         epd->data.dnscon->trigger(); 
00382 #if defined(USE_EDGE_TRIGGER)
00383         epd->refresh(EVENTIO_READ);
00384 #endif
00385       }
00386     } else if (epd->type == EVENTIO_ASYNC_SIGNAL)
00387       net_signal_hook_callback(trigger_event->ethread);
00388     ev_next_event(pd,x);
00389   }
00390 
00391   pd->result = 0;
00392 
00393 #if defined(USE_EDGE_TRIGGER)
00394  
00395   while ((vc = read_ready_list.dequeue())) {
00396     if (vc->closed)
00397       close_UnixNetVConnection(vc, trigger_event->ethread);
00398     else if (vc->read.enabled && vc->read.triggered)
00399       vc->net_read_io(this, trigger_event->ethread);
00400     else if (!vc->read.enabled) {
00401       read_ready_list.remove(vc);
00402 #if defined(solaris)
00403       if (vc->read.triggered && vc->write.enabled) {
00404         vc->ep.modify(-EVENTIO_READ);
00405         vc->ep.refresh(EVENTIO_WRITE);
00406         vc->writeReschedule(this);
00407       }
00408 #endif
00409     }
00410   }
00411   while ((vc = write_ready_list.dequeue())) {
00412     if (vc->closed)
00413       close_UnixNetVConnection(vc, trigger_event->ethread);
00414     else if (vc->write.enabled && vc->write.triggered)
00415       write_to_net(this, vc, trigger_event->ethread);
00416     else if (!vc->write.enabled) {
00417       write_ready_list.remove(vc);
00418 #if defined(solaris)
00419       if (vc->write.triggered && vc->read.enabled) {
00420         vc->ep.modify(-EVENTIO_WRITE);
00421         vc->ep.refresh(EVENTIO_READ);
00422         vc->readReschedule(this);
00423       }
00424 #endif
00425     }
00426   }
00427 #else 
00428   while ((vc = read_ready_list.dequeue())) {
00429     if (vc->closed)
00430       close_UnixNetVConnection(vc, trigger_event->ethread);
00431     else if (vc->read.enabled && vc->read.triggered)
00432       vc->net_read_io(this, trigger_event->ethread);
00433     else if (!vc->read.enabled)
00434       vc->ep.modify(-EVENTIO_READ);
00435   }
00436   while ((vc = write_ready_list.dequeue())) {
00437     if (vc->closed)
00438       close_UnixNetVConnection(vc, trigger_event->ethread);
00439     else if (vc->write.enabled && vc->write.triggered)
00440       write_to_net(this, vc, trigger_event->ethread);
00441     else if (!vc->write.enabled)
00442       vc->ep.modify(-EVENTIO_WRITE);
00443   }
00444 #endif 
00445 
00446   return EVENT_CONT;
00447 }
00448