Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #ifndef __P_UDPNET_H_
00032 #define __P_UDPNET_H_
00033
00034 extern EventType ET_UDP;
00035
00036 #include "I_UDPNet.h"
00037 #include "P_UDPPacket.h"
00038
00039
00040 static inline PollCont *get_UDPPollCont(EThread *);
00041
00042 #include "P_UnixUDPConnection.h"
00043 #include "P_UDPIOEvent.h"
00044
00045 struct UDPNetHandler;
00046
00047 struct UDPNetProcessorInternal : public UDPNetProcessor
00048 {
00049 virtual int start(int n_udp_threads, size_t stacksize);
00050 void udp_read_from_net(UDPNetHandler * nh, UDPConnection * uc);
00051 int udp_callback(UDPNetHandler * nh, UDPConnection * uc, EThread * thread);
00052
00053 off_t pollCont_offset;
00054 off_t udpNetHandler_offset;
00055 };
00056
00057 extern UDPNetProcessorInternal udpNetInternal;
00058
00059
00060
00061
00062 #define SLOT_TIME_MSEC 20
00063 #define SLOT_TIME HRTIME_MSECONDS(SLOT_TIME_MSEC)
00064 #define N_SLOTS 2048
00065
00066 class PacketQueue
00067 {
00068 public:
00069 PacketQueue()
00070 : nPackets(0), now_slot(0)
00071 {
00072 lastPullLongTermQ = 0;
00073 init();
00074 }
00075
00076 virtual ~ PacketQueue()
00077 { }
00078
00079 int nPackets;
00080 ink_hrtime lastPullLongTermQ;
00081 Queue<UDPPacketInternal> longTermQ;
00082 Queue<UDPPacketInternal> bucket[N_SLOTS];
00083 ink_hrtime delivery_time[N_SLOTS];
00084 int now_slot;
00085
00086 void init(void)
00087 {
00088 now_slot = 0;
00089 ink_hrtime now = ink_get_hrtime_internal();
00090 int i = now_slot;
00091 int j = 0;
00092 while (j < N_SLOTS) {
00093 delivery_time[i] = now + j * SLOT_TIME;
00094 i = (i + 1) % N_SLOTS;
00095 j++;
00096 }
00097 }
00098
00099 void addPacket(UDPPacketInternal * e, ink_hrtime now = 0)
00100 {
00101 int before = 0;
00102 int slot;
00103
00104 if (IsCancelledPacket(e)) {
00105 e->free();
00106 return;
00107 }
00108
00109 nPackets++;
00110
00111 ink_assert(delivery_time[now_slot]);
00112
00113 if (e->delivery_time < now)
00114 e->delivery_time = now;
00115
00116 ink_hrtime s = e->delivery_time - delivery_time[now_slot];
00117
00118 if (s < 0) {
00119 before = 1;
00120 s = 0;
00121 }
00122 s = s / SLOT_TIME;
00123
00124
00125
00126
00127 if (s >= N_SLOTS - 1) {
00128 longTermQ.enqueue(e);
00129 e->in_heap = 0;
00130 e->in_the_priority_queue = 1;
00131 return;
00132 }
00133 slot = (s + now_slot) % N_SLOTS;
00134
00135
00136 ink_assert((before || delivery_time[slot] <= e->delivery_time) &&
00137 (delivery_time[(slot + 1) % N_SLOTS] >= e->delivery_time));
00138 e->in_the_priority_queue = 1;
00139 e->in_heap = slot;
00140 bucket[slot].enqueue(e);
00141 }
00142
00143 UDPPacketInternal *firstPacket(ink_hrtime t)
00144 {
00145 if (t > delivery_time[now_slot]) {
00146 return bucket[now_slot].head;
00147 } else {
00148 return NULL;
00149 }
00150 }
00151
00152 UDPPacketInternal *getFirstPacket()
00153 {
00154 nPackets--;
00155 return dequeue_ready(0);
00156 }
00157
00158 int size()
00159 {
00160 ink_assert(nPackets >= 0);
00161 return nPackets;
00162 }
00163
00164 bool IsCancelledPacket(UDPPacketInternal * p)
00165 {
00166
00167 return ((p->conn->shouldDestroy()) || (p->conn->GetSendGenerationNumber() != p->reqGenerationNum));
00168 }
00169
00170 void FreeCancelledPackets(int numSlots)
00171 {
00172 UDPPacketInternal *p;
00173 Queue<UDPPacketInternal> tempQ;
00174 int i, s;
00175
00176 for (i = 0; i < numSlots; i++) {
00177 s = (now_slot + i) % N_SLOTS;
00178 while (NULL != (p = bucket[s].dequeue())) {
00179 if (IsCancelledPacket(p)) {
00180 p->free();
00181 continue;
00182 }
00183 tempQ.enqueue(p);
00184 }
00185
00186 while (NULL != (p = tempQ.dequeue())) {
00187 bucket[s].enqueue(p);
00188 }
00189 }
00190 }
00191
00192 void advanceNow(ink_hrtime t)
00193 {
00194 int s = now_slot;
00195 int prev;
00196
00197 if (ink_hrtime_to_msec(t - lastPullLongTermQ) >= SLOT_TIME_MSEC * ((N_SLOTS - 1) / 2)) {
00198 Queue<UDPPacketInternal> tempQ;
00199 UDPPacketInternal *p;
00200
00201 lastPullLongTermQ = t;
00202
00203
00204
00205 while ((p = longTermQ.dequeue()) != NULL)
00206 tempQ.enqueue(p);
00207 while ((p = tempQ.dequeue()) != NULL)
00208 addPacket(p);
00209 }
00210
00211 while (!bucket[s].head && (t > delivery_time[s] + SLOT_TIME)) {
00212 prev = (s + N_SLOTS - 1) % N_SLOTS;
00213 delivery_time[s] = delivery_time[prev] + SLOT_TIME;
00214 s = (s + 1) % N_SLOTS;
00215 prev = (s + N_SLOTS - 1) % N_SLOTS;
00216 ink_assert(delivery_time[prev] > delivery_time[s]);
00217
00218 if (s == now_slot) {
00219 init();
00220 s = 0;
00221 break;
00222 }
00223 }
00224
00225 if (s != now_slot)
00226 Debug("udpnet-service", "Advancing by (%d slots): behind by %" PRId64 " ms",
00227 s - now_slot, ink_hrtime_to_msec(t - delivery_time[now_slot]));
00228 now_slot = s;
00229 }
00230
00231 private:
00232 void remove(UDPPacketInternal * e)
00233 {
00234 nPackets--;
00235 ink_assert(e->in_the_priority_queue);
00236 e->in_the_priority_queue = 0;
00237 bucket[e->in_heap].remove(e);
00238 }
00239
00240 public:
00241 UDPPacketInternal *dequeue_ready(ink_hrtime t)
00242 {
00243 (void) t;
00244 UDPPacketInternal *e = bucket[now_slot].dequeue();
00245 if (e) {
00246 ink_assert(e->in_the_priority_queue);
00247 e->in_the_priority_queue = 0;
00248 }
00249 advanceNow(t);
00250 return e;
00251 }
00252
00253 void check_ready(ink_hrtime now)
00254 {
00255 (void) now;
00256 }
00257
00258 ink_hrtime earliest_timeout()
00259 {
00260 int s = now_slot;
00261 for (int i = 0; i < N_SLOTS; i++) {
00262 if (bucket[s].head) {
00263 return delivery_time[s];
00264 }
00265 s = (s + 1) % N_SLOTS;
00266 }
00267 return HRTIME_FOREVER;
00268 }
00269
00270 private:
00271 void kill_cancelled_events()
00272 { }
00273 };
00274
00275
00276 class UDPQueue
00277 {
00278 PacketQueue pipeInfo;
00279 ink_hrtime last_report;
00280 ink_hrtime last_service;
00281 int packets;
00282 int added;
00283
00284
00285 public:
00286 InkAtomicList atomicQueue;
00287
00288 void service(UDPNetHandler *);
00289
00290 void SendPackets();
00291 void SendUDPPacket(UDPPacketInternal * p, int32_t pktLen);
00292
00293
00294 void send(UDPPacket * p);
00295
00296 UDPQueue();
00297 ~UDPQueue();
00298 };
00299
00300
00301 void initialize_thread_for_udp_net(EThread * thread);
00302
00303 struct UDPNetHandler: public Continuation
00304 {
00305 public:
00306
00307 Que(UnixUDPConnection, polling_link) udp_polling;
00308
00309 Que(UnixUDPConnection, callback_link) udp_callbacks;
00310
00311 InkAtomicList udpAtomicQueue;
00312 UDPQueue udpOutQueue;
00313
00314
00315
00316
00317 InkAtomicList udpNewConnections;
00318 Event *trigger_event;
00319 ink_hrtime nextCheck;
00320 ink_hrtime lastCheck;
00321
00322 int startNetEvent(int event, Event * data);
00323 int mainNetEvent(int event, Event * data);
00324
00325 UDPNetHandler();
00326 };
00327
00328 struct PollCont;
00329 static inline PollCont *
00330 get_UDPPollCont(EThread * t)
00331 {
00332 return (PollCont *)ETHREAD_GET_PTR(t, udpNetInternal.pollCont_offset);
00333 }
00334
00335 static inline UDPNetHandler *
00336 get_UDPNetHandler(EThread * t)
00337 {
00338 return (UDPNetHandler *)ETHREAD_GET_PTR(t, udpNetInternal.udpNetHandler_offset);
00339 }
00340
00341 #endif //__P_UDPNET_H_