• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

P_UDPNet.h

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 /****************************************************************************
00025   P_UDPNet.h
00026   Private header for UDPNetProcessor
00027 
00028 
00029  ****************************************************************************/
00030 
00031 #ifndef __P_UDPNET_H_
00032 #define __P_UDPNET_H_
00033 
00034 extern EventType ET_UDP;
00035 
00036 #include "I_UDPNet.h"
00037 #include "P_UDPPacket.h"
00038 
00039 //added by YTS Team, yamsat
00040 static inline PollCont *get_UDPPollCont(EThread *);
00041 
00042 #include "P_UnixUDPConnection.h"
00043 #include "P_UDPIOEvent.h"
00044 
00045 struct UDPNetHandler;
00046 
00047 struct UDPNetProcessorInternal : public UDPNetProcessor
00048 {
00049   virtual int start(int n_udp_threads, size_t stacksize);
00050   void udp_read_from_net(UDPNetHandler * nh, UDPConnection * uc);
00051   int udp_callback(UDPNetHandler * nh, UDPConnection * uc, EThread * thread);
00052 
00053   off_t pollCont_offset;
00054   off_t udpNetHandler_offset;
00055 };
00056 
00057 extern UDPNetProcessorInternal udpNetInternal;
00058 
00059 
00060 
00061 // 20 ms slots; 2048 slots  => 40 sec. into the future
00062 #define SLOT_TIME_MSEC 20
00063 #define SLOT_TIME HRTIME_MSECONDS(SLOT_TIME_MSEC)
00064 #define N_SLOTS 2048
00065 
00066 class PacketQueue
00067 {
00068  public:
00069  PacketQueue()
00070    : nPackets(0), now_slot(0)
00071     {
00072       lastPullLongTermQ = 0;
00073       init();
00074     }
00075 
00076   virtual ~ PacketQueue()
00077     { }
00078 
00079   int nPackets;
00080   ink_hrtime lastPullLongTermQ;
00081   Queue<UDPPacketInternal> longTermQ;
00082   Queue<UDPPacketInternal> bucket[N_SLOTS];
00083   ink_hrtime delivery_time[N_SLOTS];
00084   int now_slot;
00085 
00086   void init(void)
00087   {
00088     now_slot = 0;
00089     ink_hrtime now = ink_get_hrtime_internal();
00090     int i = now_slot;
00091     int j = 0;
00092     while (j < N_SLOTS) {
00093       delivery_time[i] = now + j * SLOT_TIME;
00094       i = (i + 1) % N_SLOTS;
00095       j++;
00096     }
00097   }
00098 
00099   void addPacket(UDPPacketInternal * e, ink_hrtime now = 0)
00100   {
00101     int before = 0;
00102     int slot;
00103 
00104     if (IsCancelledPacket(e)) {
00105       e->free();
00106       return;
00107     }
00108 
00109     nPackets++;
00110 
00111     ink_assert(delivery_time[now_slot]);
00112 
00113     if (e->delivery_time < now)
00114       e->delivery_time = now;
00115 
00116     ink_hrtime s = e->delivery_time - delivery_time[now_slot];
00117 
00118     if (s < 0) {
00119       before = 1;
00120       s = 0;
00121     }
00122     s = s / SLOT_TIME;
00123     // if s >= N_SLOTS, either we are *REALLY* behind or someone is trying
00124     // queue packets *WAY* too far into the future.
00125     // need a thingy to hold packets in a "long-term" slot; then, pull packets
00126     // from long-term slot whenever you advance.
00127     if (s >= N_SLOTS - 1) {
00128       longTermQ.enqueue(e);
00129       e->in_heap = 0;
00130       e->in_the_priority_queue = 1;
00131       return;
00132     }
00133     slot = (s + now_slot) % N_SLOTS;
00134 
00135     // so that slot+1 is still "in future".
00136     ink_assert((before || delivery_time[slot] <= e->delivery_time) &&
00137                (delivery_time[(slot + 1) % N_SLOTS] >= e->delivery_time));
00138     e->in_the_priority_queue = 1;
00139     e->in_heap = slot;
00140     bucket[slot].enqueue(e);
00141   }
00142 
00143   UDPPacketInternal *firstPacket(ink_hrtime t)
00144   {
00145     if (t > delivery_time[now_slot]) {
00146       return bucket[now_slot].head;
00147     } else {
00148       return NULL;
00149     }
00150   }
00151 
00152   UDPPacketInternal *getFirstPacket()
00153   {
00154     nPackets--;
00155     return dequeue_ready(0);
00156   }
00157 
00158   int size()
00159   {
00160     ink_assert(nPackets >= 0);
00161     return nPackets;
00162   }
00163 
00164   bool IsCancelledPacket(UDPPacketInternal * p)
00165   {
00166     // discard packets that'll never get sent...
00167     return ((p->conn->shouldDestroy()) || (p->conn->GetSendGenerationNumber() != p->reqGenerationNum));
00168   }
00169 
00170   void FreeCancelledPackets(int numSlots)
00171   {
00172     UDPPacketInternal *p;
00173     Queue<UDPPacketInternal> tempQ;
00174     int i, s;
00175 
00176     for (i = 0; i < numSlots; i++) {
00177       s = (now_slot + i) % N_SLOTS;
00178       while (NULL != (p = bucket[s].dequeue())) {
00179         if (IsCancelledPacket(p)) {
00180           p->free();
00181           continue;
00182         }
00183         tempQ.enqueue(p);
00184       }
00185       // remove and flip it over
00186       while (NULL != (p = tempQ.dequeue())) {
00187         bucket[s].enqueue(p);
00188       }
00189     }
00190   }
00191 
00192   void advanceNow(ink_hrtime t)
00193   {
00194     int s = now_slot;
00195     int prev;
00196 
00197     if (ink_hrtime_to_msec(t - lastPullLongTermQ) >= SLOT_TIME_MSEC * ((N_SLOTS - 1) / 2)) {
00198       Queue<UDPPacketInternal> tempQ;
00199       UDPPacketInternal *p;
00200       // pull in all the stuff from long-term slot
00201       lastPullLongTermQ = t;
00202       // this is to handle wierdoness where someone is trying to queue a
00203       // packet to be sent in SLOT_TIME_MSEC * N_SLOTS * (2+)---the packet
00204       // will get back to longTermQ and we'll have an infinite loop.
00205       while ((p = longTermQ.dequeue()) != NULL)
00206         tempQ.enqueue(p);
00207       while ((p = tempQ.dequeue()) != NULL)
00208         addPacket(p);
00209     }
00210 
00211     while (!bucket[s].head && (t > delivery_time[s] + SLOT_TIME)) {
00212       prev = (s + N_SLOTS - 1) % N_SLOTS;
00213       delivery_time[s] = delivery_time[prev] + SLOT_TIME;
00214       s = (s + 1) % N_SLOTS;
00215       prev = (s + N_SLOTS - 1) % N_SLOTS;
00216       ink_assert(delivery_time[prev] > delivery_time[s]);
00217 
00218       if (s == now_slot) {
00219         init();
00220         s = 0;
00221         break;
00222       }
00223     }
00224 
00225     if (s != now_slot)
00226       Debug("udpnet-service", "Advancing by (%d slots): behind by %" PRId64 " ms",
00227             s - now_slot, ink_hrtime_to_msec(t - delivery_time[now_slot]));
00228     now_slot = s;
00229   }
00230 
00231  private:
00232   void remove(UDPPacketInternal * e)
00233   {
00234     nPackets--;
00235     ink_assert(e->in_the_priority_queue);
00236     e->in_the_priority_queue = 0;
00237     bucket[e->in_heap].remove(e);
00238   }
00239 
00240  public:
00241   UDPPacketInternal *dequeue_ready(ink_hrtime t)
00242   {
00243     (void) t;
00244     UDPPacketInternal *e = bucket[now_slot].dequeue();
00245     if (e) {
00246       ink_assert(e->in_the_priority_queue);
00247       e->in_the_priority_queue = 0;
00248     }
00249     advanceNow(t);
00250     return e;
00251   }
00252 
00253   void check_ready(ink_hrtime now)
00254   {
00255     (void) now;
00256   }
00257 
00258   ink_hrtime earliest_timeout()
00259   {
00260     int s = now_slot;
00261     for (int i = 0; i < N_SLOTS; i++) {
00262       if (bucket[s].head) {
00263         return delivery_time[s];
00264       }
00265       s = (s + 1) % N_SLOTS;
00266     }
00267     return HRTIME_FOREVER;
00268   }
00269 
00270  private:
00271   void kill_cancelled_events()
00272   { }
00273 };
00274 
00275 
00276 class UDPQueue
00277 {
00278   PacketQueue pipeInfo;
00279   ink_hrtime last_report;
00280   ink_hrtime last_service;
00281   int packets;
00282   int added;
00283 
00284 
00285 public:
00286   InkAtomicList atomicQueue;
00287 
00288   void service(UDPNetHandler *);
00289 
00290   void SendPackets();
00291   void SendUDPPacket(UDPPacketInternal * p, int32_t pktLen);
00292 
00293   // Interface exported to the outside world
00294   void send(UDPPacket * p);
00295 
00296   UDPQueue();
00297   ~UDPQueue();
00298 };
00299 
00300 
00301 void initialize_thread_for_udp_net(EThread * thread);
00302 
00303 struct UDPNetHandler: public Continuation
00304 {
00305 public:
00306   // to be polled for read
00307   Que(UnixUDPConnection, polling_link) udp_polling;
00308   // to be called back with data
00309   Que(UnixUDPConnection, callback_link) udp_callbacks;
00310   // outgoing packets
00311   InkAtomicList udpAtomicQueue;
00312   UDPQueue udpOutQueue;
00313   // to hold the newly created descriptors before scheduling them on
00314   // the servicing buckets.
00315   // atomically added to by a thread creating a new connection with
00316   // UDPBind
00317   InkAtomicList udpNewConnections;
00318   Event *trigger_event;
00319   ink_hrtime nextCheck;
00320   ink_hrtime lastCheck;
00321 
00322   int startNetEvent(int event, Event * data);
00323   int mainNetEvent(int event, Event * data);
00324 
00325   UDPNetHandler();
00326 };
00327 
00328 struct PollCont;
00329 static inline PollCont *
00330 get_UDPPollCont(EThread * t)
00331 {
00332   return (PollCont *)ETHREAD_GET_PTR(t, udpNetInternal.pollCont_offset);
00333 }
00334 
00335 static inline UDPNetHandler *
00336 get_UDPNetHandler(EThread * t)
00337 {
00338   return (UDPNetHandler *)ETHREAD_GET_PTR(t, udpNetInternal.udpNetHandler_offset);
00339 }
00340 
00341 #endif //__P_UDPNET_H_

Generated by  doxygen 1.7.1