Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include <stdio.h>
00032 #include <stdlib.h>
00033 #include <time.h>
00034 #include <string.h>
00035 #include <assert.h>
00036
00037 #include <unistd.h>
00038 #include <sys/socket.h>
00039 #include <netinet/in.h>
00040
00041 #define Debug(print) do { if (debug_on) printf print; } while (0)
00042 int debug_on = 0;
00043
00044 #define UDP_BUF_SIZE (64 * 1024)
00045 #define TSPORT 39679
00046
00047 #define STREAM_TIMEOUT_SECS 6000
00048 typedef unsigned int uint32;
00049
00050
00051 struct prefetch_udp_header
00052 {
00053
00054 uint32_t pkt;
00055 uint32_t md5[4];
00056 };
00057
00058 #define RESPONSE_FLAG (1<<31)
00059 #define LAST_PKT_FLAG (1<<30)
00060 #define PKT_NUM_MASK ((1<<30)-1)
00061
00062 #define PACKET_HDR_SIZE 20
00063
00064
00065 static int number_of_packets_received = 0;
00066 static int number_of_packets_dropped = 0;
00067 static int number_of_connections_to_ts = 0;
00068 static int number_of_timeouts = 0;
00069
00070
00071 int
00072 stufferUdpStatShow()
00073 {
00074 printf("no of packets received\t:\t%d\n"
00075 "no of packets dropped\t:\t%d\n"
00076 "no of connections to TS\t:\t%d\n"
00077 "no of timeouts\t\t:\t%d\n",
00078 number_of_packets_received, number_of_packets_dropped, number_of_connections_to_ts, number_of_timeouts);
00079
00080 return 0;
00081 }
00082
00083 struct Stream
00084 {
00085 time_t last_activity_time;
00086 prefetch_udp_header hdr;
00087 int fd;
00088
00089 Stream *next;
00090 };
00091
00092 class StreamHashTable
00093 {
00094
00095 Stream **array;
00096 int size;
00097 public:
00098
00099 StreamHashTable(int sz)
00100 {
00101 size = sz;
00102 array = new Stream *[size];
00103 memset(array, 0, size * sizeof(Stream *));
00104 }
00105 ~StreamHashTable()
00106 {
00107 delete[]array;
00108 }
00109
00110 int index(prefetch_udp_header * hdr)
00111 {
00112 return hdr->md5[3] % size;
00113 }
00114 Stream **position(prefetch_udp_header * hdr);
00115 Stream **position(Stream * s)
00116 {
00117 return position(&s->hdr);
00118 }
00119 Stream *lookup(prefetch_udp_header * hdr)
00120 {
00121 return *position(hdr);
00122 }
00123 void add(Stream * s);
00124 void remove(Stream * s);
00125
00126 int deleteStaleStreams(time_t now);
00127 };
00128 StreamHashTable *stream_hash_table;
00129
00130 Stream **
00131 StreamHashTable::position(prefetch_udp_header * hdr)
00132 {
00133 Stream **e = &array[index(hdr)];
00134
00135 while (*e) {
00136 prefetch_udp_header *h = &((*e)->hdr);
00137 if (hdr->md5[0] == h->md5[0] && hdr->md5[1] == h->md5[1] && hdr->md5[2] == h->md5[2] && hdr->md5[3] == h->md5[3])
00138 return e;
00139 e = &(*e)->next;
00140 }
00141 return e;
00142 }
00143
00144 void
00145 StreamHashTable::add(Stream * s)
00146 {
00147 Stream **e = position(s);
00148 assert(!*e);
00149 *e = s;
00150 }
00151
00152 void
00153 StreamHashTable::remove(Stream * s)
00154 {
00155 Stream **e = position(s);
00156 assert(s == *e);
00157 *e = s->next;
00158 }
00159
00160 int
00161 StreamHashTable::deleteStaleStreams(time_t now)
00162 {
00163 int nremoved = 0;
00164 for (int i; i < size; i++) {
00165 Stream *&e = array[i];
00166 while (e) {
00167 if (e->last_activity_time < now - STREAM_TIMEOUT_SECS) {
00168 close(e->fd);
00169 number_of_timeouts++;
00170
00171 Stream *temp = e;
00172 e = e->next;
00173 delete temp;
00174 nremoved++;
00175 } else
00176 e = e->next;
00177 }
00178 }
00179 return nremoved;
00180 }
00181
00182 int
00183 openTSConn()
00184 {
00185 int fd = socket(PF_INET, SOCK_STREAM, 0);
00186 if (fd < 0) {
00187
00188 return -1;
00189 }
00190
00191 struct sockaddr_in saddr;
00192 saddr.sin_family = AF_INET;
00193 saddr.sin_port = htons(TSPORT);
00194
00195 saddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
00196
00197 if (connect(fd, (sockaddr *) & saddr, sizeof(saddr)) < 0) {
00198 perror("connect(TS)");
00199 close(fd);
00200 return -1;
00201 }
00202
00203 number_of_connections_to_ts++;
00204 return fd;
00205 }
00206
00207 int
00208 processPacket(const char *packet, int pkt_sz)
00209 {
00210 prefetch_udp_header *hdr = (prefetch_udp_header *) packet;
00211 uint32_t flags = ntohl(hdr->pkt);
00212
00213 int close_socket = 1;
00214 int sock_fd = -1;
00215
00216 number_of_packets_received++;
00217
00218 Debug(("Received packet. response_flag : %d last_pkt: %d pkt_no: %d"
00219 " (%d)\n", (flags & RESPONSE_FLAG) ? 1 : 0,
00220 (flags & RESPONSE_FLAG) && (flags & LAST_PKT_FLAG),
00221 (flags & RESPONSE_FLAG) ? flags & PKT_NUM_MASK : 0, ntohl(hdr->pkt)));
00222
00223 if (flags & RESPONSE_FLAG) {
00224 Stream *s = stream_hash_table->lookup(hdr);
00225 uint32_t pkt_no = flags & PKT_NUM_MASK;
00226
00227 if (pkt_no == 0 && !(flags & LAST_PKT_FLAG)) {
00228 if (s || !(s = new Stream)) {
00229 number_of_packets_dropped++;
00230 return -1;
00231 }
00232 s->hdr = *hdr;
00233 s->hdr.pkt = pkt_no;
00234 s->last_activity_time = time(NULL);
00235 s->next = 0;
00236 s->fd = openTSConn();
00237 if (s->fd < 0) {
00238 delete s;
00239 return -1;
00240 } else
00241 sock_fd = s->fd;
00242
00243 close_socket = 0;
00244 stream_hash_table->add(s);
00245
00246 } else if (pkt_no > 0) {
00247 if (!s)
00248 return -1;
00249
00250 s->last_activity_time = time(0);
00251 sock_fd = s->fd;
00252
00253 s->hdr.pkt++;
00254
00255 if (s->hdr.pkt != pkt_no || flags & LAST_PKT_FLAG) {
00256 stream_hash_table->remove(s);
00257 delete s;
00258 } else
00259 close_socket = 0;
00260
00261 if (s->hdr.pkt != pkt_no) {
00262 Debug(("Received an out of order packet dropping the "
00263 "connection expected %d but got %d\n", s->hdr.pkt, pkt_no));
00264 number_of_packets_dropped++;
00265 pkt_sz = 0;
00266 }
00267 }
00268 packet += PACKET_HDR_SIZE;
00269 pkt_sz -= PACKET_HDR_SIZE;
00270 }
00271
00272 if (pkt_sz > 0) {
00273 if (sock_fd < 0) {
00274 sock_fd = openTSConn();
00275 if (sock_fd < 0)
00276 return -1;
00277 }
00278
00279 Debug(("Writing %d bytes on socket %d\n", pkt_sz, sock_fd));
00280 while (pkt_sz > 0) {
00281 int nsent = write(sock_fd, (char *) packet, pkt_sz);
00282 if (nsent < 0)
00283 break;
00284 packet += nsent;
00285 pkt_sz -= nsent;
00286 }
00287 }
00288
00289 if (close_socket && sock_fd >= 0)
00290 close(sock_fd);
00291
00292 return 0;
00293 }
00294
00295 int
00296 main(int argc, char *argv[])
00297 {
00298 int port = TSPORT;
00299
00300 if (argc > 1)
00301 debug_on = 1;
00302
00303 stream_hash_table = new StreamHashTable(257);
00304
00305 char *pkt_buf = (char *)ats_malloc(UDP_BUF_SIZE);
00306 int fd = socket(PF_INET, SOCK_DGRAM, 0);
00307
00308 struct sockaddr_in saddr;
00309 saddr.sin_family = AF_INET;
00310 saddr.sin_port = htons(port);
00311 saddr.sin_addr.s_addr = INADDR_ANY;
00312
00313 if ((bind(fd, (struct sockaddr *) &saddr, sizeof(saddr))) < 0) {
00314 perror("bind(udp_fd)");
00315 ats_free(pkt_buf);
00316 return 0;
00317 }
00318
00319 time_t last_clean_up = time(0);
00320
00321 while (1) {
00322 int pkt_size = read(fd, pkt_buf, UDP_BUF_SIZE);
00323 if (pkt_size < 0)
00324 return 0;
00325
00326 Debug(("Processing udp packet (size = %d)\n", pkt_size));
00327 processPacket(pkt_buf, pkt_size);
00328
00329 time_t now = time(0);
00330 if (now > last_clean_up + STREAM_TIMEOUT_SECS)
00331 stream_hash_table->deleteStaleStreams(now);
00332 }
00333
00334 ats_free(pkt_buf);
00335 }