00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 #include "ink_config.h"
00034 #include "HttpConfig.h"
00035 #include "HttpTunnel.h"
00036 #include "HttpSM.h"
00037 #include "HttpDebugNames.h"
00038 #include "ParseRules.h"
00039 
00040 static const int min_block_transfer_bytes = 256;
00041 static char const * const CHUNK_HEADER_FMT = "%" PRIx64"\r\n";
00042 
00043 
00044 
00045 static int const CHUNK_IOBUFFER_SIZE_INDEX = MIN_IOBUFFER_SIZE;
00046 
00047 char
00048 VcTypeCode(HttpTunnelType_t t) {
00049   char zret = ' ';
00050   switch (t) {
00051   case HT_HTTP_CLIENT: zret = 'U'; break;
00052   case HT_HTTP_SERVER: zret = 'S'; break;
00053   case HT_TRANSFORM: zret = 'T'; break;
00054   case HT_CACHE_READ: zret = 'R'; break;
00055   case HT_CACHE_WRITE: zret = 'W'; break;
00056   default: break;
00057   }
00058   return zret;
00059 }
00060 
00061 ChunkedHandler::ChunkedHandler()
00062   : chunked_reader(NULL), dechunked_buffer(NULL), dechunked_size(0), dechunked_reader(NULL), chunked_buffer(NULL),
00063     chunked_size(0), truncation(false), skip_bytes(0), state(CHUNK_READ_CHUNK), cur_chunk_size(0),
00064     bytes_left(0), last_server_event(VC_EVENT_NONE), running_sum(0), num_digits(0),
00065     max_chunk_size(DEFAULT_MAX_CHUNK_SIZE), max_chunk_header_len(0)
00066 {
00067 }
00068 
00069 void
00070 ChunkedHandler::init(IOBufferReader * buffer_in, HttpTunnelProducer * p)
00071 {
00072   if (p->do_chunking)
00073     init_by_action(buffer_in, ACTION_DOCHUNK);
00074   else if (p->do_dechunking)
00075     init_by_action(buffer_in, ACTION_DECHUNK);
00076   else
00077     init_by_action(buffer_in, ACTION_PASSTHRU);
00078   return;
00079 }
00080 
00081 void
00082 ChunkedHandler::init_by_action(IOBufferReader *buffer_in, Action action)
00083 {
00084   running_sum = 0;
00085   num_digits = 0;
00086   cur_chunk_size = 0;
00087   bytes_left = 0;
00088   truncation = false;
00089   this->action = action;
00090 
00091   switch (action) {
00092   case ACTION_DOCHUNK:
00093     dechunked_reader = buffer_in->mbuf->clone_reader(buffer_in);
00094     dechunked_reader->mbuf->water_mark = min_block_transfer_bytes;
00095     chunked_buffer = new_MIOBuffer(CHUNK_IOBUFFER_SIZE_INDEX);
00096     chunked_size = 0;
00097     break;
00098   case ACTION_DECHUNK:
00099     chunked_reader = buffer_in->mbuf->clone_reader(buffer_in);
00100     dechunked_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_256);
00101     dechunked_size = 0;
00102     break;
00103   case ACTION_PASSTHRU:
00104     chunked_reader = buffer_in->mbuf->clone_reader(buffer_in);
00105     break;
00106   default:
00107     ink_release_assert(!"Unknown action");
00108   }
00109 
00110   return;
00111 }
00112 
00113 void
00114 ChunkedHandler::clear()
00115 {
00116   switch (action) {
00117   case ACTION_DOCHUNK:
00118     free_MIOBuffer(chunked_buffer);
00119     break;
00120   case ACTION_DECHUNK:
00121     free_MIOBuffer(dechunked_buffer);
00122     break;
00123   case ACTION_PASSTHRU:
00124   default:
00125     break;
00126   }
00127 
00128   return;
00129 }
00130 
00131 void
00132 ChunkedHandler::set_max_chunk_size(int64_t size)
00133 {
00134   max_chunk_size = size ? size : DEFAULT_MAX_CHUNK_SIZE;
00135   max_chunk_header_len = snprintf(max_chunk_header, sizeof(max_chunk_header), CHUNK_HEADER_FMT, max_chunk_size);
00136 }
00137 
00138 void
00139 ChunkedHandler::read_size()
00140 {
00141   int64_t bytes_used;
00142   bool done = false;
00143 
00144   while (chunked_reader->read_avail() > 0 && !done) {
00145     const char *tmp = chunked_reader->start();
00146     int64_t data_size = chunked_reader->block_read_avail();
00147 
00148     ink_assert(data_size > 0);
00149     bytes_used = 0;
00150 
00151     while (data_size > 0) {
00152       bytes_used++;
00153       if (state == CHUNK_READ_SIZE) {
00154         
00155         if (ParseRules::is_hex(*tmp)) {
00156           num_digits++;
00157           running_sum *= 16;
00158 
00159           if (ParseRules::is_digit(*tmp)) {
00160             running_sum += *tmp - '0';
00161           } else {
00162             running_sum += ParseRules::ink_tolower(*tmp) - 'a' + 10;
00163           }
00164         } else {
00165           
00166           if (num_digits == 0 || running_sum < 0) {
00167             
00168             state = CHUNK_READ_ERROR;
00169             done = true;
00170             break;
00171           } else {
00172             state = CHUNK_READ_SIZE_CRLF;       
00173           }
00174         }
00175       } else if (state == CHUNK_READ_SIZE_CRLF) {       
00176         if (ParseRules::is_lf(*tmp)) {
00177           Debug("http_chunk", "read chunk size of %d bytes", running_sum);
00178           bytes_left = (cur_chunk_size = running_sum);
00179           state = (running_sum == 0) ? CHUNK_READ_TRAILER_BLANK : CHUNK_READ_CHUNK;
00180           done = true;
00181           break;
00182         }
00183       } else if (state == CHUNK_READ_SIZE_START) {
00184         if (ParseRules::is_lf(*tmp)) {
00185           running_sum = 0;
00186           num_digits = 0;
00187           state = CHUNK_READ_SIZE;
00188         }
00189       }
00190       tmp++;
00191       data_size--;
00192     }
00193     chunked_reader->consume(bytes_used);
00194   }
00195 }
00196 
00197 
00198 
00199 
00200 
00201 
00202 
00203 int64_t
00204 ChunkedHandler::transfer_bytes()
00205 {
00206   int64_t block_read_avail, moved, to_move, total_moved = 0;
00207 
00208   
00209   if (!dechunked_buffer) {
00210     moved = MIN(bytes_left, chunked_reader->read_avail());
00211     chunked_reader->consume(moved);
00212     bytes_left = bytes_left - moved;
00213     return moved;
00214   }
00215 
00216   while (bytes_left > 0) {
00217     block_read_avail = chunked_reader->block_read_avail();
00218 
00219     to_move = MIN(bytes_left, block_read_avail);
00220     if (to_move <= 0)
00221       break;
00222 
00223     if (to_move >= min_block_transfer_bytes) {
00224       moved = dechunked_buffer->write(chunked_reader, bytes_left);
00225     } else {
00226       
00227       
00228       
00229       
00230       moved = dechunked_buffer->write(chunked_reader->start(), to_move);
00231     }
00232 
00233     if (moved > 0) {
00234       chunked_reader->consume(moved);
00235       bytes_left = bytes_left - moved;
00236       dechunked_size += moved;
00237       total_moved += moved;
00238     } else
00239       break;
00240   }
00241   return total_moved;
00242 }
00243 
00244 void
00245 ChunkedHandler::read_chunk()
00246 {
00247   int64_t b = transfer_bytes();
00248 
00249   ink_assert(bytes_left >= 0);
00250   if (bytes_left == 0) {
00251     Debug("http_chunk", "completed read of chunk of %" PRId64" bytes", cur_chunk_size);
00252 
00253     state = CHUNK_READ_SIZE_START;
00254   } else if (bytes_left > 0) {
00255     Debug("http_chunk", "read %" PRId64" bytes of an %" PRId64" chunk", b, cur_chunk_size);
00256   }
00257 }
00258 
00259 void
00260 ChunkedHandler::read_trailer()
00261 {
00262   int64_t bytes_used;
00263   bool done = false;
00264 
00265   while (chunked_reader->is_read_avail_more_than(0) && !done) {
00266     const char *tmp = chunked_reader->start();
00267     int64_t data_size = chunked_reader->block_read_avail();
00268 
00269     ink_assert(data_size > 0);
00270     for (bytes_used = 0; data_size > 0; data_size--) {
00271       bytes_used++;
00272 
00273       if (ParseRules::is_cr(*tmp)) {
00274         
00275         
00276         
00277         state = (state == CHUNK_READ_TRAILER_BLANK) ? CHUNK_READ_TRAILER_CR : CHUNK_READ_TRAILER_LINE;
00278       } else if (ParseRules::is_lf(*tmp)) {
00279         
00280         
00281         
00282         if (state == CHUNK_READ_TRAILER_CR || state == CHUNK_READ_TRAILER_BLANK) {
00283           state = CHUNK_READ_DONE;
00284           Debug("http_chunk", "completed read of trailers");
00285           done = true;
00286           break;
00287         } else {
00288           
00289           
00290           state = CHUNK_READ_TRAILER_BLANK;
00291         }
00292       } else {
00293         
00294         
00295         state = CHUNK_READ_TRAILER_LINE;
00296       }
00297       tmp++;
00298     }
00299     chunked_reader->consume(bytes_used);
00300   }
00301 }
00302 
00303 bool ChunkedHandler::process_chunked_content()
00304 {
00305   while (chunked_reader->is_read_avail_more_than(0) && state != CHUNK_READ_DONE && state != CHUNK_READ_ERROR) {
00306     switch (state) {
00307     case CHUNK_READ_SIZE:
00308     case CHUNK_READ_SIZE_CRLF:
00309     case CHUNK_READ_SIZE_START:
00310       read_size();
00311       break;
00312     case CHUNK_READ_CHUNK:
00313       read_chunk();
00314       break;
00315     case CHUNK_READ_TRAILER_BLANK:
00316     case CHUNK_READ_TRAILER_CR:
00317     case CHUNK_READ_TRAILER_LINE:
00318       read_trailer();
00319       break;
00320     case CHUNK_FLOW_CONTROL:
00321       return false;
00322     default:
00323       ink_release_assert(0);
00324       break;
00325     }
00326   }
00327   return (state == CHUNK_READ_DONE || state == CHUNK_READ_ERROR);
00328 }
00329 
00330 bool ChunkedHandler::generate_chunked_content()
00331 {
00332   char tmp[16];
00333   bool server_done = false;
00334   int64_t r_avail;
00335 
00336   ink_assert(max_chunk_header_len);
00337 
00338   switch (last_server_event) {
00339   case VC_EVENT_EOS:
00340   case VC_EVENT_READ_COMPLETE:
00341   case HTTP_TUNNEL_EVENT_PRECOMPLETE:
00342     server_done = true;
00343     break;
00344   }
00345 
00346   while ((r_avail = dechunked_reader->read_avail()) > 0 && state != CHUNK_WRITE_DONE) {
00347     int64_t write_val = MIN(max_chunk_size, r_avail);
00348 
00349     state = CHUNK_WRITE_CHUNK;
00350     Debug("http_chunk", "creating a chunk of size %" PRId64 " bytes", write_val);
00351 
00352     
00353     if (write_val != max_chunk_size) {
00354       int len = snprintf(tmp, sizeof(tmp), CHUNK_HEADER_FMT, write_val);
00355       chunked_buffer->write(tmp, len);
00356       chunked_size += len;
00357     } else {
00358       chunked_buffer->write(max_chunk_header, max_chunk_header_len);
00359       chunked_size += max_chunk_header_len;
00360     }
00361 
00362     
00363     
00364     
00365     
00366     
00367     
00368     
00369     
00370     
00371     
00372     chunked_buffer->write(dechunked_reader, write_val);
00373     chunked_size += write_val;
00374     dechunked_reader->consume(write_val);
00375 
00376     
00377     chunked_buffer->write("\r\n", 2);
00378     chunked_size += 2;
00379   }
00380 
00381   if (server_done) {
00382     state = CHUNK_WRITE_DONE;
00383 
00384     
00385     chunked_buffer->write("0\r\n\r\n", 5);
00386     chunked_size += 5;
00387     return true;
00388   }
00389   return false;
00390 }
00391 
00392 HttpTunnelProducer::HttpTunnelProducer()
00393   : consumer_list(), self_consumer(NULL),
00394     vc(NULL), vc_handler(NULL), read_vio(NULL), read_buffer(NULL),
00395     buffer_start(NULL), vc_type(HT_HTTP_SERVER), chunking_action(TCA_PASSTHRU_DECHUNKED_CONTENT),
00396     do_chunking(false), do_dechunking(false), do_chunked_passthru(false),
00397     init_bytes_done(0), nbytes(0), ntodo(0), bytes_read(0),
00398     handler_state(0), last_event(0), num_consumers(0), alive(false),
00399     read_success(false), flow_control_source(0), name(NULL)
00400 {
00401 }
00402 
00403 uint64_t
00404 HttpTunnelProducer::backlog(uint64_t limit) {
00405   uint64_t zret = 0;
00406   
00407   
00408   
00409   for ( HttpTunnelConsumer* c = consumer_list.head ; c ; c = c->link.next ) {
00410     if (c->alive && c->write_vio) {
00411       uint64_t n = 0;
00412       if (HT_TRANSFORM == c->vc_type) {
00413         n += static_cast<TransformVCChain*>(c->vc)->backlog(limit);
00414       } else {
00415         IOBufferReader* r = c->write_vio->get_reader();
00416         if (r) {
00417           n += static_cast<uint64_t>(r->read_avail());
00418         }
00419       }
00420       if (n >= limit) return n;
00421 
00422       if (!c->is_sink()) {
00423         HttpTunnelProducer* dsp = c->self_producer;
00424         if (dsp) {
00425           n += dsp->backlog();
00426         }
00427       }
00428       if (n >= limit) return n;
00429       if (n > zret) zret = n;
00430     }
00431   }
00432 
00433   if (chunked_handler.chunked_reader) {
00434     zret += static_cast<uint64_t>(chunked_handler.chunked_reader->read_avail());
00435   }
00436 
00437   return zret;
00438 }
00439 
00440 
00441 
00442 
00443 
00444 
00445 void
00446 HttpTunnelProducer::set_throttle_src(HttpTunnelProducer* srcp) {
00447   HttpTunnelProducer* p = this;
00448   p->flow_control_source = srcp;
00449   for ( HttpTunnelConsumer* c = consumer_list.head ; c ; c = c->link.next ) {
00450     if (!c->is_sink()) {
00451       p = c->self_producer;
00452       if (p)
00453         p->set_throttle_src(srcp);
00454     }
00455   }
00456 }
00457 
00458 HttpTunnelConsumer::HttpTunnelConsumer()
00459   : link(), producer(NULL), self_producer(NULL), vc_type(HT_HTTP_CLIENT), vc(NULL), buffer_reader(NULL),
00460     vc_handler(NULL), write_vio(NULL), skip_bytes(0), bytes_written(0), handler_state(0), alive(false),
00461     write_success(false), name(NULL)
00462 {
00463 }
00464 
00465 HttpTunnel::HttpTunnel()
00466   : Continuation(NULL), num_producers(0), num_consumers(0), sm(NULL), active(false)
00467 {
00468 }
00469 
00470 void
00471 HttpTunnel::init(HttpSM * sm_arg, ProxyMutex * amutex)
00472 {
00473   HttpConfigParams* params = sm_arg->t_state.http_config_param;
00474   sm = sm_arg;
00475   active = false;
00476   mutex = amutex;
00477   SET_HANDLER(&HttpTunnel::main_handler);
00478   flow_state.enabled_p = params->oride.flow_control_enabled;
00479   if (params->oride.flow_low_water_mark > 0)
00480     flow_state.low_water = params->oride.flow_low_water_mark;
00481   if (params->oride.flow_high_water_mark > 0)
00482     flow_state.high_water = params->oride.flow_high_water_mark;
00483   
00484   ink_assert(flow_state.low_water <= flow_state.high_water);
00485 }
00486 
00487 void
00488 HttpTunnel::reset()
00489 {
00490   ink_assert(active == false);
00491 #ifdef DEBUG
00492   for (int i = 0; i < MAX_PRODUCERS; ++i) {
00493     ink_assert(producers[i].alive == false);
00494   }
00495   for (int j = 0; j < MAX_CONSUMERS; ++j) {
00496     ink_assert(consumers[j].alive == false);
00497   }
00498 #endif
00499 
00500   num_producers = 0;
00501   num_consumers = 0;
00502   memset(consumers, 0, sizeof(consumers));
00503   memset(producers, 0, sizeof(producers));
00504 }
00505 
00506 void
00507 HttpTunnel::kill_tunnel()
00508 {
00509   for (int i = 0; i < MAX_PRODUCERS; ++i) {
00510     if (producers[i].vc != NULL) {
00511       chain_abort_all(&producers[i]);
00512     }
00513     ink_assert(producers[i].alive == false);
00514   }
00515   active = false;
00516   this->deallocate_buffers();
00517   this->deallocate_redirect_postdata_buffers();
00518   this->reset();
00519 }
00520 
00521 HttpTunnelProducer *
00522 HttpTunnel::alloc_producer()
00523 {
00524   for (int i = 0; i < MAX_PRODUCERS; ++i) {
00525     if (producers[i].vc == NULL) {
00526       num_producers++;
00527       ink_assert(num_producers <= MAX_PRODUCERS);
00528       return producers + i;
00529     }
00530   }
00531   ink_release_assert(0);
00532   return NULL;
00533 }
00534 
00535 HttpTunnelConsumer *
00536 HttpTunnel::alloc_consumer()
00537 {
00538   for (int i = 0; i < MAX_CONSUMERS; i++) {
00539     if (consumers[i].vc == NULL) {
00540       num_consumers++;
00541       ink_assert(num_consumers <= MAX_CONSUMERS);
00542       return consumers + i;
00543     }
00544   }
00545   ink_release_assert(0);
00546   return NULL;
00547 }
00548 
00549 int
00550 HttpTunnel::deallocate_buffers()
00551 {
00552   int num = 0;
00553   ink_release_assert(active == false);
00554   for (int i = 0; i < MAX_PRODUCERS; ++i) {
00555     if (producers[i].read_buffer != NULL) {
00556       ink_assert(producers[i].vc != NULL);
00557       free_MIOBuffer(producers[i].read_buffer);
00558       producers[i].read_buffer = NULL;
00559       producers[i].buffer_start = NULL;
00560       num++;
00561     }
00562 
00563     if (producers[i].chunked_handler.dechunked_buffer != NULL) {
00564       ink_assert(producers[i].vc != NULL);
00565       free_MIOBuffer(producers[i].chunked_handler.dechunked_buffer);
00566       producers[i].chunked_handler.dechunked_buffer = NULL;
00567       num++;
00568     }
00569 
00570     if (producers[i].chunked_handler.chunked_buffer != NULL) {
00571       ink_assert(producers[i].vc != NULL);
00572       free_MIOBuffer(producers[i].chunked_handler.chunked_buffer);
00573       producers[i].chunked_handler.chunked_buffer = NULL;
00574       num++;
00575     }
00576     producers[i].chunked_handler.max_chunk_header_len = 0;
00577   }
00578   return num;
00579 }
00580 
00581 void
00582 HttpTunnel::set_producer_chunking_action(HttpTunnelProducer * p, int64_t skip_bytes, TunnelChunkingAction_t action)
00583 {
00584   p->chunked_handler.skip_bytes = skip_bytes;
00585   p->chunking_action = action;
00586 
00587   switch (action) {
00588   case TCA_CHUNK_CONTENT:
00589     p->chunked_handler.state = p->chunked_handler.CHUNK_WRITE_CHUNK;
00590     break;
00591   case TCA_DECHUNK_CONTENT:
00592   case TCA_PASSTHRU_CHUNKED_CONTENT:
00593     p->chunked_handler.state = p->chunked_handler.CHUNK_READ_SIZE;
00594     break;
00595   default:
00596     break;
00597   };
00598 }
00599 
00600 void
00601 HttpTunnel::set_producer_chunking_size(HttpTunnelProducer* p, int64_t size)
00602 {
00603   p->chunked_handler.set_max_chunk_size(size);
00604 }
00605 
00606 
00607 
00608 
00609 
00610 HttpTunnelProducer *
00611 HttpTunnel::add_producer(VConnection * vc,
00612                          int64_t nbytes_arg,
00613                          IOBufferReader * reader_start,
00614                          HttpProducerHandler sm_handler, HttpTunnelType_t vc_type, const char *name_arg)
00615 {
00616   HttpTunnelProducer *p;
00617 
00618   Debug("http_tunnel", "[%" PRId64 "] adding producer '%s'", sm->sm_id, name_arg);
00619 
00620   ink_assert(reader_start->mbuf);
00621   if ((p = alloc_producer()) != NULL) {
00622     p->vc = vc;
00623     p->nbytes = nbytes_arg;
00624     p->buffer_start = reader_start;
00625     p->read_buffer = reader_start->mbuf;
00626     p->vc_handler = sm_handler;
00627     p->vc_type = vc_type;
00628     p->name = name_arg;
00629     p->chunking_action = TCA_PASSTHRU_DECHUNKED_CONTENT;
00630 
00631     p->do_chunking = false;
00632     p->do_dechunking = false;
00633     p->do_chunked_passthru = false;
00634 
00635     p->init_bytes_done = reader_start->read_avail();
00636     if (p->nbytes < 0) {
00637       p->ntodo = p->nbytes;
00638     } else {                    
00639       
00640       
00641       
00642       
00643       p->ntodo = p->nbytes - p->init_bytes_done;
00644       ink_assert(p->ntodo >= 0);
00645     }
00646 
00647     
00648     
00649     if (vc == HTTP_TUNNEL_STATIC_PRODUCER) {
00650       ink_assert(p->ntodo == 0);
00651       p->alive = false;
00652       p->read_success = true;
00653     } else {
00654       p->alive = true;
00655     }
00656   }
00657   return p;
00658 }
00659 
00660 
00661 
00662 
00663 
00664 
00665 
00666 
00667 
00668 
00669 HttpTunnelConsumer *
00670 HttpTunnel::add_consumer(VConnection * vc,
00671                          VConnection * producer,
00672                          HttpConsumerHandler sm_handler, HttpTunnelType_t vc_type, const char *name_arg, int64_t skip_bytes)
00673 {
00674   Debug("http_tunnel", "[%" PRId64 "] adding consumer '%s'", sm->sm_id, name_arg);
00675 
00676   
00677   HttpTunnelProducer *p = get_producer(producer);
00678   ink_release_assert(p);
00679 
00680   
00681   
00682   if (p->alive == false && p->read_success == false) {
00683     Debug("http_tunnel", "[%" PRId64 "] consumer '%s' not added due to producer failure", sm->sm_id, name_arg);
00684     return NULL;
00685   }
00686   
00687   HttpTunnelConsumer *c = alloc_consumer();
00688   c->producer = p;
00689   c->vc = vc;
00690   c->alive = true;
00691   c->skip_bytes = skip_bytes;
00692   c->vc_handler = sm_handler;
00693   c->vc_type = vc_type;
00694   c->name = name_arg;
00695 
00696   
00697   p->consumer_list.push(c);
00698   p->num_consumers++;
00699 
00700   return c;
00701 }
00702 
00703 void
00704 HttpTunnel::chain(HttpTunnelConsumer* c, HttpTunnelProducer* p)
00705 {
00706   p->self_consumer = c;
00707   c->self_producer = p;
00708   
00709   if (c->producer->is_throttled())
00710     p->set_throttle_src(c->producer->flow_control_source);
00711 }
00712 
00713 
00714 
00715 
00716 
00717 void
00718 HttpTunnel::tunnel_run(HttpTunnelProducer * p_arg)
00719 {
00720   Debug("http_tunnel", "tunnel_run started, p_arg is %s", p_arg ? "provided" : "NULL");
00721 
00722   if (p_arg) {
00723     producer_run(p_arg);
00724   } else {
00725     HttpTunnelProducer *p;
00726 
00727     ink_assert(active == false);
00728 
00729     for (int i = 0 ; i < MAX_PRODUCERS ; ++i) {
00730       p = producers + i;
00731       if (p->vc != NULL) {
00732         producer_run(p);
00733       }
00734     }
00735   }
00736 
00737   
00738   
00739   
00740   
00741   if (!is_tunnel_alive()) {
00742     active = false;
00743     sm->handleEvent(HTTP_TUNNEL_EVENT_DONE, this);
00744   }
00745 }
00746 
00747 void
00748 HttpTunnel::producer_run(HttpTunnelProducer * p)
00749 {
00750   
00751   
00752   
00753   HttpTunnelConsumer *c, *cache_write_consumer = NULL;
00754   bool transform_consumer = false;
00755 
00756   for (c = p->consumer_list.head; c; c = c->link.next) {
00757     if (c->vc_type == HT_CACHE_WRITE) {
00758       cache_write_consumer = c;
00759       break;
00760     }
00761   }
00762 
00763   
00764   for (c = p->consumer_list.head; c; c = c->link.next) {
00765     if (c->vc_type == HT_TRANSFORM) {
00766       transform_consumer = true;
00767       break;
00768     }
00769   }
00770 
00771   
00772   
00773   TunnelChunkingAction_t action = p->chunking_action;
00774 
00775   
00776   if (p->vc != HTTP_TUNNEL_STATIC_PRODUCER) {
00777     if (action == TCA_CHUNK_CONTENT)
00778       p->do_chunking = true;
00779     else if (action == TCA_DECHUNK_CONTENT)
00780       p->do_dechunking = true;
00781     else if (action == TCA_PASSTHRU_CHUNKED_CONTENT) {
00782       p->do_chunked_passthru = true;
00783 
00784       
00785       if (cache_write_consumer != NULL)
00786         p->do_dechunking = true;
00787     }
00788   }
00789 
00790   int64_t consumer_n;
00791   int64_t producer_n;
00792 
00793   ink_assert(p->vc != NULL);
00794   active = true;
00795 
00796   IOBufferReader *chunked_buffer_start = NULL, *dechunked_buffer_start = NULL;
00797   if (p->do_chunking || p->do_dechunking || p->do_chunked_passthru) {
00798     p->chunked_handler.init(p->buffer_start, p);
00799 
00800     
00801     if (p->do_chunking) {
00802       
00803       chunked_buffer_start = p->chunked_handler.chunked_buffer->alloc_reader();
00804       p->chunked_handler.chunked_buffer->write(p->buffer_start, p->chunked_handler.skip_bytes);
00805     } else if (p->do_dechunking) {
00806       
00807       Debug("http_tunnel",
00808             "[producer_run] do_dechunking p->chunked_handler.chunked_reader->read_avail() = %" PRId64"",
00809             p->chunked_handler.chunked_reader->read_avail());
00810 
00811       
00812       dechunked_buffer_start = p->chunked_handler.dechunked_buffer->alloc_reader();
00813 
00814       
00815       
00816       if (!transform_consumer) {
00817         p->chunked_handler.dechunked_buffer->write(p->buffer_start, p->chunked_handler.skip_bytes);
00818 
00819         Debug("http_tunnel", "[producer_run] do_dechunking::Copied header of size %" PRId64"", p->chunked_handler.skip_bytes);
00820       }
00821     }
00822   }
00823 
00824   int64_t read_start_pos = 0;
00825   if (p->vc_type == HT_CACHE_READ && sm->t_state.range_setup == HttpTransact::RANGE_NOT_TRANSFORM_REQUESTED) {
00826     ink_assert(sm->t_state.num_range_fields == 1); 
00827     read_start_pos = sm->t_state.ranges[0]._start;
00828     producer_n = (sm->t_state.ranges[0]._end - sm->t_state.ranges[0]._start)+1;
00829     consumer_n = (producer_n + sm->client_response_hdr_bytes);
00830   } else if (p->nbytes >= 0) {
00831     consumer_n = p->nbytes;
00832     producer_n = p->ntodo;
00833   } else {
00834     consumer_n = (producer_n = INT64_MAX);
00835   }
00836 
00837   
00838   
00839   
00840   ink_release_assert(p->num_consumers > 0);
00841   for (c = p->consumer_list.head; c;) {
00842     
00843     
00844     if (c->vc_type == HT_CACHE_WRITE) {
00845       switch (action) {
00846       case TCA_CHUNK_CONTENT:
00847       case TCA_PASSTHRU_DECHUNKED_CONTENT:
00848         c->buffer_reader = p->read_buffer->clone_reader(p->buffer_start);
00849         break;
00850       case TCA_DECHUNK_CONTENT:
00851       case TCA_PASSTHRU_CHUNKED_CONTENT:
00852         c->buffer_reader = p->chunked_handler.dechunked_buffer->clone_reader(dechunked_buffer_start);
00853         break;
00854       default:
00855         break;
00856       }
00857     }
00858     
00859     else if (action == TCA_CHUNK_CONTENT) {
00860       c->buffer_reader = p->chunked_handler.chunked_buffer->clone_reader(chunked_buffer_start);
00861     } else if (action == TCA_DECHUNK_CONTENT) {
00862       c->buffer_reader = p->chunked_handler.dechunked_buffer->clone_reader(dechunked_buffer_start);
00863     } else {
00864       c->buffer_reader = p->read_buffer->clone_reader(p->buffer_start);
00865     }
00866 
00867     
00868     if (c->skip_bytes > 0) {
00869       ink_assert(c->skip_bytes <= c->buffer_reader->read_avail());
00870       c->buffer_reader->consume(c->skip_bytes);
00871     }
00872     int64_t c_write = consumer_n;
00873 
00874     
00875     
00876     
00877     
00878     
00879     if (c_write != INT64_MAX) {
00880       c_write -= c->skip_bytes;
00881     }
00882     
00883     
00884     
00885     if (p->do_chunking == true) {
00886       c_write = INT64_MAX;
00887     }
00888 
00889     if (c_write == 0) {
00890       
00891       c->write_vio = NULL;
00892       consumer_handler(VC_EVENT_WRITE_COMPLETE, c);
00893     } else {
00894       c->write_vio = c->vc->do_io_write(this, c_write, c->buffer_reader);
00895       ink_assert(c_write > 0);
00896     }
00897 
00898     c = c->link.next;
00899   }
00900 
00901   
00902   
00903   
00904   if (p->alive && sm->t_state.method == HTTP_WKSIDX_POST && sm->enable_redirection && (p->vc_type == HT_HTTP_CLIENT)) {
00905     Debug("http_redirect", "[HttpTunnel::producer_run] client post: %" PRId64" max size: %" PRId64"",
00906           p->buffer_start->read_avail(), HttpConfig::m_master.post_copy_size);
00907 
00908     
00909     if (p->buffer_start->read_avail() > HttpConfig::m_master.post_copy_size) {
00910       Debug("http_redirect", "[HttpTunnel::producer_handler] post exceeds buffer limit, buffer_avail=%" PRId64" limit=%" PRId64"",
00911             p->buffer_start->read_avail(), HttpConfig::m_master.post_copy_size);
00912       sm->enable_redirection = false;
00913     } else {
00914       
00915       allocate_redirect_postdata_buffers(p->read_buffer->clone_reader(p->buffer_start));
00916       copy_partial_post_data();
00917     }
00918   }                             
00919 
00920   if (p->do_chunking) {
00921     
00922     p->chunked_handler.chunked_buffer->dealloc_reader(chunked_buffer_start);
00923     p->chunked_handler.dechunked_reader->consume(p->chunked_handler.skip_bytes);
00924 
00925     
00926     producer_handler(VC_EVENT_READ_READY, p);
00927   } else if (p->do_dechunking || p->do_chunked_passthru) {
00928     
00929     if (p->do_dechunking)
00930       p->chunked_handler.dechunked_buffer->dealloc_reader(dechunked_buffer_start);
00931 
00932     
00933     
00934     Debug("http_tunnel",
00935           "[producer_run] do_dechunking p->chunked_handler.chunked_reader->read_avail() = %" PRId64"",
00936           p->chunked_handler.chunked_reader->read_avail());
00937     if (!transform_consumer && (p->chunked_handler.chunked_reader->read_avail() >= p->chunked_handler.skip_bytes)) {
00938       p->chunked_handler.chunked_reader->consume(p->chunked_handler.skip_bytes);
00939       Debug("http_tunnel", "[producer_run] do_dechunking p->chunked_handler.skip_bytes = %" PRId64"",
00940             p->chunked_handler.skip_bytes);
00941     }
00942     
00943     
00944     
00945 
00946     producer_handler(VC_EVENT_READ_READY, p);
00947     if (!p->chunked_handler.chunked_reader->read_avail() && sm->redirection_tries > 0 && p->vc_type == HT_HTTP_CLIENT) {     
00948       
00949       
00950       
00951       
00952       producer_n = 0;
00953     }
00954   }
00955 
00956   if (p->alive) {
00957     ink_assert(producer_n >= 0);
00958 
00959     if (producer_n == 0) {
00960       
00961       
00962       
00963       p->alive = false;
00964       p->read_success = true;
00965       Debug("http_tunnel", "[%" PRId64 "] [tunnel_run] producer already done", sm->sm_id);
00966       producer_handler(HTTP_TUNNEL_EVENT_PRECOMPLETE, p);
00967     } else {
00968       if (read_start_pos > 0) {
00969         p->read_vio = ((CacheVC*)p->vc)->do_io_pread(this, producer_n, p->read_buffer, read_start_pos);
00970       }
00971       else {
00972         p->read_vio = p->vc->do_io_read(this, producer_n, p->read_buffer);
00973       }
00974     }
00975   }
00976 
00977   
00978   
00979   p->read_buffer->dealloc_reader(p->buffer_start);
00980   p->buffer_start = NULL;
00981 }
00982 
00983 int
00984 HttpTunnel::producer_handler_dechunked(int event, HttpTunnelProducer * p)
00985 {
00986   ink_assert(p->do_chunking);
00987 
00988   Debug("http_tunnel", "[%" PRId64 "] producer_handler_dechunked [%s %s]", sm->sm_id, p->name, HttpDebugNames::get_event_name(event));
00989 
00990   
00991   switch (event) {
00992   case VC_EVENT_READ_READY:
00993   case VC_EVENT_READ_COMPLETE:
00994   case HTTP_TUNNEL_EVENT_PRECOMPLETE:
00995   case VC_EVENT_EOS:
00996     p->last_event =
00997       p->chunked_handler.last_server_event = event;
00998     
00999     p->chunked_handler.generate_chunked_content();
01000     break;
01001   };
01002   
01003   
01004   
01005   return event;
01006 }
01007 
01008 
01009 
01010 
01011 
01012 
01013 
01014 int
01015 HttpTunnel::producer_handler_chunked(int event, HttpTunnelProducer * p)
01016 {
01017   ink_assert(p->do_dechunking || p->do_chunked_passthru);
01018 
01019   Debug("http_tunnel", "[%" PRId64 "] producer_handler_chunked [%s %s]", sm->sm_id, p->name, HttpDebugNames::get_event_name(event));
01020 
01021   
01022   switch (event) {
01023   case VC_EVENT_READ_READY:
01024   case VC_EVENT_READ_COMPLETE:
01025   case HTTP_TUNNEL_EVENT_PRECOMPLETE:
01026   case VC_EVENT_EOS:
01027     break;
01028   default:
01029     return event;
01030   }
01031 
01032   p->last_event =
01033     p->chunked_handler.last_server_event = event;
01034   bool done = p->chunked_handler.process_chunked_content();
01035 
01036   
01037   
01038   if (p->chunked_handler.state == ChunkedHandler::CHUNK_READ_ERROR) {
01039     Debug("http_tunnel", "[%" PRId64 "] producer_handler_chunked [%s chunk decoding error]", sm->sm_id, p->name);
01040     p->chunked_handler.truncation = true;
01041     
01042     
01043     
01044     return VC_EVENT_EOS;
01045   }
01046 
01047   switch (event) {
01048   case VC_EVENT_READ_READY:
01049     if (done) {
01050       return VC_EVENT_READ_COMPLETE;
01051     }
01052     break;
01053   case HTTP_TUNNEL_EVENT_PRECOMPLETE:
01054   case VC_EVENT_EOS:
01055   case VC_EVENT_READ_COMPLETE:
01056     if (!done) {
01057       p->chunked_handler.truncation = true;
01058     }
01059     break;
01060   }
01061 
01062   return event;
01063 }
01064 
01065 
01066 
01067 
01068 
01069 
01070 
01071 
01072 
01073 
01074 
01075 
01076 bool HttpTunnel::producer_handler(int event, HttpTunnelProducer * p)
01077 {
01078   HttpTunnelConsumer *c;
01079   HttpProducerHandler jump_point;
01080   bool sm_callback = false;
01081 
01082   Debug("http_tunnel", "[%" PRId64 "] producer_handler [%s %s]", sm->sm_id, p->name, HttpDebugNames::get_event_name(event));
01083 
01084   
01085   if (p->do_chunking) {
01086     event = producer_handler_dechunked(event, p);
01087 
01088     
01089     
01090     
01091     
01092     if (event == HTTP_TUNNEL_EVENT_PRECOMPLETE) {
01093       event = VC_EVENT_EOS;
01094     }
01095   } else if (p->do_dechunking || p->do_chunked_passthru) {
01096     event = producer_handler_chunked(event, p);
01097   } else {
01098     p->last_event = event;
01099   }
01100 
01101   
01102   
01103   
01104   if (sm->t_state.method == HTTP_WKSIDX_POST && sm->enable_redirection &&
01105       (event == VC_EVENT_READ_READY || event == VC_EVENT_READ_COMPLETE) && (p->vc_type == HT_HTTP_CLIENT)) {
01106     Debug("http_redirect", "[HttpTunnel::producer_handler] [%s %s]", p->name, HttpDebugNames::get_event_name(event));
01107 
01108     if ((postbuf->postdata_copy_buffer_start->read_avail() + postbuf->ua_buffer_reader->read_avail())
01109         > HttpConfig::m_master.post_copy_size) {
01110       Debug("http_redirect",
01111             "[HttpTunnel::producer_handler] post exceeds buffer limit, buffer_avail=%" PRId64" reader_avail=%" PRId64" limit=%" PRId64"",
01112             postbuf->postdata_copy_buffer_start->read_avail(), postbuf->ua_buffer_reader->read_avail(),
01113             HttpConfig::m_master.post_copy_size);
01114       deallocate_redirect_postdata_buffers();
01115       sm->enable_redirection = false;
01116     } else {
01117       copy_partial_post_data();
01118     }
01119   }                             
01120 
01121   Debug("http_redirect", "[HttpTunnel::producer_handler] enable_redirection: [%d %d %d] event: %d",
01122         p->alive == true, sm->enable_redirection, (p->self_consumer && p->self_consumer->alive == true), event);
01123   ink_assert(p->alive == true || event == HTTP_TUNNEL_EVENT_PRECOMPLETE || event == VC_EVENT_EOS ||
01124              sm->enable_redirection || (p->self_consumer && p->self_consumer->alive == true));
01125 
01126   switch (event) {
01127   case VC_EVENT_READ_READY:
01128     
01129     for (c = p->consumer_list.head; c; c = c->link.next) {
01130       if (c->alive) {
01131         c->write_vio->reenable();
01132       }
01133     }
01134     break;
01135 
01136   case HTTP_TUNNEL_EVENT_PRECOMPLETE:
01137     
01138     
01139     
01140     
01141     
01142     
01143     p->bytes_read = 0;
01144     jump_point = p->vc_handler;
01145     (sm->*jump_point) (event, p);
01146     sm_callback = true;
01147     break;
01148 
01149   case VC_EVENT_READ_COMPLETE:
01150   case VC_EVENT_EOS:
01151     
01152     p->alive = false;
01153     if (p->read_vio) {
01154       p->bytes_read = p->read_vio->ndone;
01155     } else {
01156       
01157       
01158       
01159       
01160       
01161       p->bytes_read = 0;
01162     }
01163 
01164     
01165     
01166     
01167     
01168     
01169     
01170     
01171     jump_point = p->vc_handler;
01172     (sm->*jump_point) (event, p);
01173     sm_callback = true;
01174 
01175     
01176     for (c = p->consumer_list.head; c; c = c->link.next) {
01177       if (c->alive) {
01178         c->write_vio->reenable();
01179       }
01180     }
01181     break;
01182 
01183   case VC_EVENT_ERROR:
01184   case VC_EVENT_ACTIVE_TIMEOUT:
01185   case VC_EVENT_INACTIVITY_TIMEOUT:
01186   case HTTP_TUNNEL_EVENT_CONSUMER_DETACH:
01187     p->alive = false;
01188     p->bytes_read = p->read_vio->ndone;
01189     
01190     jump_point = p->vc_handler;
01191     (sm->*jump_point) (event, p);
01192     sm_callback = true;
01193     break;
01194 
01195   case VC_EVENT_WRITE_READY:
01196   case VC_EVENT_WRITE_COMPLETE:
01197   default:
01198     
01199     ink_release_assert(0);
01200     break;
01201   }
01202 
01203   return sm_callback;
01204 }
01205 
01206 bool
01207 HttpTunnel::consumer_reenable(HttpTunnelConsumer* c)
01208 {
01209   HttpTunnelProducer* p = c->producer;
01210   HttpTunnelProducer* srcp = p->flow_control_source;
01211   if (p->alive
01212 #ifndef LAZY_BUF_ALLOC
01213       && p->read_buffer->write_avail() > 0
01214 #endif
01215     ) {
01216     
01217     
01218     
01219     
01220     
01221     uint64_t backlog = (flow_state.enabled_p && p->is_source())
01222       ? p->backlog(flow_state.high_water) : 0;
01223 
01224     if (backlog >= flow_state.high_water) {
01225       if (is_debug_tag_set("http_tunnel"))
01226         Debug("http_tunnel", "Throttle   %p %" PRId64 " / %" PRId64, p, backlog, p->backlog());
01227       p->throttle(); 
01228     } else {
01229       if (srcp && srcp->alive && c->is_sink()) {
01230         
01231         
01232         
01233         
01234         
01235         if (srcp != p)
01236           backlog = srcp->backlog(flow_state.low_water);
01237         if (backlog < flow_state.low_water) {
01238           if (is_debug_tag_set("http_tunnel"))
01239             Debug("http_tunnel", "Unthrottle %p %" PRId64 " / %" PRId64, p, backlog, p->backlog());
01240           srcp->unthrottle();
01241           srcp->read_vio->reenable();
01242           
01243           this->producer_handler(VC_EVENT_READ_READY, srcp);
01244         }
01245       }
01246       p->read_vio->reenable();
01247     }
01248   }
01249   return p->is_throttled();
01250 }
01251 
01252 
01253 
01254 
01255 
01256 
01257 
01258 
01259 
01260 
01261 
01262 
01263 bool HttpTunnel::consumer_handler(int event, HttpTunnelConsumer * c)
01264 {
01265   bool sm_callback = false;
01266   HttpConsumerHandler jump_point;
01267   HttpTunnelProducer* p = c->producer;
01268 
01269   Debug("http_tunnel", "[%" PRId64 "] consumer_handler [%s %s]", sm->sm_id, c->name, HttpDebugNames::get_event_name(event));
01270 
01271   ink_assert(c->alive == true);
01272 
01273   switch (event) {
01274   case VC_EVENT_WRITE_READY:
01275     this->consumer_reenable(c);
01276     break;
01277 
01278   case VC_EVENT_WRITE_COMPLETE:
01279   case VC_EVENT_EOS:
01280   case VC_EVENT_ERROR:
01281   case VC_EVENT_ACTIVE_TIMEOUT:
01282   case VC_EVENT_INACTIVITY_TIMEOUT:
01283     ink_assert(c->alive);
01284     ink_assert(c->buffer_reader);
01285     c->alive = false;
01286 
01287     c->bytes_written = c->write_vio ? c->write_vio->ndone : 0;
01288 
01289     
01290     jump_point = c->vc_handler;
01291     (sm->*jump_point) (event, c);
01292     sm_callback = true;
01293 
01294     
01295     
01296     
01297     c->buffer_reader->mbuf->dealloc_reader(c->buffer_reader);
01298     c->buffer_reader = NULL;
01299 
01300     
01301     
01302     
01303     
01304     
01305     
01306     if (p->alive && p->read_vio
01307 #ifndef LAZY_BUF_ALLOC
01308         && p->read_buffer->write_avail() > 0
01309 #endif
01310       ) {
01311       if (p->is_throttled())
01312         this->consumer_reenable(c);
01313       else
01314         p->read_vio->reenable();
01315     }
01316     
01317     
01318     if (p->is_throttled())
01319       Debug("http_tunnel", "Special event %s on %p with flow control on", HttpDebugNames::get_event_name(event), p);
01320     break;
01321 
01322   case VC_EVENT_READ_READY:
01323   case VC_EVENT_READ_COMPLETE:
01324   default:
01325     
01326     ink_release_assert(0);
01327     break;
01328   }
01329 
01330   return sm_callback;
01331 }
01332 
01333 
01334 
01335 
01336 
01337 
01338 
01339 void
01340 HttpTunnel::chain_abort_all(HttpTunnelProducer * p)
01341 {
01342   HttpTunnelConsumer *c = p->consumer_list.head;
01343 
01344   while (c) {
01345     if (c->alive) {
01346       c->alive = false;
01347       c->write_vio = NULL;
01348       c->vc->do_io_close(EHTTP_ERROR);
01349       update_stats_after_abort(c->vc_type);
01350     }
01351 
01352     if (c->self_producer) {
01353       chain_abort_all(c->self_producer);
01354     }
01355 
01356     c = c->link.next;
01357   }
01358 
01359   if (p->alive) {
01360     p->alive = false;
01361     p->bytes_read = p->read_vio->ndone;
01362     if (p->self_consumer) {
01363       p->self_consumer->alive = false;
01364     }
01365     p->read_vio = NULL;
01366     p->vc->do_io_close(EHTTP_ERROR);
01367     update_stats_after_abort(p->vc_type);
01368   }
01369 }
01370 
01371 
01372 
01373 
01374 
01375 
01376 
01377 void
01378 HttpTunnel::finish_all_internal(HttpTunnelProducer * p, bool chain)
01379 {
01380   ink_assert(p->alive == false);
01381   HttpTunnelConsumer *c = p->consumer_list.head;
01382   int64_t total_bytes = 0;
01383   TunnelChunkingAction_t action = p->chunking_action;
01384 
01385   while (c) {
01386     if (c->alive) {
01387       if (c->vc_type == HT_CACHE_WRITE) {
01388         switch (action) {
01389         case TCA_CHUNK_CONTENT:
01390         case TCA_PASSTHRU_DECHUNKED_CONTENT:
01391           total_bytes = p->bytes_read + p->init_bytes_done;
01392           break;
01393         case TCA_DECHUNK_CONTENT:
01394         case TCA_PASSTHRU_CHUNKED_CONTENT:
01395           total_bytes = p->chunked_handler.skip_bytes + p->chunked_handler.dechunked_size;
01396           break;
01397         default:
01398           break;
01399         }
01400       } else if (action == TCA_CHUNK_CONTENT) {
01401         total_bytes = p->chunked_handler.skip_bytes + p->chunked_handler.chunked_size;
01402       } else if (action == TCA_DECHUNK_CONTENT) {
01403         total_bytes = p->chunked_handler.skip_bytes + p->chunked_handler.dechunked_size;
01404       } else
01405         total_bytes = p->bytes_read + p->init_bytes_done;
01406 
01407       c->write_vio->nbytes = total_bytes - c->skip_bytes;
01408       ink_assert(c->write_vio->nbytes >= 0);
01409 
01410       if (c->write_vio->nbytes < 0) {
01411         
01412         fprintf(stderr,
01413                 "[HttpTunnel::finish_all_internal] ERROR: Incorrect total_bytes - c->skip_bytes = %" PRId64 "\n",
01414                 (int64_t) (total_bytes - c->skip_bytes));
01415       }
01416 
01417       if (chain == true && c->self_producer) {
01418         chain_finish_all(c->self_producer);
01419       }
01420       
01421       
01422       
01423       
01424       if (c->write_vio->nbytes == c->write_vio->ndone) {
01425         consumer_handler(VC_EVENT_WRITE_COMPLETE, c);
01426       }
01427     }
01428 
01429     c = c->link.next;
01430   }
01431 }
01432 
01433 
01434 
01435 
01436 
01437 
01438 void
01439 HttpTunnel::chain_abort_cache_write(HttpTunnelProducer * p)
01440 {
01441   HttpTunnelConsumer *c = p->consumer_list.head;
01442 
01443   while (c) {
01444     if (c->alive) {
01445       if (c->vc_type == HT_CACHE_WRITE) {
01446         ink_assert(c->self_producer == NULL);
01447         c->write_vio = NULL;
01448         c->vc->do_io_close(EHTTP_ERROR);
01449         c->alive = false;
01450         HTTP_DECREMENT_DYN_STAT(http_current_cache_connections_stat);
01451       } else if (c->self_producer) {
01452         chain_abort_cache_write(c->self_producer);
01453       }
01454     }
01455     c = c->link.next;
01456   }
01457 }
01458 
01459 
01460 
01461 
01462 
01463 
01464 void
01465 HttpTunnel::close_vc(HttpTunnelProducer * p)
01466 {
01467   ink_assert(p->alive == false);
01468   HttpTunnelConsumer *c = p->self_consumer;
01469 
01470   if (c && c->alive) {
01471     c->alive = false;
01472     if (c->write_vio) {
01473       c->bytes_written = c->write_vio->ndone;
01474     }
01475   }
01476 
01477   p->vc->do_io_close();
01478 }
01479 
01480 
01481 
01482 
01483 
01484 
01485 void
01486 HttpTunnel::close_vc(HttpTunnelConsumer * c)
01487 {
01488   ink_assert(c->alive == false);
01489   HttpTunnelProducer *p = c->self_producer;
01490 
01491   if (p && p->alive) {
01492     p->alive = false;
01493     if (p->read_vio) {
01494       p->bytes_read = p->read_vio->ndone;
01495     }
01496   }
01497 
01498   c->vc->do_io_close();
01499 }
01500 
01501 
01502 
01503 
01504 
01505 
01506 
01507 int
01508 HttpTunnel::main_handler(int event, void *data)
01509 {
01510   HttpTunnelProducer *p = NULL;
01511   HttpTunnelConsumer *c = NULL;
01512   bool sm_callback = false;
01513 
01514   ink_assert(sm->magic == HTTP_SM_MAGIC_ALIVE);
01515 
01516   
01517   if ((p = get_producer((VIO *) data)) != 0) {
01518     sm_callback = producer_handler(event, p);
01519   } else {
01520     if ((c = get_consumer((VIO *) data)) != 0) {
01521       ink_assert(c->write_vio == (VIO *) data);
01522       sm_callback = consumer_handler(event, c);
01523     } else {
01524       internal_error();         
01525     }
01526   }
01527 
01528   
01529   
01530   
01531   
01532   if (sm_callback && !is_tunnel_alive()) {
01533     active = false;
01534     sm->handleEvent(HTTP_TUNNEL_EVENT_DONE, this);
01535     return EVENT_DONE;
01536   }
01537   return EVENT_CONT;
01538 }
01539 
01540 void
01541 HttpTunnel::update_stats_after_abort(HttpTunnelType_t t)
01542 {
01543   switch (t) {
01544   case HT_CACHE_READ:
01545   case HT_CACHE_WRITE:
01546     HTTP_DECREMENT_DYN_STAT(http_current_cache_connections_stat);
01547     break;
01548   default:
01549     
01550     
01551     
01552     break;
01553   };
01554 }
01555 
01556 void
01557 HttpTunnel::internal_error()
01558 {
01559 }
01560 
01561 
01562 
01563 
01564 void
01565 HttpTunnel::copy_partial_post_data()
01566 {
01567   postbuf->postdata_copy_buffer->write(postbuf->ua_buffer_reader);
01568   Debug("http_redirect", "[HttpTunnel::copy_partial_post_data] wrote %" PRId64" bytes to buffers %" PRId64"",
01569         postbuf->ua_buffer_reader->read_avail(), postbuf->postdata_copy_buffer_start->read_avail());
01570   postbuf->ua_buffer_reader->consume(postbuf->ua_buffer_reader->read_avail());
01571 }
01572 
01573 
01574 
01575 void
01576 HttpTunnel::allocate_redirect_postdata_producer_buffer()
01577 {
01578   int64_t alloc_index = buffer_size_to_index(sm->t_state.hdr_info.request_content_length);
01579 
01580   ink_release_assert(postbuf->postdata_producer_buffer == NULL);
01581 
01582   postbuf->postdata_producer_buffer = new_MIOBuffer(alloc_index);
01583   postbuf->postdata_producer_reader = postbuf->postdata_producer_buffer->alloc_reader();
01584 }
01585 
01586 
01587 
01588 void
01589 HttpTunnel::allocate_redirect_postdata_buffers(IOBufferReader * ua_reader)
01590 {
01591   int64_t alloc_index = buffer_size_to_index(sm->t_state.hdr_info.request_content_length);
01592 
01593   Debug("http_redirect", "[HttpTunnel::allocate_postdata_buffers]");
01594 
01595   
01596   
01597   if (postbuf == NULL) {
01598     postbuf = new PostDataBuffers();
01599   }
01600   postbuf->ua_buffer_reader = ua_reader;
01601   postbuf->postdata_copy_buffer = new_MIOBuffer(alloc_index);
01602   postbuf->postdata_copy_buffer_start = postbuf->postdata_copy_buffer->alloc_reader();
01603   allocate_redirect_postdata_producer_buffer();
01604 }
01605 
01606 
01607 
01608 
01609 void
01610 HttpTunnel::deallocate_redirect_postdata_buffers()
01611 {
01612   Debug("http_redirect", "[HttpTunnel::deallocate_postdata_copy_buffers]");
01613 
01614   if (postbuf != NULL) {
01615     if (postbuf->postdata_producer_buffer != NULL) {
01616       free_MIOBuffer(postbuf->postdata_producer_buffer);
01617       postbuf->postdata_producer_buffer = NULL;
01618       postbuf->postdata_producer_reader = NULL; 
01619     }
01620     if (postbuf->postdata_copy_buffer != NULL) {
01621       free_MIOBuffer(postbuf->postdata_copy_buffer);
01622       postbuf->postdata_copy_buffer = NULL;
01623       postbuf->postdata_copy_buffer_start = NULL;       
01624     }
01625     delete postbuf;
01626     postbuf = NULL;
01627   }
01628 }