Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 #include <stdio.h>
00032 #include <stdlib.h>
00033 #include <time.h>
00034 #include <string.h>
00035 #include <assert.h>
00036 
00037 #include <unistd.h>
00038 #include <sys/socket.h>
00039 #include <netinet/in.h>
00040 
00041 #define Debug(print) do { if (debug_on) printf print; } while (0)
00042 int debug_on = 0;
00043 
00044 #define UDP_BUF_SIZE (64 * 1024)
00045 #define TSPORT 39679
00046 
00047 #define STREAM_TIMEOUT_SECS 6000
00048 typedef unsigned int uint32;
00049 
00050 
00051 struct prefetch_udp_header
00052 {
00053   
00054   uint32_t pkt;
00055   uint32_t md5[4];
00056 };
00057 
00058 #define RESPONSE_FLAG (1<<31)
00059 #define LAST_PKT_FLAG (1<<30)
00060 #define PKT_NUM_MASK ((1<<30)-1)
00061 
00062 #define PACKET_HDR_SIZE 20
00063 
00064 
00065 static int number_of_packets_received = 0;
00066 static int number_of_packets_dropped = 0;
00067 static int number_of_connections_to_ts = 0;
00068 static int number_of_timeouts = 0;
00069 
00070 
00071 int
00072 stufferUdpStatShow()
00073 {
00074   printf("no of packets received\t:\t%d\n"
00075          "no of packets dropped\t:\t%d\n"
00076          "no of connections to TS\t:\t%d\n"
00077          "no of timeouts\t\t:\t%d\n",
00078          number_of_packets_received, number_of_packets_dropped, number_of_connections_to_ts, number_of_timeouts);
00079 
00080   return 0;
00081 }
00082 
00083 struct Stream
00084 {
00085   time_t last_activity_time;
00086   prefetch_udp_header hdr;
00087   int fd;                       
00088 
00089   Stream *next;
00090 };
00091 
00092 class StreamHashTable
00093 {
00094 
00095   Stream **array;
00096   int size;
00097 public:
00098 
00099     StreamHashTable(int sz)
00100   {
00101     size = sz;
00102     array = new Stream *[size];
00103     memset(array, 0, size * sizeof(Stream *));
00104   }
00105    ~StreamHashTable()
00106   {
00107     delete[]array;
00108   }
00109 
00110   int index(prefetch_udp_header * hdr)
00111   {
00112     return hdr->md5[3] % size;
00113   }
00114   Stream **position(prefetch_udp_header * hdr);
00115   Stream **position(Stream * s)
00116   {
00117     return position(&s->hdr);
00118   }
00119   Stream *lookup(prefetch_udp_header * hdr)
00120   {
00121     return *position(hdr);
00122   }
00123   void add(Stream * s);
00124   void remove(Stream * s);
00125 
00126   int deleteStaleStreams(time_t now);
00127 };
00128 StreamHashTable *stream_hash_table;
00129 
00130 Stream **
00131 StreamHashTable::position(prefetch_udp_header * hdr)
00132 {
00133   Stream **e = &array[index(hdr)];
00134 
00135   while (*e) {
00136     prefetch_udp_header *h = &((*e)->hdr);
00137     if (hdr->md5[0] == h->md5[0] && hdr->md5[1] == h->md5[1] && hdr->md5[2] == h->md5[2] && hdr->md5[3] == h->md5[3])
00138       return e;
00139     e = &(*e)->next;
00140   }
00141   return e;
00142 }
00143 
00144 void
00145 StreamHashTable::add(Stream * s)
00146 {
00147   Stream **e = position(s);
00148   assert(!*e);
00149   *e = s;
00150 }
00151 
00152 void
00153 StreamHashTable::remove(Stream * s)
00154 {
00155   Stream **e = position(s);
00156   assert(s == *e);
00157   *e = s->next;
00158 }
00159 
00160 int
00161 StreamHashTable::deleteStaleStreams(time_t now)
00162 {
00163   int nremoved = 0;
00164   for (int i; i < size; i++) {
00165     Stream *&e = array[i];
00166     while (e) {
00167       if (e->last_activity_time < now - STREAM_TIMEOUT_SECS) {
00168         close(e->fd);
00169         number_of_timeouts++;
00170 
00171         Stream *temp = e;
00172         e = e->next;
00173         delete temp;
00174         nremoved++;
00175       } else
00176         e = e->next;
00177     }
00178   }
00179   return nremoved;
00180 }
00181 
00182 int
00183 openTSConn()
00184 {
00185   int fd = socket(PF_INET, SOCK_STREAM, 0);
00186   if (fd < 0) {
00187     
00188     return -1;
00189   }
00190 
00191   struct sockaddr_in saddr;
00192   saddr.sin_family = AF_INET;
00193   saddr.sin_port = htons(TSPORT);
00194 
00195   saddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
00196 
00197   if (connect(fd, (sockaddr *) & saddr, sizeof(saddr)) < 0) {
00198     perror("connect(TS)");
00199     close(fd);
00200     return -1;
00201   }
00202 
00203   number_of_connections_to_ts++;
00204   return fd;
00205 }
00206 
00207 int
00208 processPacket(const char *packet, int pkt_sz)
00209 {
00210   prefetch_udp_header *hdr = (prefetch_udp_header *) packet;
00211   uint32_t flags = ntohl(hdr->pkt);
00212 
00213   int close_socket = 1;
00214   int sock_fd = -1;
00215 
00216   number_of_packets_received++;
00217 
00218   Debug(("Received packet. response_flag : %d last_pkt: %d pkt_no: %d"
00219          " (%d)\n", (flags & RESPONSE_FLAG) ? 1 : 0,
00220          (flags & RESPONSE_FLAG) && (flags & LAST_PKT_FLAG),
00221          (flags & RESPONSE_FLAG) ? flags & PKT_NUM_MASK : 0, ntohl(hdr->pkt)));
00222 
00223   if (flags & RESPONSE_FLAG) {
00224     Stream *s = stream_hash_table->lookup(hdr);
00225     uint32_t pkt_no = flags & PKT_NUM_MASK;
00226 
00227     if (pkt_no == 0 && !(flags & LAST_PKT_FLAG)) {
00228       if (s || !(s = new Stream)) {
00229         number_of_packets_dropped++;
00230         return -1;
00231       }
00232       s->hdr = *hdr;
00233       s->hdr.pkt = pkt_no;
00234       s->last_activity_time = time(NULL);
00235       s->next = 0;
00236       s->fd = openTSConn();
00237       if (s->fd < 0) {
00238         delete s;
00239         return -1;
00240       } else
00241         sock_fd = s->fd;
00242 
00243       close_socket = 0;
00244       stream_hash_table->add(s);
00245 
00246     } else if (pkt_no > 0) {
00247       if (!s)
00248         return -1;
00249 
00250       s->last_activity_time = time(0);
00251       sock_fd = s->fd;
00252 
00253       s->hdr.pkt++;
00254 
00255       if (s->hdr.pkt != pkt_no || flags & LAST_PKT_FLAG) {
00256         stream_hash_table->remove(s);
00257         delete s;
00258       } else
00259         close_socket = 0;
00260 
00261       if (s->hdr.pkt != pkt_no) {
00262         Debug(("Received an out of order packet dropping the "
00263                "connection expected %d but got %d\n", s->hdr.pkt, pkt_no));
00264         number_of_packets_dropped++;
00265         pkt_sz = 0;             
00266       }
00267     }
00268     packet += PACKET_HDR_SIZE;
00269     pkt_sz -= PACKET_HDR_SIZE;
00270   }
00271 
00272   if (pkt_sz > 0) {
00273     if (sock_fd < 0) {
00274       sock_fd = openTSConn();
00275       if (sock_fd < 0)
00276         return -1;
00277     }
00278 
00279     Debug(("Writing %d bytes on socket %d\n", pkt_sz, sock_fd));
00280     while (pkt_sz > 0) {
00281       int nsent = write(sock_fd, (char *) packet, pkt_sz);
00282       if (nsent < 0)
00283         break;
00284       packet += nsent;
00285       pkt_sz -= nsent;
00286     }
00287   }
00288 
00289   if (close_socket && sock_fd >= 0)
00290     close(sock_fd);
00291 
00292   return 0;
00293 }
00294 
00295 int
00296 main(int argc, char *argv[])
00297 {
00298   int port = TSPORT;
00299 
00300   if (argc > 1)
00301     debug_on = 1;
00302 
00303   stream_hash_table = new StreamHashTable(257);
00304 
00305   char *pkt_buf = (char *)ats_malloc(UDP_BUF_SIZE);
00306   int fd = socket(PF_INET, SOCK_DGRAM, 0);
00307 
00308   struct sockaddr_in saddr;
00309   saddr.sin_family = AF_INET;
00310   saddr.sin_port = htons(port);
00311   saddr.sin_addr.s_addr = INADDR_ANY;
00312 
00313   if ((bind(fd, (struct sockaddr *) &saddr, sizeof(saddr))) < 0) {
00314     perror("bind(udp_fd)");
00315     ats_free(pkt_buf);
00316     return 0;
00317   }
00318 
00319   time_t last_clean_up = time(0);
00320 
00321   while (1) {
00322     int pkt_size = read(fd, pkt_buf, UDP_BUF_SIZE);
00323     if (pkt_size < 0)
00324       return 0;
00325 
00326     Debug(("Processing udp packet (size = %d)\n", pkt_size));
00327     processPacket(pkt_buf, pkt_size);
00328 
00329     time_t now = time(0);
00330     if (now > last_clean_up + STREAM_TIMEOUT_SECS)
00331       stream_hash_table->deleteStaleStreams(now);
00332   }
00333 
00334   ats_free(pkt_buf);
00335 }