00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "P_Net.h"
00025
00026 ink_hrtime last_throttle_warning;
00027 ink_hrtime last_shedding_warning;
00028 ink_hrtime emergency_throttle_time;
00029 int net_connections_throttle;
00030 int fds_throttle;
00031 int fds_limit = 8000;
00032 ink_hrtime last_transient_accept_error;
00033
00034 extern "C" void fd_reify(struct ev_loop *);
00035
00036
00037 #ifndef INACTIVITY_TIMEOUT
00038
00039
00040
00041 class InactivityCop : public Continuation {
00042 public:
00043 InactivityCop(ProxyMutex *m):Continuation(m), default_inactivity_timeout(0) {
00044 SET_HANDLER(&InactivityCop::check_inactivity);
00045 REC_ReadConfigInteger(default_inactivity_timeout, "proxy.config.net.default_inactivity_timeout");
00046 Debug("inactivity_cop", "default inactivity timeout is set to: %d", default_inactivity_timeout);
00047 }
00048 int check_inactivity(int event, Event *e) {
00049 (void) event;
00050 ink_hrtime now = ink_get_hrtime();
00051 NetHandler *nh = get_NetHandler(this_ethread());
00052
00053 forl_LL(UnixNetVConnection, vc, nh->open_list) {
00054 if (vc->thread == this_ethread())
00055 nh->cop_list.push(vc);
00056 }
00057 while (UnixNetVConnection *vc = nh->cop_list.pop()) {
00058
00059 MUTEX_TRY_LOCK(lock, vc->mutex, this_ethread());
00060 if (!lock.lock_acquired) {
00061 NET_INCREMENT_DYN_STAT(inactivity_cop_lock_acquire_failure_stat);
00062 continue;
00063 }
00064
00065 if (vc->closed) {
00066 close_UnixNetVConnection(vc, e->ethread);
00067 continue;
00068 }
00069
00070
00071 if (vc->next_inactivity_timeout_at == 0 && default_inactivity_timeout > 0) {
00072 Debug("inactivity_cop", "vc: %p inactivity timeout not set, setting a default of %d", vc, default_inactivity_timeout);
00073 vc->set_inactivity_timeout(HRTIME_SECONDS(default_inactivity_timeout));
00074 } else {
00075 Debug("inactivity_cop_verbose", "vc: %p timeout at: %" PRId64 " timeout in: %" PRId64, vc, ink_hrtime_to_sec(vc->next_inactivity_timeout_at),
00076 ink_hrtime_to_sec(vc->inactivity_timeout_in));
00077 }
00078
00079 if (vc->next_inactivity_timeout_at && vc->next_inactivity_timeout_at < now)
00080 vc->handleEvent(EVENT_IMMEDIATE, e);
00081 }
00082 return 0;
00083 }
00084 private:
00085 int default_inactivity_timeout;
00086 };
00087 #endif
00088
00089 PollCont::PollCont(ProxyMutex *m, int pt):Continuation(m), net_handler(NULL), nextPollDescriptor(NULL), poll_timeout(pt) {
00090 pollDescriptor = new PollDescriptor;
00091 pollDescriptor->init();
00092 SET_HANDLER(&PollCont::pollEvent);
00093 }
00094
00095 PollCont::PollCont(ProxyMutex *m, NetHandler *nh, int pt):Continuation(m), net_handler(nh), nextPollDescriptor(NULL), poll_timeout(pt)
00096 {
00097 pollDescriptor = new PollDescriptor;
00098 pollDescriptor->init();
00099 SET_HANDLER(&PollCont::pollEvent);
00100 }
00101
00102 PollCont::~PollCont() {
00103 delete pollDescriptor;
00104 if (nextPollDescriptor != NULL) {
00105 delete nextPollDescriptor;
00106 }
00107 }
00108
00109
00110
00111
00112
00113 int
00114 PollCont::pollEvent(int event, Event *e) {
00115 (void) event;
00116 (void) e;
00117
00118 if (likely(net_handler)) {
00119
00120 if (likely
00121 (!net_handler->read_ready_list.empty() || !net_handler->write_ready_list.empty() ||
00122 !net_handler->read_enable_list.empty() || !net_handler->write_enable_list.empty())) {
00123 NetDebug("iocore_net_poll", "rrq: %d, wrq: %d, rel: %d, wel: %d",
00124 net_handler->read_ready_list.empty(),
00125 net_handler->write_ready_list.empty(), net_handler->read_enable_list.empty(),
00126 net_handler->write_enable_list.empty());
00127 poll_timeout = 0;
00128 } else {
00129 poll_timeout = net_config_poll_timeout;
00130 }
00131 }
00132
00133 #if TS_USE_EPOLL
00134 pollDescriptor->result = epoll_wait(pollDescriptor->epoll_fd,
00135 pollDescriptor->ePoll_Triggered_Events, POLL_DESCRIPTOR_SIZE, poll_timeout);
00136 NetDebug("iocore_net_poll", "[PollCont::pollEvent] epoll_fd: %d, timeout: %d, results: %d", pollDescriptor->epoll_fd, poll_timeout,
00137 pollDescriptor->result);
00138 #elif TS_USE_KQUEUE
00139 struct timespec tv;
00140 tv.tv_sec = poll_timeout / 1000;
00141 tv.tv_nsec = 1000000 * (poll_timeout % 1000);
00142 pollDescriptor->result = kevent(pollDescriptor->kqueue_fd, NULL, 0,
00143 pollDescriptor->kq_Triggered_Events,
00144 POLL_DESCRIPTOR_SIZE,
00145 &tv);
00146 NetDebug("iocore_net_poll", "[PollCont::pollEvent] kueue_fd: %d, timeout: %d, results: %d", pollDescriptor->kqueue_fd, poll_timeout,
00147 pollDescriptor->result);
00148 #elif TS_USE_PORT
00149 int retval;
00150 timespec_t ptimeout;
00151 ptimeout.tv_sec = poll_timeout / 1000;
00152 ptimeout.tv_nsec = 1000000 * (poll_timeout % 1000);
00153 unsigned nget = 1;
00154 if((retval = port_getn(pollDescriptor->port_fd,
00155 pollDescriptor->Port_Triggered_Events,
00156 POLL_DESCRIPTOR_SIZE, &nget, &ptimeout)) < 0) {
00157 pollDescriptor->result = 0;
00158 switch(errno) {
00159 case EINTR:
00160 case EAGAIN:
00161 case ETIME:
00162 if (nget > 0) {
00163 pollDescriptor->result = (int)nget;
00164 }
00165 break;
00166 default:
00167 ink_assert(!"unhandled port_getn() case:");
00168 break;
00169 }
00170 } else {
00171 pollDescriptor->result = (int)nget;
00172 }
00173 NetDebug("iocore_net_poll", "[PollCont::pollEvent] %d[%s]=port_getn(%d,%p,%d,%d,%d),results(%d)",
00174 retval,retval < 0 ? strerror(errno) : "ok",
00175 pollDescriptor->port_fd, pollDescriptor->Port_Triggered_Events,
00176 POLL_DESCRIPTOR_SIZE, nget, poll_timeout, pollDescriptor->result);
00177 #else
00178 #error port me
00179 #endif
00180 return EVENT_CONT;
00181 }
00182
00183 static void
00184 net_signal_hook_callback(EThread *thread) {
00185 #if HAVE_EVENTFD
00186 uint64_t counter;
00187 ATS_UNUSED_RETURN(read(thread->evfd, &counter, sizeof(uint64_t)));
00188 #elif TS_USE_PORT
00189
00190 #else
00191 char dummy[1024];
00192 ATS_UNUSED_RETURN(read(thread->evpipe[0], &dummy[0], 1024));
00193 #endif
00194 }
00195
00196 static void
00197 net_signal_hook_function(EThread *thread) {
00198 #if HAVE_EVENTFD
00199 uint64_t counter = 1;
00200 ATS_UNUSED_RETURN(write(thread->evfd, &counter, sizeof(uint64_t)));
00201 #elif TS_USE_PORT
00202 PollDescriptor *pd = get_PollDescriptor(thread);
00203 ATS_UNUSED_RETURN(port_send(pd->port_fd, 0, thread->ep));
00204 #else
00205 char dummy = 1;
00206 ATS_UNUSED_RETURN(write(thread->evpipe[1], &dummy, 1));
00207 #endif
00208 }
00209
00210 void
00211 initialize_thread_for_net(EThread *thread)
00212 {
00213 new((ink_dummy_for_new *) get_NetHandler(thread)) NetHandler();
00214 new((ink_dummy_for_new *) get_PollCont(thread)) PollCont(thread->mutex, get_NetHandler(thread));
00215 get_NetHandler(thread)->mutex = new_ProxyMutex();
00216 PollCont *pc = get_PollCont(thread);
00217 PollDescriptor *pd = pc->pollDescriptor;
00218
00219 thread->schedule_imm(get_NetHandler(thread));
00220
00221 #ifndef INACTIVITY_TIMEOUT
00222 InactivityCop *inactivityCop = new InactivityCop(get_NetHandler(thread)->mutex);
00223 thread->schedule_every(inactivityCop, HRTIME_SECONDS(1));
00224 #endif
00225
00226 thread->signal_hook = net_signal_hook_function;
00227 thread->ep = (EventIO*)ats_malloc(sizeof(EventIO));
00228 thread->ep->type = EVENTIO_ASYNC_SIGNAL;
00229 #if HAVE_EVENTFD
00230 thread->ep->start(pd, thread->evfd, 0, EVENTIO_READ);
00231 #else
00232 thread->ep->start(pd, thread->evpipe[0], 0, EVENTIO_READ);
00233 #endif
00234 }
00235
00236
00237
00238 NetHandler::NetHandler():Continuation(NULL), trigger_event(0)
00239 {
00240 SET_HANDLER((NetContHandler) & NetHandler::startNetEvent);
00241 }
00242
00243
00244
00245
00246
00247 int
00248 NetHandler::startNetEvent(int event, Event *e)
00249 {
00250 (void) event;
00251 SET_HANDLER((NetContHandler) & NetHandler::mainNetEvent);
00252 e->schedule_every(NET_PERIOD);
00253 trigger_event = e;
00254 return EVENT_CONT;
00255 }
00256
00257
00258
00259
00260 void
00261 NetHandler::process_enabled_list(NetHandler *nh)
00262 {
00263 UnixNetVConnection *vc = NULL;
00264
00265 SListM(UnixNetVConnection, NetState, read, enable_link) rq(nh->read_enable_list.popall());
00266 while ((vc = rq.pop())) {
00267 vc->ep.modify(EVENTIO_READ);
00268 vc->ep.refresh(EVENTIO_READ);
00269 vc->read.in_enabled_list = 0;
00270 if ((vc->read.enabled && vc->read.triggered) || vc->closed)
00271 nh->read_ready_list.in_or_enqueue(vc);
00272 }
00273
00274 SListM(UnixNetVConnection, NetState, write, enable_link) wq(nh->write_enable_list.popall());
00275 while ((vc = wq.pop())) {
00276 vc->ep.modify(EVENTIO_WRITE);
00277 vc->ep.refresh(EVENTIO_WRITE);
00278 vc->write.in_enabled_list = 0;
00279 if ((vc->write.enabled && vc->write.triggered) || vc->closed)
00280 nh->write_ready_list.in_or_enqueue(vc);
00281 }
00282 }
00283
00284
00285
00286
00287
00288
00289
00290 int
00291 NetHandler::mainNetEvent(int event, Event *e)
00292 {
00293 ink_assert(trigger_event == e && (event == EVENT_INTERVAL || event == EVENT_POLL));
00294 (void) event;
00295 (void) e;
00296 EventIO *epd = NULL;
00297 int poll_timeout;
00298
00299 NET_INCREMENT_DYN_STAT(net_handler_run_stat);
00300
00301 process_enabled_list(this);
00302 if (likely(!read_ready_list.empty() || !write_ready_list.empty() || !read_enable_list.empty() || !write_enable_list.empty()))
00303 poll_timeout = 0;
00304 else
00305 poll_timeout = net_config_poll_timeout;
00306
00307 PollDescriptor *pd = get_PollDescriptor(trigger_event->ethread);
00308 UnixNetVConnection *vc = NULL;
00309 #if TS_USE_EPOLL
00310 pd->result = epoll_wait(pd->epoll_fd, pd->ePoll_Triggered_Events, POLL_DESCRIPTOR_SIZE, poll_timeout);
00311 NetDebug("iocore_net_main_poll", "[NetHandler::mainNetEvent] epoll_wait(%d,%d), result=%d", pd->epoll_fd,poll_timeout,pd->result);
00312 #elif TS_USE_KQUEUE
00313 struct timespec tv;
00314 tv.tv_sec = poll_timeout / 1000;
00315 tv.tv_nsec = 1000000 * (poll_timeout % 1000);
00316 pd->result = kevent(pd->kqueue_fd, NULL, 0, pd->kq_Triggered_Events, POLL_DESCRIPTOR_SIZE, &tv);
00317 NetDebug("iocore_net_main_poll", "[NetHandler::mainNetEvent] kevent(%d,%d), result=%d", pd->kqueue_fd,poll_timeout,pd->result);
00318 #elif TS_USE_PORT
00319 int retval;
00320 timespec_t ptimeout;
00321 ptimeout.tv_sec = poll_timeout / 1000;
00322 ptimeout.tv_nsec = 1000000 * (poll_timeout % 1000);
00323 unsigned nget = 1;
00324 if((retval = port_getn(pd->port_fd, pd->Port_Triggered_Events, POLL_DESCRIPTOR_SIZE, &nget, &ptimeout)) < 0) {
00325 pd->result = 0;
00326 switch(errno) {
00327 case EINTR:
00328 case EAGAIN:
00329 case ETIME:
00330 if (nget > 0) {
00331 pd->result = (int)nget;
00332 }
00333 break;
00334 default:
00335 ink_assert(!"unhandled port_getn() case:");
00336 break;
00337 }
00338 } else {
00339 pd->result = (int)nget;
00340 }
00341 NetDebug("iocore_net_main_poll", "[NetHandler::mainNetEvent] %d[%s]=port_getn(%d,%p,%d,%d,%d),results(%d)",
00342 retval,retval < 0 ? strerror(errno) : "ok",
00343 pd->port_fd, pd->Port_Triggered_Events,
00344 POLL_DESCRIPTOR_SIZE, nget, poll_timeout, pd->result);
00345
00346 #else
00347 #error port me
00348 #endif
00349
00350 vc = NULL;
00351 for (int x = 0; x < pd->result; x++) {
00352 epd = (EventIO*) get_ev_data(pd,x);
00353 if (epd->type == EVENTIO_READWRITE_VC) {
00354 vc = epd->data.vc;
00355 if (get_ev_events(pd,x) & (EVENTIO_READ|EVENTIO_ERROR)) {
00356 vc->read.triggered = 1;
00357 if (!read_ready_list.in(vc))
00358 read_ready_list.enqueue(vc);
00359 else if (get_ev_events(pd,x) & EVENTIO_ERROR) {
00360
00361 Debug("iocore_net_main", "Unhandled epoll event on read: 0x%04x read.enabled=%d closed=%d read.netready_queue=%d",
00362 get_ev_events(pd,x), vc->read.enabled, vc->closed, read_ready_list.in(vc));
00363 }
00364 }
00365 vc = epd->data.vc;
00366 if (get_ev_events(pd,x) & (EVENTIO_WRITE|EVENTIO_ERROR)) {
00367 vc->write.triggered = 1;
00368 if (!write_ready_list.in(vc))
00369 write_ready_list.enqueue(vc);
00370 else if (get_ev_events(pd,x) & EVENTIO_ERROR) {
00371
00372 Debug("iocore_net_main",
00373 "Unhandled epoll event on write: 0x%04x write.enabled=%d closed=%d write.netready_queue=%d",
00374 get_ev_events(pd,x), vc->write.enabled, vc->closed, write_ready_list.in(vc));
00375 }
00376 } else if (!get_ev_events(pd,x) & EVENTIO_ERROR) {
00377 Debug("iocore_net_main", "Unhandled epoll event: 0x%04x", get_ev_events(pd,x));
00378 }
00379 } else if (epd->type == EVENTIO_DNS_CONNECTION) {
00380 if (epd->data.dnscon != NULL) {
00381 epd->data.dnscon->trigger();
00382 #if defined(USE_EDGE_TRIGGER)
00383 epd->refresh(EVENTIO_READ);
00384 #endif
00385 }
00386 } else if (epd->type == EVENTIO_ASYNC_SIGNAL)
00387 net_signal_hook_callback(trigger_event->ethread);
00388 ev_next_event(pd,x);
00389 }
00390
00391 pd->result = 0;
00392
00393 #if defined(USE_EDGE_TRIGGER)
00394
00395 while ((vc = read_ready_list.dequeue())) {
00396 if (vc->closed)
00397 close_UnixNetVConnection(vc, trigger_event->ethread);
00398 else if (vc->read.enabled && vc->read.triggered)
00399 vc->net_read_io(this, trigger_event->ethread);
00400 else if (!vc->read.enabled) {
00401 read_ready_list.remove(vc);
00402 #if defined(solaris)
00403 if (vc->read.triggered && vc->write.enabled) {
00404 vc->ep.modify(-EVENTIO_READ);
00405 vc->ep.refresh(EVENTIO_WRITE);
00406 vc->writeReschedule(this);
00407 }
00408 #endif
00409 }
00410 }
00411 while ((vc = write_ready_list.dequeue())) {
00412 if (vc->closed)
00413 close_UnixNetVConnection(vc, trigger_event->ethread);
00414 else if (vc->write.enabled && vc->write.triggered)
00415 write_to_net(this, vc, trigger_event->ethread);
00416 else if (!vc->write.enabled) {
00417 write_ready_list.remove(vc);
00418 #if defined(solaris)
00419 if (vc->write.triggered && vc->read.enabled) {
00420 vc->ep.modify(-EVENTIO_WRITE);
00421 vc->ep.refresh(EVENTIO_READ);
00422 vc->readReschedule(this);
00423 }
00424 #endif
00425 }
00426 }
00427 #else
00428 while ((vc = read_ready_list.dequeue())) {
00429 if (vc->closed)
00430 close_UnixNetVConnection(vc, trigger_event->ethread);
00431 else if (vc->read.enabled && vc->read.triggered)
00432 vc->net_read_io(this, trigger_event->ethread);
00433 else if (!vc->read.enabled)
00434 vc->ep.modify(-EVENTIO_READ);
00435 }
00436 while ((vc = write_ready_list.dequeue())) {
00437 if (vc->closed)
00438 close_UnixNetVConnection(vc, trigger_event->ethread);
00439 else if (vc->write.enabled && vc->write.triggered)
00440 write_to_net(this, vc, trigger_event->ethread);
00441 else if (!vc->write.enabled)
00442 vc->ep.modify(-EVENTIO_WRITE);
00443 }
00444 #endif
00445
00446 return EVENT_CONT;
00447 }
00448