00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 #include "P_Net.h"
00025 
00026 #ifdef ROUNDUP
00027 #undef ROUNDUP
00028 #endif
00029 #define ROUNDUP(x, y) ((((x)+((y)-1))/(y))*(y))
00030 
00031 typedef int (NetAccept::*NetAcceptHandler) (int, void *);
00032 volatile int dummy_volatile = 0;
00033 int accept_till_done = 1;
00034 
00035 static void
00036 safe_delay(int msec)
00037 {
00038   socketManager.poll(0, 0, msec);
00039 }
00040 
00041 
00042 
00043 
00044 
00045 
00046 static int
00047 send_throttle_message(NetAccept * na)
00048 {
00049   struct pollfd afd;
00050   Connection con[100];
00051   char dummy_read_request[4096];
00052 
00053   afd.fd = na->server.fd;
00054   afd.events = POLLIN;
00055 
00056   int n = 0;
00057   while (check_net_throttle(ACCEPT, ink_get_hrtime()) && n < THROTTLE_AT_ONCE - 1
00058          && (socketManager.poll(&afd, 1, 0) > 0)) {
00059     int res = 0;
00060     if ((res = na->server.accept(&con[n])) < 0)
00061       return res;
00062     n++;
00063   }
00064   safe_delay(NET_THROTTLE_DELAY / 2);
00065   int i = 0;
00066   for (i = 0; i < n; i++) {
00067     socketManager.read(con[i].fd, dummy_read_request, 4096);
00068     socketManager.write(con[i].fd, unix_netProcessor.throttle_error_message,
00069                         strlen(unix_netProcessor.throttle_error_message));
00070   }
00071   safe_delay(NET_THROTTLE_DELAY / 2);
00072   for (i = 0; i < n; i++)
00073     con[i].close();
00074   return 0;
00075 }
00076 
00077 
00078 
00079 
00080 
00081 int
00082 net_accept(NetAccept * na, void *ep, bool blockable)
00083 {
00084   Event *e = (Event *) ep;
00085   int res = 0;
00086   int count = 0;
00087   int loop = accept_till_done;
00088   UnixNetVConnection *vc = NULL;
00089 
00090   if (!blockable)
00091     if (!MUTEX_TAKE_TRY_LOCK_FOR(na->action_->mutex, e->ethread, na->action_->continuation))
00092       return 0;
00093   
00094   
00095   do {
00096     vc = (UnixNetVConnection *) na->alloc_cache;
00097     if (!vc) {
00098       vc = (UnixNetVConnection *)na->getNetProcessor()->allocate_vc(e->ethread);
00099       NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, 1);
00100       vc->id = net_next_connection_number();
00101       na->alloc_cache = vc;
00102     }
00103     if ((res = na->server.accept(&vc->con)) < 0) {
00104       if (res == -EAGAIN || res == -ECONNABORTED || res == -EPIPE)
00105         goto Ldone;
00106       if (na->server.fd != NO_FD && !na->action_->cancelled) {
00107         if (!blockable)
00108           na->action_->continuation->handleEvent(EVENT_ERROR, (void *)(intptr_t)res);
00109         else {
00110           MUTEX_LOCK(lock, na->action_->mutex, e->ethread);
00111           na->action_->continuation->handleEvent(EVENT_ERROR, (void *)(intptr_t)res);
00112         }
00113       }
00114       count = res;
00115       goto Ldone;
00116     }
00117     count++;
00118     na->alloc_cache = NULL;
00119 
00120     vc->submit_time = ink_get_hrtime();
00121     ats_ip_copy(&vc->server_addr, &vc->con.addr);
00122     vc->mutex = new_ProxyMutex();
00123     vc->action_ = *na->action_;
00124     vc->set_is_transparent(na->server.f_inbound_transparent);
00125     vc->closed  = 0;
00126     SET_CONTINUATION_HANDLER(vc, (NetVConnHandler) & UnixNetVConnection::acceptEvent);
00127 
00128     if (e->ethread->is_event_type(na->etype))
00129       vc->handleEvent(EVENT_NONE, e);
00130     else
00131       eventProcessor.schedule_imm(vc, na->etype);
00132   } while (loop);
00133 
00134 Ldone:
00135   if (!blockable)
00136     MUTEX_UNTAKE_LOCK(na->action_->mutex, e->ethread);
00137   return count;
00138 }
00139 
00140 
00141 
00142 
00143 
00144 void
00145 NetAccept::init_accept_loop(const char *thr_name)
00146 {
00147   size_t stacksize;
00148 
00149   REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
00150   SET_CONTINUATION_HANDLER(this, &NetAccept::acceptLoopEvent);
00151   eventProcessor.spawn_thread(this, thr_name, stacksize);
00152 }
00153 
00154 
00155 
00156 
00157 
00158 
00159 
00160 
00161 
00162 void
00163 NetAccept::init_accept(EThread * t)
00164 {
00165   if (!t)
00166     t = eventProcessor.assign_thread(etype);
00167 
00168   if (!action_->continuation->mutex) {
00169     action_->continuation->mutex = t->mutex;
00170     action_->mutex = t->mutex;
00171   }
00172   if (do_listen(NON_BLOCKING))
00173     return;
00174   SET_HANDLER((NetAcceptHandler) & NetAccept::acceptEvent);
00175   period = ACCEPT_PERIOD;
00176   t->schedule_every(this, period, etype);
00177 }
00178 
00179 
00180 void
00181 NetAccept::init_accept_per_thread()
00182 {
00183   int i, n;
00184 
00185   if (do_listen(NON_BLOCKING))
00186     return;
00187   if (accept_fn == net_accept)
00188     SET_HANDLER((NetAcceptHandler) & NetAccept::acceptFastEvent);
00189   else
00190     SET_HANDLER((NetAcceptHandler) & NetAccept::acceptEvent);
00191   period = ACCEPT_PERIOD;
00192 
00193   NetAccept *a;
00194   n = eventProcessor.n_threads_for_type[ET_NET];
00195   for (i = 0; i < n; i++) {
00196     if (i < n - 1)
00197       a = clone();
00198     else
00199       a = this;
00200     EThread *t = eventProcessor.eventthread[ET_NET][i];
00201     PollDescriptor *pd = get_PollDescriptor(t);
00202     if (a->ep.start(pd, a, EVENTIO_READ) < 0)
00203       Warning("[NetAccept::init_accept_per_thread]:error starting EventIO");
00204     a->mutex = get_NetHandler(t)->mutex;
00205     t->schedule_every(a, period, etype);
00206   }
00207 }
00208 
00209 int
00210 NetAccept::do_listen(bool non_blocking, bool transparent)
00211 {
00212   int res = 0;
00213 
00214   if (server.fd != NO_FD) {
00215     if ((res = server.setup_fd_for_listen(non_blocking, recv_bufsize, send_bufsize, transparent))) {
00216 
00217       Warning("unable to listen on main accept port %d: errno = %d, %s", ntohs(server.accept_addr.port()), errno, strerror(errno));
00218       goto Lretry;
00219     }
00220   } else {
00221   Lretry:
00222     if ((res = server.listen(non_blocking, recv_bufsize, send_bufsize, transparent)))
00223       Warning("unable to listen on port %d: %d %d, %s", ntohs(server.accept_addr.port()), res, errno, strerror(errno));
00224   }
00225   if (callback_on_open && !action_->cancelled) {
00226     if (res)
00227       action_->continuation->handleEvent(NET_EVENT_ACCEPT_FAILED, this);
00228     else
00229       action_->continuation->handleEvent(NET_EVENT_ACCEPT_SUCCEED, this);
00230     mutex = NULL;
00231   }
00232   return res;
00233 }
00234 
00235 int
00236 NetAccept::do_blocking_accept(EThread * t)
00237 {
00238   int res = 0;
00239   int loop = accept_till_done;
00240   UnixNetVConnection *vc = NULL;
00241   Connection con;
00242 
00243   
00244   
00245   do {
00246     ink_hrtime now = ink_get_hrtime();
00247 
00248     
00249 
00250     while (!backdoor && check_net_throttle(ACCEPT, now)) {
00251       check_throttle_warning();
00252       if (!unix_netProcessor.throttle_error_message) {
00253         safe_delay(NET_THROTTLE_DELAY);
00254       } else if (send_throttle_message(this) < 0) {
00255         goto Lerror;
00256       }
00257       now = ink_get_hrtime();
00258     }
00259 
00260     if ((res = server.accept(&con)) < 0) {
00261     Lerror:
00262       int seriousness = accept_error_seriousness(res);
00263       if (seriousness >= 0) {   
00264         if (!seriousness)       
00265           check_transient_accept_error(res);
00266         safe_delay(NET_THROTTLE_DELAY);
00267         return 0;
00268       }
00269       if (!action_->cancelled) {
00270         MUTEX_LOCK(lock, action_->mutex, t);
00271         action_->continuation->handleEvent(EVENT_ERROR, (void *)(intptr_t)res);
00272         MUTEX_UNTAKE_LOCK(action_->mutex, t);
00273         Warning("accept thread received fatal error: errno = %d", errno);
00274       }
00275       return -1;
00276     }
00277 
00278 #if TS_HAS_SO_MARK
00279       if (packet_mark != 0) {
00280         safe_setsockopt(con.fd, SOL_SOCKET, SO_MARK, reinterpret_cast<char *>(&packet_mark), sizeof(uint32_t));
00281       }
00282 #endif
00283 
00284 #if TS_HAS_IP_TOS
00285       if (packet_tos != 0) {
00286         safe_setsockopt(con.fd, IPPROTO_IP, IP_TOS, reinterpret_cast<char *>(&packet_tos), sizeof(uint32_t));
00287       }
00288 #endif
00289 
00290     
00291     vc = (UnixNetVConnection *)this->getNetProcessor()->allocate_vc(NULL);
00292     if (!vc) {
00293       con.close();
00294       return -1;
00295     }
00296     vc->con = con;
00297     vc->from_accept_thread = true;
00298     vc->id = net_next_connection_number();
00299     alloc_cache = NULL;
00300 
00301     check_emergency_throttle(con);
00302 
00303     NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, 1);
00304     vc->submit_time = now;
00305     ats_ip_copy(&vc->server_addr, &vc->con.addr);
00306     vc->set_is_transparent(server.f_inbound_transparent);
00307     vc->mutex = new_ProxyMutex();
00308     vc->action_ = *action_;
00309     SET_CONTINUATION_HANDLER(vc, (NetVConnHandler) & UnixNetVConnection::acceptEvent);
00310     
00311     eventProcessor.schedule_imm_signal(vc, getEtype());
00312   } while (loop);
00313 
00314   return 1;
00315 }
00316 
00317 
00318 int
00319 NetAccept::acceptEvent(int event, void *ep)
00320 {
00321   (void) event;
00322   Event *e = (Event *) ep;
00323   
00324   ProxyMutex *m = 0;
00325 
00326   if (action_->mutex)
00327     m = action_->mutex;
00328   else
00329     m = mutex;
00330   MUTEX_TRY_LOCK(lock, m, e->ethread);
00331   if (lock) {
00332     if (action_->cancelled) {
00333       e->cancel();
00334       NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
00335       delete this;
00336       return EVENT_DONE;
00337     }
00338 
00339     
00340     
00341     
00342       int res;
00343       if ((res = accept_fn(this, e, false)) < 0) {
00344         NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
00345         
00346         Warning("Accept on port %d failed with error no %d",
00347           ats_ip_port_host_order(&server.addr), res
00348         );
00349         Warning("Traffic Server may be unable to accept more network" "connections on %d",
00350           ats_ip_port_host_order(&server.addr)
00351         );
00352         e->cancel();
00353         delete this;
00354         return EVENT_DONE;
00355       }
00356       
00357   }
00358   return EVENT_CONT;
00359 }
00360 
00361 
00362 int
00363 NetAccept::acceptFastEvent(int event, void *ep)
00364 {
00365   Event *e = (Event *) ep;
00366   (void) event;
00367   (void) e;
00368   int bufsz, res;
00369   Connection con;
00370 
00371   PollDescriptor *pd = get_PollDescriptor(e->ethread);
00372   UnixNetVConnection *vc = NULL;
00373   int loop = accept_till_done;
00374 
00375   do {
00376     if (!backdoor && check_net_throttle(ACCEPT, ink_get_hrtime())) {
00377       ifd = -1;
00378       return EVENT_CONT;
00379     }
00380 
00381     socklen_t sz = sizeof(con.addr);
00382     int fd = socketManager.accept(server.fd, &con.addr.sa, &sz);
00383     con.fd = fd;
00384 
00385     if (likely(fd >= 0)) {
00386       Debug("iocore_net", "accepted a new socket: %d", fd);
00387       if (send_bufsize > 0) {
00388         if (unlikely(socketManager.set_sndbuf_size(fd, send_bufsize))) {
00389           bufsz = ROUNDUP(send_bufsize, 1024);
00390           while (bufsz > 0) {
00391             if (!socketManager.set_sndbuf_size(fd, bufsz))
00392               break;
00393             bufsz -= 1024;
00394           }
00395         }
00396       }
00397       if (recv_bufsize > 0) {
00398         if (unlikely(socketManager.set_rcvbuf_size(fd, recv_bufsize))) {
00399           bufsz = ROUNDUP(recv_bufsize, 1024);
00400           while (bufsz > 0) {
00401             if (!socketManager.set_rcvbuf_size(fd, bufsz))
00402               break;
00403             bufsz -= 1024;
00404           }
00405         }
00406       }
00407       if (sockopt_flags & 1) {  
00408         safe_setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, SOCKOPT_ON, sizeof(int));
00409         Debug("socket", "::acceptFastEvent: setsockopt() TCP_NODELAY on socket");
00410       }
00411       if (sockopt_flags & 2) {
00412         safe_setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, SOCKOPT_ON, sizeof(int));
00413         Debug("socket", "::acceptFastEvent: setsockopt() SO_KEEPALIVE on socket");
00414       }
00415 #if TS_HAS_SO_MARK
00416       if (packet_mark != 0) {
00417         safe_setsockopt(fd, SOL_SOCKET, SO_MARK, reinterpret_cast<char *>(&packet_mark), sizeof(uint32_t));
00418       }
00419 #endif
00420 
00421 #if TS_HAS_IP_TOS
00422       if (packet_tos != 0) {
00423         safe_setsockopt(fd, IPPROTO_IP, IP_TOS, reinterpret_cast<char *>(&packet_tos), sizeof(uint32_t));
00424       }
00425 #endif
00426       do {
00427         res = safe_nonblocking(fd);
00428       } while (res < 0 && (errno == EAGAIN || errno == EINTR));
00429 
00430       vc = (UnixNetVConnection *)this->getNetProcessor()->allocate_vc(e->ethread);
00431       if (!vc) {
00432         con.close();
00433         goto Ldone;
00434       }
00435 
00436       vc->con = con;
00437 
00438     } else {
00439       res = fd;
00440     }
00441     if (res < 0) {
00442       res = -errno;
00443       if (res == -EAGAIN || res == -ECONNABORTED
00444 #if defined(linux)
00445           || res == -EPIPE
00446 #endif
00447         ) {
00448         goto Ldone;
00449       } else if (accept_error_seriousness(res) >= 0) {
00450         check_transient_accept_error(res);
00451         goto Ldone;
00452       }
00453       if (!action_->cancelled)
00454         action_->continuation->handleEvent(EVENT_ERROR, (void *)(intptr_t)res);
00455       goto Lerror;
00456     }
00457 
00458     NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, 1);
00459     vc->id = net_next_connection_number();
00460 
00461     vc->submit_time = ink_get_hrtime();
00462     ats_ip_copy(&vc->server_addr, &vc->con.addr);
00463     vc->set_is_transparent(server.f_inbound_transparent);
00464     vc->mutex = new_ProxyMutex();
00465     vc->thread = e->ethread;
00466 
00467     vc->nh = get_NetHandler(e->ethread);
00468 
00469     SET_CONTINUATION_HANDLER(vc, (NetVConnHandler) & UnixNetVConnection::mainEvent);
00470 
00471     if (vc->ep.start(pd, vc, EVENTIO_READ|EVENTIO_WRITE) < 0) {
00472       Warning("[NetAccept::acceptFastEvent]: Error in inserting fd[%d] in kevent\n", vc->con.fd);
00473       close_UnixNetVConnection(vc, e->ethread);
00474       return EVENT_DONE;
00475     }
00476 
00477     vc->nh->open_list.enqueue(vc);
00478 
00479 #ifdef USE_EDGE_TRIGGER
00480     
00481     Debug("iocore_net", "acceptEvent : Setting triggered and adding to the read ready queue");
00482     vc->read.triggered = 1;
00483     vc->nh->read_ready_list.enqueue(vc);
00484 #endif
00485 
00486     if (!action_->cancelled)
00487       action_->continuation->handleEvent(NET_EVENT_ACCEPT, vc);
00488     else
00489       close_UnixNetVConnection(vc, e->ethread);
00490   } while (loop);
00491 
00492 Ldone:
00493   return EVENT_CONT;
00494 
00495 Lerror:
00496   server.close();
00497   e->cancel();
00498   if (vc)
00499     vc->free(e->ethread);
00500   NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
00501   delete this;
00502   return EVENT_DONE;
00503 }
00504 
00505 
00506 int
00507 NetAccept::acceptLoopEvent(int event, Event * e)
00508 {
00509   (void) event;
00510   (void) e;
00511   EThread *t = this_ethread();
00512 
00513   while (1)
00514     do_blocking_accept(t);
00515 
00516   
00517   NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
00518   delete this;
00519   return EVENT_DONE;
00520 }
00521 
00522 
00523 
00524 
00525 
00526 
00527 
00528 NetAccept::NetAccept()
00529   : Continuation(NULL),
00530     period(0),
00531     alloc_cache(0),
00532     ifd(-1),
00533     callback_on_open(false),
00534     backdoor(false),
00535     recv_bufsize(0),
00536     send_bufsize(0),
00537     sockopt_flags(0),
00538     packet_mark(0),
00539     packet_tos(0),
00540     etype(0)
00541 { }
00542 
00543 
00544 
00545 
00546 
00547 
00548 void
00549 NetAccept::cancel()
00550 {
00551   action_->cancel();
00552   server.close();
00553 }
00554 
00555 NetAccept *
00556 NetAccept::clone() const
00557 {
00558   NetAccept *na;
00559   na = new NetAccept;
00560   *na = *this;
00561   return na;
00562 }
00563 
00564 
00565 
00566 
00567 EventType NetAccept::getEtype() const
00568 {
00569   return etype;
00570 }
00571 
00572 NetProcessor *
00573 NetAccept::getNetProcessor() const
00574 {
00575   return &netProcessor;
00576 }