00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "P_Net.h"
00025
00026
00027 #include "StatPages.h"
00028
00029 NetProcessor::AcceptOptions const NetProcessor::DEFAULT_ACCEPT_OPTIONS;
00030
00031 NetProcessor::AcceptOptions&
00032 NetProcessor::AcceptOptions::reset()
00033 {
00034 local_port = 0;
00035 local_ip.invalidate();
00036 accept_threads = -1;
00037 ip_family = AF_INET;
00038 etype = ET_NET;
00039 f_callback_on_open = false;
00040 localhost_only = false;
00041 frequent_accept = true;
00042 backdoor = false;
00043 recv_bufsize = 0;
00044 send_bufsize = 0;
00045 sockopt_flags = 0;
00046 packet_mark = 0;
00047 packet_tos = 0;
00048 f_inbound_transparent = false;
00049 return *this;
00050 }
00051
00052
00053 int net_connection_number = 1;
00054
00055 unsigned int
00056 net_next_connection_number()
00057 {
00058 unsigned int res = 0;
00059 do {
00060 res = (unsigned int)
00061 ink_atomic_increment(&net_connection_number, 1);
00062 } while (!res);
00063 return res;
00064 }
00065
00066 Action *
00067 NetProcessor::accept(Continuation* cont, AcceptOptions const& opt)
00068 {
00069 Debug("iocore_net_processor", "NetProcessor::accept - port %d,recv_bufsize %d, send_bufsize %d, sockopt 0x%0x",
00070 opt.local_port, opt.recv_bufsize, opt.send_bufsize, opt.sockopt_flags);
00071
00072 return ((UnixNetProcessor *) this)->accept_internal(cont, NO_FD, opt);
00073 }
00074
00075 Action *
00076 NetProcessor::main_accept(Continuation *cont, SOCKET fd, AcceptOptions const& opt)
00077 {
00078 UnixNetProcessor* this_unp = static_cast<UnixNetProcessor*>(this);
00079 Debug("iocore_net_processor", "NetProcessor::main_accept - port %d,recv_bufsize %d, send_bufsize %d, sockopt 0x%0x",
00080 opt.local_port, opt.recv_bufsize, opt.send_bufsize, opt.sockopt_flags);
00081 return this_unp->accept_internal(cont, fd, opt);
00082 }
00083
00084 Action *
00085 UnixNetProcessor::accept_internal(Continuation *cont, int fd, AcceptOptions const& opt)
00086 {
00087 EventType upgraded_etype = opt.etype;
00088 EThread *thread = this_ethread();
00089 ProxyMutex *mutex = thread->mutex;
00090 int accept_threads = opt.accept_threads;
00091 IpEndpoint accept_ip;
00092 char thr_name[MAX_THREAD_NAME_LENGTH];
00093
00094 NetAccept *na = createNetAccept();
00095
00096
00097 upgradeEtype(upgraded_etype);
00098
00099
00100 if (opt.accept_threads < 0) {
00101 REC_ReadConfigInteger(accept_threads, "proxy.config.accept_threads");
00102 }
00103
00104 NET_INCREMENT_DYN_STAT(net_accepts_currently_open_stat);
00105
00106
00107
00108 if (opt.localhost_only) {
00109 accept_ip.setToLoopback(opt.ip_family);
00110 } else if (opt.local_ip.isValid()) {
00111 accept_ip.assign(opt.local_ip);
00112 } else {
00113 accept_ip.setToAnyAddr(opt.ip_family);
00114 }
00115 ink_assert(0 < opt.local_port && opt.local_port < 65536);
00116 accept_ip.port() = htons(opt.local_port);
00117
00118 na->accept_fn = net_accept;
00119 na->server.fd = fd;
00120 ats_ip_copy(&na->server.accept_addr, &accept_ip);
00121 na->server.f_inbound_transparent = opt.f_inbound_transparent;
00122 if (opt.f_inbound_transparent) {
00123 Debug( "http_tproxy", "Marking accept server %p on port %d as inbound transparent", na, opt.local_port);
00124 }
00125
00126 int should_filter_int = 0;
00127 na->server.http_accept_filter = false;
00128 REC_ReadConfigInteger(should_filter_int, "proxy.config.net.defer_accept");
00129 if (should_filter_int > 0 && opt.etype == ET_NET)
00130 na->server.http_accept_filter = true;
00131
00132 na->action_ = new NetAcceptAction();
00133 *na->action_ = cont;
00134 na->action_->server = &na->server;
00135 na->callback_on_open = opt.f_callback_on_open;
00136 na->recv_bufsize = opt.recv_bufsize;
00137 na->send_bufsize = opt.send_bufsize;
00138 na->sockopt_flags = opt.sockopt_flags;
00139 na->packet_mark = opt.packet_mark;
00140 na->packet_tos = opt.packet_tos;
00141 na->etype = upgraded_etype;
00142 na->backdoor = opt.backdoor;
00143 if (na->callback_on_open)
00144 na->mutex = cont->mutex;
00145 if (opt.frequent_accept) {
00146 if (accept_threads > 0) {
00147 if (0 == na->do_listen(BLOCKING, opt.f_inbound_transparent)) {
00148
00149 for (int i=1; i < accept_threads; ++i) {
00150 NetAccept * a = na->clone();
00151
00152 snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[ACCEPT %d:%d]", i-1, ats_ip_port_host_order(&accept_ip));
00153 a->init_accept_loop(thr_name);
00154 Debug("iocore_net_accept", "Created accept thread #%d for port %d", i, ats_ip_port_host_order(&accept_ip));
00155 }
00156
00157
00158 Debug("iocore_net_accept", "Created accept thread #%d for port %d", accept_threads, ats_ip_port_host_order(&accept_ip));
00159 snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[ACCEPT %d:%d]", accept_threads-1, ats_ip_port_host_order(&accept_ip));
00160 na->init_accept_loop(thr_name);
00161 }
00162 } else {
00163 na->init_accept_per_thread();
00164 }
00165 } else {
00166 na->init_accept();
00167 }
00168
00169 #ifdef TCP_DEFER_ACCEPT
00170
00171
00172 if (should_filter_int > 0) {
00173 setsockopt(na->server.fd, IPPROTO_TCP, TCP_DEFER_ACCEPT, &should_filter_int, sizeof(int));
00174 }
00175 #endif
00176 #ifdef TCP_INIT_CWND
00177 int tcp_init_cwnd = 0;
00178 REC_ReadConfigInteger(tcp_init_cwnd, "proxy.config.http.server_tcp_init_cwnd");
00179 if(tcp_init_cwnd > 0) {
00180 Debug("net", "Setting initial congestion window to %d", tcp_init_cwnd);
00181 if(setsockopt(na->server.fd, IPPROTO_TCP, TCP_INIT_CWND, &tcp_init_cwnd, sizeof(int)) != 0) {
00182 Error("Cannot set initial congestion window to %d", tcp_init_cwnd);
00183 }
00184 }
00185 #endif
00186 return na->action_;
00187 }
00188
00189 Action *
00190 UnixNetProcessor::connect_re_internal(
00191 Continuation * cont,
00192 sockaddr const* target,
00193 NetVCOptions * opt
00194 ) {
00195 ProxyMutex *mutex = cont->mutex;
00196 EThread *t = mutex->thread_holding;
00197 UnixNetVConnection *vc = (UnixNetVConnection *)this->allocate_vc(t);
00198
00199 if (opt)
00200 vc->options = *opt;
00201 else
00202 opt = &vc->options;
00203
00204
00205 upgradeEtype(opt->etype);
00206
00207 bool using_socks = (socks_conf_stuff->socks_needed && opt->socks_support != NO_SOCKS
00208 #ifdef SOCKS_WITH_TS
00209 && (opt->socks_version != SOCKS_DEFAULT_VERSION ||
00210
00211
00212
00213
00214 !socks_conf_stuff->ip_map.contains(target))
00215 #endif
00216 );
00217 SocksEntry *socksEntry = NULL;
00218
00219 NET_SUM_GLOBAL_DYN_STAT(net_connections_currently_open_stat, 1);
00220 vc->id = net_next_connection_number();
00221 vc->submit_time = ink_get_hrtime();
00222 vc->setSSLClientConnection(true);
00223 ats_ip_copy(&vc->server_addr, target);
00224 vc->mutex = cont->mutex;
00225 Action *result = &vc->action_;
00226
00227 if (using_socks) {
00228 char buff[INET6_ADDRPORTSTRLEN];
00229 Debug("Socks", "Using Socks ip: %s\n", ats_ip_nptop(target, buff, sizeof(buff)));
00230 socksEntry = socksAllocator.alloc();
00231 socksEntry->init(cont->mutex, vc, opt->socks_support, opt->socks_version);
00232 socksEntry->action_ = cont;
00233 cont = socksEntry;
00234 if (!ats_is_ip(&socksEntry->server_addr)) {
00235 socksEntry->lerrno = ESOCK_NO_SOCK_SERVER_CONN;
00236 socksEntry->free();
00237 return ACTION_RESULT_DONE;
00238 }
00239 ats_ip_copy(&vc->server_addr, &socksEntry->server_addr);
00240 result = &socksEntry->action_;
00241 vc->action_ = socksEntry;
00242 } else {
00243 Debug("Socks", "Not Using Socks %d \n", socks_conf_stuff->socks_needed);
00244 vc->action_ = cont;
00245 }
00246
00247 if (t->is_event_type(opt->etype)) {
00248 MUTEX_TRY_LOCK(lock, cont->mutex, t);
00249 if (lock) {
00250 MUTEX_TRY_LOCK(lock2, get_NetHandler(t)->mutex, t);
00251 if (lock2) {
00252 int ret;
00253 ret = vc->connectUp(t, NO_FD);
00254 if ((using_socks) && (ret == CONNECT_SUCCESS))
00255 return &socksEntry->action_;
00256 else
00257 return ACTION_RESULT_DONE;
00258 }
00259 }
00260 }
00261 eventProcessor.schedule_imm(vc, opt->etype);
00262 if (using_socks) {
00263 return &socksEntry->action_;
00264 } else
00265 return result;
00266 }
00267
00268 Action *
00269 UnixNetProcessor::connect(Continuation * cont, UnixNetVConnection ** , sockaddr const* target,
00270 NetVCOptions * opt)
00271 {
00272 return connect_re(cont, target, opt);
00273 }
00274
00275 struct CheckConnect:public Continuation
00276 {
00277 UnixNetVConnection *vc;
00278 Action action_;
00279 MIOBuffer *buf;
00280 IOBufferReader *reader;
00281 int connect_status;
00282 int recursion;
00283 ink_hrtime timeout;
00284
00285 int handle_connect(int event, Event * e)
00286 {
00287 connect_status = event;
00288 switch (event) {
00289 case NET_EVENT_OPEN:
00290 vc = (UnixNetVConnection *) e;
00291 Debug("iocore_net_connect", "connect Net open");
00292 vc->do_io_write(this, 10,
00293 reader);
00294
00295 vc->set_inactivity_timeout(timeout);
00296 return EVENT_CONT;
00297 break;
00298
00299 case NET_EVENT_OPEN_FAILED:
00300 Debug("iocore_net_connect", "connect Net open failed");
00301 if (!action_.cancelled)
00302 action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *) e);
00303 break;
00304
00305 case VC_EVENT_WRITE_READY:int sl, ret;
00306 socklen_t sz;
00307 if (!action_.cancelled)
00308 {
00309 sz = sizeof(int);
00310 ret = getsockopt(vc->con.fd, SOL_SOCKET, SO_ERROR, (char *) &sl, &sz);
00311 if (!ret && sl == 0)
00312 {
00313 Debug("iocore_net_connect", "connection established");
00314
00315 vc->write.enabled = 0;
00316 vc->cancel_inactivity_timeout();
00317
00318
00319 vc->write.vio.nbytes = 0;
00320 vc->write.vio.op = VIO::NONE;
00321 vc->write.vio.buffer.clear();
00322
00323
00324 action_.continuation->handleEvent(NET_EVENT_OPEN, vc);
00325 delete this;
00326 return EVENT_DONE;
00327 }
00328 }
00329 vc->do_io_close();
00330 if (!action_.cancelled)
00331 action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *) -ENET_CONNECT_FAILED);
00332 break;
00333 case VC_EVENT_INACTIVITY_TIMEOUT:
00334 Debug("iocore_net_connect", "connect timed out");
00335 vc->do_io_close();
00336 if (!action_.cancelled)
00337 action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *) -ENET_CONNECT_TIMEOUT);
00338 break;
00339 default:
00340 ink_assert(!"unknown connect event");
00341 if (!action_.cancelled)
00342 action_.continuation->handleEvent(NET_EVENT_OPEN_FAILED, (void *) -ENET_CONNECT_FAILED);
00343
00344 }
00345 if (!recursion)
00346 delete this;
00347 return EVENT_DONE;
00348 }
00349
00350 Action *connect_s(Continuation * cont, sockaddr const* target,
00351 int _timeout, NetVCOptions * opt)
00352 {
00353 action_ = cont;
00354 timeout = HRTIME_SECONDS(_timeout);
00355 recursion++;
00356 netProcessor.connect_re(this, target, opt);
00357 recursion--;
00358 if (connect_status != NET_EVENT_OPEN_FAILED)
00359 return &action_;
00360 else {
00361 delete this;
00362 return ACTION_RESULT_DONE;
00363 }
00364 }
00365
00366 CheckConnect(ProxyMutex * m = NULL):Continuation(m), connect_status(-1), recursion(0), timeout(0) {
00367 SET_HANDLER(&CheckConnect::handle_connect);
00368 buf = new_empty_MIOBuffer(1);
00369 reader = buf->alloc_reader();
00370 }
00371
00372 ~CheckConnect() {
00373 buf->dealloc_all_readers();
00374 buf->clear();
00375 free_MIOBuffer(buf);
00376 }
00377 };
00378
00379 Action *
00380 NetProcessor::connect_s(Continuation * cont, sockaddr const* target,
00381 int timeout, NetVCOptions * opt)
00382 {
00383 Debug("iocore_net_connect", "NetProcessor::connect_s called");
00384 CheckConnect *c = new CheckConnect(cont->mutex);
00385 return c->connect_s(cont, target, timeout, opt);
00386 }
00387
00388
00389
00390 struct PollCont;
00391
00392
00393 int
00394 UnixNetProcessor::start(int, size_t)
00395 {
00396 EventType etype = ET_NET;
00397
00398 netHandler_offset = eventProcessor.allocate(sizeof(NetHandler));
00399 pollCont_offset = eventProcessor.allocate(sizeof(PollCont));
00400
00401
00402
00403 upgradeEtype(etype);
00404
00405 n_netthreads = eventProcessor.n_threads_for_type[etype];
00406 netthreads = eventProcessor.eventthread[etype];
00407 for (int i = 0; i < n_netthreads; ++i) {
00408 initialize_thread_for_net(netthreads[i]);
00409 extern void initialize_thread_for_http_sessions(EThread *thread, int thread_index);
00410 initialize_thread_for_http_sessions(netthreads[i], i);
00411 }
00412
00413 RecData d;
00414 d.rec_int = 0;
00415 change_net_connections_throttle(NULL, RECD_INT, d, NULL);
00416
00417
00418 if (!netProcessor.socks_conf_stuff) {
00419 socks_conf_stuff = new socks_conf_struct;
00420 loadSocksConfiguration(socks_conf_stuff);
00421 if (!socks_conf_stuff->socks_needed && socks_conf_stuff->accept_enabled) {
00422 Warning("We can not have accept_enabled and socks_needed turned off" " disabling Socks accept\n");
00423 socks_conf_stuff->accept_enabled = 0;
00424 } else {
00425
00426 socks_conf_stuff = netProcessor.socks_conf_stuff;
00427 }
00428 }
00429
00430
00431
00432
00433
00434
00435
00436
00437
00438
00439
00440
00441
00442 extern Action *register_ShowNet(Continuation * c, HTTPHdr * h);
00443 if (etype == ET_NET)
00444 statPagesManager.register_http("net", register_ShowNet);
00445 return 1;
00446 }
00447
00448
00449
00450 NetAccept *
00451 UnixNetProcessor::createNetAccept()
00452 {
00453 return new NetAccept;
00454 }
00455
00456 NetVConnection *
00457 UnixNetProcessor::allocate_vc(EThread *t)
00458 {
00459 UnixNetVConnection *vc;
00460
00461 if (t) {
00462 vc = THREAD_ALLOC(netVCAllocator, t);
00463 } else {
00464 if (likely(vc = netVCAllocator.alloc())) {
00465 vc->from_accept_thread = true;
00466 }
00467 }
00468
00469 return vc;
00470 }
00471
00472 struct socks_conf_struct *
00473 NetProcessor::socks_conf_stuff = NULL;
00474 int NetProcessor::accept_mss = 0;
00475
00476 UnixNetProcessor unix_netProcessor;
00477 NetProcessor & netProcessor = unix_netProcessor;