00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #include "P_Net.h"
00033 #include "P_UDPNet.h"
00034
00035 typedef int (UDPNetHandler::*UDPNetContHandler) (int, void *);
00036
00037 inkcoreapi ClassAllocator<UDPPacketInternal> udpPacketAllocator("udpPacketAllocator");
00038 EventType ET_UDP;
00039
00040 #if defined(linux) && !defined(DEBUG)
00041 #define NODIAGS
00042 #endif
00043
00044
00045
00046
00047
00048 UDPNetProcessorInternal udpNetInternal;
00049 UDPNetProcessor &udpNet = udpNetInternal;
00050
00051 int32_t g_udp_periodicCleanupSlots;
00052 int32_t g_udp_periodicFreeCancelledPkts;
00053 int32_t g_udp_numSendRetries;
00054
00055 #include "P_LibBulkIO.h"
00056
00057
00058
00059
00060
00061 int G_bwGrapherFd;
00062 sockaddr_in6 G_bwGrapherLoc;
00063
00064 void
00065 initialize_thread_for_udp_net(EThread * thread)
00066 {
00067 new((ink_dummy_for_new *) get_UDPPollCont(thread)) PollCont(thread->mutex);
00068 new((ink_dummy_for_new *) get_UDPNetHandler(thread)) UDPNetHandler;
00069
00070
00071
00072 REC_ReadConfigInt32(g_udp_periodicFreeCancelledPkts, "proxy.config.udp.free_cancelled_pkts_sec");
00073
00074
00075
00076
00077 REC_ReadConfigInt32(g_udp_periodicCleanupSlots, "proxy.config.udp.periodic_cleanup");
00078
00079
00080
00081 REC_ReadConfigInt32(g_udp_numSendRetries, "proxy.config.udp.send_retries");
00082 g_udp_numSendRetries = g_udp_numSendRetries < 0 ? 0 : g_udp_numSendRetries;
00083
00084 thread->schedule_every(get_UDPPollCont(thread), -9);
00085 thread->schedule_imm(get_UDPNetHandler(thread));
00086 }
00087
00088 int
00089 UDPNetProcessorInternal::start(int n_upd_threads, size_t stacksize)
00090 {
00091 if (n_upd_threads < 1)
00092 return -1;
00093
00094 ET_UDP = eventProcessor.spawn_event_threads(n_upd_threads, "ET_UDP", stacksize);
00095 if (ET_UDP < 0)
00096 return -1;
00097
00098 pollCont_offset = eventProcessor.allocate(sizeof(PollCont));
00099 udpNetHandler_offset = eventProcessor.allocate(sizeof(UDPNetHandler));
00100
00101 for (int i = 0; i < eventProcessor.n_threads_for_type[ET_UDP]; i++)
00102 initialize_thread_for_udp_net(eventProcessor.eventthread[ET_UDP][i]);
00103
00104 return 0;
00105 }
00106
00107 void
00108 UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler * nh, UDPConnection * xuc)
00109 {
00110 UnixUDPConnection *uc = (UnixUDPConnection *) xuc;
00111
00112
00113
00114 int r;
00115 int iters = 0;
00116 do {
00117 sockaddr_in6 fromaddr;
00118 socklen_t fromlen = sizeof(fromaddr);
00119
00120
00121
00122 char buf[65536];
00123 int buflen = sizeof(buf);
00124 r = socketManager.recvfrom(uc->getFd(), buf, buflen, 0, (struct sockaddr *) &fromaddr, &fromlen);
00125 if (r <= 0) {
00126
00127 break;
00128 }
00129
00130 UDPPacket *p = new_incoming_UDPPacket(ats_ip_sa_cast(&fromaddr), buf, r);
00131 p->setConnection(uc);
00132
00133 ink_atomiclist_push(&uc->inQueue, p);
00134 iters++;
00135 } while (r > 0);
00136 if (iters >= 1) {
00137 Debug("udp-read", "read %d at a time", iters);
00138 }
00139
00140 if (!uc->onCallbackQueue) {
00141 ink_assert(uc->callback_link.next == NULL);
00142 ink_assert(uc->callback_link.prev == NULL);
00143 uc->AddRef();
00144 nh->udp_callbacks.enqueue(uc);
00145 uc->onCallbackQueue = 1;
00146 }
00147 }
00148
00149
00150 int
00151 UDPNetProcessorInternal::udp_callback(UDPNetHandler * nh, UDPConnection * xuc, EThread * thread)
00152 {
00153 (void) nh;
00154 UnixUDPConnection *uc = (UnixUDPConnection *) xuc;
00155
00156 if (uc->continuation && uc->mutex) {
00157 MUTEX_TRY_LOCK_FOR(lock, uc->mutex, thread, uc->continuation);
00158 if (!lock) {
00159 return 1;
00160 }
00161 uc->AddRef();
00162 uc->callbackHandler(0, 0);
00163 return 0;
00164 } else {
00165 ink_assert(!"doesn't reach here");
00166 if (!uc->callbackAction) {
00167 uc->AddRef();
00168 uc->callbackAction = eventProcessor.schedule_imm(uc);
00169 }
00170 return 0;
00171 }
00172 }
00173
00174
00175 class UDPReadContinuation:public Continuation
00176 {
00177 public:
00178 UDPReadContinuation(Event * completionToken);
00179 UDPReadContinuation();
00180 ~UDPReadContinuation();
00181 inline void free(void);
00182 inline void init_token(Event * completionToken);
00183 inline void init_read(int fd, IOBufferBlock * buf, int len, struct sockaddr *fromaddr, socklen_t *fromaddrlen);
00184
00185 void set_timer(int seconds)
00186 {
00187 timeout_interval = HRTIME_SECONDS(seconds);
00188 }
00189
00190 void cancel();
00191 int readPollEvent(int event, Event * e);
00192
00193 Action *getAction()
00194 {
00195 return event;
00196 }
00197
00198 void setupPollDescriptor();
00199
00200 private:
00201 Event * event;
00202
00203 Ptr<IOBufferBlock> readbuf;
00204 int readlen;
00205 struct sockaddr_in6 *fromaddr;
00206 socklen_t *fromaddrlen;
00207 int fd;
00208 int ifd;
00209 ink_hrtime period;
00210 ink_hrtime elapsed_time;
00211 ink_hrtime timeout_interval;
00212 };
00213
00214 ClassAllocator<UDPReadContinuation> udpReadContAllocator("udpReadContAllocator");
00215
00216 #define UNINITIALIZED_EVENT_PTR (Event *)0xdeadbeef
00217
00218 UDPReadContinuation::UDPReadContinuation(Event * completionToken)
00219 : Continuation(NULL), event(completionToken), readbuf(NULL), readlen(0), fromaddrlen(0), fd(-1),
00220 ifd(-1), period(0), elapsed_time(0), timeout_interval(0)
00221 {
00222 if (completionToken->continuation)
00223 this->mutex = completionToken->continuation->mutex;
00224 else
00225 this->mutex = new_ProxyMutex();
00226 }
00227
00228 UDPReadContinuation::UDPReadContinuation()
00229 : Continuation(NULL), event(UNINITIALIZED_EVENT_PTR), readbuf(NULL), readlen(0), fromaddrlen(0), fd(-1),
00230 ifd(-1), period(0), elapsed_time(0), timeout_interval(0)
00231 { }
00232
00233 inline void
00234 UDPReadContinuation::free(void)
00235 {
00236 ink_assert(event != NULL);
00237 completionUtil::destroy(event);
00238 event = NULL;
00239 readbuf = NULL;
00240 readlen = 0;
00241 fromaddrlen = 0;
00242 fd = -1;
00243 ifd = -1;
00244 period = 0;
00245 elapsed_time = 0;
00246 timeout_interval = 0;
00247 mutex = NULL;
00248 udpReadContAllocator.free(this);
00249 }
00250
00251 inline void
00252 UDPReadContinuation::init_token(Event * completionToken)
00253 {
00254 if (completionToken->continuation) {
00255 this->mutex = completionToken->continuation->mutex;
00256 } else {
00257 this->mutex = new_ProxyMutex();
00258 }
00259 event = completionToken;
00260 }
00261
00262 inline void
00263 UDPReadContinuation::init_read(int rfd, IOBufferBlock * buf, int len, struct sockaddr *fromaddr_, socklen_t *fromaddrlen_)
00264 {
00265 ink_assert(rfd >= 0 && buf != NULL && fromaddr_ != NULL && fromaddrlen_ != NULL);
00266 fd = rfd;
00267 readbuf = buf;
00268 readlen = len;
00269 fromaddr = ats_ip6_cast(fromaddr_);
00270 fromaddrlen = fromaddrlen_;
00271 SET_HANDLER(&UDPReadContinuation::readPollEvent);
00272 period = NET_PERIOD;
00273 setupPollDescriptor();
00274 this_ethread()->schedule_every(this, period);
00275 }
00276
00277 UDPReadContinuation::~UDPReadContinuation()
00278 {
00279 if (event != UNINITIALIZED_EVENT_PTR)
00280 {
00281 ink_assert(event != NULL);
00282 completionUtil::destroy(event);
00283 event = NULL;
00284 }
00285 }
00286
00287 void
00288 UDPReadContinuation::cancel()
00289 {
00290
00291 event->cancel();
00292 }
00293
00294 void
00295 UDPReadContinuation::setupPollDescriptor()
00296 {
00297 #if TS_USE_EPOLL
00298 Pollfd *pfd;
00299 EThread *et = (EThread *) this_thread();
00300 PollCont *pc = get_PollCont(et);
00301 if (pc->nextPollDescriptor == NULL) {
00302 pc->nextPollDescriptor = new PollDescriptor;
00303 pc->nextPollDescriptor->init();
00304 }
00305 pfd = pc->nextPollDescriptor->alloc();
00306 pfd->fd = fd;
00307 ifd = pfd - pc->nextPollDescriptor->pfd;
00308 ink_assert(pc->nextPollDescriptor->nfds > ifd);
00309 pfd->events = POLLIN;
00310 pfd->revents = 0;
00311 #endif
00312 }
00313
00314 int
00315 UDPReadContinuation::readPollEvent(int event_, Event * e)
00316 {
00317 (void) event_;
00318 (void) e;
00319
00320
00321 Continuation *c;
00322
00323 if (event->cancelled) {
00324 e->cancel();
00325 free();
00326
00327 return EVENT_DONE;
00328 }
00329
00330 if (timeout_interval) {
00331 elapsed_time += -period;
00332 if (elapsed_time >= timeout_interval) {
00333 c = completionUtil::getContinuation(event);
00334
00335 c->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event);
00336 e->cancel();
00337 free();
00338
00339 return EVENT_DONE;
00340 }
00341 }
00342
00343
00344
00345 c = completionUtil::getContinuation(event);
00346
00347 socklen_t tmp_fromlen = *fromaddrlen;
00348 int rlen = socketManager.recvfrom(fd, readbuf->end(), readlen, 0, ats_ip_sa_cast(fromaddr), &tmp_fromlen);
00349
00350 completionUtil::setThread(event, e->ethread);
00351
00352 if (rlen > 0) {
00353
00354 *fromaddrlen = tmp_fromlen;
00355 completionUtil::setInfo(event, fd, readbuf, rlen, errno);
00356 readbuf->fill(rlen);
00357
00358 c->handleEvent(NET_EVENT_DATAGRAM_READ_COMPLETE, event);
00359 e->cancel();
00360 free();
00361
00362 return EVENT_DONE;
00363 } else if (rlen < 0 && rlen != -EAGAIN) {
00364
00365 *fromaddrlen = tmp_fromlen;
00366 completionUtil::setInfo(event, fd, (IOBufferBlock *) readbuf, rlen, errno);
00367 c = completionUtil::getContinuation(event);
00368
00369 c->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event);
00370 e->cancel();
00371 free();
00372
00373 return EVENT_DONE;
00374 } else {
00375 completionUtil::setThread(event, NULL);
00376 }
00377
00378 if (event->cancelled) {
00379 e->cancel();
00380 free();
00381
00382 return EVENT_DONE;
00383 }
00384
00385 setupPollDescriptor();
00386
00387 return EVENT_CONT;
00388 }
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400
00401
00402
00403
00404
00405 Action *
00406 UDPNetProcessor::recvfrom_re(Continuation * cont,
00407 void *token,
00408 int fd,
00409 struct sockaddr * fromaddr, socklen_t *fromaddrlen,
00410 IOBufferBlock * buf, int len, bool useReadCont, int timeout)
00411 {
00412 (void) useReadCont;
00413 ink_assert(buf->write_avail() >= len);
00414 int actual;
00415 Event *event = completionUtil::create();
00416
00417 completionUtil::setContinuation(event, cont);
00418 completionUtil::setHandle(event, token);
00419 actual = socketManager.recvfrom(fd, buf->end(), len, 0, fromaddr, fromaddrlen);
00420
00421 if (actual > 0) {
00422 completionUtil::setThread(event, this_ethread());
00423 completionUtil::setInfo(event, fd, buf, actual, errno);
00424 buf->fill(actual);
00425 cont->handleEvent(NET_EVENT_DATAGRAM_READ_COMPLETE, event);
00426 completionUtil::destroy(event);
00427 return ACTION_RESULT_DONE;
00428 } else if (actual == 0 || (actual < 0 && actual == -EAGAIN)) {
00429 UDPReadContinuation *c = udpReadContAllocator.alloc();
00430 c->init_token(event);
00431 c->init_read(fd, buf, len, fromaddr, fromaddrlen);
00432 if (timeout) {
00433 c->set_timer(timeout);
00434 }
00435 return event;
00436 } else {
00437 completionUtil::setThread(event, this_ethread());
00438 completionUtil::setInfo(event, fd, buf, actual, errno);
00439 cont->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event);
00440 completionUtil::destroy(event);
00441 return ACTION_IO_ERROR;
00442 }
00443 }
00444
00445
00446
00447
00448
00449
00450
00451
00452
00453 Action *
00454 UDPNetProcessor::sendmsg_re(Continuation * cont, void *token, int fd, struct msghdr * msg)
00455 {
00456 int actual;
00457 Event *event = completionUtil::create();
00458
00459 completionUtil::setContinuation(event, cont);
00460 completionUtil::setHandle(event, token);
00461
00462 actual = socketManager.sendmsg(fd, msg, 0);
00463 if (actual >= 0) {
00464 completionUtil::setThread(event, this_ethread());
00465 completionUtil::setInfo(event, fd, msg, actual, errno);
00466 cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_COMPLETE, event);
00467 completionUtil::destroy(event);
00468 return ACTION_RESULT_DONE;
00469 } else {
00470 completionUtil::setThread(event, this_ethread());
00471 completionUtil::setInfo(event, fd, msg, actual, errno);
00472 cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_ERROR, event);
00473 completionUtil::destroy(event);
00474 return ACTION_IO_ERROR;
00475 }
00476 }
00477
00478
00479
00480
00481
00482
00483
00484
00485
00486
00487
00488
00489 Action *
00490 UDPNetProcessor::sendto_re(Continuation * cont, void *token, int fd, struct sockaddr const* toaddr, int toaddrlen,
00491 IOBufferBlock * buf, int len)
00492 {
00493 (void) token;
00494 ink_assert(buf->read_avail() >= len);
00495 int nbytes_sent = socketManager.sendto(fd, buf->start(), len, 0, toaddr, toaddrlen);
00496
00497 if (nbytes_sent >= 0) {
00498 ink_assert(nbytes_sent == len);
00499 buf->consume(nbytes_sent);
00500 cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_COMPLETE, (void *) -1);
00501 return ACTION_RESULT_DONE;
00502 } else {
00503 cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_ERROR, (void *)(intptr_t)nbytes_sent);
00504 return ACTION_IO_ERROR;
00505 }
00506 }
00507
00508
00509 bool
00510 UDPNetProcessor::CreateUDPSocket(int *resfd, sockaddr const* remote_addr, sockaddr* local_addr,
00511 int *local_addr_len, Action ** status, int send_bufsize, int recv_bufsize)
00512 {
00513 int res = 0, fd = -1;
00514
00515 ink_assert(ats_ip_are_compatible(remote_addr, local_addr));
00516
00517 *resfd = -1;
00518 if ((res = socketManager.socket(remote_addr->sa_family, SOCK_DGRAM, 0)) < 0)
00519 goto HardError;
00520 fd = res;
00521 if ((res = safe_fcntl(fd, F_SETFL, O_NONBLOCK)) < 0)
00522 goto HardError;
00523 if ((res = socketManager.ink_bind(fd, remote_addr, ats_ip_size(remote_addr), IPPROTO_UDP)) < 0) {
00524 char buff[INET6_ADDRPORTSTRLEN];
00525 Debug("udpnet", "ink bind failed on %s", ats_ip_nptop(remote_addr, buff, sizeof(buff)));
00526 goto SoftError;
00527 }
00528
00529 if (recv_bufsize) {
00530 if (unlikely(socketManager.set_rcvbuf_size(fd, recv_bufsize)))
00531 Debug("udpnet", "set_dnsbuf_size(%d) failed", recv_bufsize);
00532 }
00533 if (send_bufsize) {
00534 if (unlikely(socketManager.set_sndbuf_size(fd, send_bufsize)))
00535 Debug("udpnet", "set_dnsbuf_size(%d) failed", send_bufsize);
00536 }
00537 if ((res = safe_getsockname(fd, local_addr, local_addr_len)) < 0) {
00538 Debug("udpnet", "CreateUdpsocket: getsockname didnt' work");
00539 goto HardError;
00540 }
00541 *resfd = fd;
00542 *status = NULL;
00543 Debug("udpnet", "creating a udp socket port = %d, %d---success",
00544 ats_ip_port_host_order(remote_addr), ats_ip_port_host_order(local_addr)
00545 );
00546 return true;
00547 SoftError:
00548 Debug("udpnet", "creating a udp socket port = %d---soft failure",
00549 ats_ip_port_host_order(local_addr)
00550 );
00551 if (fd != -1)
00552 socketManager.close(fd);
00553 *resfd = -1;
00554 *status = NULL;
00555 return false;
00556 HardError:
00557 Debug("udpnet", "creating a udp socket port = %d---hard failure",
00558 ats_ip_port_host_order(local_addr)
00559 );
00560 if (fd != -1)
00561 socketManager.close(fd);
00562 *resfd = -1;
00563 *status = ACTION_IO_ERROR;
00564 return false;
00565 }
00566
00567
00568 Action *
00569 UDPNetProcessor::UDPBind(Continuation * cont, sockaddr const* addr, int send_bufsize, int recv_bufsize)
00570 {
00571 int res = 0;
00572 int fd = -1;
00573 UnixUDPConnection *n = NULL;
00574 IpEndpoint myaddr;
00575 int myaddr_len = sizeof(myaddr);
00576
00577 if ((res = socketManager.socket(addr->sa_family, SOCK_DGRAM, 0)) < 0)
00578 goto Lerror;
00579 fd = res;
00580 if ((res = fcntl(fd, F_SETFL, O_NONBLOCK) < 0))
00581 goto Lerror;
00582
00583
00584 if (ats_is_ip_multicast(addr)) {
00585 int enable_reuseaddr = 1;
00586
00587 if ((res = safe_setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &enable_reuseaddr, sizeof(enable_reuseaddr)) < 0)) {
00588 goto Lerror;
00589 }
00590 }
00591
00592 if ((res = socketManager.ink_bind(fd, addr, ats_ip_size(addr))) < 0) {
00593 goto Lerror;
00594 }
00595
00596 if (recv_bufsize) {
00597 if (unlikely(socketManager.set_rcvbuf_size(fd, recv_bufsize)))
00598 Debug("udpnet", "set_dnsbuf_size(%d) failed", recv_bufsize);
00599 }
00600 if (send_bufsize) {
00601 if (unlikely(socketManager.set_sndbuf_size(fd, send_bufsize)))
00602 Debug("udpnet", "set_dnsbuf_size(%d) failed", send_bufsize);
00603 }
00604 if ((res = safe_getsockname(fd, &myaddr.sa, &myaddr_len)) < 0) {
00605 goto Lerror;
00606 }
00607 n = new UnixUDPConnection(fd);
00608
00609 Debug("udpnet", "UDPNetProcessor::UDPBind: %p fd=%d", n, fd);
00610 n->setBinding(&myaddr.sa);
00611 n->bindToThread(cont);
00612
00613 cont->handleEvent(NET_EVENT_DATAGRAM_OPEN, n);
00614 return ACTION_RESULT_DONE;
00615 Lerror:
00616 if (fd != NO_FD)
00617 socketManager.close(fd);
00618 cont->handleEvent(NET_EVENT_DATAGRAM_ERROR, NULL);
00619 return ACTION_IO_ERROR;
00620 }
00621
00622
00623
00624 UDPQueue::UDPQueue()
00625 : last_report(0), last_service(0), packets(0), added(0)
00626 { }
00627
00628 UDPQueue::~UDPQueue()
00629 {
00630 }
00631
00632
00633
00634
00635 void
00636 UDPQueue::service(UDPNetHandler * nh)
00637 {
00638 (void) nh;
00639 ink_hrtime now = ink_get_hrtime_internal();
00640 uint64_t timeSpent = 0;
00641 uint64_t pktSendStartTime;
00642 UDPPacketInternal *p;
00643 ink_hrtime pktSendTime;
00644
00645 p = (UDPPacketInternal *) ink_atomiclist_popall(&atomicQueue);
00646 if (p) {
00647 UDPPacketInternal *pnext = NULL;
00648 Queue<UDPPacketInternal> stk;
00649
00650 while (p) {
00651 pnext = p->alink.next;
00652 p->alink.next = NULL;
00653 stk.push(p);
00654 p = pnext;
00655 }
00656
00657
00658 while (stk.head) {
00659 p = stk.pop();
00660 ink_assert(p->link.prev == NULL);
00661 ink_assert(p->link.next == NULL);
00662
00663 Debug("udp-send", "Adding %p", p);
00664 if (p->conn->lastPktStartTime == 0) {
00665 pktSendStartTime = MAX(now, p->delivery_time);
00666 } else {
00667 pktSendTime = p->delivery_time;
00668 pktSendStartTime = MAX(MAX(now, pktSendTime), p->delivery_time);
00669 }
00670 p->conn->lastPktStartTime = pktSendStartTime;
00671 p->delivery_time = pktSendStartTime;
00672
00673 pipeInfo.addPacket(p, now);
00674 }
00675 }
00676
00677 pipeInfo.advanceNow(now);
00678 SendPackets();
00679
00680 timeSpent = ink_hrtime_to_msec(now - last_report);
00681 if (timeSpent > 10000) {
00682 last_report = now;
00683 added = 0;
00684 packets = 0;
00685 }
00686 last_service = now;
00687 }
00688
00689 void
00690 UDPQueue::SendPackets()
00691 {
00692 UDPPacketInternal *p;
00693 static ink_hrtime lastCleanupTime = ink_get_hrtime_internal();
00694 ink_hrtime now = ink_get_hrtime_internal();
00695 ink_hrtime send_threshold_time = now + SLOT_TIME;
00696 int32_t bytesThisSlot = INT_MAX, bytesUsed = 0;
00697 int32_t bytesThisPipe, sentOne;
00698 int64_t pktLen;
00699
00700 bytesThisSlot = INT_MAX;
00701
00702 sendPackets:
00703 sentOne = false;
00704 bytesThisPipe = (int32_t)bytesThisSlot;
00705
00706 while ((bytesThisPipe > 0) && (pipeInfo.firstPacket(send_threshold_time))) {
00707 p = pipeInfo.getFirstPacket();
00708 pktLen = p->getPktLength();
00709
00710 if (p->conn->shouldDestroy())
00711 goto next_pkt;
00712 if (p->conn->GetSendGenerationNumber() != p->reqGenerationNum)
00713 goto next_pkt;
00714
00715 SendUDPPacket(p, pktLen);
00716 bytesUsed += pktLen;
00717 bytesThisPipe -= pktLen;
00718 next_pkt:
00719 sentOne = true;
00720 p->free();
00721
00722 if (bytesThisPipe < 0)
00723 break;
00724 }
00725
00726 bytesThisSlot -= bytesUsed;
00727
00728 if ((bytesThisSlot > 0) && sentOne) {
00729
00730 now = ink_get_hrtime_internal();
00731 if (pipeInfo.firstPacket(now) == NULL) {
00732 pipeInfo.advanceNow(now);
00733 }
00734 goto sendPackets;
00735 }
00736
00737 if ((g_udp_periodicFreeCancelledPkts) && (now - lastCleanupTime > ink_hrtime_from_sec(g_udp_periodicFreeCancelledPkts))) {
00738 pipeInfo.FreeCancelledPackets(g_udp_periodicCleanupSlots);
00739 lastCleanupTime = now;
00740 }
00741 }
00742
00743 void
00744 UDPQueue::SendUDPPacket(UDPPacketInternal *p, int32_t )
00745 {
00746 IOBufferBlock *b;
00747 struct msghdr msg;
00748 struct iovec iov[32];
00749 int real_len = 0;
00750 int n, count, iov_len = 0;
00751
00752 p->conn->lastSentPktStartTime = p->delivery_time;
00753 Debug("udp-send", "Sending %p", p);
00754
00755 #if !defined(solaris)
00756 msg.msg_control = 0;
00757 msg.msg_controllen = 0;
00758 msg.msg_flags = 0;
00759 #endif
00760 msg.msg_name = (caddr_t) & p->to;
00761 msg.msg_namelen = sizeof(p->to);
00762 iov_len = 0;
00763
00764 for (b = p->chain; b != NULL; b = b->next) {
00765 iov[iov_len].iov_base = (caddr_t) b->start();
00766 iov[iov_len].iov_len = b->size();
00767 real_len += iov[iov_len].iov_len;
00768 iov_len++;
00769 }
00770 msg.msg_iov = iov;
00771 msg.msg_iovlen = iov_len;
00772
00773 count = 0;
00774 while (1) {
00775
00776 n =::sendmsg(p->conn->getFd(), &msg, 0);
00777 if ((n >= 0) || ((n < 0) && (errno != EAGAIN)))
00778
00779 break;
00780 if (errno == EAGAIN) {
00781 ++count;
00782 if ((g_udp_numSendRetries > 0) && (count >= g_udp_numSendRetries)) {
00783
00784 Debug("udpnet", "Send failed: too many retries");
00785 break;
00786 }
00787 }
00788 }
00789 }
00790
00791
00792 void
00793 UDPQueue::send(UDPPacket * p)
00794 {
00795
00796 ink_atomiclist_push(&atomicQueue, p);
00797 }
00798
00799 #undef LINK
00800
00801 UDPNetHandler::UDPNetHandler()
00802 {
00803 mutex = new_ProxyMutex();
00804 ink_atomiclist_init(&udpOutQueue.atomicQueue, "Outgoing UDP Packet queue", offsetof(UDPPacketInternal, alink.next));
00805 ink_atomiclist_init(&udpNewConnections, "UDP Connection queue", offsetof(UnixUDPConnection, newconn_alink.next));
00806 nextCheck = ink_get_hrtime_internal() + HRTIME_MSECONDS(1000);
00807 lastCheck = 0;
00808 SET_HANDLER((UDPNetContHandler) & UDPNetHandler::startNetEvent);
00809 }
00810
00811 int
00812 UDPNetHandler::startNetEvent(int event, Event * e)
00813 {
00814 (void) event;
00815 SET_HANDLER((UDPNetContHandler) & UDPNetHandler::mainNetEvent);
00816 trigger_event = e;
00817 e->schedule_every(-HRTIME_MSECONDS(9));
00818 return EVENT_CONT;
00819 }
00820
00821 int
00822 UDPNetHandler::mainNetEvent(int event, Event * e)
00823 {
00824 ink_assert(trigger_event == e && event == EVENT_POLL);
00825 (void) event;
00826 (void) e;
00827
00828 PollCont *pc = get_UDPPollCont(e->ethread);
00829
00830
00831 udpOutQueue.service(this);
00832
00833
00834 UnixUDPConnection *uc, *next;
00835 int i;
00836 int nread = 0;
00837
00838 EventIO *temp_eptr = NULL;
00839 for (i = 0; i < pc->pollDescriptor->result; i++) {
00840 temp_eptr = (EventIO*) get_ev_data(pc->pollDescriptor,i);
00841 if ((get_ev_events(pc->pollDescriptor,i) & EVENTIO_READ)
00842 && temp_eptr->type == EVENTIO_UDP_CONNECTION) {
00843 uc = temp_eptr->data.uc;
00844 ink_assert(uc && uc->mutex && uc->continuation);
00845 ink_assert(uc->refcount >= 1);
00846 if (uc->shouldDestroy()) {
00847
00848 uc->Release();
00849 } else {
00850 udpNetInternal.udp_read_from_net(this, uc);
00851 nread++;
00852 }
00853 }
00854 }
00855
00856
00857 ink_hrtime now = ink_get_hrtime_internal();
00858 if (now >= nextCheck) {
00859 for (uc = udp_polling.head; uc; uc = next) {
00860 ink_assert(uc->mutex && uc->continuation);
00861 ink_assert(uc->refcount >= 1);
00862 next = uc->polling_link.next;
00863 if (uc->shouldDestroy()) {
00864
00865
00866 uc->Release();
00867 }
00868 }
00869 nextCheck = ink_get_hrtime_internal() + HRTIME_MSECONDS(1000);
00870 }
00871
00872 Que(UnixUDPConnection, callback_link) q = udp_callbacks;
00873 udp_callbacks.clear();
00874 while ((uc = q.dequeue())) {
00875 ink_assert(uc->mutex && uc->continuation);
00876 if (udpNetInternal.udp_callback(this, uc, trigger_event->ethread)) {
00877
00878 ink_assert(uc->callback_link.next == NULL);
00879 ink_assert(uc->callback_link.prev == NULL);
00880 udp_callbacks.enqueue(uc);
00881 } else {
00882 ink_assert(uc->callback_link.next == NULL);
00883 ink_assert(uc->callback_link.prev == NULL);
00884 uc->onCallbackQueue = 0;
00885 uc->Release();
00886 }
00887 }
00888
00889 return EVENT_CONT;
00890 }