00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include <limits.h>
00024 #include "Net.h"
00025 #include "Disk.h"
00026 #include "Main.h"
00027 #include "HostDB.h"
00028 #include "Cluster.h"
00029 #include "OneWayTunnel.h"
00030 #include "OneWayMultiTunnel.h"
00031 #include "Cache.h"
00032
00033
00034 struct TestProxy:Continuation
00035 {
00036 VConnection *vc;
00037 VConnection *vconnection_vector[2];
00038 VConnection *remote;
00039 MIOBuffer *inbuf;
00040 MIOBuffer *outbuf;
00041 VIO *clusterOutVIO;
00042 VIO *inVIO;
00043 char host[1024], *url, *url_end, amode;
00044 int port;
00045 char s[1024];
00046 ClusterVCToken token;
00047 OneWayTunnel *tunnel;
00048 char url_str[1024];
00049 VConnection *cachefile;
00050 URL *url_struct;
00051 HostDBInfo *hostdbinfo;
00052 CacheObjInfo *objinfo;
00053 HttpHeader *request_header;
00054
00055 int done()
00056 {
00057 ink_assert(inbuf);
00058 if (inbuf)
00059 free_MIOBuffer(inbuf);
00060 inbuf = 0;
00061 if (outbuf)
00062 free_MIOBuffer(outbuf);
00063 if (vc)
00064 vc->do_io(VIO::CLOSE);
00065 if (remote)
00066 remote->do_io(VIO::CLOSE);
00067 if (cachefile)
00068 cachefile->do_io(VIO::CLOSE);
00069 if (tunnel)
00070 delete tunnel;
00071 delete this;
00072 return EVENT_DONE;
00073 }
00074
00075 int gets(VIO * vio)
00076 {
00077 char *sx = s, *x;
00078 int t, i;
00079 for (x = vio->buffer.mbuf->start; *x && x < vio->buffer.mbuf->end; x++) {
00080 if (x - vio->buffer.mbuf->start > 1023)
00081 return -1;
00082 if (*x == '\n')
00083 break;
00084 *sx++ = *x;
00085 }
00086
00087 t = 2;
00088 for (i = 0; t && s[i]; i++) {
00089 if (s[i] == ' ')
00090 --t;
00091 }
00092
00093
00094
00095 if (s[i - 2] == 'X') {
00096 i -= 2;
00097 amode = 'x';
00098 while (s[i] != '\0') {
00099 s[i] = s[i + 1];
00100 ++i;
00101 }
00102 return x - vio->buffer.mbuf->start - 1;
00103 }
00104 return x - vio->buffer.mbuf->start;
00105 }
00106
00107 int startEvent(int event, VIO * vio)
00108 {
00109 char *temp;
00110 if (event != VC_EVENT_READ_READY) {
00111 printf("TestProxy startEvent error %d %X\n", event, (unsigned int) vio->vc_server);
00112 return done();
00113 }
00114 inVIO = vio;
00115 vc = (NetVConnection *) vio->vc_server;
00116 int res = 0;
00117 char *thost = NULL;
00118 if ((res = gets(vio))) {
00119 if (res < 0) {
00120 printf("TestProxy startEvent line too long\n");
00121 return done();
00122 }
00123
00124 s[res] = 0;
00125 if ((res > 0) && (s[res - 1] == '\r'))
00126 s[res - 1] = 0;
00127
00128 if (s[4] == '/') {
00129 url = s + 5;
00130 url_end = strchr(url, ' ');
00131 *url_end = 0;
00132 SET_HANDLER(fileEvent);
00133 diskProcessor.open_vc(this, url, O_RDONLY);
00134 return EVENT_DONE;
00135 }
00136 else
00137 thost = s + 11;
00138 url = strchr(thost, '/');
00139 temp = strchr(thost, ' ');
00140 ink_assert(temp - thost < 1024);
00141 ink_strlcpy(url_str, thost, sizeof(url_str));
00142 if (!url)
00143 return done();
00144 char *portStr = strchr(thost, ':');
00145 *url = 0;
00146 if (portStr == NULL) {
00147 port = 80;
00148 ink_strlcpy(host, thost, sizeof(host));
00149 } else {
00150 *portStr = '\0';
00151 port = atoi(portStr + 1);
00152 ink_strlcpy(host, thost, sizeof(host));
00153 *portStr = ':';
00154 }
00155 url_end = strchr(url + 1, ' ');
00156 SET_HANDLER(dnsEvent);
00157 *url = '/';
00158 hostDBProcessor.getbyname(this, host);
00159 return EVENT_DONE;
00160 }
00161 return EVENT_CONT;
00162 }
00163
00164 int clusterOpenEvent(int event, void *data)
00165 {
00166 if (event == CLUSTER_EVENT_OPEN_FAILED)
00167 return done();
00168 if (event == CLUSTER_EVENT_OPEN) {
00169 if (!data)
00170 return done();
00171 remote = (VConnection *) data;
00172 clusterOutVIO = remote->do_io(VIO::WRITE, this, INT64_MAX, inbuf);
00173 ink_assert(clusterOutVIO);
00174 SET_HANDLER(tunnelEvent);
00175 tunnel = new OneWayTunnel(remote, vc, this, TUNNEL_TILL_DONE, true, true, true);
00176 }
00177 return EVENT_CONT;
00178 }
00179
00180 int clusterEvent(int event, VConnection * data)
00181 {
00182 (void) event;
00183 vc = data;
00184 if (!vc)
00185 return done();
00186 SET_HANDLER(startEvent);
00187 vc->do_io(VIO::READ, this, INT64_MAX, inbuf);
00188 return EVENT_CONT;
00189 }
00190
00191 int fileEvent(int event, DiskVConnection * aremote)
00192 {
00193 if (event != DISK_EVENT_OPEN) {
00194 printf("TestProxy fileEvent error %d\n", event);
00195 return done();
00196 }
00197 remote = aremote;
00198 SET_HANDLER(tunnelEvent);
00199 tunnel = new OneWayTunnel(remote, vc, this, TUNNEL_TILL_DONE, true, true, true);
00200 return EVENT_CONT;
00201 }
00202
00203 int dnsEvent(int event, HostDBInfo * info)
00204 {
00205 if (!info) {
00206 printf("TestProxy dnsEvent error %d\n", event);
00207 return done();
00208 }
00209 SET_HANDLER(cacheCheckEvent);
00210 url_struct = new URL((const char *) url_str, sizeof(url_str), true);
00211 hostdbinfo = info;
00212 cacheProcessor.lookup(this, url_struct, false);
00213
00214
00215 return EVENT_DONE;
00216 }
00217
00218 int cacheCheckEvent(int event, void *data)
00219 {
00220 if (event == CACHE_EVENT_LOOKUP) {
00221 if (amode == 'x') {
00222 cout << "Removing object from the cache\n";
00223 SET_HANDLER(NULL);
00224 amode = 0;
00225 cacheProcessor.remove(&(((CacheObjInfoVector *) data)->data[0]), false);
00226 return done();
00227 } else {
00228 cout << "Serving the object from cache\n";
00229 SET_HANDLER(cacheReadEvent);
00230 cacheProcessor.open_read(this, &(((CacheObjInfoVector *) data)->data[0]), false);
00231 return EVENT_CONT;
00232 }
00233 } else if (event == CACHE_EVENT_LOOKUP_FAILED) {
00234 cout << "Getting the object from origin server\n";
00235 SET_HANDLER(cacheCreateCacheFileEvent);
00236 objinfo = new CacheObjInfo;
00237 request_header = new HttpHeader;
00238 request_header->m_url = *url_struct;
00239 objinfo->request = *request_header;
00240 cacheProcessor.open_write(this, objinfo, false, CACHE_UNKNOWN_SIZE);
00241 return EVENT_DONE;
00242 } else {
00243 printf("TestProxy cacheCheckEvent error %d\n", event);
00244 return done();
00245 }
00246 }
00247
00248 int cacheReadEvent(int event, DiskVConnection * aremote)
00249 {
00250 if (event != CACHE_EVENT_OPEN_READ) {
00251 printf("TestProxy cacheReadEvent error %d\n", event);
00252 return done();
00253 }
00254 remote = aremote;
00255 SET_HANDLER(tunnelEvent);
00256 new OneWayTunnel(remote, vc, this, TUNNEL_TILL_DONE, true, true, true);
00257 return EVENT_CONT;
00258 }
00259 int cacheCreateCacheFileEvent(int event, VConnection * acachefile)
00260 {
00261 if (event != CACHE_EVENT_OPEN_WRITE) {
00262 printf("TestProxy cacheCreateCacheFileEvent error %d\n", event);
00263 cachefile = 0;
00264 } else
00265 cachefile = acachefile;
00266 SET_HANDLER(cacheSendGetEvent);
00267 netProcessor.connect(this, hostdbinfo->ip, port, host);
00268 return EVENT_CONT;
00269 }
00270 int cacheSendGetEvent(int event, NetVConnection * aremote)
00271 {
00272 if (event != NET_EVENT_OPEN) {
00273 printf("TestProxy cacheSendGetEvent error %d\n", event);
00274 return done();
00275 }
00276 remote = aremote;
00277 outbuf = new_MIOBuffer();
00278 SET_HANDLER(cacheTransRemoteToCacheFileEvent);
00279
00280
00281 *url_end = 0;
00282 sprintf(outbuf->start, "GET %s HTTP/1.0\nHost: %s\n\n", url, host);
00283 outbuf->fill(strlen(outbuf->start) + 1);
00284 remote->do_io(VIO::WRITE, this, INT64_MAX, outbuf);
00285
00286 return EVENT_CONT;
00287 }
00288 int cacheTransRemoteToCacheFileEvent(int event, VIO * vio)
00289 {
00290 if (event != VC_EVENT_WRITE_READY) {
00291 printf("TestProxy cacheTransRemoteToCacheFileEvent error %d\n", event);
00292 return done();
00293 }
00294 if (vio->buffer.size())
00295 return EVENT_CONT;
00296 SET_HANDLER(tunnelEvent);
00297 vconnection_vector[0] = vc;
00298 vconnection_vector[1] = cachefile;
00299 {
00300 int n = cachefile ? 2 : 1;
00301 cachefile = 0;
00302 new OneWayMultiTunnel(remote, vconnection_vector, n, this, TUNNEL_TILL_DONE, true, true, true);
00303 }
00304 return EVENT_DONE;
00305 }
00306
00307 int connectEvent(int event, NetVConnection * aremote)
00308 {
00309 if (event != NET_EVENT_OPEN) {
00310 printf("TestProxy connectEvent error %d\n", event);
00311 return done();
00312 }
00313 remote = aremote;
00314 outbuf = new_MIOBuffer();
00315 SET_HANDLER(sendEvent);
00316 *url_end = 0;
00317 sprintf(outbuf->start, "GET %s HTTP/1.0\nHost: %s\n\n", url, host);
00318 outbuf->fill(strlen(outbuf->start) + 1);
00319 remote->do_io(VIO::WRITE, this, INT64_MAX, outbuf);
00320
00321 return EVENT_CONT;
00322 }
00323
00324 int sendEvent(int event, VIO * vio)
00325 {
00326 if (event != VC_EVENT_WRITE_READY) {
00327 printf("TestProxy sendEvent error %d\n", event);
00328 return done();
00329 }
00330 if (vio->buffer.size())
00331 return EVENT_CONT;
00332 SET_HANDLER(tunnelEvent);
00333 clusterOutVIO = (VIO *) - 1;
00334 if (((NetVConnectionBase *) vc)->closed) {
00335 printf("TestProxy sendEvent unexpected close %X\n", (unsigned int) vc);
00336 vc = 0;
00337 return done();
00338 }
00339 tunnel = new OneWayTunnel(remote, vc, this, TUNNEL_TILL_DONE, true, true, true);
00340 return EVENT_DONE;
00341 }
00342
00343 int tunnelEvent(int event, Continuation * cont)
00344 {
00345 (void) cont;
00346 if ((VIO *) cont == clusterOutVIO || (VIO *) cont == inVIO) {
00347 if (event == VC_EVENT_WRITE_COMPLETE)
00348 return EVENT_DONE;
00349 if (event == VC_EVENT_ERROR || event == VC_EVENT_EOS)
00350 return EVENT_DONE;
00351 return EVENT_CONT;
00352 }
00353 remote = 0;
00354 vc = 0;
00355 if (event != VC_EVENT_EOS) {
00356 printf("TestProxy sendEvent error %d\n", event);
00357 return done();
00358 }
00359
00360 return done();
00361 }
00362
00363 TestProxy(MIOBuffer * abuf)
00364 : Continuation(new_ProxyMutex()),
00365 vc(0), remote(0), inbuf(abuf), outbuf(0), clusterOutVIO(0),
00366 inVIO(0), url(0), url_end(0), amode(0), tunnel(0), cachefile(0) {
00367 SET_HANDLER(startEvent);
00368 }
00369 };
00370
00371 struct TestAccept:Continuation
00372 {
00373 int startEvent(int event, NetVConnection * e)
00374 {
00375 if (event == NET_EVENT_ACCEPT) {
00376 MIOBuffer *buf = new_MIOBuffer();
00377 e->do_io(VIO::READ, new TestProxy(buf), INT64_MAX, buf);
00378 } else
00379 {
00380 printf("TestAccept error %d\n", event);
00381 return EVENT_DONE;
00382 }
00383 return EVENT_CONT;
00384 }
00385 TestAccept():Continuation(new_ProxyMutex()) {
00386 SET_HANDLER(startEvent);
00387 }
00388 };
00389
00390 void
00391 redirect_test(Machine * m, void *data, int len)
00392 {
00393 (void) m;
00394 (void) len;
00395 MIOBuffer *buf = new_MIOBuffer();
00396 TestProxy *c = new TestProxy(buf);
00397 SET_CONTINUATION_HANDLER(c, clusterEvent);
00398 clusterProcessor.connect(c, *(ClusterVCToken *) data);
00399 }
00400
00401 #ifndef SUB_TEST
00402 void
00403 test()
00404 {
00405 ptest_ClusterFunction = redirect_test;
00406 netProcessor.proxy_accept(new TestAccept);
00407 }
00408 #endif