• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

StufferUdpReceiver.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 /*
00025   This is the standalone program to receive the UDP packet from the parent
00026   and stream them to TS on local host
00027 
00028   Right now, if an out of order packet arrives, we just neglect it.
00029 */
00030 
00031 #include <stdio.h>
00032 #include <stdlib.h>
00033 #include <time.h>
00034 #include <string.h>
00035 #include <assert.h>
00036 
00037 #include <unistd.h>
00038 #include <sys/socket.h>
00039 #include <netinet/in.h>
00040 
00041 #define Debug(print) do { if (debug_on) printf print; } while (0)
00042 int debug_on = 0;
00043 
00044 #define UDP_BUF_SIZE (64 * 1024)
00045 #define TSPORT 39679
00046 
00047 #define STREAM_TIMEOUT_SECS 6000
00048 typedef unsigned int uint32;
00049 
00050 /*taken from Prefetch.cc */
00051 struct prefetch_udp_header
00052 {
00053   //uint32 response_flag:1, last_pkt:1, pkt_no:30;
00054   uint32_t pkt;
00055   uint32_t md5[4];
00056 };
00057 
00058 #define RESPONSE_FLAG (1<<31)
00059 #define LAST_PKT_FLAG (1<<30)
00060 #define PKT_NUM_MASK ((1<<30)-1)
00061 
00062 #define PACKET_HDR_SIZE 20
00063 
00064 /* statistics */
00065 static int number_of_packets_received = 0;
00066 static int number_of_packets_dropped = 0;
00067 static int number_of_connections_to_ts = 0;
00068 static int number_of_timeouts = 0;
00069 
00070 /* TODO: this functions should be a signal handler ... */
00071 int
00072 stufferUdpStatShow()
00073 {
00074   printf("no of packets received\t:\t%d\n"
00075          "no of packets dropped\t:\t%d\n"
00076          "no of connections to TS\t:\t%d\n"
00077          "no of timeouts\t\t:\t%d\n",
00078          number_of_packets_received, number_of_packets_dropped, number_of_connections_to_ts, number_of_timeouts);
00079 
00080   return 0;
00081 }
00082 
00083 struct Stream
00084 {
00085   time_t last_activity_time;
00086   prefetch_udp_header hdr;
00087   int fd;                       //tcp connection
00088 
00089   Stream *next;
00090 };
00091 
00092 class StreamHashTable
00093 {
00094 
00095   Stream **array;
00096   int size;
00097 public:
00098 
00099     StreamHashTable(int sz)
00100   {
00101     size = sz;
00102     array = new Stream *[size];
00103     memset(array, 0, size * sizeof(Stream *));
00104   }
00105    ~StreamHashTable()
00106   {
00107     delete[]array;
00108   }
00109 
00110   int index(prefetch_udp_header * hdr)
00111   {
00112     return hdr->md5[3] % size;
00113   }
00114   Stream **position(prefetch_udp_header * hdr);
00115   Stream **position(Stream * s)
00116   {
00117     return position(&s->hdr);
00118   }
00119   Stream *lookup(prefetch_udp_header * hdr)
00120   {
00121     return *position(hdr);
00122   }
00123   void add(Stream * s);
00124   void remove(Stream * s);
00125 
00126   int deleteStaleStreams(time_t now);
00127 };
00128 StreamHashTable *stream_hash_table;
00129 
00130 Stream **
00131 StreamHashTable::position(prefetch_udp_header * hdr)
00132 {
00133   Stream **e = &array[index(hdr)];
00134 
00135   while (*e) {
00136     prefetch_udp_header *h = &((*e)->hdr);
00137     if (hdr->md5[0] == h->md5[0] && hdr->md5[1] == h->md5[1] && hdr->md5[2] == h->md5[2] && hdr->md5[3] == h->md5[3])
00138       return e;
00139     e = &(*e)->next;
00140   }
00141   return e;
00142 }
00143 
00144 void
00145 StreamHashTable::add(Stream * s)
00146 {
00147   Stream **e = position(s);
00148   assert(!*e);
00149   *e = s;
00150 }
00151 
00152 void
00153 StreamHashTable::remove(Stream * s)
00154 {
00155   Stream **e = position(s);
00156   assert(s == *e);
00157   *e = s->next;
00158 }
00159 
00160 int
00161 StreamHashTable::deleteStaleStreams(time_t now)
00162 {
00163   int nremoved = 0;
00164   for (int i; i < size; i++) {
00165     Stream *&e = array[i];
00166     while (e) {
00167       if (e->last_activity_time < now - STREAM_TIMEOUT_SECS) {
00168         close(e->fd);
00169         number_of_timeouts++;
00170 
00171         Stream *temp = e;
00172         e = e->next;
00173         delete temp;
00174         nremoved++;
00175       } else
00176         e = e->next;
00177     }
00178   }
00179   return nremoved;
00180 }
00181 
00182 int
00183 openTSConn()
00184 {
00185   int fd = socket(PF_INET, SOCK_STREAM, 0);
00186   if (fd < 0) {
00187     //perror("socket()");
00188     return -1;
00189   }
00190 
00191   struct sockaddr_in saddr;
00192   saddr.sin_family = AF_INET;
00193   saddr.sin_port = htons(TSPORT);
00194 //#define INADDR_LOOPBACK ((209<<24)|(131<<16)|(52<<8)|48)
00195   saddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
00196 
00197   if (connect(fd, (sockaddr *) & saddr, sizeof(saddr)) < 0) {
00198     perror("connect(TS)");
00199     close(fd);
00200     return -1;
00201   }
00202 
00203   number_of_connections_to_ts++;
00204   return fd;
00205 }
00206 
00207 int
00208 processPacket(const char *packet, int pkt_sz)
00209 {
00210   prefetch_udp_header *hdr = (prefetch_udp_header *) packet;
00211   uint32_t flags = ntohl(hdr->pkt);
00212 
00213   int close_socket = 1;
00214   int sock_fd = -1;
00215 
00216   number_of_packets_received++;
00217 
00218   Debug(("Received packet. response_flag : %d last_pkt: %d pkt_no: %d"
00219          " (%d)\n", (flags & RESPONSE_FLAG) ? 1 : 0,
00220          (flags & RESPONSE_FLAG) && (flags & LAST_PKT_FLAG),
00221          (flags & RESPONSE_FLAG) ? flags & PKT_NUM_MASK : 0, ntohl(hdr->pkt)));
00222 
00223   if (flags & RESPONSE_FLAG) {
00224     Stream *s = stream_hash_table->lookup(hdr);
00225     uint32_t pkt_no = flags & PKT_NUM_MASK;
00226 
00227     if (pkt_no == 0 && !(flags & LAST_PKT_FLAG)) {
00228       if (s || !(s = new Stream)) {
00229         number_of_packets_dropped++;
00230         return -1;
00231       }
00232       s->hdr = *hdr;
00233       s->hdr.pkt = pkt_no;
00234       s->last_activity_time = time(NULL);
00235       s->next = 0;
00236       s->fd = openTSConn();
00237       if (s->fd < 0) {
00238         delete s;
00239         return -1;
00240       } else
00241         sock_fd = s->fd;
00242 
00243       close_socket = 0;
00244       stream_hash_table->add(s);
00245 
00246     } else if (pkt_no > 0) {
00247       if (!s)
00248         return -1;
00249 
00250       s->last_activity_time = time(0);
00251       sock_fd = s->fd;
00252 
00253       s->hdr.pkt++;
00254 
00255       if (s->hdr.pkt != pkt_no || flags & LAST_PKT_FLAG) {
00256         stream_hash_table->remove(s);
00257         delete s;
00258       } else
00259         close_socket = 0;
00260 
00261       if (s->hdr.pkt != pkt_no) {
00262         Debug(("Received an out of order packet dropping the "
00263                "connection expected %d but got %d\n", s->hdr.pkt, pkt_no));
00264         number_of_packets_dropped++;
00265         pkt_sz = 0;             // we dont want to send anything.
00266       }
00267     }
00268     packet += PACKET_HDR_SIZE;
00269     pkt_sz -= PACKET_HDR_SIZE;
00270   }
00271 
00272   if (pkt_sz > 0) {
00273     if (sock_fd < 0) {
00274       sock_fd = openTSConn();
00275       if (sock_fd < 0)
00276         return -1;
00277     }
00278 
00279     Debug(("Writing %d bytes on socket %d\n", pkt_sz, sock_fd));
00280     while (pkt_sz > 0) {
00281       int nsent = write(sock_fd, (char *) packet, pkt_sz);
00282       if (nsent < 0)
00283         break;
00284       packet += nsent;
00285       pkt_sz -= nsent;
00286     }
00287   }
00288 
00289   if (close_socket && sock_fd >= 0)
00290     close(sock_fd);
00291 
00292   return 0;
00293 }
00294 
00295 int
00296 main(int argc, char *argv[])
00297 {
00298   int port = TSPORT;
00299 
00300   if (argc > 1)
00301     debug_on = 1;
00302 
00303   stream_hash_table = new StreamHashTable(257);
00304 
00305   char *pkt_buf = (char *)ats_malloc(UDP_BUF_SIZE);
00306   int fd = socket(PF_INET, SOCK_DGRAM, 0);
00307 
00308   struct sockaddr_in saddr;
00309   saddr.sin_family = AF_INET;
00310   saddr.sin_port = htons(port);
00311   saddr.sin_addr.s_addr = INADDR_ANY;
00312 
00313   if ((bind(fd, (struct sockaddr *) &saddr, sizeof(saddr))) < 0) {
00314     perror("bind(udp_fd)");
00315     ats_free(pkt_buf);
00316     return 0;
00317   }
00318 
00319   time_t last_clean_up = time(0);
00320 
00321   while (1) {
00322     int pkt_size = read(fd, pkt_buf, UDP_BUF_SIZE);
00323     if (pkt_size < 0)
00324       return 0;
00325 
00326     Debug(("Processing udp packet (size = %d)\n", pkt_size));
00327     processPacket(pkt_buf, pkt_size);
00328 
00329     time_t now = time(0);
00330     if (now > last_clean_up + STREAM_TIMEOUT_SECS)
00331       stream_hash_table->deleteStaleStreams(now);
00332   }
00333 
00334   ats_free(pkt_buf);
00335 }

Generated by  doxygen 1.7.1