• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

UnixUDPNet.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 /****************************************************************************
00025 
00026   UnixUDPNet.cc
00027   UDPNet implementation
00028 
00029 
00030  ****************************************************************************/
00031 
00032 #include "P_Net.h"
00033 #include "P_UDPNet.h"
00034 
00035 typedef int (UDPNetHandler::*UDPNetContHandler) (int, void *);
00036 
00037 inkcoreapi ClassAllocator<UDPPacketInternal> udpPacketAllocator("udpPacketAllocator");
00038 EventType ET_UDP;
00039 
00040 #if defined(linux) && !defined(DEBUG)
00041 #define NODIAGS
00042 #endif
00043 
00044 //
00045 // Global Data
00046 //
00047 
00048 UDPNetProcessorInternal udpNetInternal;
00049 UDPNetProcessor &udpNet = udpNetInternal;
00050 
00051 int32_t g_udp_periodicCleanupSlots;
00052 int32_t g_udp_periodicFreeCancelledPkts;
00053 int32_t g_udp_numSendRetries;
00054 
00055 #include "P_LibBulkIO.h"
00056 
00057 //
00058 // Public functions
00059 // See header for documentation
00060 //
00061 int G_bwGrapherFd;
00062 sockaddr_in6 G_bwGrapherLoc;
00063 
00064 void
00065 initialize_thread_for_udp_net(EThread * thread)
00066 {
00067   new((ink_dummy_for_new *) get_UDPPollCont(thread)) PollCont(thread->mutex);
00068   new((ink_dummy_for_new *) get_UDPNetHandler(thread)) UDPNetHandler;
00069 
00070   // This variable controls how often we cleanup the cancelled packets.
00071   // If it is set to 0, then cleanup never occurs.
00072   REC_ReadConfigInt32(g_udp_periodicFreeCancelledPkts, "proxy.config.udp.free_cancelled_pkts_sec");
00073 
00074   // This variable controls how many "slots" of the udp calendar queue we cleanup.
00075   // If it is set to 0, then cleanup never occurs.  This value makes sense
00076   // only if the above variable is set.
00077   REC_ReadConfigInt32(g_udp_periodicCleanupSlots, "proxy.config.udp.periodic_cleanup");
00078 
00079   // UDP sends can fail with errno=EAGAIN.  This variable determines the # of
00080   // times the UDP thread retries before giving up.  Set to 0 to keep trying forever.
00081   REC_ReadConfigInt32(g_udp_numSendRetries, "proxy.config.udp.send_retries");
00082   g_udp_numSendRetries = g_udp_numSendRetries < 0 ? 0 : g_udp_numSendRetries;
00083 
00084   thread->schedule_every(get_UDPPollCont(thread), -9);
00085   thread->schedule_imm(get_UDPNetHandler(thread));
00086 }
00087 
00088 int
00089 UDPNetProcessorInternal::start(int n_upd_threads, size_t stacksize)
00090 {
00091   if (n_upd_threads < 1)
00092     return -1;
00093 
00094   ET_UDP = eventProcessor.spawn_event_threads(n_upd_threads, "ET_UDP", stacksize);
00095   if (ET_UDP < 0)               // Probably can't happen, maybe at some point EventType should be unsigned ?
00096     return -1;
00097 
00098   pollCont_offset = eventProcessor.allocate(sizeof(PollCont));
00099   udpNetHandler_offset = eventProcessor.allocate(sizeof(UDPNetHandler));
00100 
00101   for (int i = 0; i < eventProcessor.n_threads_for_type[ET_UDP]; i++)
00102     initialize_thread_for_udp_net(eventProcessor.eventthread[ET_UDP][i]);
00103 
00104   return 0;
00105 }
00106 
00107 void
00108 UDPNetProcessorInternal::udp_read_from_net(UDPNetHandler * nh, UDPConnection * xuc)
00109 {
00110   UnixUDPConnection *uc = (UnixUDPConnection *) xuc;
00111 
00112   // receive packet and queue onto UDPConnection.
00113   // don't call back connection at this time.
00114   int r;
00115   int iters = 0;
00116   do {
00117     sockaddr_in6 fromaddr;
00118     socklen_t fromlen = sizeof(fromaddr);
00119     // XXX: want to be 0 copy.
00120     // XXX: really should read into next contiguous region of an IOBufferData
00121     // which gets referenced by IOBufferBlock.
00122     char buf[65536];
00123     int buflen = sizeof(buf);
00124     r = socketManager.recvfrom(uc->getFd(), buf, buflen, 0, (struct sockaddr *) &fromaddr, &fromlen);
00125     if (r <= 0) {
00126       // error
00127       break;
00128     }
00129     // create packet
00130     UDPPacket *p = new_incoming_UDPPacket(ats_ip_sa_cast(&fromaddr), buf, r);
00131     p->setConnection(uc);
00132     // queue onto the UDPConnection
00133     ink_atomiclist_push(&uc->inQueue, p);
00134     iters++;
00135   } while (r > 0);
00136   if (iters >= 1) {
00137     Debug("udp-read", "read %d at a time", iters);
00138   }
00139   // if not already on to-be-called-back queue, then add it.
00140   if (!uc->onCallbackQueue) {
00141     ink_assert(uc->callback_link.next == NULL);
00142     ink_assert(uc->callback_link.prev == NULL);
00143     uc->AddRef();
00144     nh->udp_callbacks.enqueue(uc);
00145     uc->onCallbackQueue = 1;
00146   }
00147 }
00148 
00149 
00150 int
00151 UDPNetProcessorInternal::udp_callback(UDPNetHandler * nh, UDPConnection * xuc, EThread * thread)
00152 {
00153   (void) nh;
00154   UnixUDPConnection *uc = (UnixUDPConnection *) xuc;
00155 
00156   if (uc->continuation && uc->mutex) {
00157     MUTEX_TRY_LOCK_FOR(lock, uc->mutex, thread, uc->continuation);
00158     if (!lock) {
00159       return 1;
00160     }
00161     uc->AddRef();
00162     uc->callbackHandler(0, 0);
00163     return 0;
00164   } else {
00165     ink_assert(!"doesn't reach here");
00166     if (!uc->callbackAction) {
00167       uc->AddRef();
00168       uc->callbackAction = eventProcessor.schedule_imm(uc);
00169     }
00170     return 0;
00171   }
00172 }
00173 
00174 // cheesy implementation of a asynchronous read and callback for Unix
00175 class UDPReadContinuation:public Continuation
00176 {
00177 public:
00178   UDPReadContinuation(Event * completionToken);
00179   UDPReadContinuation();
00180   ~UDPReadContinuation();
00181   inline void free(void);
00182   inline void init_token(Event * completionToken);
00183   inline void init_read(int fd, IOBufferBlock * buf, int len, struct sockaddr *fromaddr, socklen_t *fromaddrlen);
00184 
00185   void set_timer(int seconds)
00186   {
00187     timeout_interval = HRTIME_SECONDS(seconds);
00188   }
00189 
00190   void cancel();
00191   int readPollEvent(int event, Event * e);
00192 
00193   Action *getAction()
00194   {
00195     return event;
00196   }
00197 
00198   void setupPollDescriptor();
00199 
00200 private:
00201   Event * event;                // the completion event token created
00202   // on behalf of the client
00203   Ptr<IOBufferBlock> readbuf;
00204   int readlen;
00205   struct sockaddr_in6 *fromaddr;
00206   socklen_t *fromaddrlen;
00207   int fd;                       // fd we are reading from
00208   int ifd;                      // poll fd index
00209   ink_hrtime period;            // polling period
00210   ink_hrtime elapsed_time;
00211   ink_hrtime timeout_interval;
00212 };
00213 
00214 ClassAllocator<UDPReadContinuation> udpReadContAllocator("udpReadContAllocator");
00215 
00216 #define UNINITIALIZED_EVENT_PTR (Event *)0xdeadbeef
00217 
00218 UDPReadContinuation::UDPReadContinuation(Event * completionToken)
00219   : Continuation(NULL), event(completionToken), readbuf(NULL), readlen(0), fromaddrlen(0), fd(-1),
00220     ifd(-1), period(0), elapsed_time(0), timeout_interval(0)
00221 {
00222   if (completionToken->continuation)
00223     this->mutex = completionToken->continuation->mutex;
00224   else
00225     this->mutex = new_ProxyMutex();
00226 }
00227 
00228 UDPReadContinuation::UDPReadContinuation()
00229   : Continuation(NULL), event(UNINITIALIZED_EVENT_PTR), readbuf(NULL), readlen(0), fromaddrlen(0), fd(-1),
00230     ifd(-1), period(0), elapsed_time(0), timeout_interval(0)
00231 { }
00232 
00233 inline void
00234 UDPReadContinuation::free(void)
00235 {
00236   ink_assert(event != NULL);
00237   completionUtil::destroy(event);
00238   event = NULL;
00239   readbuf = NULL;
00240   readlen = 0;
00241   fromaddrlen = 0;
00242   fd = -1;
00243   ifd = -1;
00244   period = 0;
00245   elapsed_time = 0;
00246   timeout_interval = 0;
00247   mutex = NULL;
00248   udpReadContAllocator.free(this);
00249 }
00250 
00251 inline void
00252 UDPReadContinuation::init_token(Event * completionToken)
00253 {
00254   if (completionToken->continuation) {
00255     this->mutex = completionToken->continuation->mutex;
00256   } else {
00257     this->mutex = new_ProxyMutex();
00258   }
00259   event = completionToken;
00260 }
00261 
00262 inline void
00263 UDPReadContinuation::init_read(int rfd, IOBufferBlock * buf, int len, struct sockaddr *fromaddr_, socklen_t *fromaddrlen_)
00264 {
00265   ink_assert(rfd >= 0 && buf != NULL && fromaddr_ != NULL && fromaddrlen_ != NULL);
00266   fd = rfd;
00267   readbuf = buf;
00268   readlen = len;
00269   fromaddr = ats_ip6_cast(fromaddr_);
00270   fromaddrlen = fromaddrlen_;
00271   SET_HANDLER(&UDPReadContinuation::readPollEvent);
00272   period = NET_PERIOD;
00273   setupPollDescriptor();
00274   this_ethread()->schedule_every(this, period);
00275 }
00276 
00277 UDPReadContinuation::~UDPReadContinuation()
00278 {
00279   if (event != UNINITIALIZED_EVENT_PTR)
00280   {
00281     ink_assert(event != NULL);
00282     completionUtil::destroy(event);
00283     event = NULL;
00284   }
00285 }
00286 
00287 void
00288 UDPReadContinuation::cancel()
00289 {
00290   // I don't think this actually cancels it correctly right now.
00291   event->cancel();
00292 }
00293 
00294 void
00295 UDPReadContinuation::setupPollDescriptor()
00296 {
00297 #if TS_USE_EPOLL
00298   Pollfd *pfd;
00299   EThread *et = (EThread *) this_thread();
00300   PollCont *pc = get_PollCont(et);
00301   if (pc->nextPollDescriptor == NULL) {
00302     pc->nextPollDescriptor = new PollDescriptor;
00303     pc->nextPollDescriptor->init();
00304   }
00305   pfd = pc->nextPollDescriptor->alloc();
00306   pfd->fd = fd;
00307   ifd = pfd - pc->nextPollDescriptor->pfd;
00308   ink_assert(pc->nextPollDescriptor->nfds > ifd);
00309   pfd->events = POLLIN;
00310   pfd->revents = 0;
00311 #endif
00312 }
00313 
00314 int
00315 UDPReadContinuation::readPollEvent(int event_, Event * e)
00316 {
00317   (void) event_;
00318   (void) e;
00319 
00320   //PollCont *pc = get_PollCont(e->ethread);
00321   Continuation *c;
00322 
00323   if (event->cancelled) {
00324     e->cancel();
00325     free();
00326     //    delete this;
00327     return EVENT_DONE;
00328   }
00329   // See if the request has timed out
00330   if (timeout_interval) {
00331     elapsed_time += -period;
00332     if (elapsed_time >= timeout_interval) {
00333       c = completionUtil::getContinuation(event);
00334       // TODO: Should we deal with the return code?
00335       c->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event);
00336       e->cancel();
00337       free();
00338       //      delete this;
00339       return EVENT_DONE;
00340     }
00341   }
00342   //ink_assert(ifd < 0 || event_ == EVENT_INTERVAL || (event_ == EVENT_POLL && pc->pollDescriptor->nfds > ifd && pc->pollDescriptor->pfd[ifd].fd == fd));
00343   //if (ifd < 0 || event_ == EVENT_INTERVAL || (pc->pollDescriptor->pfd[ifd].revents & POLLIN)) {
00344   //ink_assert(!"incomplete");
00345   c = completionUtil::getContinuation(event);
00346   // do read
00347   socklen_t tmp_fromlen = *fromaddrlen;
00348   int rlen = socketManager.recvfrom(fd, readbuf->end(), readlen, 0, ats_ip_sa_cast(fromaddr), &tmp_fromlen);
00349 
00350   completionUtil::setThread(event, e->ethread);
00351   // call back user with their event
00352   if (rlen > 0) {
00353     // do callback if read is successful
00354     *fromaddrlen = tmp_fromlen;
00355     completionUtil::setInfo(event, fd, readbuf, rlen, errno);
00356     readbuf->fill(rlen);
00357     // TODO: Should we deal with the return code?
00358     c->handleEvent(NET_EVENT_DATAGRAM_READ_COMPLETE, event);
00359     e->cancel();
00360     free();
00361     // delete this;
00362     return EVENT_DONE;
00363   } else if (rlen < 0 && rlen != -EAGAIN) {
00364     // signal error.
00365     *fromaddrlen = tmp_fromlen;
00366     completionUtil::setInfo(event, fd, (IOBufferBlock *) readbuf, rlen, errno);
00367     c = completionUtil::getContinuation(event);
00368     // TODO: Should we deal with the return code?
00369     c->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event);
00370     e->cancel();
00371     free();
00372     //delete this;
00373     return EVENT_DONE;
00374   } else {
00375     completionUtil::setThread(event, NULL);
00376   }
00377 
00378   if (event->cancelled) {
00379     e->cancel();
00380     free();
00381     //delete this;
00382     return EVENT_DONE;
00383   }
00384   // reestablish poll
00385   setupPollDescriptor();
00386 
00387   return EVENT_CONT;
00388 }
00389 
00390 /* recvfrom:
00391  * Unix:
00392  *   assert(buf->write_avail() >= len);
00393  *   *actual_len = recvfrom(fd,addr,buf->end(),len)
00394  *   if successful then
00395  *      buf->fill(*actual_len);
00396  *          return ACTION_RESULT_DONE
00397  *   else if nothing read
00398  *      *actual_len is 0
00399  *      create "UDP read continuation" C with 'cont's lock
00400  *         set user callback to 'cont'
00401  *      return C's action.
00402  *   else
00403  *      return error;
00404  */
00405 Action *
00406 UDPNetProcessor::recvfrom_re(Continuation * cont,
00407                              void *token,
00408                              int fd,
00409                              struct sockaddr * fromaddr, socklen_t *fromaddrlen,
00410                              IOBufferBlock * buf, int len, bool useReadCont, int timeout)
00411 {
00412   (void) useReadCont;
00413   ink_assert(buf->write_avail() >= len);
00414   int actual;
00415   Event *event = completionUtil::create();
00416 
00417   completionUtil::setContinuation(event, cont);
00418   completionUtil::setHandle(event, token);
00419   actual = socketManager.recvfrom(fd, buf->end(), len, 0, fromaddr, fromaddrlen);
00420 
00421   if (actual > 0) {
00422     completionUtil::setThread(event, this_ethread());
00423     completionUtil::setInfo(event, fd, buf, actual, errno);
00424     buf->fill(actual);
00425     cont->handleEvent(NET_EVENT_DATAGRAM_READ_COMPLETE, event);
00426     completionUtil::destroy(event);
00427     return ACTION_RESULT_DONE;
00428   } else if (actual == 0 || (actual < 0 && actual == -EAGAIN)) {
00429     UDPReadContinuation *c = udpReadContAllocator.alloc();
00430     c->init_token(event);
00431     c->init_read(fd, buf, len, fromaddr, fromaddrlen);
00432     if (timeout) {
00433       c->set_timer(timeout);
00434     }
00435     return event;
00436   } else {
00437     completionUtil::setThread(event, this_ethread());
00438     completionUtil::setInfo(event, fd, buf, actual, errno);
00439     cont->handleEvent(NET_EVENT_DATAGRAM_READ_ERROR, event);
00440     completionUtil::destroy(event);
00441     return ACTION_IO_ERROR;
00442   }
00443 }
00444 
00445 /* sendmsg:
00446  * Unix:
00447  *   *actual_len = sendmsg(fd,msg,default-flags);
00448  *   if successful,
00449  *      return ACTION_RESULT_DONE
00450  *   else
00451  *      return error
00452  */
00453 Action *
00454 UDPNetProcessor::sendmsg_re(Continuation * cont, void *token, int fd, struct msghdr * msg)
00455 {
00456   int actual;
00457   Event *event = completionUtil::create();
00458 
00459   completionUtil::setContinuation(event, cont);
00460   completionUtil::setHandle(event, token);
00461 
00462   actual = socketManager.sendmsg(fd, msg, 0);
00463   if (actual >= 0) {
00464     completionUtil::setThread(event, this_ethread());
00465     completionUtil::setInfo(event, fd, msg, actual, errno);
00466     cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_COMPLETE, event);
00467     completionUtil::destroy(event);
00468     return ACTION_RESULT_DONE;
00469   } else {
00470     completionUtil::setThread(event, this_ethread());
00471     completionUtil::setInfo(event, fd, msg, actual, errno);
00472     cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_ERROR, event);
00473     completionUtil::destroy(event);
00474     return ACTION_IO_ERROR;
00475   }
00476 }
00477 
00478 /* sendto:
00479  * If this were implemented, it might be implemented like this:
00480  * Unix:
00481  *   call sendto(fd,addr,buf->reader()->start(),len);
00482  *   if successful,
00483  *      buf->consume(len);
00484  *      return ACTION_RESULT_DONE
00485  *   else
00486  *      return error
00487  *
00488  */
00489 Action *
00490 UDPNetProcessor::sendto_re(Continuation * cont, void *token, int fd, struct sockaddr const* toaddr, int toaddrlen,
00491                            IOBufferBlock * buf, int len)
00492 {
00493   (void) token;
00494   ink_assert(buf->read_avail() >= len);
00495   int nbytes_sent = socketManager.sendto(fd, buf->start(), len, 0, toaddr, toaddrlen);
00496 
00497   if (nbytes_sent >= 0) {
00498     ink_assert(nbytes_sent == len);
00499     buf->consume(nbytes_sent);
00500     cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_COMPLETE, (void *) -1);
00501     return ACTION_RESULT_DONE;
00502   } else {
00503     cont->handleEvent(NET_EVENT_DATAGRAM_WRITE_ERROR, (void *)(intptr_t)nbytes_sent);
00504     return ACTION_IO_ERROR;
00505   }
00506 }
00507 
00508 
00509 bool
00510 UDPNetProcessor::CreateUDPSocket(int *resfd, sockaddr const* remote_addr, sockaddr* local_addr,
00511                                  int *local_addr_len, Action ** status, int send_bufsize, int recv_bufsize)
00512 {
00513   int res = 0, fd = -1;
00514 
00515   ink_assert(ats_ip_are_compatible(remote_addr, local_addr));
00516 
00517   *resfd = -1;
00518   if ((res = socketManager.socket(remote_addr->sa_family, SOCK_DGRAM, 0)) < 0)
00519     goto HardError;
00520   fd = res;
00521   if ((res = safe_fcntl(fd, F_SETFL, O_NONBLOCK)) < 0)
00522     goto HardError;
00523   if ((res = socketManager.ink_bind(fd, remote_addr, ats_ip_size(remote_addr), IPPROTO_UDP)) < 0) {
00524     char buff[INET6_ADDRPORTSTRLEN];
00525     Debug("udpnet", "ink bind failed on %s", ats_ip_nptop(remote_addr, buff, sizeof(buff)));
00526     goto SoftError;
00527   }
00528 
00529   if (recv_bufsize) {
00530     if (unlikely(socketManager.set_rcvbuf_size(fd, recv_bufsize)))
00531       Debug("udpnet", "set_dnsbuf_size(%d) failed", recv_bufsize);
00532   }
00533   if (send_bufsize) {
00534     if (unlikely(socketManager.set_sndbuf_size(fd, send_bufsize)))
00535       Debug("udpnet", "set_dnsbuf_size(%d) failed", send_bufsize);
00536   }
00537   if ((res = safe_getsockname(fd, local_addr, local_addr_len)) < 0) {
00538     Debug("udpnet", "CreateUdpsocket: getsockname didnt' work");
00539     goto HardError;
00540   }
00541   *resfd = fd;
00542   *status = NULL;
00543   Debug("udpnet", "creating a udp socket port = %d, %d---success",
00544     ats_ip_port_host_order(remote_addr), ats_ip_port_host_order(local_addr)
00545   );
00546   return true;
00547 SoftError:
00548   Debug("udpnet", "creating a udp socket port = %d---soft failure",
00549     ats_ip_port_host_order(local_addr)
00550   );
00551   if (fd != -1)
00552     socketManager.close(fd);
00553   *resfd = -1;
00554   *status = NULL;
00555   return false;
00556 HardError:
00557   Debug("udpnet", "creating a udp socket port = %d---hard failure",
00558     ats_ip_port_host_order(local_addr)
00559   );
00560   if (fd != -1)
00561     socketManager.close(fd);
00562   *resfd = -1;
00563   *status = ACTION_IO_ERROR;
00564   return false;
00565 }
00566 
00567 
00568 Action *
00569 UDPNetProcessor::UDPBind(Continuation * cont, sockaddr const* addr, int send_bufsize, int recv_bufsize)
00570 {
00571   int res = 0;
00572   int fd = -1;
00573   UnixUDPConnection *n = NULL;
00574   IpEndpoint myaddr;
00575   int myaddr_len = sizeof(myaddr);
00576 
00577   if ((res = socketManager.socket(addr->sa_family, SOCK_DGRAM, 0)) < 0)
00578     goto Lerror;
00579   fd = res;
00580   if ((res = fcntl(fd, F_SETFL, O_NONBLOCK) < 0))
00581     goto Lerror;
00582 
00583   // If this is a class D address (i.e. multicast address), use REUSEADDR.
00584   if (ats_is_ip_multicast(addr)) {
00585     int enable_reuseaddr = 1;
00586 
00587     if ((res = safe_setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &enable_reuseaddr, sizeof(enable_reuseaddr)) < 0)) {
00588       goto Lerror;
00589     }
00590   }
00591 
00592   if ((res = socketManager.ink_bind(fd, addr, ats_ip_size(addr))) < 0) {
00593     goto Lerror;
00594   }
00595 
00596   if (recv_bufsize) {
00597     if (unlikely(socketManager.set_rcvbuf_size(fd, recv_bufsize)))
00598       Debug("udpnet", "set_dnsbuf_size(%d) failed", recv_bufsize);
00599   }
00600   if (send_bufsize) {
00601     if (unlikely(socketManager.set_sndbuf_size(fd, send_bufsize)))
00602       Debug("udpnet", "set_dnsbuf_size(%d) failed", send_bufsize);
00603   }
00604   if ((res = safe_getsockname(fd, &myaddr.sa, &myaddr_len)) < 0) {
00605     goto Lerror;
00606   }
00607   n = new UnixUDPConnection(fd);
00608 
00609   Debug("udpnet", "UDPNetProcessor::UDPBind: %p fd=%d", n, fd);
00610   n->setBinding(&myaddr.sa);
00611   n->bindToThread(cont);
00612 
00613   cont->handleEvent(NET_EVENT_DATAGRAM_OPEN, n);
00614   return ACTION_RESULT_DONE;
00615 Lerror:
00616   if (fd != NO_FD)
00617     socketManager.close(fd);
00618   cont->handleEvent(NET_EVENT_DATAGRAM_ERROR, NULL);
00619   return ACTION_IO_ERROR;
00620 }
00621 
00622 
00623 // send out all packets that need to be sent out as of time=now
00624 UDPQueue::UDPQueue()
00625   : last_report(0), last_service(0), packets(0), added(0)
00626 { }
00627 
00628 UDPQueue::~UDPQueue()
00629 {
00630 }
00631 
00632 /*
00633  * Driver function that aggregates packets across cont's and sends them
00634  */
00635 void
00636 UDPQueue::service(UDPNetHandler * nh)
00637 {
00638   (void) nh;
00639   ink_hrtime now = ink_get_hrtime_internal();
00640   uint64_t timeSpent = 0;
00641   uint64_t pktSendStartTime;
00642   UDPPacketInternal *p;
00643   ink_hrtime pktSendTime;
00644 
00645   p = (UDPPacketInternal *) ink_atomiclist_popall(&atomicQueue);
00646   if (p) {
00647     UDPPacketInternal *pnext = NULL;
00648     Queue<UDPPacketInternal> stk;
00649 
00650     while (p) {
00651       pnext = p->alink.next;
00652       p->alink.next = NULL;
00653       stk.push(p);
00654       p = pnext;
00655     }
00656 
00657     // walk backwards down list since this is actually an atomic stack.
00658     while (stk.head) {
00659       p = stk.pop();
00660       ink_assert(p->link.prev == NULL);
00661       ink_assert(p->link.next == NULL);
00662       // insert into our queue.
00663       Debug("udp-send", "Adding %p", p);
00664       if (p->conn->lastPktStartTime == 0) {
00665         pktSendStartTime = MAX(now, p->delivery_time);
00666       } else {
00667         pktSendTime = p->delivery_time;
00668         pktSendStartTime = MAX(MAX(now, pktSendTime), p->delivery_time);
00669       }
00670       p->conn->lastPktStartTime = pktSendStartTime;
00671       p->delivery_time = pktSendStartTime;
00672 
00673       pipeInfo.addPacket(p, now);
00674     }
00675   }
00676 
00677   pipeInfo.advanceNow(now);
00678   SendPackets();
00679 
00680   timeSpent = ink_hrtime_to_msec(now - last_report);
00681   if (timeSpent > 10000) {
00682     last_report = now;
00683     added = 0;
00684     packets = 0;
00685   }
00686   last_service = now;
00687 }
00688 
00689 void
00690 UDPQueue::SendPackets()
00691 {
00692   UDPPacketInternal *p;
00693   static ink_hrtime lastCleanupTime = ink_get_hrtime_internal();
00694   ink_hrtime now = ink_get_hrtime_internal();
00695   ink_hrtime send_threshold_time = now + SLOT_TIME;
00696   int32_t bytesThisSlot = INT_MAX, bytesUsed = 0;
00697   int32_t bytesThisPipe, sentOne;
00698   int64_t pktLen;
00699 
00700   bytesThisSlot = INT_MAX;
00701 
00702 sendPackets:
00703   sentOne = false;
00704   bytesThisPipe = (int32_t)bytesThisSlot;
00705 
00706   while ((bytesThisPipe > 0) && (pipeInfo.firstPacket(send_threshold_time))) {
00707     p = pipeInfo.getFirstPacket();
00708     pktLen = p->getPktLength();
00709 
00710     if (p->conn->shouldDestroy())
00711       goto next_pkt;
00712     if (p->conn->GetSendGenerationNumber() != p->reqGenerationNum)
00713       goto next_pkt;
00714 
00715     SendUDPPacket(p, pktLen);
00716     bytesUsed += pktLen;
00717     bytesThisPipe -= pktLen;
00718   next_pkt:
00719     sentOne = true;
00720     p->free();
00721 
00722     if (bytesThisPipe < 0)
00723       break;
00724   }
00725 
00726   bytesThisSlot -= bytesUsed;
00727 
00728   if ((bytesThisSlot > 0) && sentOne) {
00729     // redistribute the slack...
00730     now = ink_get_hrtime_internal();
00731     if (pipeInfo.firstPacket(now) == NULL) {
00732       pipeInfo.advanceNow(now);
00733     }
00734     goto sendPackets;
00735   }
00736 
00737   if ((g_udp_periodicFreeCancelledPkts) && (now - lastCleanupTime > ink_hrtime_from_sec(g_udp_periodicFreeCancelledPkts))) {
00738     pipeInfo.FreeCancelledPackets(g_udp_periodicCleanupSlots);
00739     lastCleanupTime = now;
00740   }
00741 }
00742 
00743 void
00744 UDPQueue::SendUDPPacket(UDPPacketInternal *p, int32_t /* pktLen ATS_UNUSED */)
00745 {
00746   IOBufferBlock *b;
00747   struct msghdr msg;
00748   struct iovec iov[32];
00749   int real_len = 0;
00750   int n, count, iov_len = 0;
00751 
00752   p->conn->lastSentPktStartTime = p->delivery_time;
00753   Debug("udp-send", "Sending %p", p);
00754 
00755 #if !defined(solaris)
00756   msg.msg_control = 0;
00757   msg.msg_controllen = 0;
00758   msg.msg_flags = 0;
00759 #endif
00760   msg.msg_name = (caddr_t) & p->to;
00761   msg.msg_namelen = sizeof(p->to);
00762   iov_len = 0;
00763 
00764   for (b = p->chain; b != NULL; b = b->next) {
00765     iov[iov_len].iov_base = (caddr_t) b->start();
00766     iov[iov_len].iov_len = b->size();
00767     real_len += iov[iov_len].iov_len;
00768     iov_len++;
00769   }
00770   msg.msg_iov = iov;
00771   msg.msg_iovlen = iov_len;
00772 
00773   count = 0;
00774   while (1) {
00775     // stupid Linux problem: sendmsg can return EAGAIN
00776     n =::sendmsg(p->conn->getFd(), &msg, 0);
00777     if ((n >= 0) || ((n < 0) && (errno != EAGAIN)))
00778       // send succeeded or some random error happened.
00779       break;
00780     if (errno == EAGAIN) {
00781       ++count;
00782       if ((g_udp_numSendRetries > 0) && (count >= g_udp_numSendRetries)) {
00783         // tried too many times; give up
00784         Debug("udpnet", "Send failed: too many retries");
00785         break;
00786       }
00787     }
00788   }
00789 }
00790 
00791 
00792 void
00793 UDPQueue::send(UDPPacket * p)
00794 {
00795   // XXX: maybe fastpath for immediate send?
00796   ink_atomiclist_push(&atomicQueue, p);
00797 }
00798 
00799 #undef LINK
00800 
00801 UDPNetHandler::UDPNetHandler()
00802 {
00803   mutex = new_ProxyMutex();
00804   ink_atomiclist_init(&udpOutQueue.atomicQueue, "Outgoing UDP Packet queue", offsetof(UDPPacketInternal, alink.next));
00805   ink_atomiclist_init(&udpNewConnections, "UDP Connection queue", offsetof(UnixUDPConnection, newconn_alink.next));
00806   nextCheck = ink_get_hrtime_internal() + HRTIME_MSECONDS(1000);
00807   lastCheck = 0;
00808   SET_HANDLER((UDPNetContHandler) & UDPNetHandler::startNetEvent);
00809 }
00810 
00811 int
00812 UDPNetHandler::startNetEvent(int event, Event * e)
00813 {
00814   (void) event;
00815   SET_HANDLER((UDPNetContHandler) & UDPNetHandler::mainNetEvent);
00816   trigger_event = e;
00817   e->schedule_every(-HRTIME_MSECONDS(9));
00818   return EVENT_CONT;
00819 }
00820 
00821 int
00822 UDPNetHandler::mainNetEvent(int event, Event * e)
00823 {
00824   ink_assert(trigger_event == e && event == EVENT_POLL);
00825   (void) event;
00826   (void) e;
00827 
00828   PollCont *pc = get_UDPPollCont(e->ethread);
00829 
00830   // handle UDP outgoing engine
00831   udpOutQueue.service(this);
00832 
00833   // handle UDP read operations
00834   UnixUDPConnection *uc, *next;
00835   int i;
00836   int nread = 0;
00837 
00838   EventIO *temp_eptr = NULL;
00839   for (i = 0; i < pc->pollDescriptor->result; i++) {
00840     temp_eptr = (EventIO*) get_ev_data(pc->pollDescriptor,i);
00841     if ((get_ev_events(pc->pollDescriptor,i) & EVENTIO_READ)
00842         && temp_eptr->type == EVENTIO_UDP_CONNECTION) {
00843       uc = temp_eptr->data.uc;
00844       ink_assert(uc && uc->mutex && uc->continuation);
00845       ink_assert(uc->refcount >= 1);
00846       if (uc->shouldDestroy()) {
00847         // udp_polling->remove(uc,uc->polling_link);
00848         uc->Release();
00849       } else {
00850         udpNetInternal.udp_read_from_net(this, uc);
00851         nread++;
00852       }
00853     }                           //if EPOLLIN
00854   }                             //end for
00855 
00856   // remove dead UDP connections
00857   ink_hrtime now = ink_get_hrtime_internal();
00858   if (now >= nextCheck) {
00859     for (uc = udp_polling.head; uc; uc = next) {
00860       ink_assert(uc->mutex && uc->continuation);
00861       ink_assert(uc->refcount >= 1);
00862       next = uc->polling_link.next;
00863       if (uc->shouldDestroy()) {
00864         //changed by YTS Team, yamsat
00865         //udp_polling->remove(uc,uc->polling_link);
00866         uc->Release();
00867       }
00868     }
00869     nextCheck = ink_get_hrtime_internal() + HRTIME_MSECONDS(1000);
00870   }
00871   // service UDPConnections with data ready for callback.
00872   Que(UnixUDPConnection, callback_link) q = udp_callbacks;
00873   udp_callbacks.clear();
00874   while ((uc = q.dequeue())) {
00875     ink_assert(uc->mutex && uc->continuation);
00876     if (udpNetInternal.udp_callback(this, uc, trigger_event->ethread)) {        // not successful
00877       // schedule on a thread of its own.
00878       ink_assert(uc->callback_link.next == NULL);
00879       ink_assert(uc->callback_link.prev == NULL);
00880       udp_callbacks.enqueue(uc);
00881     } else {
00882       ink_assert(uc->callback_link.next == NULL);
00883       ink_assert(uc->callback_link.prev == NULL);
00884       uc->onCallbackQueue = 0;
00885       uc->Release();
00886     }
00887   }
00888 
00889   return EVENT_CONT;
00890 }

Generated by  doxygen 1.7.1