Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 #ifndef __P_UDPNET_H_
00032 #define __P_UDPNET_H_
00033 
00034 extern EventType ET_UDP;
00035 
00036 #include "I_UDPNet.h"
00037 #include "P_UDPPacket.h"
00038 
00039 
00040 static inline PollCont *get_UDPPollCont(EThread *);
00041 
00042 #include "P_UnixUDPConnection.h"
00043 #include "P_UDPIOEvent.h"
00044 
00045 struct UDPNetHandler;
00046 
00047 struct UDPNetProcessorInternal : public UDPNetProcessor
00048 {
00049   virtual int start(int n_udp_threads, size_t stacksize);
00050   void udp_read_from_net(UDPNetHandler * nh, UDPConnection * uc);
00051   int udp_callback(UDPNetHandler * nh, UDPConnection * uc, EThread * thread);
00052 
00053   off_t pollCont_offset;
00054   off_t udpNetHandler_offset;
00055 };
00056 
00057 extern UDPNetProcessorInternal udpNetInternal;
00058 
00059 
00060 
00061 
00062 #define SLOT_TIME_MSEC 20
00063 #define SLOT_TIME HRTIME_MSECONDS(SLOT_TIME_MSEC)
00064 #define N_SLOTS 2048
00065 
00066 class PacketQueue
00067 {
00068  public:
00069  PacketQueue()
00070    : nPackets(0), now_slot(0)
00071     {
00072       lastPullLongTermQ = 0;
00073       init();
00074     }
00075 
00076   virtual ~ PacketQueue()
00077     { }
00078 
00079   int nPackets;
00080   ink_hrtime lastPullLongTermQ;
00081   Queue<UDPPacketInternal> longTermQ;
00082   Queue<UDPPacketInternal> bucket[N_SLOTS];
00083   ink_hrtime delivery_time[N_SLOTS];
00084   int now_slot;
00085 
00086   void init(void)
00087   {
00088     now_slot = 0;
00089     ink_hrtime now = ink_get_hrtime_internal();
00090     int i = now_slot;
00091     int j = 0;
00092     while (j < N_SLOTS) {
00093       delivery_time[i] = now + j * SLOT_TIME;
00094       i = (i + 1) % N_SLOTS;
00095       j++;
00096     }
00097   }
00098 
00099   void addPacket(UDPPacketInternal * e, ink_hrtime now = 0)
00100   {
00101     int before = 0;
00102     int slot;
00103 
00104     if (IsCancelledPacket(e)) {
00105       e->free();
00106       return;
00107     }
00108 
00109     nPackets++;
00110 
00111     ink_assert(delivery_time[now_slot]);
00112 
00113     if (e->delivery_time < now)
00114       e->delivery_time = now;
00115 
00116     ink_hrtime s = e->delivery_time - delivery_time[now_slot];
00117 
00118     if (s < 0) {
00119       before = 1;
00120       s = 0;
00121     }
00122     s = s / SLOT_TIME;
00123     
00124     
00125     
00126     
00127     if (s >= N_SLOTS - 1) {
00128       longTermQ.enqueue(e);
00129       e->in_heap = 0;
00130       e->in_the_priority_queue = 1;
00131       return;
00132     }
00133     slot = (s + now_slot) % N_SLOTS;
00134 
00135     
00136     ink_assert((before || delivery_time[slot] <= e->delivery_time) &&
00137                (delivery_time[(slot + 1) % N_SLOTS] >= e->delivery_time));
00138     e->in_the_priority_queue = 1;
00139     e->in_heap = slot;
00140     bucket[slot].enqueue(e);
00141   }
00142 
00143   UDPPacketInternal *firstPacket(ink_hrtime t)
00144   {
00145     if (t > delivery_time[now_slot]) {
00146       return bucket[now_slot].head;
00147     } else {
00148       return NULL;
00149     }
00150   }
00151 
00152   UDPPacketInternal *getFirstPacket()
00153   {
00154     nPackets--;
00155     return dequeue_ready(0);
00156   }
00157 
00158   int size()
00159   {
00160     ink_assert(nPackets >= 0);
00161     return nPackets;
00162   }
00163 
00164   bool IsCancelledPacket(UDPPacketInternal * p)
00165   {
00166     
00167     return ((p->conn->shouldDestroy()) || (p->conn->GetSendGenerationNumber() != p->reqGenerationNum));
00168   }
00169 
00170   void FreeCancelledPackets(int numSlots)
00171   {
00172     UDPPacketInternal *p;
00173     Queue<UDPPacketInternal> tempQ;
00174     int i, s;
00175 
00176     for (i = 0; i < numSlots; i++) {
00177       s = (now_slot + i) % N_SLOTS;
00178       while (NULL != (p = bucket[s].dequeue())) {
00179         if (IsCancelledPacket(p)) {
00180           p->free();
00181           continue;
00182         }
00183         tempQ.enqueue(p);
00184       }
00185       
00186       while (NULL != (p = tempQ.dequeue())) {
00187         bucket[s].enqueue(p);
00188       }
00189     }
00190   }
00191 
00192   void advanceNow(ink_hrtime t)
00193   {
00194     int s = now_slot;
00195     int prev;
00196 
00197     if (ink_hrtime_to_msec(t - lastPullLongTermQ) >= SLOT_TIME_MSEC * ((N_SLOTS - 1) / 2)) {
00198       Queue<UDPPacketInternal> tempQ;
00199       UDPPacketInternal *p;
00200       
00201       lastPullLongTermQ = t;
00202       
00203       
00204       
00205       while ((p = longTermQ.dequeue()) != NULL)
00206         tempQ.enqueue(p);
00207       while ((p = tempQ.dequeue()) != NULL)
00208         addPacket(p);
00209     }
00210 
00211     while (!bucket[s].head && (t > delivery_time[s] + SLOT_TIME)) {
00212       prev = (s + N_SLOTS - 1) % N_SLOTS;
00213       delivery_time[s] = delivery_time[prev] + SLOT_TIME;
00214       s = (s + 1) % N_SLOTS;
00215       prev = (s + N_SLOTS - 1) % N_SLOTS;
00216       ink_assert(delivery_time[prev] > delivery_time[s]);
00217 
00218       if (s == now_slot) {
00219         init();
00220         s = 0;
00221         break;
00222       }
00223     }
00224 
00225     if (s != now_slot)
00226       Debug("udpnet-service", "Advancing by (%d slots): behind by %" PRId64 " ms",
00227             s - now_slot, ink_hrtime_to_msec(t - delivery_time[now_slot]));
00228     now_slot = s;
00229   }
00230 
00231  private:
00232   void remove(UDPPacketInternal * e)
00233   {
00234     nPackets--;
00235     ink_assert(e->in_the_priority_queue);
00236     e->in_the_priority_queue = 0;
00237     bucket[e->in_heap].remove(e);
00238   }
00239 
00240  public:
00241   UDPPacketInternal *dequeue_ready(ink_hrtime t)
00242   {
00243     (void) t;
00244     UDPPacketInternal *e = bucket[now_slot].dequeue();
00245     if (e) {
00246       ink_assert(e->in_the_priority_queue);
00247       e->in_the_priority_queue = 0;
00248     }
00249     advanceNow(t);
00250     return e;
00251   }
00252 
00253   void check_ready(ink_hrtime now)
00254   {
00255     (void) now;
00256   }
00257 
00258   ink_hrtime earliest_timeout()
00259   {
00260     int s = now_slot;
00261     for (int i = 0; i < N_SLOTS; i++) {
00262       if (bucket[s].head) {
00263         return delivery_time[s];
00264       }
00265       s = (s + 1) % N_SLOTS;
00266     }
00267     return HRTIME_FOREVER;
00268   }
00269 
00270  private:
00271   void kill_cancelled_events()
00272   { }
00273 };
00274 
00275 
00276 class UDPQueue
00277 {
00278   PacketQueue pipeInfo;
00279   ink_hrtime last_report;
00280   ink_hrtime last_service;
00281   int packets;
00282   int added;
00283 
00284 
00285 public:
00286   InkAtomicList atomicQueue;
00287 
00288   void service(UDPNetHandler *);
00289 
00290   void SendPackets();
00291   void SendUDPPacket(UDPPacketInternal * p, int32_t pktLen);
00292 
00293   
00294   void send(UDPPacket * p);
00295 
00296   UDPQueue();
00297   ~UDPQueue();
00298 };
00299 
00300 
00301 void initialize_thread_for_udp_net(EThread * thread);
00302 
00303 struct UDPNetHandler: public Continuation
00304 {
00305 public:
00306   
00307   Que(UnixUDPConnection, polling_link) udp_polling;
00308   
00309   Que(UnixUDPConnection, callback_link) udp_callbacks;
00310   
00311   InkAtomicList udpAtomicQueue;
00312   UDPQueue udpOutQueue;
00313   
00314   
00315   
00316   
00317   InkAtomicList udpNewConnections;
00318   Event *trigger_event;
00319   ink_hrtime nextCheck;
00320   ink_hrtime lastCheck;
00321 
00322   int startNetEvent(int event, Event * data);
00323   int mainNetEvent(int event, Event * data);
00324 
00325   UDPNetHandler();
00326 };
00327 
00328 struct PollCont;
00329 static inline PollCont *
00330 get_UDPPollCont(EThread * t)
00331 {
00332   return (PollCont *)ETHREAD_GET_PTR(t, udpNetInternal.pollCont_offset);
00333 }
00334 
00335 static inline UDPNetHandler *
00336 get_UDPNetHandler(EThread * t)
00337 {
00338   return (UDPNetHandler *)ETHREAD_GET_PTR(t, udpNetInternal.udpNetHandler_offset);
00339 }
00340 
00341 #endif //__P_UDPNET_H_