• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

UnixNetProcessor.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 #include "P_Net.h"
00025 
00026 // For Stat Pages
00027 #include "StatPages.h"
00028 
00029 NetProcessor::AcceptOptions const NetProcessor::DEFAULT_ACCEPT_OPTIONS;
00030 
00031 NetProcessor::AcceptOptions&
00032 NetProcessor::AcceptOptions::reset()
00033 {
00034   local_port = 0;
00035   local_ip.invalidate();
00036   accept_threads = -1;
00037   ip_family = AF_INET;
00038   etype = ET_NET;
00039   f_callback_on_open = false;
00040   localhost_only = false;
00041   frequent_accept = true;
00042   backdoor = false;
00043   recv_bufsize = 0;
00044   send_bufsize = 0;
00045   sockopt_flags = 0;
00046   packet_mark = 0;
00047   packet_tos = 0;
00048   f_inbound_transparent = false;
00049   return *this;
00050 }
00051 
00052 
00053 int net_connection_number = 1;
00054 
00055 unsigned int
00056 net_next_connection_number()
00057 {
00058   unsigned int res = 0;
00059   do {
00060     res = (unsigned int)
00061       ink_atomic_increment(&net_connection_number, 1);
00062   } while (!res);
00063   return res;
00064 }
00065 
00066 Action *
00067 NetProcessor::accept(Continuation* cont, AcceptOptions const& opt)
00068 {
00069   Debug("iocore_net_processor", "NetProcessor::accept - port %d,recv_bufsize %d, send_bufsize %d, sockopt 0x%0x",
00070         opt.local_port, opt.recv_bufsize, opt.send_bufsize, opt.sockopt_flags);
00071 
00072   return ((UnixNetProcessor *) this)->accept_internal(cont, NO_FD, opt);
00073 }
00074 
00075 Action *
00076 NetProcessor::main_accept(Continuation *cont, SOCKET fd, AcceptOptions const& opt)
00077 {
00078   UnixNetProcessor* this_unp = static_cast<UnixNetProcessor*>(this);
00079   Debug("iocore_net_processor", "NetProcessor::main_accept - port %d,recv_bufsize %d, send_bufsize %d, sockopt 0x%0x",
00080         opt.local_port, opt.recv_bufsize, opt.send_bufsize, opt.sockopt_flags);
00081   return this_unp->accept_internal(cont, fd, opt);
00082 }
00083 
00084 Action *
00085 UnixNetProcessor::accept_internal(Continuation *cont, int fd, AcceptOptions const& opt)
00086 {
00087   EventType upgraded_etype = opt.etype; // setEtype requires non-const ref.
00088   EThread *thread = this_ethread();
00089   ProxyMutex *mutex = thread->mutex;
00090   int accept_threads = opt.accept_threads; // might be changed.
00091   IpEndpoint accept_ip; // local binding address.
00092   char thr_name[MAX_THREAD_NAME_LENGTH];
00093 
00094   NetAccept *na = createNetAccept();
00095 
00096   // Potentially upgrade to SSL.
00097   upgradeEtype(upgraded_etype);
00098 
00099   // Fill in accept thread from configuration if necessary.
00100   if (opt.accept_threads < 0) {
00101     REC_ReadConfigInteger(accept_threads, "proxy.config.accept_threads");
00102   }
00103 
00104   NET_INCREMENT_DYN_STAT(net_accepts_currently_open_stat);
00105 
00106   // We've handled the config stuff at start up, but there are a few cases
00107   // we must handle at this point.
00108   if (opt.localhost_only) {
00109     accept_ip.setToLoopback(opt.ip_family);
00110   } else if (opt.local_ip.isValid()) {
00111     accept_ip.assign(opt.local_ip);
00112   } else {
00113     accept_ip.setToAnyAddr(opt.ip_family);
00114   }
00115   ink_assert(0 < opt.local_port && opt.local_port < 65536);
00116   accept_ip.port() = htons(opt.local_port);
00117 
00118   na->accept_fn = net_accept; // All callers used this.
00119   na->server.fd = fd;
00120   ats_ip_copy(&na->server.accept_addr, &accept_ip);
00121   na->server.f_inbound_transparent = opt.f_inbound_transparent;
00122   if (opt.f_inbound_transparent) {
00123     Debug( "http_tproxy", "Marking accept server %p on port %d as inbound transparent", na, opt.local_port);
00124   }
00125 
00126   int should_filter_int = 0;
00127   na->server.http_accept_filter = false;
00128   REC_ReadConfigInteger(should_filter_int, "proxy.config.net.defer_accept");
00129   if (should_filter_int > 0 && opt.etype == ET_NET)
00130     na->server.http_accept_filter = true;
00131 
00132   na->action_ = new NetAcceptAction();
00133   *na->action_ = cont;
00134   na->action_->server = &na->server;
00135   na->callback_on_open = opt.f_callback_on_open;
00136   na->recv_bufsize = opt.recv_bufsize;
00137   na->send_bufsize = opt.send_bufsize;
00138   na->sockopt_flags = opt.sockopt_flags;
00139   na->packet_mark = opt.packet_mark;
00140   na->packet_tos = opt.packet_tos;
00141   na->etype = upgraded_etype;
00142   na->backdoor = opt.backdoor;
00143   if (na->callback_on_open)
00144     na->mutex = cont->mutex;
00145   if (opt.frequent_accept) { // true
00146     if (accept_threads > 0)  {
00147       if (0 == na->do_listen(BLOCKING, opt.f_inbound_transparent)) {
00148 
00149         for (int i=1; i < accept_threads; ++i) {
00150           NetAccept * a = na->clone();
00151 
00152           snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[ACCEPT %d:%d]", i-1, ats_ip_port_host_order(&accept_ip));
00153           a->init_accept_loop(thr_name);
00154           Debug("iocore_net_accept", "Created accept thread #%d for port %d", i, ats_ip_port_host_order(&accept_ip));
00155         }
00156 
00157         // Start the "template" accept thread last.
00158         Debug("iocore_net_accept", "Created accept thread #%d for port %d", accept_threads, ats_ip_port_host_order(&accept_ip));
00159         snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[ACCEPT %d:%d]", accept_threads-1, ats_ip_port_host_order(&accept_ip));
00160         na->init_accept_loop(thr_name);
00161       }
00162     } else {
00163       na->init_accept_per_thread();
00164     }
00165   } else {
00166     na->init_accept();
00167   }
00168 
00169 #ifdef TCP_DEFER_ACCEPT
00170   // set tcp defer accept timeout if it is configured, this will not trigger an accept until there is
00171   // data on the socket ready to be read
00172   if (should_filter_int > 0) {
00173     setsockopt(na->server.fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &should_filter_int, sizeof(int));
00174   }
00175 #endif
00176 #ifdef TCP_INIT_CWND
00177  int tcp_init_cwnd = 0;
00178  REC_ReadConfigInteger(tcp_init_cwnd, "proxy.config.http.server_tcp_init_cwnd");
00179  if(tcp_init_cwnd > 0) {
00180     Debug("net", "Setting initial congestion window to %d", tcp_init_cwnd);
00181     if(setsockopt(na->server.fd, IPPROTO_TCP, TCP_INIT_CWND, &tcp_init_cwnd, sizeof(int)) != 0) {
00182       Error("Cannot set initial congestion window to %d", tcp_init_cwnd);
00183     }
00184  }
00185 #endif
00186   return na->action_;
00187 }
00188 
00189 Action *
00190 UnixNetProcessor::connect_re_internal(
00191   Continuation * cont,
00192   sockaddr const* target,
00193   NetVCOptions * opt
00194 ) {
00195   ProxyMutex *mutex = cont->mutex;
00196   EThread *t = mutex->thread_holding;
00197   UnixNetVConnection *vc = (UnixNetVConnection *)this->allocate_vc(t);
00198 
00199   if (opt)
00200     vc->options = *opt;
00201   else
00202     opt = &vc->options;
00203 
00204   // virtual function used to upgrade etype to ET_SSL for SSLNetProcessor.
00205   upgradeEtype(opt->etype);
00206 
00207   bool using_socks = (socks_conf_stuff->socks_needed && opt->socks_support != NO_SOCKS
00208 #ifdef SOCKS_WITH_TS
00209                       && (opt->socks_version != SOCKS_DEFAULT_VERSION ||
00210                           /* This implies we are tunnelling.
00211                            * we need to connect using socks server even
00212                            * if this ip is in no_socks list.
00213                            */
00214                           !socks_conf_stuff->ip_map.contains(target))
00215 #endif
00216     );
00217   SocksEntry *socksEntry = NULL;
00218 
00219   NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, 1);
00220   vc->id = net_next_connection_number();
00221   vc->submit_time = ink_get_hrtime();
00222   vc->setSSLClientConnection(true);
00223   ats_ip_copy(&vc->server_addr, target);
00224   vc->mutex = cont->mutex;
00225   Action *result = &vc->action_;
00226 
00227   if (using_socks) {
00228     char buff[INET6_ADDRPORTSTRLEN];
00229     Debug("Socks", "Using Socks ip: %s\n", ats_ip_nptop(target, buff, sizeof(buff)));
00230     socksEntry = socksAllocator.alloc();
00231     socksEntry->init(cont->mutex, vc, opt->socks_support, opt->socks_version);        /*XXXX remove last two args */
00232     socksEntry->action_ = cont;
00233     cont = socksEntry;
00234     if (!ats_is_ip(&socksEntry->server_addr)) {
00235       socksEntry->lerrno = ESOCK_NO_SOCK_SERVER_CONN;
00236       socksEntry->free();
00237       return ACTION_RESULT_DONE;
00238     }
00239     ats_ip_copy(&vc->server_addr, &socksEntry->server_addr);
00240     result = &socksEntry->action_;
00241     vc->action_ = socksEntry;
00242   } else {
00243     Debug("Socks", "Not Using Socks %d \n", socks_conf_stuff->socks_needed);
00244     vc->action_ = cont;
00245   }
00246 
00247   if (t->is_event_type(opt->etype)) {
00248     MUTEX_TRY_LOCK(lock, cont->mutex, t);
00249     if (lock) {
00250       MUTEX_TRY_LOCK(lock2, get_NetHandler(t)->mutex, t);
00251       if (lock2) {
00252         int ret;
00253         ret = vc->connectUp(t, NO_FD);
00254         if ((using_socks) && (ret == CONNECT_SUCCESS))
00255           return &socksEntry->action_;
00256         else
00257           return ACTION_RESULT_DONE;
00258       }
00259     }
00260   }
00261   eventProcessor.schedule_imm(vc, opt->etype);
00262   if (using_socks) {
00263     return &socksEntry->action_;
00264   } else
00265     return result;
00266 }
00267 
00268 Action *
00269 UnixNetProcessor::connect(Continuation * cont, UnixNetVConnection ** /* avc */, sockaddr const* target,
00270                           NetVCOptions * opt)
00271 {
00272   return connect_re(cont, target, opt);
00273 }
00274 
00275 struct CheckConnect:public Continuation
00276 {
00277   UnixNetVConnection *vc;
00278   Action action_;
00279   MIOBuffer *buf;
00280   IOBufferReader *reader;
00281   int connect_status;
00282   int recursion;
00283   ink_hrtime timeout;
00284 
00285   int handle_connect(int event, Event * e)
00286   {
00287     connect_status = event;
00288     switch (event) {
00289     case NET_EVENT_OPEN:
00290       vc = (UnixNetVConnection *) e;
00291       Debug("iocore_net_connect", "connect Net open");
00292       vc->do_io_write(this, 10, /* some non-zero number just to get the poll going */
00293                       reader);
00294       /* dont wait for more than timeout secs */
00295       vc->set_inactivity_timeout(timeout);
00296       return EVENT_CONT;
00297       break;
00298 
00299       case NET_EVENT_OPEN_FAILED:
00300         Debug("iocore_net_connect", "connect Net open failed");
00301       if (!action_.cancelled)
00302         action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *) e);
00303       break;
00304 
00305       case VC_EVENT_WRITE_READY:int sl, ret;
00306       socklen_t sz;
00307       if (!action_.cancelled)
00308       {
00309         sz = sizeof(int);
00310           ret = getsockopt(vc->con.fd, SOL_SOCKET, SO_ERROR, (char *) &sl, &sz);
00311         if (!ret && sl == 0)
00312         {
00313           Debug("iocore_net_connect", "connection established");
00314           /* disable write on vc */
00315           vc->write.enabled = 0;
00316           vc->cancel_inactivity_timeout();
00317           //write_disable(get_NetHandler(this_ethread()), vc);
00318           /* clean up vc fields */
00319           vc->write.vio.nbytes = 0;
00320           vc->write.vio.op = VIO::NONE;
00321           vc->write.vio.buffer.clear();
00322 
00323 
00324           action_.continuation->handleEvent(NET_EVENT_OPEN, vc);
00325           delete this;
00326             return EVENT_DONE;
00327         }
00328       }
00329       vc->do_io_close();
00330       if (!action_.cancelled)
00331         action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *) -ENET_CONNECT_FAILED);
00332       break;
00333     case VC_EVENT_INACTIVITY_TIMEOUT:
00334       Debug("iocore_net_connect", "connect timed out");
00335       vc->do_io_close();
00336       if (!action_.cancelled)
00337         action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *) -ENET_CONNECT_TIMEOUT);
00338       break;
00339     default:
00340       ink_assert(!"unknown connect event");
00341       if (!action_.cancelled)
00342         action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *) -ENET_CONNECT_FAILED);
00343 
00344     }
00345     if (!recursion)
00346       delete this;
00347     return EVENT_DONE;
00348   }
00349 
00350   Action *connect_s(Continuation * cont, sockaddr const* target,
00351                     int _timeout, NetVCOptions * opt)
00352   {
00353     action_ = cont;
00354     timeout = HRTIME_SECONDS(_timeout);
00355     recursion++;
00356     netProcessor.connect_re(this, target, opt);
00357     recursion--;
00358     if (connect_status != NET_EVENT_OPEN_FAILED)
00359       return &action_;
00360     else {
00361       delete this;
00362       return ACTION_RESULT_DONE;
00363     }
00364   }
00365 
00366   CheckConnect(ProxyMutex * m = NULL):Continuation(m), connect_status(-1), recursion(0), timeout(0) {
00367     SET_HANDLER(&CheckConnect::handle_connect);
00368     buf = new_empty_MIOBuffer(1);
00369     reader = buf->alloc_reader();
00370   }
00371 
00372   ~CheckConnect() {
00373     buf->dealloc_all_readers();
00374     buf->clear();
00375     free_MIOBuffer(buf);
00376   }
00377 };
00378 
00379 Action *
00380 NetProcessor::connect_s(Continuation * cont, sockaddr const* target,
00381                         int timeout, NetVCOptions * opt)
00382 {
00383   Debug("iocore_net_connect", "NetProcessor::connect_s called");
00384   CheckConnect *c = new CheckConnect(cont->mutex);
00385   return c->connect_s(cont, target, timeout, opt);
00386 }
00387 
00388 
00389 
00390 struct PollCont;
00391 
00392 // This is a little odd, in that the actual threads are created before calling the processor.
00393 int
00394 UnixNetProcessor::start(int, size_t)
00395 {
00396   EventType etype = ET_NET;
00397 
00398   netHandler_offset = eventProcessor.allocate(sizeof(NetHandler));
00399   pollCont_offset = eventProcessor.allocate(sizeof(PollCont));
00400 
00401   // etype is ET_NET for netProcessor
00402   // and      ET_SSL for sslNetProcessor
00403   upgradeEtype(etype);
00404 
00405   n_netthreads = eventProcessor.n_threads_for_type[etype];
00406   netthreads = eventProcessor.eventthread[etype];
00407   for (int i = 0; i < n_netthreads; ++i) {
00408     initialize_thread_for_net(netthreads[i]);
00409     extern void initialize_thread_for_http_sessions(EThread *thread, int thread_index);
00410     initialize_thread_for_http_sessions(netthreads[i], i);
00411   }
00412 
00413   RecData d;
00414   d.rec_int = 0;
00415   change_net_connections_throttle(NULL, RECD_INT, d, NULL);
00416 
00417   // Socks
00418   if (!netProcessor.socks_conf_stuff) {
00419     socks_conf_stuff = new socks_conf_struct;
00420     loadSocksConfiguration(socks_conf_stuff);
00421     if (!socks_conf_stuff->socks_needed && socks_conf_stuff->accept_enabled) {
00422       Warning("We can not have accept_enabled and socks_needed turned off" " disabling Socks accept\n");
00423       socks_conf_stuff->accept_enabled = 0;
00424     } else {
00425       // this is sslNetprocessor
00426       socks_conf_stuff = netProcessor.socks_conf_stuff;
00427     }
00428   }
00429 
00430   // commented by vijay -  bug 2489945
00431   /*if (use_accept_thread) // 0
00432      { NetAccept * na = createNetAccept();
00433      SET_CONTINUATION_HANDLER(na,&NetAccept::acceptLoopEvent);
00434      accept_thread_event = eventProcessor.spawn_thread(na);
00435      if (!accept_thread_event) delete na;
00436      } */
00437 
00438 
00439 /*
00440  * Stat pages
00441  */
00442   extern Action *register_ShowNet(Continuation * c, HTTPHdr * h);
00443   if (etype == ET_NET)
00444     statPagesManager.register_http("net", register_ShowNet);
00445   return 1;
00446 }
00447 
00448 // Virtual function allows creation of an
00449 // SSLNetAccept or NetAccept transparent to NetProcessor.
00450 NetAccept *
00451 UnixNetProcessor::createNetAccept()
00452 {
00453   return new NetAccept;
00454 }
00455 
00456 NetVConnection *
00457 UnixNetProcessor::allocate_vc(EThread *t)
00458 {
00459   UnixNetVConnection *vc;
00460 
00461   if (t) {
00462     vc = THREAD_ALLOC(netVCAllocator, t);
00463   } else {
00464     if (likely(vc = netVCAllocator.alloc())) {
00465       vc->from_accept_thread = true;
00466     }
00467   }
00468 
00469   return vc;
00470 }
00471 
00472 struct socks_conf_struct *
00473 NetProcessor::socks_conf_stuff = NULL;
00474 int NetProcessor::accept_mss = 0;
00475 
00476 UnixNetProcessor unix_netProcessor;
00477 NetProcessor & netProcessor = unix_netProcessor;

Generated by  doxygen 1.7.1