00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 #include <limits.h>
00024 #include "Net.h"
00025 #include "Disk.h"
00026 #include "Main.h"
00027 #include "HostDB.h"
00028 #include "Cluster.h"
00029 #include "OneWayTunnel.h"
00030 #include "OneWayMultiTunnel.h"
00031 #include "Cache.h"
00032 
00033 
00034 struct TestProxy:Continuation
00035 {
00036   VConnection *vc;
00037   VConnection *vconnection_vector[2];
00038   VConnection *remote;
00039   MIOBuffer *inbuf;
00040   MIOBuffer *outbuf;
00041   VIO *clusterOutVIO;
00042   VIO *inVIO;
00043   char host[1024], *url, *url_end, amode;
00044   int port;
00045   char s[1024];
00046   ClusterVCToken token;
00047   OneWayTunnel *tunnel;
00048   char url_str[1024];
00049   VConnection *cachefile;
00050   URL *url_struct;
00051   HostDBInfo *hostdbinfo;
00052   CacheObjInfo *objinfo;
00053   HttpHeader *request_header;
00054 
00055   int done()
00056   {
00057     ink_assert(inbuf);
00058     if (inbuf)
00059       free_MIOBuffer(inbuf);
00060     inbuf = 0;
00061     if (outbuf)
00062       free_MIOBuffer(outbuf);
00063     if (vc)
00064       vc->do_io(VIO::CLOSE);
00065     if (remote)
00066       remote->do_io(VIO::CLOSE);
00067     if (cachefile)
00068       cachefile->do_io(VIO::CLOSE);
00069     if (tunnel)
00070       delete tunnel;
00071     delete this;
00072       return EVENT_DONE;
00073   }
00074 
00075   int gets(VIO * vio)
00076   {
00077     char *sx = s, *x;
00078     int t, i;
00079     for (x = vio->buffer.mbuf->start; *x && x < vio->buffer.mbuf->end; x++) {
00080       if (x - vio->buffer.mbuf->start > 1023)
00081         return -1;
00082       if (*x == '\n')
00083         break;
00084       *sx++ = *x;
00085     }
00086 
00087     t = 2;
00088     for (i = 0; t && s[i]; i++) {
00089       if (s[i] == ' ')
00090         --t;
00091     }
00092 
00093 
00094 
00095     if (s[i - 2] == 'X') {
00096       i -= 2;
00097       amode = 'x';
00098       while (s[i] != '\0') {
00099         s[i] = s[i + 1];
00100         ++i;
00101       }
00102       return x - vio->buffer.mbuf->start - 1;
00103     }
00104     return x - vio->buffer.mbuf->start;
00105   }
00106 
00107   int startEvent(int event, VIO * vio)
00108   {
00109     char *temp;
00110     if (event != VC_EVENT_READ_READY) {
00111       printf("TestProxy startEvent error %d %X\n", event, (unsigned int) vio->vc_server);
00112       return done();
00113     }
00114     inVIO = vio;
00115     vc = (NetVConnection *) vio->vc_server;
00116     int res = 0;
00117     char *thost = NULL;
00118     if ((res = gets(vio))) {
00119       if (res < 0) {
00120         printf("TestProxy startEvent line too long\n");
00121         return done();
00122       }
00123       
00124       s[res] = 0;
00125       if ((res > 0) && (s[res - 1] == '\r'))
00126         s[res - 1] = 0;
00127       
00128       if (s[4] == '/') {
00129         url = s + 5;
00130         url_end = strchr(url, ' ');
00131         *url_end = 0;
00132         SET_HANDLER(fileEvent);
00133         diskProcessor.open_vc(this, url, O_RDONLY);
00134         return EVENT_DONE;
00135       }
00136       else
00137         thost = s + 11;         
00138       url = strchr(thost, '/'); 
00139       temp = strchr(thost, ' ');
00140       ink_assert(temp - thost < 1024);
00141       ink_strlcpy(url_str, thost, sizeof(url_str));
00142       if (!url)
00143         return done();
00144       char *portStr = strchr(thost, ':');
00145       *url = 0;
00146       if (portStr == NULL) {
00147         port = 80;
00148         ink_strlcpy(host, thost, sizeof(host));
00149       } else {
00150         *portStr = '\0';        
00151         port = atoi(portStr + 1);
00152         ink_strlcpy(host, thost, sizeof(host));
00153         *portStr = ':';
00154       }
00155       url_end = strchr(url + 1, ' ');
00156       SET_HANDLER(dnsEvent);
00157       *url = '/';
00158       hostDBProcessor.getbyname(this, host);
00159       return EVENT_DONE;
00160     }
00161     return EVENT_CONT;
00162   }
00163 
00164   int clusterOpenEvent(int event, void *data)
00165   {
00166     if (event == CLUSTER_EVENT_OPEN_FAILED)
00167       return done();
00168     if (event == CLUSTER_EVENT_OPEN) {
00169       if (!data)
00170         return done();
00171       remote = (VConnection *) data;
00172       clusterOutVIO = remote->do_io(VIO::WRITE, this, INT64_MAX, inbuf);
00173       ink_assert(clusterOutVIO);
00174       SET_HANDLER(tunnelEvent);
00175       tunnel = new OneWayTunnel(remote, vc, this, TUNNEL_TILL_DONE, true, true, true);
00176     }
00177     return EVENT_CONT;
00178   }
00179 
00180   int clusterEvent(int event, VConnection * data)
00181   {
00182     (void) event;
00183     vc = data;
00184     if (!vc)
00185       return done();
00186     SET_HANDLER(startEvent);
00187     vc->do_io(VIO::READ, this, INT64_MAX, inbuf);
00188     return EVENT_CONT;
00189   }
00190 
00191   int fileEvent(int event, DiskVConnection * aremote)
00192   {
00193     if (event != DISK_EVENT_OPEN) {
00194       printf("TestProxy fileEvent error %d\n", event);
00195       return done();
00196     }
00197     remote = aremote;
00198     SET_HANDLER(tunnelEvent);
00199     tunnel = new OneWayTunnel(remote, vc, this, TUNNEL_TILL_DONE, true, true, true);
00200     return EVENT_CONT;
00201   }
00202 
00203   int dnsEvent(int event, HostDBInfo * info)
00204   {
00205     if (!info) {
00206       printf("TestProxy dnsEvent error %d\n", event);
00207       return done();
00208     }
00209     SET_HANDLER(cacheCheckEvent);
00210     url_struct = new URL((const char *) url_str, sizeof(url_str), true);
00211     hostdbinfo = info;
00212     cacheProcessor.lookup(this, url_struct, false);
00213     
00214     
00215     return EVENT_DONE;
00216   }
00217 
00218   int cacheCheckEvent(int event, void *data)
00219   {
00220     if (event == CACHE_EVENT_LOOKUP) {
00221       if (amode == 'x') {
00222         cout << "Removing object from the cache\n";
00223         SET_HANDLER(NULL);
00224         amode = 0;
00225         cacheProcessor.remove(&(((CacheObjInfoVector *) data)->data[0]), false);
00226         return done();
00227       } else {
00228         cout << "Serving the object from cache\n";
00229         SET_HANDLER(cacheReadEvent);
00230         cacheProcessor.open_read(this, &(((CacheObjInfoVector *) data)->data[0]), false);
00231         return EVENT_CONT;
00232       }
00233     } else if (event == CACHE_EVENT_LOOKUP_FAILED) {
00234       cout << "Getting the object from origin server\n";
00235       SET_HANDLER(cacheCreateCacheFileEvent);
00236       objinfo = new CacheObjInfo;
00237       request_header = new HttpHeader;
00238       request_header->m_url = *url_struct;
00239       objinfo->request = *request_header;
00240       cacheProcessor.open_write(this, objinfo, false, CACHE_UNKNOWN_SIZE);
00241       return EVENT_DONE;
00242     } else {
00243       printf("TestProxy cacheCheckEvent error %d\n", event);
00244       return done();
00245     }
00246   }
00247 
00248   int cacheReadEvent(int event, DiskVConnection * aremote)
00249   {
00250     if (event != CACHE_EVENT_OPEN_READ) {
00251       printf("TestProxy cacheReadEvent error %d\n", event);
00252       return done();
00253     }
00254     remote = aremote;
00255     SET_HANDLER(tunnelEvent);
00256     new OneWayTunnel(remote, vc, this, TUNNEL_TILL_DONE, true, true, true);
00257     return EVENT_CONT;
00258   }
00259   int cacheCreateCacheFileEvent(int event, VConnection * acachefile)
00260   {
00261     if (event != CACHE_EVENT_OPEN_WRITE) {
00262       printf("TestProxy cacheCreateCacheFileEvent error %d\n", event);
00263       cachefile = 0;
00264     } else
00265       cachefile = acachefile;
00266     SET_HANDLER(cacheSendGetEvent);
00267     netProcessor.connect(this, hostdbinfo->ip, port, host);
00268     return EVENT_CONT;
00269   }
00270   int cacheSendGetEvent(int event, NetVConnection * aremote)
00271   {
00272     if (event != NET_EVENT_OPEN) {
00273       printf("TestProxy cacheSendGetEvent error %d\n", event);
00274       return done();
00275     }
00276     remote = aremote;
00277     outbuf = new_MIOBuffer();
00278     SET_HANDLER(cacheTransRemoteToCacheFileEvent);
00279     
00280     
00281     *url_end = 0;
00282     sprintf(outbuf->start, "GET %s HTTP/1.0\nHost: %s\n\n", url, host);
00283     outbuf->fill(strlen(outbuf->start) + 1);
00284     remote->do_io(VIO::WRITE, this, INT64_MAX, outbuf);
00285     
00286     return EVENT_CONT;
00287   }
00288   int cacheTransRemoteToCacheFileEvent(int event, VIO * vio)
00289   {
00290     if (event != VC_EVENT_WRITE_READY) {
00291       printf("TestProxy cacheTransRemoteToCacheFileEvent error %d\n", event);
00292       return done();
00293     }
00294     if (vio->buffer.size())
00295       return EVENT_CONT;
00296     SET_HANDLER(tunnelEvent);
00297     vconnection_vector[0] = vc;
00298     vconnection_vector[1] = cachefile;
00299     {
00300       int n = cachefile ? 2 : 1;
00301       cachefile = 0;
00302       new OneWayMultiTunnel(remote, vconnection_vector, n, this, TUNNEL_TILL_DONE, true, true, true);
00303     }
00304     return EVENT_DONE;
00305   }
00306 
00307   int connectEvent(int event, NetVConnection * aremote)
00308   {
00309     if (event != NET_EVENT_OPEN) {
00310       printf("TestProxy connectEvent error %d\n", event);
00311       return done();
00312     }
00313     remote = aremote;
00314     outbuf = new_MIOBuffer();
00315     SET_HANDLER(sendEvent);
00316     *url_end = 0;
00317     sprintf(outbuf->start, "GET %s HTTP/1.0\nHost: %s\n\n", url, host);
00318     outbuf->fill(strlen(outbuf->start) + 1);
00319     remote->do_io(VIO::WRITE, this, INT64_MAX, outbuf);
00320     
00321     return EVENT_CONT;
00322   }
00323 
00324   int sendEvent(int event, VIO * vio)
00325   {
00326     if (event != VC_EVENT_WRITE_READY) {
00327       printf("TestProxy sendEvent error %d\n", event);
00328       return done();
00329     }
00330     if (vio->buffer.size())
00331       return EVENT_CONT;
00332     SET_HANDLER(tunnelEvent);
00333     clusterOutVIO = (VIO *) - 1;        
00334     if (((NetVConnectionBase *) vc)->closed) {
00335       printf("TestProxy sendEvent unexpected close %X\n", (unsigned int) vc);
00336       vc = 0;
00337       return done();
00338     }
00339     tunnel = new OneWayTunnel(remote, vc, this, TUNNEL_TILL_DONE, true, true, true);
00340     return EVENT_DONE;
00341   }
00342 
00343   int tunnelEvent(int event, Continuation * cont)
00344   {
00345     (void) cont;
00346     if ((VIO *) cont == clusterOutVIO || (VIO *) cont == inVIO) {
00347       if (event == VC_EVENT_WRITE_COMPLETE)
00348         return EVENT_DONE;
00349       if (event == VC_EVENT_ERROR || event == VC_EVENT_EOS)
00350         return EVENT_DONE;
00351       return EVENT_CONT;
00352     }
00353     remote = 0;
00354     vc = 0;
00355     if (event != VC_EVENT_EOS) {
00356       printf("TestProxy sendEvent error %d\n", event);
00357       return done();
00358     }
00359     
00360     return done();
00361   }
00362 
00363   TestProxy(MIOBuffer * abuf)
00364 :  Continuation(new_ProxyMutex()),
00365     vc(0), remote(0), inbuf(abuf), outbuf(0), clusterOutVIO(0),
00366     inVIO(0), url(0), url_end(0), amode(0), tunnel(0), cachefile(0) {
00367     SET_HANDLER(startEvent);
00368   }
00369 };
00370 
00371 struct TestAccept:Continuation
00372 {
00373   int startEvent(int event, NetVConnection * e)
00374   {
00375     if (event == NET_EVENT_ACCEPT) {
00376       MIOBuffer *buf = new_MIOBuffer();
00377         e->do_io(VIO::READ, new TestProxy(buf), INT64_MAX, buf);
00378     } else
00379     {
00380       printf("TestAccept error %d\n", event);
00381       return EVENT_DONE;
00382     }
00383     return EVENT_CONT;
00384   }
00385 TestAccept():Continuation(new_ProxyMutex()) {
00386     SET_HANDLER(startEvent);
00387   }
00388 };
00389 
00390 void
00391 redirect_test(Machine * m, void *data, int len)
00392 {
00393   (void) m;
00394   (void) len;
00395   MIOBuffer *buf = new_MIOBuffer();
00396   TestProxy *c = new TestProxy(buf);
00397   SET_CONTINUATION_HANDLER(c, clusterEvent);
00398   clusterProcessor.connect(c, *(ClusterVCToken *) data);
00399 }
00400 
00401 #ifndef SUB_TEST
00402 void
00403 test()
00404 {
00405   ptest_ClusterFunction = redirect_test;
00406   netProcessor.proxy_accept(new TestAccept);
00407 }
00408 #endif