• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

HttpTunnel.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 /****************************************************************************
00025 
00026    HttpTunnel.cc
00027 
00028    Description:
00029 
00030 
00031 ****************************************************************************/
00032 
00033 #include "ink_config.h"
00034 #include "HttpConfig.h"
00035 #include "HttpTunnel.h"
00036 #include "HttpSM.h"
00037 #include "HttpDebugNames.h"
00038 #include "ParseRules.h"
00039 
00040 static const int min_block_transfer_bytes = 256;
00041 static char const * const CHUNK_HEADER_FMT = "%" PRIx64"\r\n";
00042 // This should be as small as possible because it will only hold the
00043 // header and trailer per chunk - the chunk body will be a reference to
00044 // a block in the input stream.
00045 static int const CHUNK_IOBUFFER_SIZE_INDEX = MIN_IOBUFFER_SIZE;
00046 
00047 char
00048 VcTypeCode(HttpTunnelType_t t) {
00049   char zret = ' ';
00050   switch (t) {
00051   case HT_HTTP_CLIENT: zret = 'U'; break;
00052   case HT_HTTP_SERVER: zret = 'S'; break;
00053   case HT_TRANSFORM: zret = 'T'; break;
00054   case HT_CACHE_READ: zret = 'R'; break;
00055   case HT_CACHE_WRITE: zret = 'W'; break;
00056   default: break;
00057   }
00058   return zret;
00059 }
00060 
00061 ChunkedHandler::ChunkedHandler()
00062   : chunked_reader(NULL), dechunked_buffer(NULL), dechunked_size(0), dechunked_reader(NULL), chunked_buffer(NULL),
00063     chunked_size(0), truncation(false), skip_bytes(0), state(CHUNK_READ_CHUNK), cur_chunk_size(0),
00064     bytes_left(0), last_server_event(VC_EVENT_NONE), running_sum(0), num_digits(0),
00065     max_chunk_size(DEFAULT_MAX_CHUNK_SIZE), max_chunk_header_len(0)
00066 {
00067 }
00068 
00069 void
00070 ChunkedHandler::init(IOBufferReader * buffer_in, HttpTunnelProducer * p)
00071 {
00072   if (p->do_chunking)
00073     init_by_action(buffer_in, ACTION_DOCHUNK);
00074   else if (p->do_dechunking)
00075     init_by_action(buffer_in, ACTION_DECHUNK);
00076   else
00077     init_by_action(buffer_in, ACTION_PASSTHRU);
00078   return;
00079 }
00080 
00081 void
00082 ChunkedHandler::init_by_action(IOBufferReader *buffer_in, Action action)
00083 {
00084   running_sum = 0;
00085   num_digits = 0;
00086   cur_chunk_size = 0;
00087   bytes_left = 0;
00088   truncation = false;
00089   this->action = action;
00090 
00091   switch (action) {
00092   case ACTION_DOCHUNK:
00093     dechunked_reader = buffer_in->mbuf->clone_reader(buffer_in);
00094     dechunked_reader->mbuf->water_mark = min_block_transfer_bytes;
00095     chunked_buffer = new_MIOBuffer(CHUNK_IOBUFFER_SIZE_INDEX);
00096     chunked_size = 0;
00097     break;
00098   case ACTION_DECHUNK:
00099     chunked_reader = buffer_in->mbuf->clone_reader(buffer_in);
00100     dechunked_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_256);
00101     dechunked_size = 0;
00102     break;
00103   case ACTION_PASSTHRU:
00104     chunked_reader = buffer_in->mbuf->clone_reader(buffer_in);
00105     break;
00106   default:
00107     ink_release_assert(!"Unknown action");
00108   }
00109 
00110   return;
00111 }
00112 
00113 void
00114 ChunkedHandler::clear()
00115 {
00116   switch (action) {
00117   case ACTION_DOCHUNK:
00118     free_MIOBuffer(chunked_buffer);
00119     break;
00120   case ACTION_DECHUNK:
00121     free_MIOBuffer(dechunked_buffer);
00122     break;
00123   case ACTION_PASSTHRU:
00124   default:
00125     break;
00126   }
00127 
00128   return;
00129 }
00130 
00131 void
00132 ChunkedHandler::set_max_chunk_size(int64_t size)
00133 {
00134   max_chunk_size = size ? size : DEFAULT_MAX_CHUNK_SIZE;
00135   max_chunk_header_len = snprintf(max_chunk_header, sizeof(max_chunk_header), CHUNK_HEADER_FMT, max_chunk_size);
00136 }
00137 
00138 void
00139 ChunkedHandler::read_size()
00140 {
00141   int64_t bytes_used;
00142   bool done = false;
00143 
00144   while (chunked_reader->read_avail() > 0 && !done) {
00145     const char *tmp = chunked_reader->start();
00146     int64_t data_size = chunked_reader->block_read_avail();
00147 
00148     ink_assert(data_size > 0);
00149     bytes_used = 0;
00150 
00151     while (data_size > 0) {
00152       bytes_used++;
00153       if (state == CHUNK_READ_SIZE) {
00154         // The http spec says the chunked size is always in hex
00155         if (ParseRules::is_hex(*tmp)) {
00156           num_digits++;
00157           running_sum *= 16;
00158 
00159           if (ParseRules::is_digit(*tmp)) {
00160             running_sum += *tmp - '0';
00161           } else {
00162             running_sum += ParseRules::ink_tolower(*tmp) - 'a' + 10;
00163           }
00164         } else {
00165           // We are done parsing size
00166           if (num_digits == 0 || running_sum < 0) {
00167             // Bogus chunk size
00168             state = CHUNK_READ_ERROR;
00169             done = true;
00170             break;
00171           } else {
00172             state = CHUNK_READ_SIZE_CRLF;       // now look for CRLF
00173           }
00174         }
00175       } else if (state == CHUNK_READ_SIZE_CRLF) {       // Scan for a linefeed
00176         if (ParseRules::is_lf(*tmp)) {
00177           Debug("http_chunk", "read chunk size of %d bytes", running_sum);
00178           bytes_left = (cur_chunk_size = running_sum);
00179           state = (running_sum == 0) ? CHUNK_READ_TRAILER_BLANK : CHUNK_READ_CHUNK;
00180           done = true;
00181           break;
00182         }
00183       } else if (state == CHUNK_READ_SIZE_START) {
00184         if (ParseRules::is_lf(*tmp)) {
00185           running_sum = 0;
00186           num_digits = 0;
00187           state = CHUNK_READ_SIZE;
00188         }
00189       }
00190       tmp++;
00191       data_size--;
00192     }
00193     chunked_reader->consume(bytes_used);
00194   }
00195 }
00196 
00197 // int ChunkedHandler::transfer_bytes()
00198 //
00199 //   Transfer bytes from chunked_reader to dechunked buffer
00200 //   Use block reference method when there is a sufficient
00201 //   size to move.  Otherwise, uses memcpy method
00202 //
00203 int64_t
00204 ChunkedHandler::transfer_bytes()
00205 {
00206   int64_t block_read_avail, moved, to_move, total_moved = 0;
00207 
00208   // Handle the case where we are doing chunked passthrough.
00209   if (!dechunked_buffer) {
00210     moved = MIN(bytes_left, chunked_reader->read_avail());
00211     chunked_reader->consume(moved);
00212     bytes_left = bytes_left - moved;
00213     return moved;
00214   }
00215 
00216   while (bytes_left > 0) {
00217     block_read_avail = chunked_reader->block_read_avail();
00218 
00219     to_move = MIN(bytes_left, block_read_avail);
00220     if (to_move <= 0)
00221       break;
00222 
00223     if (to_move >= min_block_transfer_bytes) {
00224       moved = dechunked_buffer->write(chunked_reader, bytes_left);
00225     } else {
00226       // Small amount of data available.  We want to copy the
00227       // data rather than block reference to prevent the buildup
00228       // of too many small blocks which leads to stack overflow
00229       // on deallocation
00230       moved = dechunked_buffer->write(chunked_reader->start(), to_move);
00231     }
00232 
00233     if (moved > 0) {
00234       chunked_reader->consume(moved);
00235       bytes_left = bytes_left - moved;
00236       dechunked_size += moved;
00237       total_moved += moved;
00238     } else
00239       break;
00240   }
00241   return total_moved;
00242 }
00243 
00244 void
00245 ChunkedHandler::read_chunk()
00246 {
00247   int64_t b = transfer_bytes();
00248 
00249   ink_assert(bytes_left >= 0);
00250   if (bytes_left == 0) {
00251     Debug("http_chunk", "completed read of chunk of %" PRId64" bytes", cur_chunk_size);
00252 
00253     state = CHUNK_READ_SIZE_START;
00254   } else if (bytes_left > 0) {
00255     Debug("http_chunk", "read %" PRId64" bytes of an %" PRId64" chunk", b, cur_chunk_size);
00256   }
00257 }
00258 
00259 void
00260 ChunkedHandler::read_trailer()
00261 {
00262   int64_t bytes_used;
00263   bool done = false;
00264 
00265   while (chunked_reader->is_read_avail_more_than(0) && !done) {
00266     const char *tmp = chunked_reader->start();
00267     int64_t data_size = chunked_reader->block_read_avail();
00268 
00269     ink_assert(data_size > 0);
00270     for (bytes_used = 0; data_size > 0; data_size--) {
00271       bytes_used++;
00272 
00273       if (ParseRules::is_cr(*tmp)) {
00274         // For a CR to signal we are almost done, the preceding
00275         //  part of the line must be blank and next character
00276         //  must a LF
00277         state = (state == CHUNK_READ_TRAILER_BLANK) ? CHUNK_READ_TRAILER_CR : CHUNK_READ_TRAILER_LINE;
00278       } else if (ParseRules::is_lf(*tmp)) {
00279         // For a LF to signal we are done reading the
00280         //   trailer, the line must have either been blank
00281         //   or must have have only had a CR on it
00282         if (state == CHUNK_READ_TRAILER_CR || state == CHUNK_READ_TRAILER_BLANK) {
00283           state = CHUNK_READ_DONE;
00284           Debug("http_chunk", "completed read of trailers");
00285           done = true;
00286           break;
00287         } else {
00288           // A LF that does not terminate the trailer
00289           //  indicates a new line
00290           state = CHUNK_READ_TRAILER_BLANK;
00291         }
00292       } else {
00293         // A character that is not a CR or LF indicates
00294         //  the we are parsing a line of the trailer
00295         state = CHUNK_READ_TRAILER_LINE;
00296       }
00297       tmp++;
00298     }
00299     chunked_reader->consume(bytes_used);
00300   }
00301 }
00302 
00303 bool ChunkedHandler::process_chunked_content()
00304 {
00305   while (chunked_reader->is_read_avail_more_than(0) && state != CHUNK_READ_DONE && state != CHUNK_READ_ERROR) {
00306     switch (state) {
00307     case CHUNK_READ_SIZE:
00308     case CHUNK_READ_SIZE_CRLF:
00309     case CHUNK_READ_SIZE_START:
00310       read_size();
00311       break;
00312     case CHUNK_READ_CHUNK:
00313       read_chunk();
00314       break;
00315     case CHUNK_READ_TRAILER_BLANK:
00316     case CHUNK_READ_TRAILER_CR:
00317     case CHUNK_READ_TRAILER_LINE:
00318       read_trailer();
00319       break;
00320     case CHUNK_FLOW_CONTROL:
00321       return false;
00322     default:
00323       ink_release_assert(0);
00324       break;
00325     }
00326   }
00327   return (state == CHUNK_READ_DONE || state == CHUNK_READ_ERROR);
00328 }
00329 
00330 bool ChunkedHandler::generate_chunked_content()
00331 {
00332   char tmp[16];
00333   bool server_done = false;
00334   int64_t r_avail;
00335 
00336   ink_assert(max_chunk_header_len);
00337 
00338   switch (last_server_event) {
00339   case VC_EVENT_EOS:
00340   case VC_EVENT_READ_COMPLETE:
00341   case HTTP_TUNNEL_EVENT_PRECOMPLETE:
00342     server_done = true;
00343     break;
00344   }
00345 
00346   while ((r_avail = dechunked_reader->read_avail()) > 0 && state != CHUNK_WRITE_DONE) {
00347     int64_t write_val = MIN(max_chunk_size, r_avail);
00348 
00349     state = CHUNK_WRITE_CHUNK;
00350     Debug("http_chunk", "creating a chunk of size %" PRId64 " bytes", write_val);
00351 
00352     // Output the chunk size.
00353     if (write_val != max_chunk_size) {
00354       int len = snprintf(tmp, sizeof(tmp), CHUNK_HEADER_FMT, write_val);
00355       chunked_buffer->write(tmp, len);
00356       chunked_size += len;
00357     } else {
00358       chunked_buffer->write(max_chunk_header, max_chunk_header_len);
00359       chunked_size += max_chunk_header_len;
00360     }
00361 
00362     // Output the chunk itself.
00363     //
00364     // BZ# 54395 Note - we really should only do a
00365     //   block transfer if there is sizable amount of
00366     //   data (like we do for the case where we are
00367     //   removing chunked encoding in ChunkedHandler::transfer_bytes()
00368     //   However, I want to do this fix with as small a risk
00369     //   as possible so I'm leaving this issue alone for
00370     //   now
00371     //
00372     chunked_buffer->write(dechunked_reader, write_val);
00373     chunked_size += write_val;
00374     dechunked_reader->consume(write_val);
00375 
00376     // Output the trailing CRLF.
00377     chunked_buffer->write("\r\n", 2);
00378     chunked_size += 2;
00379   }
00380 
00381   if (server_done) {
00382     state = CHUNK_WRITE_DONE;
00383 
00384     // Add the chunked transfer coding trailer.
00385     chunked_buffer->write("0\r\n\r\n", 5);
00386     chunked_size += 5;
00387     return true;
00388   }
00389   return false;
00390 }
00391 
00392 HttpTunnelProducer::HttpTunnelProducer()
00393   : consumer_list(), self_consumer(NULL),
00394     vc(NULL), vc_handler(NULL), read_vio(NULL), read_buffer(NULL),
00395     buffer_start(NULL), vc_type(HT_HTTP_SERVER), chunking_action(TCA_PASSTHRU_DECHUNKED_CONTENT),
00396     do_chunking(false), do_dechunking(false), do_chunked_passthru(false),
00397     init_bytes_done(0), nbytes(0), ntodo(0), bytes_read(0),
00398     handler_state(0), last_event(0), num_consumers(0), alive(false),
00399     read_success(false), flow_control_source(0), name(NULL)
00400 {
00401 }
00402 
00403 uint64_t
00404 HttpTunnelProducer::backlog(uint64_t limit) {
00405   uint64_t zret = 0;
00406   // Calculate the total backlog, the # of bytes inside ATS for this producer.
00407   // We go all the way through each chain to the ending sink and take the maximum
00408   // over those paths. Do need to be careful about loops which can occur.
00409   for ( HttpTunnelConsumer* c = consumer_list.head ; c ; c = c->link.next ) {
00410     if (c->alive && c->write_vio) {
00411       uint64_t n = 0;
00412       if (HT_TRANSFORM == c->vc_type) {
00413         n += static_cast<TransformVCChain*>(c->vc)->backlog(limit);
00414       } else {
00415         IOBufferReader* r = c->write_vio->get_reader();
00416         if (r) {
00417           n += static_cast<uint64_t>(r->read_avail());
00418         }
00419       }
00420       if (n >= limit) return n;
00421 
00422       if (!c->is_sink()) {
00423         HttpTunnelProducer* dsp = c->self_producer;
00424         if (dsp) {
00425           n += dsp->backlog();
00426         }
00427       }
00428       if (n >= limit) return n;
00429       if (n > zret) zret = n;
00430     }
00431   }
00432 
00433   if (chunked_handler.chunked_reader) {
00434     zret += static_cast<uint64_t>(chunked_handler.chunked_reader->read_avail());
00435   }
00436 
00437   return zret;
00438 }
00439 
00440 /*  We set the producers in a flow chain specifically rather than
00441     using a tunnel level variable in order to handle bi-directional
00442     tunnels correctly. In such a case the flow control on producers is
00443     not related so a single value for the tunnel won't work.
00444 */
00445 void
00446 HttpTunnelProducer::set_throttle_src(HttpTunnelProducer* srcp) {
00447   HttpTunnelProducer* p = this;
00448   p->flow_control_source = srcp;
00449   for ( HttpTunnelConsumer* c = consumer_list.head ; c ; c = c->link.next ) {
00450     if (!c->is_sink()) {
00451       p = c->self_producer;
00452       if (p)
00453         p->set_throttle_src(srcp);
00454     }
00455   }
00456 }
00457 
00458 HttpTunnelConsumer::HttpTunnelConsumer()
00459   : link(), producer(NULL), self_producer(NULL), vc_type(HT_HTTP_CLIENT), vc(NULL), buffer_reader(NULL),
00460     vc_handler(NULL), write_vio(NULL), skip_bytes(0), bytes_written(0), handler_state(0), alive(false),
00461     write_success(false), name(NULL)
00462 {
00463 }
00464 
00465 HttpTunnel::HttpTunnel()
00466   : Continuation(NULL), num_producers(0), num_consumers(0), sm(NULL), active(false)
00467 {
00468 }
00469 
00470 void
00471 HttpTunnel::init(HttpSM * sm_arg, ProxyMutex * amutex)
00472 {
00473   HttpConfigParams* params = sm_arg->t_state.http_config_param;
00474   sm = sm_arg;
00475   active = false;
00476   mutex = amutex;
00477   SET_HANDLER(&HttpTunnel::main_handler);
00478   flow_state.enabled_p = params->oride.flow_control_enabled;
00479   if (params->oride.flow_low_water_mark > 0)
00480     flow_state.low_water = params->oride.flow_low_water_mark;
00481   if (params->oride.flow_high_water_mark > 0)
00482     flow_state.high_water = params->oride.flow_high_water_mark;
00483   // This should always be true, we handled default cases back in HttpConfig::reconfigure()
00484   ink_assert(flow_state.low_water <= flow_state.high_water);
00485 }
00486 
00487 void
00488 HttpTunnel::reset()
00489 {
00490   ink_assert(active == false);
00491 #ifdef DEBUG
00492   for (int i = 0; i < MAX_PRODUCERS; ++i) {
00493     ink_assert(producers[i].alive == false);
00494   }
00495   for (int j = 0; j < MAX_CONSUMERS; ++j) {
00496     ink_assert(consumers[j].alive == false);
00497   }
00498 #endif
00499 
00500   num_producers = 0;
00501   num_consumers = 0;
00502   memset(consumers, 0, sizeof(consumers));
00503   memset(producers, 0, sizeof(producers));
00504 }
00505 
00506 void
00507 HttpTunnel::kill_tunnel()
00508 {
00509   for (int i = 0; i < MAX_PRODUCERS; ++i) {
00510     if (producers[i].vc != NULL) {
00511       chain_abort_all(&producers[i]);
00512     }
00513     ink_assert(producers[i].alive == false);
00514   }
00515   active = false;
00516   this->deallocate_buffers();
00517   this->deallocate_redirect_postdata_buffers();
00518   this->reset();
00519 }
00520 
00521 HttpTunnelProducer *
00522 HttpTunnel::alloc_producer()
00523 {
00524   for (int i = 0; i < MAX_PRODUCERS; ++i) {
00525     if (producers[i].vc == NULL) {
00526       num_producers++;
00527       ink_assert(num_producers <= MAX_PRODUCERS);
00528       return producers + i;
00529     }
00530   }
00531   ink_release_assert(0);
00532   return NULL;
00533 }
00534 
00535 HttpTunnelConsumer *
00536 HttpTunnel::alloc_consumer()
00537 {
00538   for (int i = 0; i < MAX_CONSUMERS; i++) {
00539     if (consumers[i].vc == NULL) {
00540       num_consumers++;
00541       ink_assert(num_consumers <= MAX_CONSUMERS);
00542       return consumers + i;
00543     }
00544   }
00545   ink_release_assert(0);
00546   return NULL;
00547 }
00548 
00549 int
00550 HttpTunnel::deallocate_buffers()
00551 {
00552   int num = 0;
00553   ink_release_assert(active == false);
00554   for (int i = 0; i < MAX_PRODUCERS; ++i) {
00555     if (producers[i].read_buffer != NULL) {
00556       ink_assert(producers[i].vc != NULL);
00557       free_MIOBuffer(producers[i].read_buffer);
00558       producers[i].read_buffer = NULL;
00559       producers[i].buffer_start = NULL;
00560       num++;
00561     }
00562 
00563     if (producers[i].chunked_handler.dechunked_buffer != NULL) {
00564       ink_assert(producers[i].vc != NULL);
00565       free_MIOBuffer(producers[i].chunked_handler.dechunked_buffer);
00566       producers[i].chunked_handler.dechunked_buffer = NULL;
00567       num++;
00568     }
00569 
00570     if (producers[i].chunked_handler.chunked_buffer != NULL) {
00571       ink_assert(producers[i].vc != NULL);
00572       free_MIOBuffer(producers[i].chunked_handler.chunked_buffer);
00573       producers[i].chunked_handler.chunked_buffer = NULL;
00574       num++;
00575     }
00576     producers[i].chunked_handler.max_chunk_header_len = 0;
00577   }
00578   return num;
00579 }
00580 
00581 void
00582 HttpTunnel::set_producer_chunking_action(HttpTunnelProducer * p, int64_t skip_bytes, TunnelChunkingAction_t action)
00583 {
00584   p->chunked_handler.skip_bytes = skip_bytes;
00585   p->chunking_action = action;
00586 
00587   switch (action) {
00588   case TCA_CHUNK_CONTENT:
00589     p->chunked_handler.state = p->chunked_handler.CHUNK_WRITE_CHUNK;
00590     break;
00591   case TCA_DECHUNK_CONTENT:
00592   case TCA_PASSTHRU_CHUNKED_CONTENT:
00593     p->chunked_handler.state = p->chunked_handler.CHUNK_READ_SIZE;
00594     break;
00595   default:
00596     break;
00597   };
00598 }
00599 
00600 void
00601 HttpTunnel::set_producer_chunking_size(HttpTunnelProducer* p, int64_t size)
00602 {
00603   p->chunked_handler.set_max_chunk_size(size);
00604 }
00605 
00606 // HttpTunnelProducer* HttpTunnel::add_producer
00607 //
00608 //   Adds a new producer to the tunnel
00609 //
00610 HttpTunnelProducer *
00611 HttpTunnel::add_producer(VConnection * vc,
00612                          int64_t nbytes_arg,
00613                          IOBufferReader * reader_start,
00614                          HttpProducerHandler sm_handler, HttpTunnelType_t vc_type, const char *name_arg)
00615 {
00616   HttpTunnelProducer *p;
00617 
00618   Debug("http_tunnel", "[%" PRId64 "] adding producer '%s'", sm->sm_id, name_arg);
00619 
00620   ink_assert(reader_start->mbuf);
00621   if ((p = alloc_producer()) != NULL) {
00622     p->vc = vc;
00623     p->nbytes = nbytes_arg;
00624     p->buffer_start = reader_start;
00625     p->read_buffer = reader_start->mbuf;
00626     p->vc_handler = sm_handler;
00627     p->vc_type = vc_type;
00628     p->name = name_arg;
00629     p->chunking_action = TCA_PASSTHRU_DECHUNKED_CONTENT;
00630 
00631     p->do_chunking = false;
00632     p->do_dechunking = false;
00633     p->do_chunked_passthru = false;
00634 
00635     p->init_bytes_done = reader_start->read_avail();
00636     if (p->nbytes < 0) {
00637       p->ntodo = p->nbytes;
00638     } else {                    // The byte count given us includes bytes
00639       //  that alread may be in the buffer.
00640       //  ntodo represents the number of bytes
00641       //  the tunneling mechanism needs to read
00642       //  for the producer
00643       p->ntodo = p->nbytes - p->init_bytes_done;
00644       ink_assert(p->ntodo >= 0);
00645     }
00646 
00647     // We are static, the producer is never "alive"
00648     //   It just has data in the buffer
00649     if (vc == HTTP_TUNNEL_STATIC_PRODUCER) {
00650       ink_assert(p->ntodo == 0);
00651       p->alive = false;
00652       p->read_success = true;
00653     } else {
00654       p->alive = true;
00655     }
00656   }
00657   return p;
00658 }
00659 
00660 // void HttpTunnel::add_consumer
00661 //
00662 //    Adds a new consumer to the tunnel.  The producer must
00663 //    be specified and already added to the tunnel.  Attaches
00664 //    the new consumer to the entry for the existing producer
00665 //
00666 //    Returns true if the consumer successfully added.  Returns
00667 //    false if the consumer was not added because the source failed
00668 //
00669 HttpTunnelConsumer *
00670 HttpTunnel::add_consumer(VConnection * vc,
00671                          VConnection * producer,
00672                          HttpConsumerHandler sm_handler, HttpTunnelType_t vc_type, const char *name_arg, int64_t skip_bytes)
00673 {
00674   Debug("http_tunnel", "[%" PRId64 "] adding consumer '%s'", sm->sm_id, name_arg);
00675 
00676   // Find the producer entry
00677   HttpTunnelProducer *p = get_producer(producer);
00678   ink_release_assert(p);
00679 
00680   // Check to see if the producer terminated
00681   //  without sending all of its data
00682   if (p->alive == false && p->read_success == false) {
00683     Debug("http_tunnel", "[%" PRId64 "] consumer '%s' not added due to producer failure", sm->sm_id, name_arg);
00684     return NULL;
00685   }
00686   // Initialize the consumer structure
00687   HttpTunnelConsumer *c = alloc_consumer();
00688   c->producer = p;
00689   c->vc = vc;
00690   c->alive = true;
00691   c->skip_bytes = skip_bytes;
00692   c->vc_handler = sm_handler;
00693   c->vc_type = vc_type;
00694   c->name = name_arg;
00695 
00696   // Register the consumer with the producer
00697   p->consumer_list.push(c);
00698   p->num_consumers++;
00699 
00700   return c;
00701 }
00702 
00703 void
00704 HttpTunnel::chain(HttpTunnelConsumer* c, HttpTunnelProducer* p)
00705 {
00706   p->self_consumer = c;
00707   c->self_producer = p;
00708   // If the flow is already throttled update the chained producer.
00709   if (c->producer->is_throttled())
00710     p->set_throttle_src(c->producer->flow_control_source);
00711 }
00712 
00713 // void HttpTunnel::tunnel_run()
00714 //
00715 //    Makes the tunnel go
00716 //
00717 void
00718 HttpTunnel::tunnel_run(HttpTunnelProducer * p_arg)
00719 {
00720   Debug("http_tunnel", "tunnel_run started, p_arg is %s", p_arg ? "provided" : "NULL");
00721 
00722   if (p_arg) {
00723     producer_run(p_arg);
00724   } else {
00725     HttpTunnelProducer *p;
00726 
00727     ink_assert(active == false);
00728 
00729     for (int i = 0 ; i < MAX_PRODUCERS ; ++i) {
00730       p = producers + i;
00731       if (p->vc != NULL) {
00732         producer_run(p);
00733       }
00734     }
00735   }
00736 
00737   // It is possible that there was nothing to do
00738   //   due to a all transfers being zero length
00739   //   If that is the case, call the state machine
00740   //   back to say we are done
00741   if (!is_tunnel_alive()) {
00742     active = false;
00743     sm->handleEvent(HTTP_TUNNEL_EVENT_DONE, this);
00744   }
00745 }
00746 
00747 void
00748 HttpTunnel::producer_run(HttpTunnelProducer * p)
00749 {
00750   // Determine whether the producer has a cache-write consumer,
00751   // since all chunked content read by the producer gets dechunked
00752   // prior to being written into the cache.
00753   HttpTunnelConsumer *c, *cache_write_consumer = NULL;
00754   bool transform_consumer = false;
00755 
00756   for (c = p->consumer_list.head; c; c = c->link.next) {
00757     if (c->vc_type == HT_CACHE_WRITE) {
00758       cache_write_consumer = c;
00759       break;
00760     }
00761   }
00762 
00763   // bz57413
00764   for (c = p->consumer_list.head; c; c = c->link.next) {
00765     if (c->vc_type == HT_TRANSFORM) {
00766       transform_consumer = true;
00767       break;
00768     }
00769   }
00770 
00771   // Determine whether the producer is to perform chunking,
00772   // dechunking, or chunked-passthough of the incoming response.
00773   TunnelChunkingAction_t action = p->chunking_action;
00774 
00775   // [bug 2579251] static producers won't have handler set
00776   if (p->vc != HTTP_TUNNEL_STATIC_PRODUCER) {
00777     if (action == TCA_CHUNK_CONTENT)
00778       p->do_chunking = true;
00779     else if (action == TCA_DECHUNK_CONTENT)
00780       p->do_dechunking = true;
00781     else if (action == TCA_PASSTHRU_CHUNKED_CONTENT) {
00782       p->do_chunked_passthru = true;
00783 
00784       // Dechunk the chunked content into the cache.
00785       if (cache_write_consumer != NULL)
00786         p->do_dechunking = true;
00787     }
00788   }
00789 
00790   int64_t consumer_n;
00791   int64_t producer_n;
00792 
00793   ink_assert(p->vc != NULL);
00794   active = true;
00795 
00796   IOBufferReader *chunked_buffer_start = NULL, *dechunked_buffer_start = NULL;
00797   if (p->do_chunking || p->do_dechunking || p->do_chunked_passthru) {
00798     p->chunked_handler.init(p->buffer_start, p);
00799 
00800     // Copy the header into the chunked/dechunked buffers.
00801     if (p->do_chunking) {
00802       // initialize a reader to chunked buffer start before writing to keep ref count
00803       chunked_buffer_start = p->chunked_handler.chunked_buffer->alloc_reader();
00804       p->chunked_handler.chunked_buffer->write(p->buffer_start, p->chunked_handler.skip_bytes);
00805     } else if (p->do_dechunking) {
00806       // bz57413
00807       Debug("http_tunnel",
00808             "[producer_run] do_dechunking p->chunked_handler.chunked_reader->read_avail() = %" PRId64"",
00809             p->chunked_handler.chunked_reader->read_avail());
00810 
00811       // initialize a reader to dechunked buffer start before writing to keep ref count
00812       dechunked_buffer_start = p->chunked_handler.dechunked_buffer->alloc_reader();
00813 
00814       // If there is no transformation then add the header to the buffer, else the
00815       // client already has got the header from us, no need for it in the buffer.
00816       if (!transform_consumer) {
00817         p->chunked_handler.dechunked_buffer->write(p->buffer_start, p->chunked_handler.skip_bytes);
00818 
00819         Debug("http_tunnel", "[producer_run] do_dechunking::Copied header of size %" PRId64"", p->chunked_handler.skip_bytes);
00820       }
00821     }
00822   }
00823 
00824   int64_t read_start_pos = 0;
00825   if (p->vc_type == HT_CACHE_READ && sm->t_state.range_setup == HttpTransact::RANGE_NOT_TRANSFORM_REQUESTED) {
00826     ink_assert(sm->t_state.num_range_fields == 1); // we current just support only one range entry
00827     read_start_pos = sm->t_state.ranges[0]._start;
00828     producer_n = (sm->t_state.ranges[0]._end - sm->t_state.ranges[0]._start)+1;
00829     consumer_n = (producer_n + sm->client_response_hdr_bytes);
00830   } else if (p->nbytes >= 0) {
00831     consumer_n = p->nbytes;
00832     producer_n = p->ntodo;
00833   } else {
00834     consumer_n = (producer_n = INT64_MAX);
00835   }
00836 
00837   // Do the IO on the consumers first so
00838   //  data doesn't disappear out from
00839   //  under the tunnel
00840   ink_release_assert(p->num_consumers > 0);
00841   for (c = p->consumer_list.head; c;) {
00842     // Create a reader for each consumer.  The reader allows
00843     // us to implement skip bytes
00844     if (c->vc_type == HT_CACHE_WRITE) {
00845       switch (action) {
00846       case TCA_CHUNK_CONTENT:
00847       case TCA_PASSTHRU_DECHUNKED_CONTENT:
00848         c->buffer_reader = p->read_buffer->clone_reader(p->buffer_start);
00849         break;
00850       case TCA_DECHUNK_CONTENT:
00851       case TCA_PASSTHRU_CHUNKED_CONTENT:
00852         c->buffer_reader = p->chunked_handler.dechunked_buffer->clone_reader(dechunked_buffer_start);
00853         break;
00854       default:
00855         break;
00856       }
00857     }
00858     // Non-cache consumers.
00859     else if (action == TCA_CHUNK_CONTENT) {
00860       c->buffer_reader = p->chunked_handler.chunked_buffer->clone_reader(chunked_buffer_start);
00861     } else if (action == TCA_DECHUNK_CONTENT) {
00862       c->buffer_reader = p->chunked_handler.dechunked_buffer->clone_reader(dechunked_buffer_start);
00863     } else {
00864       c->buffer_reader = p->read_buffer->clone_reader(p->buffer_start);
00865     }
00866 
00867     // Consume bytes of the reader if we skipping bytes
00868     if (c->skip_bytes > 0) {
00869       ink_assert(c->skip_bytes <= c->buffer_reader->read_avail());
00870       c->buffer_reader->consume(c->skip_bytes);
00871     }
00872     int64_t c_write = consumer_n;
00873 
00874     // INKqa05109 - if we don't know the length leave it at
00875     //  INT64_MAX or else the cache may bounce the write
00876     //  because it thinks the document is too big.  INT64_MAX
00877     //  is a special case for the max document size code
00878     //  in the cache
00879     if (c_write != INT64_MAX) {
00880       c_write -= c->skip_bytes;
00881     }
00882     // Fix for problems with not chunked content being chunked and
00883     // not sending the entire data.  The content length grows when
00884     // it is being chunked.
00885     if (p->do_chunking == true) {
00886       c_write = INT64_MAX;
00887     }
00888 
00889     if (c_write == 0) {
00890       // Nothing to do, call back the cleanup handlers
00891       c->write_vio = NULL;
00892       consumer_handler(VC_EVENT_WRITE_COMPLETE, c);
00893     } else {
00894       c->write_vio = c->vc->do_io_write(this, c_write, c->buffer_reader);
00895       ink_assert(c_write > 0);
00896     }
00897 
00898     c = c->link.next;
00899   }
00900 
00901   //YTS Team, yamsat Plugin
00902   // Allocate and copy partial POST data to buffers. Check for the various parameters
00903   // including the maximum configured post data size
00904   if (p->alive && sm->t_state.method == HTTP_WKSIDX_POST && sm->enable_redirection && (p->vc_type == HT_HTTP_CLIENT)) {
00905     Debug("http_redirect", "[HttpTunnel::producer_run] client post: %" PRId64" max size: %" PRId64"",
00906           p->buffer_start->read_avail(), HttpConfig::m_master.post_copy_size);
00907 
00908     // (note that since we are not dechunking POST, this is the chunked size if chunked)
00909     if (p->buffer_start->read_avail() > HttpConfig::m_master.post_copy_size) {
00910       Debug("http_redirect", "[HttpTunnel::producer_handler] post exceeds buffer limit, buffer_avail=%" PRId64" limit=%" PRId64"",
00911             p->buffer_start->read_avail(), HttpConfig::m_master.post_copy_size);
00912       sm->enable_redirection = false;
00913     } else {
00914       // allocate post buffers with a new reader. The reader will be freed when p->read_buffer is freed
00915       allocate_redirect_postdata_buffers(p->read_buffer->clone_reader(p->buffer_start));
00916       copy_partial_post_data();
00917     }
00918   }                             //end of added logic for partial POST
00919 
00920   if (p->do_chunking) {
00921     // remove the chunked reader marker so that it doesn't act like a buffer guard
00922     p->chunked_handler.chunked_buffer->dealloc_reader(chunked_buffer_start);
00923     p->chunked_handler.dechunked_reader->consume(p->chunked_handler.skip_bytes);
00924 
00925     // If there is data to process in the buffer, do it now
00926     producer_handler(VC_EVENT_READ_READY, p);
00927   } else if (p->do_dechunking || p->do_chunked_passthru) {
00928     // remove the dechunked reader marker so that it doesn't act like a buffer guard
00929     if (p->do_dechunking)
00930       p->chunked_handler.dechunked_buffer->dealloc_reader(dechunked_buffer_start);
00931 
00932     // bz57413
00933     // If there is no transformation plugin, then we didn't add the header, hence no need to consume it
00934     Debug("http_tunnel",
00935           "[producer_run] do_dechunking p->chunked_handler.chunked_reader->read_avail() = %" PRId64"",
00936           p->chunked_handler.chunked_reader->read_avail());
00937     if (!transform_consumer && (p->chunked_handler.chunked_reader->read_avail() >= p->chunked_handler.skip_bytes)) {
00938       p->chunked_handler.chunked_reader->consume(p->chunked_handler.skip_bytes);
00939       Debug("http_tunnel", "[producer_run] do_dechunking p->chunked_handler.skip_bytes = %" PRId64"",
00940             p->chunked_handler.skip_bytes);
00941     }
00942     //if(p->chunked_handler.chunked_reader->read_avail() > 0)
00943     //p->chunked_handler.chunked_reader->consume(
00944     //p->chunked_handler.skip_bytes);
00945 
00946     producer_handler(VC_EVENT_READ_READY, p);
00947     if (!p->chunked_handler.chunked_reader->read_avail() && sm->redirection_tries > 0 && p->vc_type == HT_HTTP_CLIENT) {     // read_avail() == 0
00948       // [bug 2579251]
00949       // Ugh, this is horrible but in the redirect case they are running a the tunnel again with the
00950       // now closed/empty producer to trigger PRECOMPLETE.  If the POST was chunked, producer_n is set
00951       // (incorrectly) to INT64_MAX.  It needs to be set to 0 to prevent triggering another read.
00952       producer_n = 0;
00953     }
00954   }
00955 
00956   if (p->alive) {
00957     ink_assert(producer_n >= 0);
00958 
00959     if (producer_n == 0) {
00960       // Everything is already in the buffer so mark the producer as done.  We need to notify
00961       // state machine that everything is done.  We use a special event to say the producers is
00962       // done but we didn't do anything
00963       p->alive = false;
00964       p->read_success = true;
00965       Debug("http_tunnel", "[%" PRId64 "] [tunnel_run] producer already done", sm->sm_id);
00966       producer_handler(HTTP_TUNNEL_EVENT_PRECOMPLETE, p);
00967     } else {
00968       if (read_start_pos > 0) {
00969         p->read_vio = ((CacheVC*)p->vc)->do_io_pread(this, producer_n, p->read_buffer, read_start_pos);
00970       }
00971       else {
00972         p->read_vio = p->vc->do_io_read(this, producer_n, p->read_buffer);
00973       }
00974     }
00975   }
00976 
00977   // Now that the tunnel has started, we must remove producer's reader so
00978   // that it doesn't act like a buffer guard
00979   p->read_buffer->dealloc_reader(p->buffer_start);
00980   p->buffer_start = NULL;
00981 }
00982 
00983 int
00984 HttpTunnel::producer_handler_dechunked(int event, HttpTunnelProducer * p)
00985 {
00986   ink_assert(p->do_chunking);
00987 
00988   Debug("http_tunnel", "[%" PRId64 "] producer_handler_dechunked [%s %s]", sm->sm_id, p->name, HttpDebugNames::get_event_name(event));
00989 
00990   // We only interested in translating certain events
00991   switch (event) {
00992   case VC_EVENT_READ_READY:
00993   case VC_EVENT_READ_COMPLETE:
00994   case HTTP_TUNNEL_EVENT_PRECOMPLETE:
00995   case VC_EVENT_EOS:
00996     p->last_event =
00997       p->chunked_handler.last_server_event = event;
00998     // TODO: Should we check the return code?
00999     p->chunked_handler.generate_chunked_content();
01000     break;
01001   };
01002   // Since we will consume all the data if the server is actually finished
01003   //   we don't have to translate events like we do in the
01004   //   case producer_handler_chunked()
01005   return event;
01006 }
01007 
01008 // int HttpTunnel::producer_handler_chunked(int event, HttpTunnelProducer* p)
01009 //
01010 //   Handles events from chunked producers.  It calls the chunking handlers
01011 //    if appropriate and then translates the event we got into a suitable
01012 //    event to represent the unchunked state, and does chunked bookeeping
01013 //
01014 int
01015 HttpTunnel::producer_handler_chunked(int event, HttpTunnelProducer * p)
01016 {
01017   ink_assert(p->do_dechunking || p->do_chunked_passthru);
01018 
01019   Debug("http_tunnel", "[%" PRId64 "] producer_handler_chunked [%s %s]", sm->sm_id, p->name, HttpDebugNames::get_event_name(event));
01020 
01021   // We only interested in translating certain events
01022   switch (event) {
01023   case VC_EVENT_READ_READY:
01024   case VC_EVENT_READ_COMPLETE:
01025   case HTTP_TUNNEL_EVENT_PRECOMPLETE:
01026   case VC_EVENT_EOS:
01027     break;
01028   default:
01029     return event;
01030   }
01031 
01032   p->last_event =
01033     p->chunked_handler.last_server_event = event;
01034   bool done = p->chunked_handler.process_chunked_content();
01035 
01036   // If we couldn't understand the encoding, return
01037   //   an error
01038   if (p->chunked_handler.state == ChunkedHandler::CHUNK_READ_ERROR) {
01039     Debug("http_tunnel", "[%" PRId64 "] producer_handler_chunked [%s chunk decoding error]", sm->sm_id, p->name);
01040     p->chunked_handler.truncation = true;
01041     // FIX ME: we return EOS here since it will cause the
01042     //  the client to be reenabled.  ERROR makes more
01043     //  sense but no reenables follow
01044     return VC_EVENT_EOS;
01045   }
01046 
01047   switch (event) {
01048   case VC_EVENT_READ_READY:
01049     if (done) {
01050       return VC_EVENT_READ_COMPLETE;
01051     }
01052     break;
01053   case HTTP_TUNNEL_EVENT_PRECOMPLETE:
01054   case VC_EVENT_EOS:
01055   case VC_EVENT_READ_COMPLETE:
01056     if (!done) {
01057       p->chunked_handler.truncation = true;
01058     }
01059     break;
01060   }
01061 
01062   return event;
01063 }
01064 
01065 //
01066 // bool HttpTunnel::producer_handler(int event, HttpTunnelProducer* p)
01067 //
01068 //   Handles events from producers.
01069 //
01070 //   If the event is interesting only to the tunnel, this
01071 //    handler takes all necessary actions and returns false
01072 //    If the event is interesting to the state_machine,
01073 //    it calls back the state machine and returns true
01074 //
01075 //
01076 bool HttpTunnel::producer_handler(int event, HttpTunnelProducer * p)
01077 {
01078   HttpTunnelConsumer *c;
01079   HttpProducerHandler jump_point;
01080   bool sm_callback = false;
01081 
01082   Debug("http_tunnel", "[%" PRId64 "] producer_handler [%s %s]", sm->sm_id, p->name, HttpDebugNames::get_event_name(event));
01083 
01084   // Handle chunking/dechunking/chunked-passthrough if necessary.
01085   if (p->do_chunking) {
01086     event = producer_handler_dechunked(event, p);
01087 
01088     // If we were in PRECOMPLETE when this function was called
01089     // and we are doing chunking, then we just wrote the last
01090     // chunk in the the function call above.  We are done with the
01091     // tunnel.
01092     if (event == HTTP_TUNNEL_EVENT_PRECOMPLETE) {
01093       event = VC_EVENT_EOS;
01094     }
01095   } else if (p->do_dechunking || p->do_chunked_passthru) {
01096     event = producer_handler_chunked(event, p);
01097   } else {
01098     p->last_event = event;
01099   }
01100 
01101   //YTS Team, yamsat Plugin
01102   //Copy partial POST data to buffers. Check for the various parameters including
01103   //the maximum configured post data size
01104   if (sm->t_state.method == HTTP_WKSIDX_POST && sm->enable_redirection &&
01105       (event == VC_EVENT_READ_READY || event == VC_EVENT_READ_COMPLETE) && (p->vc_type == HT_HTTP_CLIENT)) {
01106     Debug("http_redirect", "[HttpTunnel::producer_handler] [%s %s]", p->name, HttpDebugNames::get_event_name(event));
01107 
01108     if ((postbuf->postdata_copy_buffer_start->read_avail() + postbuf->ua_buffer_reader->read_avail())
01109         > HttpConfig::m_master.post_copy_size) {
01110       Debug("http_redirect",
01111             "[HttpTunnel::producer_handler] post exceeds buffer limit, buffer_avail=%" PRId64" reader_avail=%" PRId64" limit=%" PRId64"",
01112             postbuf->postdata_copy_buffer_start->read_avail(), postbuf->ua_buffer_reader->read_avail(),
01113             HttpConfig::m_master.post_copy_size);
01114       deallocate_redirect_postdata_buffers();
01115       sm->enable_redirection = false;
01116     } else {
01117       copy_partial_post_data();
01118     }
01119   }                             //end of added logic for partial copy of POST
01120 
01121   Debug("http_redirect", "[HttpTunnel::producer_handler] enable_redirection: [%d %d %d] event: %d",
01122         p->alive == true, sm->enable_redirection, (p->self_consumer && p->self_consumer->alive == true), event);
01123   ink_assert(p->alive == true || event == HTTP_TUNNEL_EVENT_PRECOMPLETE || event == VC_EVENT_EOS ||
01124              sm->enable_redirection || (p->self_consumer && p->self_consumer->alive == true));
01125 
01126   switch (event) {
01127   case VC_EVENT_READ_READY:
01128     // Data read from producer, reenable consumers
01129     for (c = p->consumer_list.head; c; c = c->link.next) {
01130       if (c->alive) {
01131         c->write_vio->reenable();
01132       }
01133     }
01134     break;
01135 
01136   case HTTP_TUNNEL_EVENT_PRECOMPLETE:
01137     // the producer had finished before the tunnel
01138     //  started so just call the state machine back
01139     //  We don't need to reenable since the consumers
01140     //  were just activated.  Likewise, we can't be
01141     //  done because the consumer couldn't have
01142     //  called us back yet
01143     p->bytes_read = 0;
01144     jump_point = p->vc_handler;
01145     (sm->*jump_point) (event, p);
01146     sm_callback = true;
01147     break;
01148 
01149   case VC_EVENT_READ_COMPLETE:
01150   case VC_EVENT_EOS:
01151     // The producer completed
01152     p->alive = false;
01153     if (p->read_vio) {
01154       p->bytes_read = p->read_vio->ndone;
01155     } else {
01156       // If we are chunked, we can receive the whole document
01157       //   along with the header without knowing it (due to
01158       //   the message length being a property of the encoding)
01159       //   In that case, we won't have done a do_io so there
01160       //   will not be vio
01161       p->bytes_read = 0;
01162     }
01163 
01164     // callback the SM to notify of completion
01165     //  Note: we need to callback the SM before
01166     //  reenabling the consumers as the reenable may
01167     //  make the data visible to the consumer and
01168     //  initiate async I/O operation.  The SM needs to
01169     //  set how much I/O to do before async I/O is
01170     //  initiated
01171     jump_point = p->vc_handler;
01172     (sm->*jump_point) (event, p);
01173     sm_callback = true;
01174 
01175     // Data read from producer, reenable consumers
01176     for (c = p->consumer_list.head; c; c = c->link.next) {
01177       if (c->alive) {
01178         c->write_vio->reenable();
01179       }
01180     }
01181     break;
01182 
01183   case VC_EVENT_ERROR:
01184   case VC_EVENT_ACTIVE_TIMEOUT:
01185   case VC_EVENT_INACTIVITY_TIMEOUT:
01186   case HTTP_TUNNEL_EVENT_CONSUMER_DETACH:
01187     p->alive = false;
01188     p->bytes_read = p->read_vio->ndone;
01189     // Interesting tunnel event, call SM
01190     jump_point = p->vc_handler;
01191     (sm->*jump_point) (event, p);
01192     sm_callback = true;
01193     break;
01194 
01195   case VC_EVENT_WRITE_READY:
01196   case VC_EVENT_WRITE_COMPLETE:
01197   default:
01198     // Producers should not get these events
01199     ink_release_assert(0);
01200     break;
01201   }
01202 
01203   return sm_callback;
01204 }
01205 
01206 bool
01207 HttpTunnel::consumer_reenable(HttpTunnelConsumer* c)
01208 {
01209   HttpTunnelProducer* p = c->producer;
01210   HttpTunnelProducer* srcp = p->flow_control_source;
01211   if (p->alive
01212 #ifndef LAZY_BUF_ALLOC
01213       && p->read_buffer->write_avail() > 0
01214 #endif
01215     ) {
01216     // Only do flow control if enabled and the producer is an external
01217     // source.  Otherwise disable by making the backlog zero. Because
01218     // the backlog short cuts quit when the value is equal (or
01219     // greater) to the target, we use strict comparison only for
01220     // checking low water, otherwise the flow control can stall out.
01221     uint64_t backlog = (flow_state.enabled_p && p->is_source())
01222       ? p->backlog(flow_state.high_water) : 0;
01223 
01224     if (backlog >= flow_state.high_water) {
01225       if (is_debug_tag_set("http_tunnel"))
01226         Debug("http_tunnel", "Throttle   %p %" PRId64 " / %" PRId64, p, backlog, p->backlog());
01227       p->throttle(); // p becomes srcp for future calls to this method
01228     } else {
01229       if (srcp && srcp->alive && c->is_sink()) {
01230         // Check if backlog is below low water - note we need to check
01231         // against the source producer, not necessarily the producer
01232         // for this consumer. We don't have to recompute the backlog
01233         // if they are the same because we know low water <= high
01234         // water so the value is sufficiently accurate.
01235         if (srcp != p)
01236           backlog = srcp->backlog(flow_state.low_water);
01237         if (backlog < flow_state.low_water) {
01238           if (is_debug_tag_set("http_tunnel"))
01239             Debug("http_tunnel", "Unthrottle %p %" PRId64 " / %" PRId64, p, backlog, p->backlog());
01240           srcp->unthrottle();
01241           srcp->read_vio->reenable();
01242           // Kick source producer to get flow ... well, flowing.
01243           this->producer_handler(VC_EVENT_READ_READY, srcp);
01244         }
01245       }
01246       p->read_vio->reenable();
01247     }
01248   }
01249   return p->is_throttled();
01250 }
01251 
01252 //
01253 // bool HttpTunnel::consumer_handler(int event, HttpTunnelConsumer* p)
01254 //
01255 //   Handles events from consumers.
01256 //
01257 //   If the event is interesting only to the tunnel, this
01258 //    handler takes all necessary actions and returns false
01259 //    If the event is interesting to the state_machine,
01260 //    it calls back the state machine and returns true
01261 //
01262 //
01263 bool HttpTunnel::consumer_handler(int event, HttpTunnelConsumer * c)
01264 {
01265   bool sm_callback = false;
01266   HttpConsumerHandler jump_point;
01267   HttpTunnelProducer* p = c->producer;
01268 
01269   Debug("http_tunnel", "[%" PRId64 "] consumer_handler [%s %s]", sm->sm_id, c->name, HttpDebugNames::get_event_name(event));
01270 
01271   ink_assert(c->alive == true);
01272 
01273   switch (event) {
01274   case VC_EVENT_WRITE_READY:
01275     this->consumer_reenable(c);
01276     break;
01277 
01278   case VC_EVENT_WRITE_COMPLETE:
01279   case VC_EVENT_EOS:
01280   case VC_EVENT_ERROR:
01281   case VC_EVENT_ACTIVE_TIMEOUT:
01282   case VC_EVENT_INACTIVITY_TIMEOUT:
01283     ink_assert(c->alive);
01284     ink_assert(c->buffer_reader);
01285     c->alive = false;
01286 
01287     c->bytes_written = c->write_vio ? c->write_vio->ndone : 0;
01288 
01289     // Interesting tunnel event, call SM
01290     jump_point = c->vc_handler;
01291     (sm->*jump_point) (event, c);
01292     sm_callback = true;
01293 
01294     // Deallocate the reader after calling back the sm
01295     //  because buffer problems are easier to debug
01296     //  in the sm when the reader is still valid
01297     c->buffer_reader->mbuf->dealloc_reader(c->buffer_reader);
01298     c->buffer_reader = NULL;
01299 
01300     // Since we removed a consumer, it may now be
01301     //   possbile to put more stuff in the buffer
01302     // Note: we reenable only after calling back
01303     //    the SM since the reenabling has the side effect
01304     //    updating the buffer state for the VConnection
01305     //    that is being reenabled
01306     if (p->alive && p->read_vio
01307 #ifndef LAZY_BUF_ALLOC
01308         && p->read_buffer->write_avail() > 0
01309 #endif
01310       ) {
01311       if (p->is_throttled())
01312         this->consumer_reenable(c);
01313       else
01314         p->read_vio->reenable();
01315     }
01316     // [amc] I don't think this happens but we'll leave a debug trap
01317     // here just in case.
01318     if (p->is_throttled())
01319       Debug("http_tunnel", "Special event %s on %p with flow control on", HttpDebugNames::get_event_name(event), p);
01320     break;
01321 
01322   case VC_EVENT_READ_READY:
01323   case VC_EVENT_READ_COMPLETE:
01324   default:
01325     // Consumers should not get these events
01326     ink_release_assert(0);
01327     break;
01328   }
01329 
01330   return sm_callback;
01331 }
01332 
01333 
01334 // void HttpTunnel::chain_abort_all(HttpTunnelProducer* p)
01335 //
01336 //    Abort the producer and everyone still alive
01337 //     downstream of the producer
01338 //
01339 void
01340 HttpTunnel::chain_abort_all(HttpTunnelProducer * p)
01341 {
01342   HttpTunnelConsumer *c = p->consumer_list.head;
01343 
01344   while (c) {
01345     if (c->alive) {
01346       c->alive = false;
01347       c->write_vio = NULL;
01348       c->vc->do_io_close(EHTTP_ERROR);
01349       update_stats_after_abort(c->vc_type);
01350     }
01351 
01352     if (c->self_producer) {
01353       chain_abort_all(c->self_producer);
01354     }
01355 
01356     c = c->link.next;
01357   }
01358 
01359   if (p->alive) {
01360     p->alive = false;
01361     p->bytes_read = p->read_vio->ndone;
01362     if (p->self_consumer) {
01363       p->self_consumer->alive = false;
01364     }
01365     p->read_vio = NULL;
01366     p->vc->do_io_close(EHTTP_ERROR);
01367     update_stats_after_abort(p->vc_type);
01368   }
01369 }
01370 
01371 // void HttpTunnel::chain_finish_internal(HttpTunnelProducer* p)
01372 //
01373 //    Internal function for finishing all consumers.  Takes
01374 //       chain argument about where to finish just immediate
01375 //       consumer or all those downstream
01376 //
01377 void
01378 HttpTunnel::finish_all_internal(HttpTunnelProducer * p, bool chain)
01379 {
01380   ink_assert(p->alive == false);
01381   HttpTunnelConsumer *c = p->consumer_list.head;
01382   int64_t total_bytes = 0;
01383   TunnelChunkingAction_t action = p->chunking_action;
01384 
01385   while (c) {
01386     if (c->alive) {
01387       if (c->vc_type == HT_CACHE_WRITE) {
01388         switch (action) {
01389         case TCA_CHUNK_CONTENT:
01390         case TCA_PASSTHRU_DECHUNKED_CONTENT:
01391           total_bytes = p->bytes_read + p->init_bytes_done;
01392           break;
01393         case TCA_DECHUNK_CONTENT:
01394         case TCA_PASSTHRU_CHUNKED_CONTENT:
01395           total_bytes = p->chunked_handler.skip_bytes + p->chunked_handler.dechunked_size;
01396           break;
01397         default:
01398           break;
01399         }
01400       } else if (action == TCA_CHUNK_CONTENT) {
01401         total_bytes = p->chunked_handler.skip_bytes + p->chunked_handler.chunked_size;
01402       } else if (action == TCA_DECHUNK_CONTENT) {
01403         total_bytes = p->chunked_handler.skip_bytes + p->chunked_handler.dechunked_size;
01404       } else
01405         total_bytes = p->bytes_read + p->init_bytes_done;
01406 
01407       c->write_vio->nbytes = total_bytes - c->skip_bytes;
01408       ink_assert(c->write_vio->nbytes >= 0);
01409 
01410       if (c->write_vio->nbytes < 0) {
01411         // TODO: Wtf, printf?
01412         fprintf(stderr,
01413                 "[HttpTunnel::finish_all_internal] ERROR: Incorrect total_bytes - c->skip_bytes = %" PRId64 "\n",
01414                 (int64_t) (total_bytes - c->skip_bytes));
01415       }
01416 
01417       if (chain == true && c->self_producer) {
01418         chain_finish_all(c->self_producer);
01419       }
01420       // The IO Core will not call us back if there
01421       //   is nothing to do.  Check to see if there is
01422       //   nothing to do and take the appripriate
01423       //   action
01424       if (c->write_vio->nbytes == c->write_vio->ndone) {
01425         consumer_handler(VC_EVENT_WRITE_COMPLETE, c);
01426       }
01427     }
01428 
01429     c = c->link.next;
01430   }
01431 }
01432 
01433 // void HttpTunnel::chain_abort_cache_write(HttpProducer* p)
01434 //
01435 //    Terminates all cache writes.  Used to prevent truncated
01436 //     documents from being stored in the cache
01437 //
01438 void
01439 HttpTunnel::chain_abort_cache_write(HttpTunnelProducer * p)
01440 {
01441   HttpTunnelConsumer *c = p->consumer_list.head;
01442 
01443   while (c) {
01444     if (c->alive) {
01445       if (c->vc_type == HT_CACHE_WRITE) {
01446         ink_assert(c->self_producer == NULL);
01447         c->write_vio = NULL;
01448         c->vc->do_io_close(EHTTP_ERROR);
01449         c->alive = false;
01450         HTTP_DECREMENT_DYN_STAT(http_current_cache_connections_stat);
01451       } else if (c->self_producer) {
01452         chain_abort_cache_write(c->self_producer);
01453       }
01454     }
01455     c = c->link.next;
01456   }
01457 }
01458 
01459 // void HttpTunnel::close_vc(HttpTunnelProducer* p)
01460 //
01461 //    Closes the vc associated with the producer and
01462 //      updates the state of the self_consumer
01463 //
01464 void
01465 HttpTunnel::close_vc(HttpTunnelProducer * p)
01466 {
01467   ink_assert(p->alive == false);
01468   HttpTunnelConsumer *c = p->self_consumer;
01469 
01470   if (c && c->alive) {
01471     c->alive = false;
01472     if (c->write_vio) {
01473       c->bytes_written = c->write_vio->ndone;
01474     }
01475   }
01476 
01477   p->vc->do_io_close();
01478 }
01479 
01480 // void HttpTunnel::close_vc(HttpTunnelConsumer* c)
01481 //
01482 //    Closes the vc associated with the consumer and
01483 //      updates the state of the self_producer
01484 //
01485 void
01486 HttpTunnel::close_vc(HttpTunnelConsumer * c)
01487 {
01488   ink_assert(c->alive == false);
01489   HttpTunnelProducer *p = c->self_producer;
01490 
01491   if (p && p->alive) {
01492     p->alive = false;
01493     if (p->read_vio) {
01494       p->bytes_read = p->read_vio->ndone;
01495     }
01496   }
01497 
01498   c->vc->do_io_close();
01499 }
01500 
01501 // int HttpTunnel::main_handler(int event, void* data)
01502 //
01503 //   Main handler for the tunnel.  Vectors events
01504 //   based on whether they are from consumers or
01505 //   producers
01506 //
01507 int
01508 HttpTunnel::main_handler(int event, void *data)
01509 {
01510   HttpTunnelProducer *p = NULL;
01511   HttpTunnelConsumer *c = NULL;
01512   bool sm_callback = false;
01513 
01514   ink_assert(sm->magic == HTTP_SM_MAGIC_ALIVE);
01515 
01516   // Find the appropriate entry
01517   if ((p = get_producer((VIO *) data)) != 0) {
01518     sm_callback = producer_handler(event, p);
01519   } else {
01520     if ((c = get_consumer((VIO *) data)) != 0) {
01521       ink_assert(c->write_vio == (VIO *) data);
01522       sm_callback = consumer_handler(event, c);
01523     } else {
01524       internal_error();         // do nothing
01525     }
01526   }
01527 
01528   // We called a vc handler, the tunnel might be
01529   //  finished.  Check to see if there are any remaining
01530   //  VConnections alive.  If not, notifiy the state machine
01531   //
01532   if (sm_callback && !is_tunnel_alive()) {
01533     active = false;
01534     sm->handleEvent(HTTP_TUNNEL_EVENT_DONE, this);
01535     return EVENT_DONE;
01536   }
01537   return EVENT_CONT;
01538 }
01539 
01540 void
01541 HttpTunnel::update_stats_after_abort(HttpTunnelType_t t)
01542 {
01543   switch (t) {
01544   case HT_CACHE_READ:
01545   case HT_CACHE_WRITE:
01546     HTTP_DECREMENT_DYN_STAT(http_current_cache_connections_stat);
01547     break;
01548   default:
01549     // Handled here:
01550     // HT_HTTP_SERVER, HT_HTTP_CLIENT,
01551     // HT_TRANSFORM, HT_STATIC
01552     break;
01553   };
01554 }
01555 
01556 void
01557 HttpTunnel::internal_error()
01558 {
01559 }
01560 
01561 
01562 //YTS Team, yamsat Plugin
01563 //Function to copy the partial Post data while tunnelling
01564 void
01565 HttpTunnel::copy_partial_post_data()
01566 {
01567   postbuf->postdata_copy_buffer->write(postbuf->ua_buffer_reader);
01568   Debug("http_redirect", "[HttpTunnel::copy_partial_post_data] wrote %" PRId64" bytes to buffers %" PRId64"",
01569         postbuf->ua_buffer_reader->read_avail(), postbuf->postdata_copy_buffer_start->read_avail());
01570   postbuf->ua_buffer_reader->consume(postbuf->ua_buffer_reader->read_avail());
01571 }
01572 
01573 //YTS Team, yamsat Plugin
01574 //Allocate a new buffer for static producers
01575 void
01576 HttpTunnel::allocate_redirect_postdata_producer_buffer()
01577 {
01578   int64_t alloc_index = buffer_size_to_index(sm->t_state.hdr_info.request_content_length);
01579 
01580   ink_release_assert(postbuf->postdata_producer_buffer == NULL);
01581 
01582   postbuf->postdata_producer_buffer = new_MIOBuffer(alloc_index);
01583   postbuf->postdata_producer_reader = postbuf->postdata_producer_buffer->alloc_reader();
01584 }
01585 
01586 //YTS Team, yamsat Plugin
01587 //Allocating the post data buffers
01588 void
01589 HttpTunnel::allocate_redirect_postdata_buffers(IOBufferReader * ua_reader)
01590 {
01591   int64_t alloc_index = buffer_size_to_index(sm->t_state.hdr_info.request_content_length);
01592 
01593   Debug("http_redirect", "[HttpTunnel::allocate_postdata_buffers]");
01594 
01595   // TODO: This is uncool, shouldn't this use the class allocator or proxy allocator ?
01596   // If fixed, obviously also fix the deallocator.
01597   if (postbuf == NULL) {
01598     postbuf = new PostDataBuffers();
01599   }
01600   postbuf->ua_buffer_reader = ua_reader;
01601   postbuf->postdata_copy_buffer = new_MIOBuffer(alloc_index);
01602   postbuf->postdata_copy_buffer_start = postbuf->postdata_copy_buffer->alloc_reader();
01603   allocate_redirect_postdata_producer_buffer();
01604 }
01605 
01606 
01607 //YTS Team, yamsat Plugin
01608 //Deallocating the post data buffers
01609 void
01610 HttpTunnel::deallocate_redirect_postdata_buffers()
01611 {
01612   Debug("http_redirect", "[HttpTunnel::deallocate_postdata_copy_buffers]");
01613 
01614   if (postbuf != NULL) {
01615     if (postbuf->postdata_producer_buffer != NULL) {
01616       free_MIOBuffer(postbuf->postdata_producer_buffer);
01617       postbuf->postdata_producer_buffer = NULL;
01618       postbuf->postdata_producer_reader = NULL; //deallocated by the buffer
01619     }
01620     if (postbuf->postdata_copy_buffer != NULL) {
01621       free_MIOBuffer(postbuf->postdata_copy_buffer);
01622       postbuf->postdata_copy_buffer = NULL;
01623       postbuf->postdata_copy_buffer_start = NULL;       //deallocated by the buffer
01624     }
01625     delete postbuf;
01626     postbuf = NULL;
01627   }
01628 }

Generated by  doxygen 1.7.1