00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "P_Net.h"
00025
00026 #ifdef ROUNDUP
00027 #undef ROUNDUP
00028 #endif
00029 #define ROUNDUP(x, y) ((((x)+((y)-1))/(y))*(y))
00030
00031 typedef int (NetAccept::*NetAcceptHandler) (int, void *);
00032 volatile int dummy_volatile = 0;
00033 int accept_till_done = 1;
00034
00035 static void
00036 safe_delay(int msec)
00037 {
00038 socketManager.poll(0, 0, msec);
00039 }
00040
00041
00042
00043
00044
00045
00046 static int
00047 send_throttle_message(NetAccept * na)
00048 {
00049 struct pollfd afd;
00050 Connection con[100];
00051 char dummy_read_request[4096];
00052
00053 afd.fd = na->server.fd;
00054 afd.events = POLLIN;
00055
00056 int n = 0;
00057 while (check_net_throttle(ACCEPT, ink_get_hrtime()) && n < THROTTLE_AT_ONCE - 1
00058 && (socketManager.poll(&afd, 1, 0) > 0)) {
00059 int res = 0;
00060 if ((res = na->server.accept(&con[n])) < 0)
00061 return res;
00062 n++;
00063 }
00064 safe_delay(NET_THROTTLE_DELAY / 2);
00065 int i = 0;
00066 for (i = 0; i < n; i++) {
00067 socketManager.read(con[i].fd, dummy_read_request, 4096);
00068 socketManager.write(con[i].fd, unix_netProcessor.throttle_error_message,
00069 strlen(unix_netProcessor.throttle_error_message));
00070 }
00071 safe_delay(NET_THROTTLE_DELAY / 2);
00072 for (i = 0; i < n; i++)
00073 con[i].close();
00074 return 0;
00075 }
00076
00077
00078
00079
00080
00081 int
00082 net_accept(NetAccept * na, void *ep, bool blockable)
00083 {
00084 Event *e = (Event *) ep;
00085 int res = 0;
00086 int count = 0;
00087 int loop = accept_till_done;
00088 UnixNetVConnection *vc = NULL;
00089
00090 if (!blockable)
00091 if (!MUTEX_TAKE_TRY_LOCK_FOR(na->action_->mutex, e->ethread, na->action_->continuation))
00092 return 0;
00093
00094
00095 do {
00096 vc = (UnixNetVConnection *) na->alloc_cache;
00097 if (!vc) {
00098 vc = (UnixNetVConnection *)na->getNetProcessor()->allocate_vc(e->ethread);
00099 NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, 1);
00100 vc->id = net_next_connection_number();
00101 na->alloc_cache = vc;
00102 }
00103 if ((res = na->server.accept(&vc->con)) < 0) {
00104 if (res == -EAGAIN || res == -ECONNABORTED || res == -EPIPE)
00105 goto Ldone;
00106 if (na->server.fd != NO_FD && !na->action_->cancelled) {
00107 if (!blockable)
00108 na->action_->continuation->handleEvent(EVENT_ERROR, (void *)(intptr_t)res);
00109 else {
00110 MUTEX_LOCK(lock, na->action_->mutex, e->ethread);
00111 na->action_->continuation->handleEvent(EVENT_ERROR, (void *)(intptr_t)res);
00112 }
00113 }
00114 count = res;
00115 goto Ldone;
00116 }
00117 count++;
00118 na->alloc_cache = NULL;
00119
00120 vc->submit_time = ink_get_hrtime();
00121 ats_ip_copy(&vc->server_addr, &vc->con.addr);
00122 vc->mutex = new_ProxyMutex();
00123 vc->action_ = *na->action_;
00124 vc->set_is_transparent(na->server.f_inbound_transparent);
00125 vc->closed = 0;
00126 SET_CONTINUATION_HANDLER(vc, (NetVConnHandler) & UnixNetVConnection::acceptEvent);
00127
00128 if (e->ethread->is_event_type(na->etype))
00129 vc->handleEvent(EVENT_NONE, e);
00130 else
00131 eventProcessor.schedule_imm(vc, na->etype);
00132 } while (loop);
00133
00134 Ldone:
00135 if (!blockable)
00136 MUTEX_UNTAKE_LOCK(na->action_->mutex, e->ethread);
00137 return count;
00138 }
00139
00140
00141
00142
00143
00144 void
00145 NetAccept::init_accept_loop(const char *thr_name)
00146 {
00147 size_t stacksize;
00148
00149 REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
00150 SET_CONTINUATION_HANDLER(this, &NetAccept::acceptLoopEvent);
00151 eventProcessor.spawn_thread(this, thr_name, stacksize);
00152 }
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162 void
00163 NetAccept::init_accept(EThread * t)
00164 {
00165 if (!t)
00166 t = eventProcessor.assign_thread(etype);
00167
00168 if (!action_->continuation->mutex) {
00169 action_->continuation->mutex = t->mutex;
00170 action_->mutex = t->mutex;
00171 }
00172 if (do_listen(NON_BLOCKING))
00173 return;
00174 SET_HANDLER((NetAcceptHandler) & NetAccept::acceptEvent);
00175 period = ACCEPT_PERIOD;
00176 t->schedule_every(this, period, etype);
00177 }
00178
00179
00180 void
00181 NetAccept::init_accept_per_thread()
00182 {
00183 int i, n;
00184
00185 if (do_listen(NON_BLOCKING))
00186 return;
00187 if (accept_fn == net_accept)
00188 SET_HANDLER((NetAcceptHandler) & NetAccept::acceptFastEvent);
00189 else
00190 SET_HANDLER((NetAcceptHandler) & NetAccept::acceptEvent);
00191 period = ACCEPT_PERIOD;
00192
00193 NetAccept *a;
00194 n = eventProcessor.n_threads_for_type[ET_NET];
00195 for (i = 0; i < n; i++) {
00196 if (i < n - 1)
00197 a = clone();
00198 else
00199 a = this;
00200 EThread *t = eventProcessor.eventthread[ET_NET][i];
00201 PollDescriptor *pd = get_PollDescriptor(t);
00202 if (a->ep.start(pd, a, EVENTIO_READ) < 0)
00203 Warning("[NetAccept::init_accept_per_thread]:error starting EventIO");
00204 a->mutex = get_NetHandler(t)->mutex;
00205 t->schedule_every(a, period, etype);
00206 }
00207 }
00208
00209 int
00210 NetAccept::do_listen(bool non_blocking, bool transparent)
00211 {
00212 int res = 0;
00213
00214 if (server.fd != NO_FD) {
00215 if ((res = server.setup_fd_for_listen(non_blocking, recv_bufsize, send_bufsize, transparent))) {
00216
00217 Warning("unable to listen on main accept port %d: errno = %d, %s", ntohs(server.accept_addr.port()), errno, strerror(errno));
00218 goto Lretry;
00219 }
00220 } else {
00221 Lretry:
00222 if ((res = server.listen(non_blocking, recv_bufsize, send_bufsize, transparent)))
00223 Warning("unable to listen on port %d: %d %d, %s", ntohs(server.accept_addr.port()), res, errno, strerror(errno));
00224 }
00225 if (callback_on_open && !action_->cancelled) {
00226 if (res)
00227 action_->continuation->handleEvent(NET_EVENT_ACCEPT_FAILED, this);
00228 else
00229 action_->continuation->handleEvent(NET_EVENT_ACCEPT_SUCCEED, this);
00230 mutex = NULL;
00231 }
00232 return res;
00233 }
00234
00235 int
00236 NetAccept::do_blocking_accept(EThread * t)
00237 {
00238 int res = 0;
00239 int loop = accept_till_done;
00240 UnixNetVConnection *vc = NULL;
00241 Connection con;
00242
00243
00244
00245 do {
00246 ink_hrtime now = ink_get_hrtime();
00247
00248
00249
00250 while (!backdoor && check_net_throttle(ACCEPT, now)) {
00251 check_throttle_warning();
00252 if (!unix_netProcessor.throttle_error_message) {
00253 safe_delay(NET_THROTTLE_DELAY);
00254 } else if (send_throttle_message(this) < 0) {
00255 goto Lerror;
00256 }
00257 now = ink_get_hrtime();
00258 }
00259
00260 if ((res = server.accept(&con)) < 0) {
00261 Lerror:
00262 int seriousness = accept_error_seriousness(res);
00263 if (seriousness >= 0) {
00264 if (!seriousness)
00265 check_transient_accept_error(res);
00266 safe_delay(NET_THROTTLE_DELAY);
00267 return 0;
00268 }
00269 if (!action_->cancelled) {
00270 MUTEX_LOCK(lock, action_->mutex, t);
00271 action_->continuation->handleEvent(EVENT_ERROR, (void *)(intptr_t)res);
00272 MUTEX_UNTAKE_LOCK(action_->mutex, t);
00273 Warning("accept thread received fatal error: errno = %d", errno);
00274 }
00275 return -1;
00276 }
00277
00278 #if TS_HAS_SO_MARK
00279 if (packet_mark != 0) {
00280 safe_setsockopt(con.fd, SOL_SOCKET, SO_MARK, reinterpret_cast<char *>(&packet_mark), sizeof(uint32_t));
00281 }
00282 #endif
00283
00284 #if TS_HAS_IP_TOS
00285 if (packet_tos != 0) {
00286 safe_setsockopt(con.fd, IPPROTO_IP, IP_TOS, reinterpret_cast<char *>(&packet_tos), sizeof(uint32_t));
00287 }
00288 #endif
00289
00290
00291 vc = (UnixNetVConnection *)this->getNetProcessor()->allocate_vc(NULL);
00292 if (!vc) {
00293 con.close();
00294 return -1;
00295 }
00296 vc->con = con;
00297 vc->from_accept_thread = true;
00298 vc->id = net_next_connection_number();
00299 alloc_cache = NULL;
00300
00301 check_emergency_throttle(con);
00302
00303 NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, 1);
00304 vc->submit_time = now;
00305 ats_ip_copy(&vc->server_addr, &vc->con.addr);
00306 vc->set_is_transparent(server.f_inbound_transparent);
00307 vc->mutex = new_ProxyMutex();
00308 vc->action_ = *action_;
00309 SET_CONTINUATION_HANDLER(vc, (NetVConnHandler) & UnixNetVConnection::acceptEvent);
00310
00311 eventProcessor.schedule_imm_signal(vc, getEtype());
00312 } while (loop);
00313
00314 return 1;
00315 }
00316
00317
00318 int
00319 NetAccept::acceptEvent(int event, void *ep)
00320 {
00321 (void) event;
00322 Event *e = (Event *) ep;
00323
00324 ProxyMutex *m = 0;
00325
00326 if (action_->mutex)
00327 m = action_->mutex;
00328 else
00329 m = mutex;
00330 MUTEX_TRY_LOCK(lock, m, e->ethread);
00331 if (lock) {
00332 if (action_->cancelled) {
00333 e->cancel();
00334 NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
00335 delete this;
00336 return EVENT_DONE;
00337 }
00338
00339
00340
00341
00342 int res;
00343 if ((res = accept_fn(this, e, false)) < 0) {
00344 NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
00345
00346 Warning("Accept on port %d failed with error no %d",
00347 ats_ip_port_host_order(&server.addr), res
00348 );
00349 Warning("Traffic Server may be unable to accept more network" "connections on %d",
00350 ats_ip_port_host_order(&server.addr)
00351 );
00352 e->cancel();
00353 delete this;
00354 return EVENT_DONE;
00355 }
00356
00357 }
00358 return EVENT_CONT;
00359 }
00360
00361
00362 int
00363 NetAccept::acceptFastEvent(int event, void *ep)
00364 {
00365 Event *e = (Event *) ep;
00366 (void) event;
00367 (void) e;
00368 int bufsz, res;
00369 Connection con;
00370
00371 PollDescriptor *pd = get_PollDescriptor(e->ethread);
00372 UnixNetVConnection *vc = NULL;
00373 int loop = accept_till_done;
00374
00375 do {
00376 if (!backdoor && check_net_throttle(ACCEPT, ink_get_hrtime())) {
00377 ifd = -1;
00378 return EVENT_CONT;
00379 }
00380
00381 socklen_t sz = sizeof(con.addr);
00382 int fd = socketManager.accept(server.fd, &con.addr.sa, &sz);
00383 con.fd = fd;
00384
00385 if (likely(fd >= 0)) {
00386 Debug("iocore_net", "accepted a new socket: %d", fd);
00387 if (send_bufsize > 0) {
00388 if (unlikely(socketManager.set_sndbuf_size(fd, send_bufsize))) {
00389 bufsz = ROUNDUP(send_bufsize, 1024);
00390 while (bufsz > 0) {
00391 if (!socketManager.set_sndbuf_size(fd, bufsz))
00392 break;
00393 bufsz -= 1024;
00394 }
00395 }
00396 }
00397 if (recv_bufsize > 0) {
00398 if (unlikely(socketManager.set_rcvbuf_size(fd, recv_bufsize))) {
00399 bufsz = ROUNDUP(recv_bufsize, 1024);
00400 while (bufsz > 0) {
00401 if (!socketManager.set_rcvbuf_size(fd, bufsz))
00402 break;
00403 bufsz -= 1024;
00404 }
00405 }
00406 }
00407 if (sockopt_flags & 1) {
00408 safe_setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, SOCKOPT_ON, sizeof(int));
00409 Debug("socket", "::acceptFastEvent: setsockopt() TCP_NODELAY on socket");
00410 }
00411 if (sockopt_flags & 2) {
00412 safe_setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, SOCKOPT_ON, sizeof(int));
00413 Debug("socket", "::acceptFastEvent: setsockopt() SO_KEEPALIVE on socket");
00414 }
00415 #if TS_HAS_SO_MARK
00416 if (packet_mark != 0) {
00417 safe_setsockopt(fd, SOL_SOCKET, SO_MARK, reinterpret_cast<char *>(&packet_mark), sizeof(uint32_t));
00418 }
00419 #endif
00420
00421 #if TS_HAS_IP_TOS
00422 if (packet_tos != 0) {
00423 safe_setsockopt(fd, IPPROTO_IP, IP_TOS, reinterpret_cast<char *>(&packet_tos), sizeof(uint32_t));
00424 }
00425 #endif
00426 do {
00427 res = safe_nonblocking(fd);
00428 } while (res < 0 && (errno == EAGAIN || errno == EINTR));
00429
00430 vc = (UnixNetVConnection *)this->getNetProcessor()->allocate_vc(e->ethread);
00431 if (!vc) {
00432 con.close();
00433 goto Ldone;
00434 }
00435
00436 vc->con = con;
00437
00438 } else {
00439 res = fd;
00440 }
00441 if (res < 0) {
00442 res = -errno;
00443 if (res == -EAGAIN || res == -ECONNABORTED
00444 #if defined(linux)
00445 || res == -EPIPE
00446 #endif
00447 ) {
00448 goto Ldone;
00449 } else if (accept_error_seriousness(res) >= 0) {
00450 check_transient_accept_error(res);
00451 goto Ldone;
00452 }
00453 if (!action_->cancelled)
00454 action_->continuation->handleEvent(EVENT_ERROR, (void *)(intptr_t)res);
00455 goto Lerror;
00456 }
00457
00458 NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, 1);
00459 vc->id = net_next_connection_number();
00460
00461 vc->submit_time = ink_get_hrtime();
00462 ats_ip_copy(&vc->server_addr, &vc->con.addr);
00463 vc->set_is_transparent(server.f_inbound_transparent);
00464 vc->mutex = new_ProxyMutex();
00465 vc->thread = e->ethread;
00466
00467 vc->nh = get_NetHandler(e->ethread);
00468
00469 SET_CONTINUATION_HANDLER(vc, (NetVConnHandler) & UnixNetVConnection::mainEvent);
00470
00471 if (vc->ep.start(pd, vc, EVENTIO_READ|EVENTIO_WRITE) < 0) {
00472 Warning("[NetAccept::acceptFastEvent]: Error in inserting fd[%d] in kevent\n", vc->con.fd);
00473 close_UnixNetVConnection(vc, e->ethread);
00474 return EVENT_DONE;
00475 }
00476
00477 vc->nh->open_list.enqueue(vc);
00478
00479 #ifdef USE_EDGE_TRIGGER
00480
00481 Debug("iocore_net", "acceptEvent : Setting triggered and adding to the read ready queue");
00482 vc->read.triggered = 1;
00483 vc->nh->read_ready_list.enqueue(vc);
00484 #endif
00485
00486 if (!action_->cancelled)
00487 action_->continuation->handleEvent(NET_EVENT_ACCEPT, vc);
00488 else
00489 close_UnixNetVConnection(vc, e->ethread);
00490 } while (loop);
00491
00492 Ldone:
00493 return EVENT_CONT;
00494
00495 Lerror:
00496 server.close();
00497 e->cancel();
00498 if (vc)
00499 vc->free(e->ethread);
00500 NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
00501 delete this;
00502 return EVENT_DONE;
00503 }
00504
00505
00506 int
00507 NetAccept::acceptLoopEvent(int event, Event * e)
00508 {
00509 (void) event;
00510 (void) e;
00511 EThread *t = this_ethread();
00512
00513 while (1)
00514 do_blocking_accept(t);
00515
00516
00517 NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
00518 delete this;
00519 return EVENT_DONE;
00520 }
00521
00522
00523
00524
00525
00526
00527
00528 NetAccept::NetAccept()
00529 : Continuation(NULL),
00530 period(0),
00531 alloc_cache(0),
00532 ifd(-1),
00533 callback_on_open(false),
00534 backdoor(false),
00535 recv_bufsize(0),
00536 send_bufsize(0),
00537 sockopt_flags(0),
00538 packet_mark(0),
00539 packet_tos(0),
00540 etype(0)
00541 { }
00542
00543
00544
00545
00546
00547
00548 void
00549 NetAccept::cancel()
00550 {
00551 action_->cancel();
00552 server.close();
00553 }
00554
00555 NetAccept *
00556 NetAccept::clone() const
00557 {
00558 NetAccept *na;
00559 na = new NetAccept;
00560 *na = *this;
00561 return na;
00562 }
00563
00564
00565
00566
00567 EventType NetAccept::getEtype() const
00568 {
00569 return etype;
00570 }
00571
00572 NetProcessor *
00573 NetAccept::getNetProcessor() const
00574 {
00575 return &netProcessor;
00576 }