• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

UnixNetAccept.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 #include "P_Net.h"
00025 
00026 #ifdef ROUNDUP
00027 #undef ROUNDUP
00028 #endif
00029 #define ROUNDUP(x, y) ((((x)+((y)-1))/(y))*(y))
00030 
00031 typedef int (NetAccept::*NetAcceptHandler) (int, void *);
00032 volatile int dummy_volatile = 0;
00033 int accept_till_done = 1;
00034 
00035 static void
00036 safe_delay(int msec)
00037 {
00038   socketManager.poll(0, 0, msec);
00039 }
00040 
00041 
00042 //
00043 // Send the throttling message to up to THROTTLE_AT_ONCE connections,
00044 // delaying to let some of the current connections complete
00045 //
00046 static int
00047 send_throttle_message(NetAccept * na)
00048 {
00049   struct pollfd afd;
00050   Connection con[100];
00051   char dummy_read_request[4096];
00052 
00053   afd.fd = na->server.fd;
00054   afd.events = POLLIN;
00055 
00056   int n = 0;
00057   while (check_net_throttle(ACCEPT, ink_get_hrtime()) && n < THROTTLE_AT_ONCE - 1
00058          && (socketManager.poll(&afd, 1, 0) > 0)) {
00059     int res = 0;
00060     if ((res = na->server.accept(&con[n])) < 0)
00061       return res;
00062     n++;
00063   }
00064   safe_delay(NET_THROTTLE_DELAY / 2);
00065   int i = 0;
00066   for (i = 0; i < n; i++) {
00067     socketManager.read(con[i].fd, dummy_read_request, 4096);
00068     socketManager.write(con[i].fd, unix_netProcessor.throttle_error_message,
00069                         strlen(unix_netProcessor.throttle_error_message));
00070   }
00071   safe_delay(NET_THROTTLE_DELAY / 2);
00072   for (i = 0; i < n; i++)
00073     con[i].close();
00074   return 0;
00075 }
00076 
00077 
00078 //
00079 // General case network connection accept code
00080 //
00081 int
00082 net_accept(NetAccept * na, void *ep, bool blockable)
00083 {
00084   Event *e = (Event *) ep;
00085   int res = 0;
00086   int count = 0;
00087   int loop = accept_till_done;
00088   UnixNetVConnection *vc = NULL;
00089 
00090   if (!blockable)
00091     if (!MUTEX_TAKE_TRY_LOCK_FOR(na->action_->mutex, e->ethread, na->action_->continuation))
00092       return 0;
00093   //do-while for accepting all the connections
00094   //added by YTS Team, yamsat
00095   do {
00096     vc = (UnixNetVConnection *) na->alloc_cache;
00097     if (!vc) {
00098       vc = (UnixNetVConnection *)na->getNetProcessor()->allocate_vc(e->ethread);
00099       NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, 1);
00100       vc->id = net_next_connection_number();
00101       na->alloc_cache = vc;
00102     }
00103     if ((res = na->server.accept(&vc->con)) < 0) {
00104       if (res == -EAGAIN || res == -ECONNABORTED || res == -EPIPE)
00105         goto Ldone;
00106       if (na->server.fd != NO_FD && !na->action_->cancelled) {
00107         if (!blockable)
00108           na->action_->continuation->handleEvent(EVENT_ERROR, (void *)(intptr_t)res);
00109         else {
00110           MUTEX_LOCK(lock, na->action_->mutex, e->ethread);
00111           na->action_->continuation->handleEvent(EVENT_ERROR, (void *)(intptr_t)res);
00112         }
00113       }
00114       count = res;
00115       goto Ldone;
00116     }
00117     count++;
00118     na->alloc_cache = NULL;
00119 
00120     vc->submit_time = ink_get_hrtime();
00121     ats_ip_copy(&vc->server_addr, &vc->con.addr);
00122     vc->mutex = new_ProxyMutex();
00123     vc->action_ = *na->action_;
00124     vc->set_is_transparent(na->server.f_inbound_transparent);
00125     vc->closed  = 0;
00126     SET_CONTINUATION_HANDLER(vc, (NetVConnHandler) & UnixNetVConnection::acceptEvent);
00127 
00128     if (e->ethread->is_event_type(na->etype))
00129       vc->handleEvent(EVENT_NONE, e);
00130     else
00131       eventProcessor.schedule_imm(vc, na->etype);
00132   } while (loop);
00133 
00134 Ldone:
00135   if (!blockable)
00136     MUTEX_UNTAKE_LOCK(na->action_->mutex, e->ethread);
00137   return count;
00138 }
00139 
00140 //
00141 // Initialize the NetAccept for execution in its own thread.
00142 // This should be done for low latency, high connection rate sockets.
00143 //
00144 void
00145 NetAccept::init_accept_loop(const char *thr_name)
00146 {
00147   size_t stacksize;
00148 
00149   REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
00150   SET_CONTINUATION_HANDLER(this, &NetAccept::acceptLoopEvent);
00151   eventProcessor.spawn_thread(this, thr_name, stacksize);
00152 }
00153 
00154 
00155 //
00156 // Initialize the NetAccept for execution in a etype thread.
00157 // This should be done for low connection rate sockets.
00158 // (Management, Cluster, etc.)  Also, since it adapts to the
00159 // number of connections arriving, it should be reasonable to
00160 // use it for high connection rates as well.
00161 //
00162 void
00163 NetAccept::init_accept(EThread * t)
00164 {
00165   if (!t)
00166     t = eventProcessor.assign_thread(etype);
00167 
00168   if (!action_->continuation->mutex) {
00169     action_->continuation->mutex = t->mutex;
00170     action_->mutex = t->mutex;
00171   }
00172   if (do_listen(NON_BLOCKING))
00173     return;
00174   SET_HANDLER((NetAcceptHandler) & NetAccept::acceptEvent);
00175   period = ACCEPT_PERIOD;
00176   t->schedule_every(this, period, etype);
00177 }
00178 
00179 
00180 void
00181 NetAccept::init_accept_per_thread()
00182 {
00183   int i, n;
00184 
00185   if (do_listen(NON_BLOCKING))
00186     return;
00187   if (accept_fn == net_accept)
00188     SET_HANDLER((NetAcceptHandler) & NetAccept::acceptFastEvent);
00189   else
00190     SET_HANDLER((NetAcceptHandler) & NetAccept::acceptEvent);
00191   period = ACCEPT_PERIOD;
00192 
00193   NetAccept *a;
00194   n = eventProcessor.n_threads_for_type[ET_NET];
00195   for (i = 0; i < n; i++) {
00196     if (i < n - 1)
00197       a = clone();
00198     else
00199       a = this;
00200     EThread *t = eventProcessor.eventthread[ET_NET][i];
00201     PollDescriptor *pd = get_PollDescriptor(t);
00202     if (a->ep.start(pd, a, EVENTIO_READ) < 0)
00203       Warning("[NetAccept::init_accept_per_thread]:error starting EventIO");
00204     a->mutex = get_NetHandler(t)->mutex;
00205     t->schedule_every(a, period, etype);
00206   }
00207 }
00208 
00209 int
00210 NetAccept::do_listen(bool non_blocking, bool transparent)
00211 {
00212   int res = 0;
00213 
00214   if (server.fd != NO_FD) {
00215     if ((res = server.setup_fd_for_listen(non_blocking, recv_bufsize, send_bufsize, transparent))) {
00216 
00217       Warning("unable to listen on main accept port %d: errno = %d, %s", ntohs(server.accept_addr.port()), errno, strerror(errno));
00218       goto Lretry;
00219     }
00220   } else {
00221   Lretry:
00222     if ((res = server.listen(non_blocking, recv_bufsize, send_bufsize, transparent)))
00223       Warning("unable to listen on port %d: %d %d, %s", ntohs(server.accept_addr.port()), res, errno, strerror(errno));
00224   }
00225   if (callback_on_open && !action_->cancelled) {
00226     if (res)
00227       action_->continuation->handleEvent(NET_EVENT_ACCEPT_FAILED, this);
00228     else
00229       action_->continuation->handleEvent(NET_EVENT_ACCEPT_SUCCEED, this);
00230     mutex = NULL;
00231   }
00232   return res;
00233 }
00234 
00235 int
00236 NetAccept::do_blocking_accept(EThread * t)
00237 {
00238   int res = 0;
00239   int loop = accept_till_done;
00240   UnixNetVConnection *vc = NULL;
00241   Connection con;
00242 
00243   //do-while for accepting all the connections
00244   //added by YTS Team, yamsat
00245   do {
00246     ink_hrtime now = ink_get_hrtime();
00247 
00248     // Throttle accepts
00249 
00250     while (!backdoor && check_net_throttle(ACCEPT, now)) {
00251       check_throttle_warning();
00252       if (!unix_netProcessor.throttle_error_message) {
00253         safe_delay(NET_THROTTLE_DELAY);
00254       } else if (send_throttle_message(this) < 0) {
00255         goto Lerror;
00256       }
00257       now = ink_get_hrtime();
00258     }
00259 
00260     if ((res = server.accept(&con)) < 0) {
00261     Lerror:
00262       int seriousness = accept_error_seriousness(res);
00263       if (seriousness >= 0) {   // not so bad
00264         if (!seriousness)       // bad enough to warn about
00265           check_transient_accept_error(res);
00266         safe_delay(NET_THROTTLE_DELAY);
00267         return 0;
00268       }
00269       if (!action_->cancelled) {
00270         MUTEX_LOCK(lock, action_->mutex, t);
00271         action_->continuation->handleEvent(EVENT_ERROR, (void *)(intptr_t)res);
00272         MUTEX_UNTAKE_LOCK(action_->mutex, t);
00273         Warning("accept thread received fatal error: errno = %d", errno);
00274       }
00275       return -1;
00276     }
00277 
00278 #if TS_HAS_SO_MARK
00279       if (packet_mark != 0) {
00280         safe_setsockopt(con.fd, SOL_SOCKET, SO_MARK, reinterpret_cast<char *>(&packet_mark), sizeof(uint32_t));
00281       }
00282 #endif
00283 
00284 #if TS_HAS_IP_TOS
00285       if (packet_tos != 0) {
00286         safe_setsockopt(con.fd, IPPROTO_IP, IP_TOS, reinterpret_cast<char *>(&packet_tos), sizeof(uint32_t));
00287       }
00288 #endif
00289 
00290     // Use 'NULL' to Bypass thread allocator
00291     vc = (UnixNetVConnection *)this->getNetProcessor()->allocate_vc(NULL);
00292     if (!vc) {
00293       con.close();
00294       return -1;
00295     }
00296     vc->con = con;
00297     vc->from_accept_thread = true;
00298     vc->id = net_next_connection_number();
00299     alloc_cache = NULL;
00300 
00301     check_emergency_throttle(con);
00302 
00303     NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, 1);
00304     vc->submit_time = now;
00305     ats_ip_copy(&vc->server_addr, &vc->con.addr);
00306     vc->set_is_transparent(server.f_inbound_transparent);
00307     vc->mutex = new_ProxyMutex();
00308     vc->action_ = *action_;
00309     SET_CONTINUATION_HANDLER(vc, (NetVConnHandler) & UnixNetVConnection::acceptEvent);
00310     //eventProcessor.schedule_imm(vc, getEtype());
00311     eventProcessor.schedule_imm_signal(vc, getEtype());
00312   } while (loop);
00313 
00314   return 1;
00315 }
00316 
00317 
00318 int
00319 NetAccept::acceptEvent(int event, void *ep)
00320 {
00321   (void) event;
00322   Event *e = (Event *) ep;
00323   //PollDescriptor *pd = get_PollDescriptor(e->ethread);
00324   ProxyMutex *m = 0;
00325 
00326   if (action_->mutex)
00327     m = action_->mutex;
00328   else
00329     m = mutex;
00330   MUTEX_TRY_LOCK(lock, m, e->ethread);
00331   if (lock) {
00332     if (action_->cancelled) {
00333       e->cancel();
00334       NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
00335       delete this;
00336       return EVENT_DONE;
00337     }
00338 
00339     //ink_assert(ifd < 0 || event == EVENT_INTERVAL || (pd->nfds > ifd && pd->pfd[ifd].fd == server.fd));
00340     //if (ifd < 0 || event == EVENT_INTERVAL || (pd->pfd[ifd].revents & (POLLIN | POLLERR | POLLHUP | POLLNVAL))) {
00341     //ink_assert(!"incomplete");
00342       int res;
00343       if ((res = accept_fn(this, e, false)) < 0) {
00344         NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
00345         /* INKqa11179 */
00346         Warning("Accept on port %d failed with error no %d",
00347           ats_ip_port_host_order(&server.addr), res
00348         );
00349         Warning("Traffic Server may be unable to accept more network" "connections on %d",
00350           ats_ip_port_host_order(&server.addr)
00351         );
00352         e->cancel();
00353         delete this;
00354         return EVENT_DONE;
00355       }
00356       //}
00357   }
00358   return EVENT_CONT;
00359 }
00360 
00361 
00362 int
00363 NetAccept::acceptFastEvent(int event, void *ep)
00364 {
00365   Event *e = (Event *) ep;
00366   (void) event;
00367   (void) e;
00368   int bufsz, res;
00369   Connection con;
00370 
00371   PollDescriptor *pd = get_PollDescriptor(e->ethread);
00372   UnixNetVConnection *vc = NULL;
00373   int loop = accept_till_done;
00374 
00375   do {
00376     if (!backdoor && check_net_throttle(ACCEPT, ink_get_hrtime())) {
00377       ifd = -1;
00378       return EVENT_CONT;
00379     }
00380 
00381     socklen_t sz = sizeof(con.addr);
00382     int fd = socketManager.accept(server.fd, &con.addr.sa, &sz);
00383     con.fd = fd;
00384 
00385     if (likely(fd >= 0)) {
00386       Debug("iocore_net", "accepted a new socket: %d", fd);
00387       if (send_bufsize > 0) {
00388         if (unlikely(socketManager.set_sndbuf_size(fd, send_bufsize))) {
00389           bufsz = ROUNDUP(send_bufsize, 1024);
00390           while (bufsz > 0) {
00391             if (!socketManager.set_sndbuf_size(fd, bufsz))
00392               break;
00393             bufsz -= 1024;
00394           }
00395         }
00396       }
00397       if (recv_bufsize > 0) {
00398         if (unlikely(socketManager.set_rcvbuf_size(fd, recv_bufsize))) {
00399           bufsz = ROUNDUP(recv_bufsize, 1024);
00400           while (bufsz > 0) {
00401             if (!socketManager.set_rcvbuf_size(fd, bufsz))
00402               break;
00403             bufsz -= 1024;
00404           }
00405         }
00406       }
00407       if (sockopt_flags & 1) {  // we have to disable Nagle
00408         safe_setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, SOCKOPT_ON, sizeof(int));
00409         Debug("socket", "::acceptFastEvent: setsockopt() TCP_NODELAY on socket");
00410       }
00411       if (sockopt_flags & 2) {
00412         safe_setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, SOCKOPT_ON, sizeof(int));
00413         Debug("socket", "::acceptFastEvent: setsockopt() SO_KEEPALIVE on socket");
00414       }
00415 #if TS_HAS_SO_MARK
00416       if (packet_mark != 0) {
00417         safe_setsockopt(fd, SOL_SOCKET, SO_MARK, reinterpret_cast<char *>(&packet_mark), sizeof(uint32_t));
00418       }
00419 #endif
00420 
00421 #if TS_HAS_IP_TOS
00422       if (packet_tos != 0) {
00423         safe_setsockopt(fd, IPPROTO_IP, IP_TOS, reinterpret_cast<char *>(&packet_tos), sizeof(uint32_t));
00424       }
00425 #endif
00426       do {
00427         res = safe_nonblocking(fd);
00428       } while (res < 0 && (errno == EAGAIN || errno == EINTR));
00429 
00430       vc = (UnixNetVConnection *)this->getNetProcessor()->allocate_vc(e->ethread);
00431       if (!vc) {
00432         con.close();
00433         goto Ldone;
00434       }
00435 
00436       vc->con = con;
00437 
00438     } else {
00439       res = fd;
00440     }
00441     if (res < 0) {
00442       res = -errno;
00443       if (res == -EAGAIN || res == -ECONNABORTED
00444 #if defined(linux)
00445           || res == -EPIPE
00446 #endif
00447         ) {
00448         goto Ldone;
00449       } else if (accept_error_seriousness(res) >= 0) {
00450         check_transient_accept_error(res);
00451         goto Ldone;
00452       }
00453       if (!action_->cancelled)
00454         action_->continuation->handleEvent(EVENT_ERROR, (void *)(intptr_t)res);
00455       goto Lerror;
00456     }
00457 
00458     NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, 1);
00459     vc->id = net_next_connection_number();
00460 
00461     vc->submit_time = ink_get_hrtime();
00462     ats_ip_copy(&vc->server_addr, &vc->con.addr);
00463     vc->set_is_transparent(server.f_inbound_transparent);
00464     vc->mutex = new_ProxyMutex();
00465     vc->thread = e->ethread;
00466 
00467     vc->nh = get_NetHandler(e->ethread);
00468 
00469     SET_CONTINUATION_HANDLER(vc, (NetVConnHandler) & UnixNetVConnection::mainEvent);
00470 
00471     if (vc->ep.start(pd, vc, EVENTIO_READ|EVENTIO_WRITE) < 0) {
00472       Warning("[NetAccept::acceptFastEvent]: Error in inserting fd[%d] in kevent\n", vc->con.fd);
00473       close_UnixNetVConnection(vc, e->ethread);
00474       return EVENT_DONE;
00475     }
00476 
00477     vc->nh->open_list.enqueue(vc);
00478 
00479 #ifdef USE_EDGE_TRIGGER
00480     // Set the vc as triggered and place it in the read ready queue in case there is already data on the socket.
00481     Debug("iocore_net", "acceptEvent : Setting triggered and adding to the read ready queue");
00482     vc->read.triggered = 1;
00483     vc->nh->read_ready_list.enqueue(vc);
00484 #endif
00485 
00486     if (!action_->cancelled)
00487       action_->continuation->handleEvent(NET_EVENT_ACCEPT, vc);
00488     else
00489       close_UnixNetVConnection(vc, e->ethread);
00490   } while (loop);
00491 
00492 Ldone:
00493   return EVENT_CONT;
00494 
00495 Lerror:
00496   server.close();
00497   e->cancel();
00498   if (vc)
00499     vc->free(e->ethread);
00500   NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
00501   delete this;
00502   return EVENT_DONE;
00503 }
00504 
00505 
00506 int
00507 NetAccept::acceptLoopEvent(int event, Event * e)
00508 {
00509   (void) event;
00510   (void) e;
00511   EThread *t = this_ethread();
00512 
00513   while (1)
00514     do_blocking_accept(t);
00515 
00516   // Don't think this ever happens ...
00517   NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat);
00518   delete this;
00519   return EVENT_DONE;
00520 }
00521 
00522 
00523 //
00524 // Accept Event handler
00525 //
00526 //
00527 
00528 NetAccept::NetAccept()
00529   : Continuation(NULL),
00530     period(0),
00531     alloc_cache(0),
00532     ifd(-1),
00533     callback_on_open(false),
00534     backdoor(false),
00535     recv_bufsize(0),
00536     send_bufsize(0),
00537     sockopt_flags(0),
00538     packet_mark(0),
00539     packet_tos(0),
00540     etype(0)
00541 { }
00542 
00543 
00544 //
00545 // Stop listening.  When the next poll takes place, an error will result.
00546 // THIS ONLY WORKS WITH POLLING STYLE ACCEPTS!
00547 //
00548 void
00549 NetAccept::cancel()
00550 {
00551   action_->cancel();
00552   server.close();
00553 }
00554 
00555 NetAccept *
00556 NetAccept::clone() const
00557 {
00558   NetAccept *na;
00559   na = new NetAccept;
00560   *na = *this;
00561   return na;
00562 }
00563 
00564 // Virtual function allows the correct
00565 // etype to be used in NetAccept functions (ET_SSL
00566 // or ET_NET).
00567 EventType NetAccept::getEtype() const
00568 {
00569   return etype;
00570 }
00571 
00572 NetProcessor *
00573 NetAccept::getNetProcessor() const
00574 {
00575   return &netProcessor;
00576 }

Generated by  doxygen 1.7.1