00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #ifndef __P_UNIXNET_H__
00025 #define __P_UNIXNET_H__
00026
00027 #include "libts.h"
00028
00029 #define USE_EDGE_TRIGGER_EPOLL 1
00030 #define USE_EDGE_TRIGGER_KQUEUE 1
00031 #define USE_EDGE_TRIGGER_PORT 1
00032
00033
00034 #define EVENTIO_NETACCEPT 1
00035 #define EVENTIO_READWRITE_VC 2
00036 #define EVENTIO_DNS_CONNECTION 3
00037 #define EVENTIO_UDP_CONNECTION 4
00038 #define EVENTIO_ASYNC_SIGNAL 5
00039
00040 #if TS_USE_EPOLL
00041 #ifdef USE_EDGE_TRIGGER_EPOLL
00042 #define USE_EDGE_TRIGGER 1
00043 #define EVENTIO_READ (EPOLLIN|EPOLLET)
00044 #define EVENTIO_WRITE (EPOLLOUT|EPOLLET)
00045 #else
00046 #define EVENTIO_READ EPOLLIN
00047 #define EVENTIO_WRITE EPOLLOUT
00048 #endif
00049 #define EVENTIO_ERROR (EPOLLERR|EPOLLPRI|EPOLLHUP)
00050 #endif
00051
00052 #if TS_USE_KQUEUE
00053 #ifdef USE_EDGE_TRIGGER_KQUEUE
00054 #define USE_EDGE_TRIGGER 1
00055 #define INK_EV_EDGE_TRIGGER EV_CLEAR
00056 #else
00057 #define INK_EV_EDGE_TRIGGER 0
00058 #endif
00059 #define EVENTIO_READ INK_EVP_IN
00060 #define EVENTIO_WRITE INK_EVP_OUT
00061 #define EVENTIO_ERROR (0x010|0x002|0x020) // ERR PRI HUP
00062 #endif
00063 #if TS_USE_PORT
00064 #ifdef USE_EDGE_TRIGGER_PORT
00065 #define USE_EDGE_TRIGGER 1
00066 #endif
00067 #define EVENTIO_READ POLLIN
00068 #define EVENTIO_WRITE POLLOUT
00069 #define EVENTIO_ERROR (POLLERR|POLLPRI|POLLHUP)
00070 #endif
00071
00072 struct PollDescriptor;
00073 typedef PollDescriptor *EventLoop;
00074
00075 class UnixNetVConnection;
00076 class UnixUDPConnection;
00077 struct DNSConnection;
00078 struct NetAccept;
00079 struct EventIO
00080 {
00081 int fd;
00082 #if TS_USE_KQUEUE || TS_USE_EPOLL && !defined(USE_EDGE_TRIGGER) || TS_USE_PORT
00083 int events;
00084 #endif
00085 EventLoop event_loop;
00086 int type;
00087 union
00088 {
00089 Continuation *c;
00090 UnixNetVConnection *vc;
00091 DNSConnection *dnscon;
00092 NetAccept *na;
00093 UnixUDPConnection *uc;
00094 } data;
00095 int start(EventLoop l, DNSConnection *vc, int events);
00096 int start(EventLoop l, NetAccept *vc, int events);
00097 int start(EventLoop l, UnixNetVConnection *vc, int events);
00098 int start(EventLoop l, UnixUDPConnection *vc, int events);
00099 int start(EventLoop l, int fd, Continuation *c, int events);
00100
00101
00102 int modify(int events);
00103
00104 int refresh(int events);
00105 int stop();
00106 int close();
00107 EventIO() {
00108 type = 0;
00109 data.c = 0;
00110 }
00111 };
00112
00113 #include "P_UnixNetProcessor.h"
00114 #include "P_UnixNetVConnection.h"
00115 #include "P_NetAccept.h"
00116 #include "P_DNSConnection.h"
00117 #include "P_UnixUDPConnection.h"
00118 #include "P_UnixPollDescriptor.h"
00119
00120 class UnixNetVConnection;
00121 class NetHandler;
00122 typedef int (NetHandler::*NetContHandler) (int, void *);
00123 typedef unsigned int uint32;
00124
00125 extern ink_hrtime last_throttle_warning;
00126 extern ink_hrtime last_shedding_warning;
00127 extern ink_hrtime emergency_throttle_time;
00128 extern int net_connections_throttle;
00129 extern int fds_throttle;
00130 extern int fds_limit;
00131 extern ink_hrtime last_transient_accept_error;
00132 extern int http_accept_port_number;
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142 #define THROTTLE_FD_HEADROOM (128 + 64) // CACHE_DB_FDS + 64
00143
00144 #define TRANSIENT_ACCEPT_ERROR_MESSAGE_EVERY HRTIME_HOURS(24)
00145 #define NET_RETRY_DELAY HRTIME_MSECONDS(10)
00146
00147
00148 #define THROTTLE_AT_ONCE 5
00149 #define EMERGENCY_THROTTLE 16
00150 #define HYPER_EMERGENCY_THROTTLE 6
00151
00152 #define NET_THROTTLE_ACCEPT_HEADROOM 1.1 // 10%
00153 #define NET_THROTTLE_CONNECT_HEADROOM 1.0 // 0%
00154 #define NET_THROTTLE_MESSAGE_EVERY HRTIME_MINUTES(10)
00155 #define NET_PERIOD -HRTIME_MSECONDS(5)
00156 #define ACCEPT_PERIOD -HRTIME_MSECONDS(4)
00157 #define NET_THROTTLE_DELAY 50
00158
00159 #define PRINT_IP(x) ((uint8_t*)&(x))[0],((uint8_t*)&(x))[1], ((uint8_t*)&(x))[2],((uint8_t*)&(x))[3]
00160
00161
00162
00163 unsigned int net_next_connection_number();
00164
00165 struct PollCont:public Continuation
00166 {
00167 NetHandler *net_handler;
00168 PollDescriptor *pollDescriptor;
00169 PollDescriptor *nextPollDescriptor;
00170 int poll_timeout;
00171
00172 PollCont(ProxyMutex * m, int pt = net_config_poll_timeout);
00173 PollCont(ProxyMutex * m, NetHandler * nh, int pt = net_config_poll_timeout);
00174 ~PollCont();
00175 int pollEvent(int event, Event * e);
00176 };
00177
00178
00179
00180
00181
00182
00183
00184
00185 class NetHandler:public Continuation
00186 {
00187 public:
00188 Event *trigger_event;
00189 QueM(UnixNetVConnection, NetState, read, ready_link) read_ready_list;
00190 QueM(UnixNetVConnection, NetState, write, ready_link) write_ready_list;
00191 Que(UnixNetVConnection, link) open_list;
00192 DList(UnixNetVConnection, cop_link) cop_list;
00193 ASLLM(UnixNetVConnection, NetState, read, enable_link) read_enable_list;
00194 ASLLM(UnixNetVConnection, NetState, write, enable_link) write_enable_list;
00195
00196 time_t sec;
00197 int cycles;
00198
00199 int startNetEvent(int event, Event * data);
00200 int mainNetEvent(int event, Event * data);
00201 int mainNetEventExt(int event, Event * data);
00202 void process_enabled_list(NetHandler *);
00203
00204 NetHandler();
00205 };
00206
00207 static inline NetHandler *
00208 get_NetHandler(EThread * t)
00209 {
00210 return (NetHandler *) ETHREAD_GET_PTR(t, unix_netProcessor.netHandler_offset);
00211 }
00212 static inline PollCont *
00213 get_PollCont(EThread * t)
00214 {
00215 return (PollCont *) ETHREAD_GET_PTR(t, unix_netProcessor.pollCont_offset);
00216 }
00217 static inline PollDescriptor *
00218 get_PollDescriptor(EThread * t)
00219 {
00220 PollCont *p = get_PollCont(t);
00221 return p->pollDescriptor;
00222 }
00223
00224
00225 enum ThrottleType
00226 { ACCEPT, CONNECT };
00227
00228 TS_INLINE int
00229 net_connections_to_throttle(ThrottleType t)
00230 {
00231
00232 double headroom = t == ACCEPT ? NET_THROTTLE_ACCEPT_HEADROOM : NET_THROTTLE_CONNECT_HEADROOM;
00233 int64_t sval = 0;
00234
00235 NET_READ_GLOBAL_DYN_SUM(net_connections_currently_open_stat, sval);
00236 int currently_open = (int) sval;
00237
00238 if (currently_open < 0)
00239 currently_open = 0;
00240 return (int) (currently_open * headroom);
00241 }
00242
00243 TS_INLINE void
00244 check_shedding_warning()
00245 {
00246 ink_hrtime t = ink_get_hrtime();
00247 if (t - last_shedding_warning > NET_THROTTLE_MESSAGE_EVERY) {
00248 last_shedding_warning = t;
00249 RecSignalWarning(REC_SIGNAL_SYSTEM_ERROR, "number of connections reaching shedding limit");
00250 }
00251 }
00252
00253 TS_INLINE int
00254 emergency_throttle(ink_hrtime now)
00255 {
00256 return emergency_throttle_time > now;
00257 }
00258
00259 TS_INLINE int
00260 check_net_throttle(ThrottleType t, ink_hrtime now)
00261 {
00262 int connections = net_connections_to_throttle(t);
00263
00264 if (connections >= net_connections_throttle)
00265 return true;
00266
00267 if (emergency_throttle(now))
00268 return true;
00269
00270 return false;
00271 }
00272
00273 TS_INLINE void
00274 check_throttle_warning()
00275 {
00276 ink_hrtime t = ink_get_hrtime();
00277 if (t - last_throttle_warning > NET_THROTTLE_MESSAGE_EVERY) {
00278 last_throttle_warning = t;
00279 RecSignalWarning(REC_SIGNAL_SYSTEM_ERROR, "too many connections, throttling");
00280
00281 }
00282 }
00283
00284
00285
00286
00287
00288
00289
00290
00291
00292
00293
00294
00295 TS_INLINE int
00296 check_emergency_throttle(Connection & con)
00297 {
00298 int fd = con.fd;
00299 int emergency = fds_limit - EMERGENCY_THROTTLE;
00300 if (fd > emergency) {
00301 int over = fd - emergency;
00302 emergency_throttle_time = ink_get_hrtime() + (over * over) * HRTIME_SECOND;
00303 RecSignalWarning(REC_SIGNAL_SYSTEM_ERROR, "too many open file descriptors, emergency throttling");
00304 int hyper_emergency = fds_limit - HYPER_EMERGENCY_THROTTLE;
00305 if (fd > hyper_emergency)
00306 con.close();
00307 return true;
00308 }
00309 return false;
00310 }
00311
00312
00313 TS_INLINE int
00314 change_net_connections_throttle(const char *token, RecDataT data_type, RecData value, void *data)
00315 {
00316 (void) token;
00317 (void) data_type;
00318 (void) value;
00319 (void) data;
00320 int throttle = fds_limit - THROTTLE_FD_HEADROOM;
00321 if (fds_throttle < 0)
00322 net_connections_throttle = throttle;
00323 else {
00324 net_connections_throttle = fds_throttle;
00325 if (net_connections_throttle > throttle)
00326 net_connections_throttle = throttle;
00327 }
00328 return 0;
00329 }
00330
00331
00332
00333
00334
00335 TS_INLINE int
00336 accept_error_seriousness(int res)
00337 {
00338 switch (res) {
00339 case -EAGAIN:
00340 case -ECONNABORTED:
00341 case -ECONNRESET:
00342 case -EPIPE:
00343 return 1;
00344 case -EMFILE:
00345 case -ENOMEM:
00346 #if defined(ENOSR) && !defined(freebsd)
00347 case -ENOSR:
00348 #endif
00349 ink_assert(!"throttling misconfigured: set too high");
00350 #ifdef ENOBUFS
00351 case -ENOBUFS:
00352 #endif
00353 #ifdef ENFILE
00354 case -ENFILE:
00355 #endif
00356 return 0;
00357 case -EINTR:
00358 ink_assert(!"should be handled at a lower level");
00359 return 0;
00360 #if defined(EPROTO) && !defined(freebsd)
00361 case -EPROTO:
00362 #endif
00363 case -EOPNOTSUPP:
00364 case -ENOTSOCK:
00365 case -ENODEV:
00366 case -EBADF:
00367 default:
00368 return -1;
00369 }
00370 }
00371
00372 TS_INLINE void
00373 check_transient_accept_error(int res)
00374 {
00375 ink_hrtime t = ink_get_hrtime();
00376 if (!last_transient_accept_error || t - last_transient_accept_error > TRANSIENT_ACCEPT_ERROR_MESSAGE_EVERY) {
00377 last_transient_accept_error = t;
00378 Warning("accept thread received transient error: errno = %d", -res);
00379 #if defined(linux)
00380 if (res == -ENOBUFS || res == -ENFILE)
00381 Warning("errno : %d consider a memory upgrade", -res);
00382 #endif
00383 }
00384 }
00385
00386
00387
00388
00389 static inline void
00390 read_disable(NetHandler * nh, UnixNetVConnection * vc)
00391 {
00392 #ifdef INACTIVITY_TIMEOUT
00393 if (vc->inactivity_timeout) {
00394 if (!vc->write.enabled) {
00395 vc->inactivity_timeout->cancel_action();
00396 vc->inactivity_timeout = NULL;
00397 }
00398 }
00399 #else
00400 if (!vc->write.enabled)
00401 vc->next_inactivity_timeout_at = 0;
00402 #endif
00403 vc->read.enabled = 0;
00404 nh->read_ready_list.remove(vc);
00405 vc->ep.modify(-EVENTIO_READ);
00406 }
00407
00408 static inline void
00409 write_disable(NetHandler * nh, UnixNetVConnection * vc)
00410 {
00411 #ifdef INACTIVITY_TIMEOUT
00412 if (vc->inactivity_timeout) {
00413 if (!vc->read.enabled) {
00414 vc->inactivity_timeout->cancel_action();
00415 vc->inactivity_timeout = NULL;
00416 }
00417 }
00418 #else
00419 if (!vc->read.enabled)
00420 vc->next_inactivity_timeout_at = 0;
00421 #endif
00422 vc->write.enabled = 0;
00423 nh->write_ready_list.remove(vc);
00424 vc->ep.modify(-EVENTIO_WRITE);
00425 }
00426
00427 TS_INLINE int EventIO::start(EventLoop l, DNSConnection *vc, int events) {
00428 type = EVENTIO_DNS_CONNECTION;
00429 return start(l, vc->fd, (Continuation*)vc, events);
00430 }
00431 TS_INLINE int EventIO::start(EventLoop l, NetAccept *vc, int events) {
00432 type = EVENTIO_NETACCEPT;
00433 return start(l, vc->server.fd, (Continuation*)vc, events);
00434 }
00435 TS_INLINE int EventIO::start(EventLoop l, UnixNetVConnection *vc, int events) {
00436 type = EVENTIO_READWRITE_VC;
00437 return start(l, vc->con.fd, (Continuation*)vc, events);
00438 }
00439 TS_INLINE int EventIO::start(EventLoop l, UnixUDPConnection *vc, int events) {
00440 type = EVENTIO_UDP_CONNECTION;
00441 return start(l, vc->fd, (Continuation*)vc, events);
00442 }
00443 TS_INLINE int EventIO::close() {
00444 stop();
00445 switch (type) {
00446 default: ink_assert(!"case");
00447 case EVENTIO_DNS_CONNECTION: return data.dnscon->close(); break;
00448 case EVENTIO_NETACCEPT: return data.na->server.close(); break;
00449 case EVENTIO_READWRITE_VC: return data.vc->con.close(); break;
00450 }
00451 return -1;
00452 }
00453
00454 TS_INLINE int EventIO::start(EventLoop l, int afd, Continuation *c, int e) {
00455 data.c = c;
00456 fd = afd;
00457 event_loop = l;
00458 #if TS_USE_EPOLL
00459 struct epoll_event ev;
00460 memset(&ev, 0, sizeof(ev));
00461 ev.events = e;
00462 ev.data.ptr = this;
00463 #ifndef USE_EDGE_TRIGGER
00464 events = e;
00465 #endif
00466 return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
00467 #endif
00468 #if TS_USE_KQUEUE
00469 events = e;
00470 struct kevent ev[2];
00471 int n = 0;
00472 if (e & EVENTIO_READ)
00473 EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD|INK_EV_EDGE_TRIGGER, 0, 0, this);
00474 if (e & EVENTIO_WRITE)
00475 EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD|INK_EV_EDGE_TRIGGER, 0, 0, this);
00476 return kevent(l->kqueue_fd, &ev[0], n, NULL, 0, NULL);
00477 #endif
00478 #if TS_USE_PORT
00479 events = e;
00480 int retval = port_associate(event_loop->port_fd, PORT_SOURCE_FD, fd, events, this);
00481 Debug("iocore_eventio", "[EventIO::start] e(%d), events(%d), %d[%s]=port_associate(%d,%d,%d,%d,%p)", e, events, retval, retval<0? strerror(errno) : "ok", event_loop->port_fd, PORT_SOURCE_FD, fd, events, this);
00482 return retval;
00483 #endif
00484 }
00485
00486 TS_INLINE int EventIO::modify(int e) {
00487 #if TS_USE_EPOLL && !defined(USE_EDGE_TRIGGER)
00488 struct epoll_event ev;
00489 memset(&ev, 0, sizeof(ev));
00490 int new_events = events, old_events = events;
00491 if (e < 0)
00492 new_events &= ~(-e);
00493 else
00494 new_events |= e;
00495 events = new_events;
00496 ev.events = new_events;
00497 ev.data.ptr = this;
00498 if (!new_events)
00499 return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_DEL, fd, &ev);
00500 else if (!old_events)
00501 return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
00502 else
00503 return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_MOD, fd, &ev);
00504 #endif
00505 #if TS_USE_KQUEUE && !defined(USE_EDGE_TRIGGER)
00506 int n = 0;
00507 struct kevent ev[2];
00508 int ee = events;
00509 if (e < 0) {
00510 ee &= ~(-e);
00511 if ((-e) & EVENTIO_READ)
00512 EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, this);
00513 if ((-e) & EVENTIO_WRITE)
00514 EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, this);
00515 } else {
00516 ee |= e;
00517 if (e & EVENTIO_READ)
00518 EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD|INK_EV_EDGE_TRIGGER, 0, 0, this);
00519 if (e & EVENTIO_WRITE)
00520 EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD|INK_EV_EDGE_TRIGGER, 0, 0, this);
00521 }
00522 events = ee;
00523 if (n)
00524 return kevent(event_loop->kqueue_fd, &ev[0], n, NULL, 0, NULL);
00525 else
00526 return 0;
00527 #endif
00528 #if TS_USE_PORT
00529 int n = 0;
00530 int ne = e;
00531 if (e < 0) {
00532 if (((-e) & events)) {
00533 ne = ~(-e) & events;
00534 if ((-e) & EVENTIO_READ)
00535 n++;
00536 if ((-e) & EVENTIO_WRITE)
00537 n++;
00538 }
00539 } else {
00540 if (!(e & events)) {
00541 ne = events | e;
00542 if (e & EVENTIO_READ)
00543 n++;
00544 if (e & EVENTIO_WRITE)
00545 n++;
00546 }
00547 }
00548 if (n && ne && event_loop) {
00549 events = ne;
00550 int retval = port_associate(event_loop->port_fd, PORT_SOURCE_FD, fd, events, this);
00551 Debug("iocore_eventio", "[EventIO::modify] e(%d), ne(%d), events(%d), %d[%s]=port_associate(%d,%d,%d,%d,%p)", e, ne, events, retval, retval<0? strerror(errno) : "ok", event_loop->port_fd, PORT_SOURCE_FD, fd, events, this);
00552 return retval;
00553 }
00554 return 0;
00555 #else
00556 (void)e;
00557 return 0;
00558 #endif
00559 }
00560
00561 TS_INLINE int EventIO::refresh(int e) {
00562 #if TS_USE_KQUEUE && defined(USE_EDGE_TRIGGER)
00563 e = e & events;
00564 struct kevent ev[2];
00565 int n = 0;
00566 if (e & EVENTIO_READ)
00567 EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD|INK_EV_EDGE_TRIGGER, 0, 0, this);
00568 if (e & EVENTIO_WRITE)
00569 EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD|INK_EV_EDGE_TRIGGER, 0, 0, this);
00570 if (n)
00571 return kevent(event_loop->kqueue_fd, &ev[0], n, NULL, 0, NULL);
00572 else
00573 return 0;
00574 #endif
00575 #if TS_USE_PORT
00576 int n = 0;
00577 int ne = e;
00578 if ((e & events)) {
00579 ne = events | e;
00580 if (e & EVENTIO_READ)
00581 n++;
00582 if (e & EVENTIO_WRITE)
00583 n++;
00584 if (n && ne && event_loop) {
00585 events = ne;
00586 int retval = port_associate(event_loop->port_fd, PORT_SOURCE_FD, fd, events, this);
00587 Debug("iocore_eventio", "[EventIO::refresh] e(%d), ne(%d), events(%d), %d[%s]=port_associate(%d,%d,%d,%d,%p)",
00588 e, ne, events, retval, retval<0? strerror(errno) : "ok", event_loop->port_fd, PORT_SOURCE_FD, fd, events, this);
00589 return retval;
00590 }
00591 }
00592 return 0;
00593 #else
00594 (void)e;
00595 return 0;
00596 #endif
00597 }
00598
00599
00600 TS_INLINE int EventIO::stop() {
00601 if (event_loop) {
00602 #if TS_USE_EPOLL
00603 struct epoll_event ev;
00604 memset(&ev, 0, sizeof(struct epoll_event));
00605 ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
00606 return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_DEL, fd, &ev);
00607 #endif
00608 #if TS_USE_PORT
00609 int retval = port_dissociate(event_loop->port_fd, PORT_SOURCE_FD, fd);
00610 Debug("iocore_eventio", "[EventIO::stop] %d[%s]=port_dissociate(%d,%d,%d)", retval, retval<0? strerror(errno) : "ok", event_loop->port_fd, PORT_SOURCE_FD, fd);
00611 return retval;
00612 #endif
00613 event_loop = 0;
00614 }
00615 return 0;
00616 }
00617
00618 #endif