00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #include "ink_config.h"
00034 #include "HttpConfig.h"
00035 #include "HttpTunnel.h"
00036 #include "HttpSM.h"
00037 #include "HttpDebugNames.h"
00038 #include "ParseRules.h"
00039
00040 static const int min_block_transfer_bytes = 256;
00041 static char const * const CHUNK_HEADER_FMT = "%" PRIx64"\r\n";
00042
00043
00044
00045 static int const CHUNK_IOBUFFER_SIZE_INDEX = MIN_IOBUFFER_SIZE;
00046
00047 char
00048 VcTypeCode(HttpTunnelType_t t) {
00049 char zret = ' ';
00050 switch (t) {
00051 case HT_HTTP_CLIENT: zret = 'U'; break;
00052 case HT_HTTP_SERVER: zret = 'S'; break;
00053 case HT_TRANSFORM: zret = 'T'; break;
00054 case HT_CACHE_READ: zret = 'R'; break;
00055 case HT_CACHE_WRITE: zret = 'W'; break;
00056 default: break;
00057 }
00058 return zret;
00059 }
00060
00061 ChunkedHandler::ChunkedHandler()
00062 : chunked_reader(NULL), dechunked_buffer(NULL), dechunked_size(0), dechunked_reader(NULL), chunked_buffer(NULL),
00063 chunked_size(0), truncation(false), skip_bytes(0), state(CHUNK_READ_CHUNK), cur_chunk_size(0),
00064 bytes_left(0), last_server_event(VC_EVENT_NONE), running_sum(0), num_digits(0),
00065 max_chunk_size(DEFAULT_MAX_CHUNK_SIZE), max_chunk_header_len(0)
00066 {
00067 }
00068
00069 void
00070 ChunkedHandler::init(IOBufferReader * buffer_in, HttpTunnelProducer * p)
00071 {
00072 if (p->do_chunking)
00073 init_by_action(buffer_in, ACTION_DOCHUNK);
00074 else if (p->do_dechunking)
00075 init_by_action(buffer_in, ACTION_DECHUNK);
00076 else
00077 init_by_action(buffer_in, ACTION_PASSTHRU);
00078 return;
00079 }
00080
00081 void
00082 ChunkedHandler::init_by_action(IOBufferReader *buffer_in, Action action)
00083 {
00084 running_sum = 0;
00085 num_digits = 0;
00086 cur_chunk_size = 0;
00087 bytes_left = 0;
00088 truncation = false;
00089 this->action = action;
00090
00091 switch (action) {
00092 case ACTION_DOCHUNK:
00093 dechunked_reader = buffer_in->mbuf->clone_reader(buffer_in);
00094 dechunked_reader->mbuf->water_mark = min_block_transfer_bytes;
00095 chunked_buffer = new_MIOBuffer(CHUNK_IOBUFFER_SIZE_INDEX);
00096 chunked_size = 0;
00097 break;
00098 case ACTION_DECHUNK:
00099 chunked_reader = buffer_in->mbuf->clone_reader(buffer_in);
00100 dechunked_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_256);
00101 dechunked_size = 0;
00102 break;
00103 case ACTION_PASSTHRU:
00104 chunked_reader = buffer_in->mbuf->clone_reader(buffer_in);
00105 break;
00106 default:
00107 ink_release_assert(!"Unknown action");
00108 }
00109
00110 return;
00111 }
00112
00113 void
00114 ChunkedHandler::clear()
00115 {
00116 switch (action) {
00117 case ACTION_DOCHUNK:
00118 free_MIOBuffer(chunked_buffer);
00119 break;
00120 case ACTION_DECHUNK:
00121 free_MIOBuffer(dechunked_buffer);
00122 break;
00123 case ACTION_PASSTHRU:
00124 default:
00125 break;
00126 }
00127
00128 return;
00129 }
00130
00131 void
00132 ChunkedHandler::set_max_chunk_size(int64_t size)
00133 {
00134 max_chunk_size = size ? size : DEFAULT_MAX_CHUNK_SIZE;
00135 max_chunk_header_len = snprintf(max_chunk_header, sizeof(max_chunk_header), CHUNK_HEADER_FMT, max_chunk_size);
00136 }
00137
00138 void
00139 ChunkedHandler::read_size()
00140 {
00141 int64_t bytes_used;
00142 bool done = false;
00143
00144 while (chunked_reader->read_avail() > 0 && !done) {
00145 const char *tmp = chunked_reader->start();
00146 int64_t data_size = chunked_reader->block_read_avail();
00147
00148 ink_assert(data_size > 0);
00149 bytes_used = 0;
00150
00151 while (data_size > 0) {
00152 bytes_used++;
00153 if (state == CHUNK_READ_SIZE) {
00154
00155 if (ParseRules::is_hex(*tmp)) {
00156 num_digits++;
00157 running_sum *= 16;
00158
00159 if (ParseRules::is_digit(*tmp)) {
00160 running_sum += *tmp - '0';
00161 } else {
00162 running_sum += ParseRules::ink_tolower(*tmp) - 'a' + 10;
00163 }
00164 } else {
00165
00166 if (num_digits == 0 || running_sum < 0) {
00167
00168 state = CHUNK_READ_ERROR;
00169 done = true;
00170 break;
00171 } else {
00172 state = CHUNK_READ_SIZE_CRLF;
00173 }
00174 }
00175 } else if (state == CHUNK_READ_SIZE_CRLF) {
00176 if (ParseRules::is_lf(*tmp)) {
00177 Debug("http_chunk", "read chunk size of %d bytes", running_sum);
00178 bytes_left = (cur_chunk_size = running_sum);
00179 state = (running_sum == 0) ? CHUNK_READ_TRAILER_BLANK : CHUNK_READ_CHUNK;
00180 done = true;
00181 break;
00182 }
00183 } else if (state == CHUNK_READ_SIZE_START) {
00184 if (ParseRules::is_lf(*tmp)) {
00185 running_sum = 0;
00186 num_digits = 0;
00187 state = CHUNK_READ_SIZE;
00188 }
00189 }
00190 tmp++;
00191 data_size--;
00192 }
00193 chunked_reader->consume(bytes_used);
00194 }
00195 }
00196
00197
00198
00199
00200
00201
00202
00203 int64_t
00204 ChunkedHandler::transfer_bytes()
00205 {
00206 int64_t block_read_avail, moved, to_move, total_moved = 0;
00207
00208
00209 if (!dechunked_buffer) {
00210 moved = MIN(bytes_left, chunked_reader->read_avail());
00211 chunked_reader->consume(moved);
00212 bytes_left = bytes_left - moved;
00213 return moved;
00214 }
00215
00216 while (bytes_left > 0) {
00217 block_read_avail = chunked_reader->block_read_avail();
00218
00219 to_move = MIN(bytes_left, block_read_avail);
00220 if (to_move <= 0)
00221 break;
00222
00223 if (to_move >= min_block_transfer_bytes) {
00224 moved = dechunked_buffer->write(chunked_reader, bytes_left);
00225 } else {
00226
00227
00228
00229
00230 moved = dechunked_buffer->write(chunked_reader->start(), to_move);
00231 }
00232
00233 if (moved > 0) {
00234 chunked_reader->consume(moved);
00235 bytes_left = bytes_left - moved;
00236 dechunked_size += moved;
00237 total_moved += moved;
00238 } else
00239 break;
00240 }
00241 return total_moved;
00242 }
00243
00244 void
00245 ChunkedHandler::read_chunk()
00246 {
00247 int64_t b = transfer_bytes();
00248
00249 ink_assert(bytes_left >= 0);
00250 if (bytes_left == 0) {
00251 Debug("http_chunk", "completed read of chunk of %" PRId64" bytes", cur_chunk_size);
00252
00253 state = CHUNK_READ_SIZE_START;
00254 } else if (bytes_left > 0) {
00255 Debug("http_chunk", "read %" PRId64" bytes of an %" PRId64" chunk", b, cur_chunk_size);
00256 }
00257 }
00258
00259 void
00260 ChunkedHandler::read_trailer()
00261 {
00262 int64_t bytes_used;
00263 bool done = false;
00264
00265 while (chunked_reader->is_read_avail_more_than(0) && !done) {
00266 const char *tmp = chunked_reader->start();
00267 int64_t data_size = chunked_reader->block_read_avail();
00268
00269 ink_assert(data_size > 0);
00270 for (bytes_used = 0; data_size > 0; data_size--) {
00271 bytes_used++;
00272
00273 if (ParseRules::is_cr(*tmp)) {
00274
00275
00276
00277 state = (state == CHUNK_READ_TRAILER_BLANK) ? CHUNK_READ_TRAILER_CR : CHUNK_READ_TRAILER_LINE;
00278 } else if (ParseRules::is_lf(*tmp)) {
00279
00280
00281
00282 if (state == CHUNK_READ_TRAILER_CR || state == CHUNK_READ_TRAILER_BLANK) {
00283 state = CHUNK_READ_DONE;
00284 Debug("http_chunk", "completed read of trailers");
00285 done = true;
00286 break;
00287 } else {
00288
00289
00290 state = CHUNK_READ_TRAILER_BLANK;
00291 }
00292 } else {
00293
00294
00295 state = CHUNK_READ_TRAILER_LINE;
00296 }
00297 tmp++;
00298 }
00299 chunked_reader->consume(bytes_used);
00300 }
00301 }
00302
00303 bool ChunkedHandler::process_chunked_content()
00304 {
00305 while (chunked_reader->is_read_avail_more_than(0) && state != CHUNK_READ_DONE && state != CHUNK_READ_ERROR) {
00306 switch (state) {
00307 case CHUNK_READ_SIZE:
00308 case CHUNK_READ_SIZE_CRLF:
00309 case CHUNK_READ_SIZE_START:
00310 read_size();
00311 break;
00312 case CHUNK_READ_CHUNK:
00313 read_chunk();
00314 break;
00315 case CHUNK_READ_TRAILER_BLANK:
00316 case CHUNK_READ_TRAILER_CR:
00317 case CHUNK_READ_TRAILER_LINE:
00318 read_trailer();
00319 break;
00320 case CHUNK_FLOW_CONTROL:
00321 return false;
00322 default:
00323 ink_release_assert(0);
00324 break;
00325 }
00326 }
00327 return (state == CHUNK_READ_DONE || state == CHUNK_READ_ERROR);
00328 }
00329
00330 bool ChunkedHandler::generate_chunked_content()
00331 {
00332 char tmp[16];
00333 bool server_done = false;
00334 int64_t r_avail;
00335
00336 ink_assert(max_chunk_header_len);
00337
00338 switch (last_server_event) {
00339 case VC_EVENT_EOS:
00340 case VC_EVENT_READ_COMPLETE:
00341 case HTTP_TUNNEL_EVENT_PRECOMPLETE:
00342 server_done = true;
00343 break;
00344 }
00345
00346 while ((r_avail = dechunked_reader->read_avail()) > 0 && state != CHUNK_WRITE_DONE) {
00347 int64_t write_val = MIN(max_chunk_size, r_avail);
00348
00349 state = CHUNK_WRITE_CHUNK;
00350 Debug("http_chunk", "creating a chunk of size %" PRId64 " bytes", write_val);
00351
00352
00353 if (write_val != max_chunk_size) {
00354 int len = snprintf(tmp, sizeof(tmp), CHUNK_HEADER_FMT, write_val);
00355 chunked_buffer->write(tmp, len);
00356 chunked_size += len;
00357 } else {
00358 chunked_buffer->write(max_chunk_header, max_chunk_header_len);
00359 chunked_size += max_chunk_header_len;
00360 }
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370
00371
00372 chunked_buffer->write(dechunked_reader, write_val);
00373 chunked_size += write_val;
00374 dechunked_reader->consume(write_val);
00375
00376
00377 chunked_buffer->write("\r\n", 2);
00378 chunked_size += 2;
00379 }
00380
00381 if (server_done) {
00382 state = CHUNK_WRITE_DONE;
00383
00384
00385 chunked_buffer->write("0\r\n\r\n", 5);
00386 chunked_size += 5;
00387 return true;
00388 }
00389 return false;
00390 }
00391
00392 HttpTunnelProducer::HttpTunnelProducer()
00393 : consumer_list(), self_consumer(NULL),
00394 vc(NULL), vc_handler(NULL), read_vio(NULL), read_buffer(NULL),
00395 buffer_start(NULL), vc_type(HT_HTTP_SERVER), chunking_action(TCA_PASSTHRU_DECHUNKED_CONTENT),
00396 do_chunking(false), do_dechunking(false), do_chunked_passthru(false),
00397 init_bytes_done(0), nbytes(0), ntodo(0), bytes_read(0),
00398 handler_state(0), last_event(0), num_consumers(0), alive(false),
00399 read_success(false), flow_control_source(0), name(NULL)
00400 {
00401 }
00402
00403 uint64_t
00404 HttpTunnelProducer::backlog(uint64_t limit) {
00405 uint64_t zret = 0;
00406
00407
00408
00409 for ( HttpTunnelConsumer* c = consumer_list.head ; c ; c = c->link.next ) {
00410 if (c->alive && c->write_vio) {
00411 uint64_t n = 0;
00412 if (HT_TRANSFORM == c->vc_type) {
00413 n += static_cast<TransformVCChain*>(c->vc)->backlog(limit);
00414 } else {
00415 IOBufferReader* r = c->write_vio->get_reader();
00416 if (r) {
00417 n += static_cast<uint64_t>(r->read_avail());
00418 }
00419 }
00420 if (n >= limit) return n;
00421
00422 if (!c->is_sink()) {
00423 HttpTunnelProducer* dsp = c->self_producer;
00424 if (dsp) {
00425 n += dsp->backlog();
00426 }
00427 }
00428 if (n >= limit) return n;
00429 if (n > zret) zret = n;
00430 }
00431 }
00432
00433 if (chunked_handler.chunked_reader) {
00434 zret += static_cast<uint64_t>(chunked_handler.chunked_reader->read_avail());
00435 }
00436
00437 return zret;
00438 }
00439
00440
00441
00442
00443
00444
00445 void
00446 HttpTunnelProducer::set_throttle_src(HttpTunnelProducer* srcp) {
00447 HttpTunnelProducer* p = this;
00448 p->flow_control_source = srcp;
00449 for ( HttpTunnelConsumer* c = consumer_list.head ; c ; c = c->link.next ) {
00450 if (!c->is_sink()) {
00451 p = c->self_producer;
00452 if (p)
00453 p->set_throttle_src(srcp);
00454 }
00455 }
00456 }
00457
00458 HttpTunnelConsumer::HttpTunnelConsumer()
00459 : link(), producer(NULL), self_producer(NULL), vc_type(HT_HTTP_CLIENT), vc(NULL), buffer_reader(NULL),
00460 vc_handler(NULL), write_vio(NULL), skip_bytes(0), bytes_written(0), handler_state(0), alive(false),
00461 write_success(false), name(NULL)
00462 {
00463 }
00464
00465 HttpTunnel::HttpTunnel()
00466 : Continuation(NULL), num_producers(0), num_consumers(0), sm(NULL), active(false)
00467 {
00468 }
00469
00470 void
00471 HttpTunnel::init(HttpSM * sm_arg, ProxyMutex * amutex)
00472 {
00473 HttpConfigParams* params = sm_arg->t_state.http_config_param;
00474 sm = sm_arg;
00475 active = false;
00476 mutex = amutex;
00477 SET_HANDLER(&HttpTunnel::main_handler);
00478 flow_state.enabled_p = params->oride.flow_control_enabled;
00479 if (params->oride.flow_low_water_mark > 0)
00480 flow_state.low_water = params->oride.flow_low_water_mark;
00481 if (params->oride.flow_high_water_mark > 0)
00482 flow_state.high_water = params->oride.flow_high_water_mark;
00483
00484 ink_assert(flow_state.low_water <= flow_state.high_water);
00485 }
00486
00487 void
00488 HttpTunnel::reset()
00489 {
00490 ink_assert(active == false);
00491 #ifdef DEBUG
00492 for (int i = 0; i < MAX_PRODUCERS; ++i) {
00493 ink_assert(producers[i].alive == false);
00494 }
00495 for (int j = 0; j < MAX_CONSUMERS; ++j) {
00496 ink_assert(consumers[j].alive == false);
00497 }
00498 #endif
00499
00500 num_producers = 0;
00501 num_consumers = 0;
00502 memset(consumers, 0, sizeof(consumers));
00503 memset(producers, 0, sizeof(producers));
00504 }
00505
00506 void
00507 HttpTunnel::kill_tunnel()
00508 {
00509 for (int i = 0; i < MAX_PRODUCERS; ++i) {
00510 if (producers[i].vc != NULL) {
00511 chain_abort_all(&producers[i]);
00512 }
00513 ink_assert(producers[i].alive == false);
00514 }
00515 active = false;
00516 this->deallocate_buffers();
00517 this->deallocate_redirect_postdata_buffers();
00518 this->reset();
00519 }
00520
00521 HttpTunnelProducer *
00522 HttpTunnel::alloc_producer()
00523 {
00524 for (int i = 0; i < MAX_PRODUCERS; ++i) {
00525 if (producers[i].vc == NULL) {
00526 num_producers++;
00527 ink_assert(num_producers <= MAX_PRODUCERS);
00528 return producers + i;
00529 }
00530 }
00531 ink_release_assert(0);
00532 return NULL;
00533 }
00534
00535 HttpTunnelConsumer *
00536 HttpTunnel::alloc_consumer()
00537 {
00538 for (int i = 0; i < MAX_CONSUMERS; i++) {
00539 if (consumers[i].vc == NULL) {
00540 num_consumers++;
00541 ink_assert(num_consumers <= MAX_CONSUMERS);
00542 return consumers + i;
00543 }
00544 }
00545 ink_release_assert(0);
00546 return NULL;
00547 }
00548
00549 int
00550 HttpTunnel::deallocate_buffers()
00551 {
00552 int num = 0;
00553 ink_release_assert(active == false);
00554 for (int i = 0; i < MAX_PRODUCERS; ++i) {
00555 if (producers[i].read_buffer != NULL) {
00556 ink_assert(producers[i].vc != NULL);
00557 free_MIOBuffer(producers[i].read_buffer);
00558 producers[i].read_buffer = NULL;
00559 producers[i].buffer_start = NULL;
00560 num++;
00561 }
00562
00563 if (producers[i].chunked_handler.dechunked_buffer != NULL) {
00564 ink_assert(producers[i].vc != NULL);
00565 free_MIOBuffer(producers[i].chunked_handler.dechunked_buffer);
00566 producers[i].chunked_handler.dechunked_buffer = NULL;
00567 num++;
00568 }
00569
00570 if (producers[i].chunked_handler.chunked_buffer != NULL) {
00571 ink_assert(producers[i].vc != NULL);
00572 free_MIOBuffer(producers[i].chunked_handler.chunked_buffer);
00573 producers[i].chunked_handler.chunked_buffer = NULL;
00574 num++;
00575 }
00576 producers[i].chunked_handler.max_chunk_header_len = 0;
00577 }
00578 return num;
00579 }
00580
00581 void
00582 HttpTunnel::set_producer_chunking_action(HttpTunnelProducer * p, int64_t skip_bytes, TunnelChunkingAction_t action)
00583 {
00584 p->chunked_handler.skip_bytes = skip_bytes;
00585 p->chunking_action = action;
00586
00587 switch (action) {
00588 case TCA_CHUNK_CONTENT:
00589 p->chunked_handler.state = p->chunked_handler.CHUNK_WRITE_CHUNK;
00590 break;
00591 case TCA_DECHUNK_CONTENT:
00592 case TCA_PASSTHRU_CHUNKED_CONTENT:
00593 p->chunked_handler.state = p->chunked_handler.CHUNK_READ_SIZE;
00594 break;
00595 default:
00596 break;
00597 };
00598 }
00599
00600 void
00601 HttpTunnel::set_producer_chunking_size(HttpTunnelProducer* p, int64_t size)
00602 {
00603 p->chunked_handler.set_max_chunk_size(size);
00604 }
00605
00606
00607
00608
00609
00610 HttpTunnelProducer *
00611 HttpTunnel::add_producer(VConnection * vc,
00612 int64_t nbytes_arg,
00613 IOBufferReader * reader_start,
00614 HttpProducerHandler sm_handler, HttpTunnelType_t vc_type, const char *name_arg)
00615 {
00616 HttpTunnelProducer *p;
00617
00618 Debug("http_tunnel", "[%" PRId64 "] adding producer '%s'", sm->sm_id, name_arg);
00619
00620 ink_assert(reader_start->mbuf);
00621 if ((p = alloc_producer()) != NULL) {
00622 p->vc = vc;
00623 p->nbytes = nbytes_arg;
00624 p->buffer_start = reader_start;
00625 p->read_buffer = reader_start->mbuf;
00626 p->vc_handler = sm_handler;
00627 p->vc_type = vc_type;
00628 p->name = name_arg;
00629 p->chunking_action = TCA_PASSTHRU_DECHUNKED_CONTENT;
00630
00631 p->do_chunking = false;
00632 p->do_dechunking = false;
00633 p->do_chunked_passthru = false;
00634
00635 p->init_bytes_done = reader_start->read_avail();
00636 if (p->nbytes < 0) {
00637 p->ntodo = p->nbytes;
00638 } else {
00639
00640
00641
00642
00643 p->ntodo = p->nbytes - p->init_bytes_done;
00644 ink_assert(p->ntodo >= 0);
00645 }
00646
00647
00648
00649 if (vc == HTTP_TUNNEL_STATIC_PRODUCER) {
00650 ink_assert(p->ntodo == 0);
00651 p->alive = false;
00652 p->read_success = true;
00653 } else {
00654 p->alive = true;
00655 }
00656 }
00657 return p;
00658 }
00659
00660
00661
00662
00663
00664
00665
00666
00667
00668
00669 HttpTunnelConsumer *
00670 HttpTunnel::add_consumer(VConnection * vc,
00671 VConnection * producer,
00672 HttpConsumerHandler sm_handler, HttpTunnelType_t vc_type, const char *name_arg, int64_t skip_bytes)
00673 {
00674 Debug("http_tunnel", "[%" PRId64 "] adding consumer '%s'", sm->sm_id, name_arg);
00675
00676
00677 HttpTunnelProducer *p = get_producer(producer);
00678 ink_release_assert(p);
00679
00680
00681
00682 if (p->alive == false && p->read_success == false) {
00683 Debug("http_tunnel", "[%" PRId64 "] consumer '%s' not added due to producer failure", sm->sm_id, name_arg);
00684 return NULL;
00685 }
00686
00687 HttpTunnelConsumer *c = alloc_consumer();
00688 c->producer = p;
00689 c->vc = vc;
00690 c->alive = true;
00691 c->skip_bytes = skip_bytes;
00692 c->vc_handler = sm_handler;
00693 c->vc_type = vc_type;
00694 c->name = name_arg;
00695
00696
00697 p->consumer_list.push(c);
00698 p->num_consumers++;
00699
00700 return c;
00701 }
00702
00703 void
00704 HttpTunnel::chain(HttpTunnelConsumer* c, HttpTunnelProducer* p)
00705 {
00706 p->self_consumer = c;
00707 c->self_producer = p;
00708
00709 if (c->producer->is_throttled())
00710 p->set_throttle_src(c->producer->flow_control_source);
00711 }
00712
00713
00714
00715
00716
00717 void
00718 HttpTunnel::tunnel_run(HttpTunnelProducer * p_arg)
00719 {
00720 Debug("http_tunnel", "tunnel_run started, p_arg is %s", p_arg ? "provided" : "NULL");
00721
00722 if (p_arg) {
00723 producer_run(p_arg);
00724 } else {
00725 HttpTunnelProducer *p;
00726
00727 ink_assert(active == false);
00728
00729 for (int i = 0 ; i < MAX_PRODUCERS ; ++i) {
00730 p = producers + i;
00731 if (p->vc != NULL) {
00732 producer_run(p);
00733 }
00734 }
00735 }
00736
00737
00738
00739
00740
00741 if (!is_tunnel_alive()) {
00742 active = false;
00743 sm->handleEvent(HTTP_TUNNEL_EVENT_DONE, this);
00744 }
00745 }
00746
00747 void
00748 HttpTunnel::producer_run(HttpTunnelProducer * p)
00749 {
00750
00751
00752
00753 HttpTunnelConsumer *c, *cache_write_consumer = NULL;
00754 bool transform_consumer = false;
00755
00756 for (c = p->consumer_list.head; c; c = c->link.next) {
00757 if (c->vc_type == HT_CACHE_WRITE) {
00758 cache_write_consumer = c;
00759 break;
00760 }
00761 }
00762
00763
00764 for (c = p->consumer_list.head; c; c = c->link.next) {
00765 if (c->vc_type == HT_TRANSFORM) {
00766 transform_consumer = true;
00767 break;
00768 }
00769 }
00770
00771
00772
00773 TunnelChunkingAction_t action = p->chunking_action;
00774
00775
00776 if (p->vc != HTTP_TUNNEL_STATIC_PRODUCER) {
00777 if (action == TCA_CHUNK_CONTENT)
00778 p->do_chunking = true;
00779 else if (action == TCA_DECHUNK_CONTENT)
00780 p->do_dechunking = true;
00781 else if (action == TCA_PASSTHRU_CHUNKED_CONTENT) {
00782 p->do_chunked_passthru = true;
00783
00784
00785 if (cache_write_consumer != NULL)
00786 p->do_dechunking = true;
00787 }
00788 }
00789
00790 int64_t consumer_n;
00791 int64_t producer_n;
00792
00793 ink_assert(p->vc != NULL);
00794 active = true;
00795
00796 IOBufferReader *chunked_buffer_start = NULL, *dechunked_buffer_start = NULL;
00797 if (p->do_chunking || p->do_dechunking || p->do_chunked_passthru) {
00798 p->chunked_handler.init(p->buffer_start, p);
00799
00800
00801 if (p->do_chunking) {
00802
00803 chunked_buffer_start = p->chunked_handler.chunked_buffer->alloc_reader();
00804 p->chunked_handler.chunked_buffer->write(p->buffer_start, p->chunked_handler.skip_bytes);
00805 } else if (p->do_dechunking) {
00806
00807 Debug("http_tunnel",
00808 "[producer_run] do_dechunking p->chunked_handler.chunked_reader->read_avail() = %" PRId64"",
00809 p->chunked_handler.chunked_reader->read_avail());
00810
00811
00812 dechunked_buffer_start = p->chunked_handler.dechunked_buffer->alloc_reader();
00813
00814
00815
00816 if (!transform_consumer) {
00817 p->chunked_handler.dechunked_buffer->write(p->buffer_start, p->chunked_handler.skip_bytes);
00818
00819 Debug("http_tunnel", "[producer_run] do_dechunking::Copied header of size %" PRId64"", p->chunked_handler.skip_bytes);
00820 }
00821 }
00822 }
00823
00824 int64_t read_start_pos = 0;
00825 if (p->vc_type == HT_CACHE_READ && sm->t_state.range_setup == HttpTransact::RANGE_NOT_TRANSFORM_REQUESTED) {
00826 ink_assert(sm->t_state.num_range_fields == 1);
00827 read_start_pos = sm->t_state.ranges[0]._start;
00828 producer_n = (sm->t_state.ranges[0]._end - sm->t_state.ranges[0]._start)+1;
00829 consumer_n = (producer_n + sm->client_response_hdr_bytes);
00830 } else if (p->nbytes >= 0) {
00831 consumer_n = p->nbytes;
00832 producer_n = p->ntodo;
00833 } else {
00834 consumer_n = (producer_n = INT64_MAX);
00835 }
00836
00837
00838
00839
00840 ink_release_assert(p->num_consumers > 0);
00841 for (c = p->consumer_list.head; c;) {
00842
00843
00844 if (c->vc_type == HT_CACHE_WRITE) {
00845 switch (action) {
00846 case TCA_CHUNK_CONTENT:
00847 case TCA_PASSTHRU_DECHUNKED_CONTENT:
00848 c->buffer_reader = p->read_buffer->clone_reader(p->buffer_start);
00849 break;
00850 case TCA_DECHUNK_CONTENT:
00851 case TCA_PASSTHRU_CHUNKED_CONTENT:
00852 c->buffer_reader = p->chunked_handler.dechunked_buffer->clone_reader(dechunked_buffer_start);
00853 break;
00854 default:
00855 break;
00856 }
00857 }
00858
00859 else if (action == TCA_CHUNK_CONTENT) {
00860 c->buffer_reader = p->chunked_handler.chunked_buffer->clone_reader(chunked_buffer_start);
00861 } else if (action == TCA_DECHUNK_CONTENT) {
00862 c->buffer_reader = p->chunked_handler.dechunked_buffer->clone_reader(dechunked_buffer_start);
00863 } else {
00864 c->buffer_reader = p->read_buffer->clone_reader(p->buffer_start);
00865 }
00866
00867
00868 if (c->skip_bytes > 0) {
00869 ink_assert(c->skip_bytes <= c->buffer_reader->read_avail());
00870 c->buffer_reader->consume(c->skip_bytes);
00871 }
00872 int64_t c_write = consumer_n;
00873
00874
00875
00876
00877
00878
00879 if (c_write != INT64_MAX) {
00880 c_write -= c->skip_bytes;
00881 }
00882
00883
00884
00885 if (p->do_chunking == true) {
00886 c_write = INT64_MAX;
00887 }
00888
00889 if (c_write == 0) {
00890
00891 c->write_vio = NULL;
00892 consumer_handler(VC_EVENT_WRITE_COMPLETE, c);
00893 } else {
00894 c->write_vio = c->vc->do_io_write(this, c_write, c->buffer_reader);
00895 ink_assert(c_write > 0);
00896 }
00897
00898 c = c->link.next;
00899 }
00900
00901
00902
00903
00904 if (p->alive && sm->t_state.method == HTTP_WKSIDX_POST && sm->enable_redirection && (p->vc_type == HT_HTTP_CLIENT)) {
00905 Debug("http_redirect", "[HttpTunnel::producer_run] client post: %" PRId64" max size: %" PRId64"",
00906 p->buffer_start->read_avail(), HttpConfig::m_master.post_copy_size);
00907
00908
00909 if (p->buffer_start->read_avail() > HttpConfig::m_master.post_copy_size) {
00910 Debug("http_redirect", "[HttpTunnel::producer_handler] post exceeds buffer limit, buffer_avail=%" PRId64" limit=%" PRId64"",
00911 p->buffer_start->read_avail(), HttpConfig::m_master.post_copy_size);
00912 sm->enable_redirection = false;
00913 } else {
00914
00915 allocate_redirect_postdata_buffers(p->read_buffer->clone_reader(p->buffer_start));
00916 copy_partial_post_data();
00917 }
00918 }
00919
00920 if (p->do_chunking) {
00921
00922 p->chunked_handler.chunked_buffer->dealloc_reader(chunked_buffer_start);
00923 p->chunked_handler.dechunked_reader->consume(p->chunked_handler.skip_bytes);
00924
00925
00926 producer_handler(VC_EVENT_READ_READY, p);
00927 } else if (p->do_dechunking || p->do_chunked_passthru) {
00928
00929 if (p->do_dechunking)
00930 p->chunked_handler.dechunked_buffer->dealloc_reader(dechunked_buffer_start);
00931
00932
00933
00934 Debug("http_tunnel",
00935 "[producer_run] do_dechunking p->chunked_handler.chunked_reader->read_avail() = %" PRId64"",
00936 p->chunked_handler.chunked_reader->read_avail());
00937 if (!transform_consumer && (p->chunked_handler.chunked_reader->read_avail() >= p->chunked_handler.skip_bytes)) {
00938 p->chunked_handler.chunked_reader->consume(p->chunked_handler.skip_bytes);
00939 Debug("http_tunnel", "[producer_run] do_dechunking p->chunked_handler.skip_bytes = %" PRId64"",
00940 p->chunked_handler.skip_bytes);
00941 }
00942
00943
00944
00945
00946 producer_handler(VC_EVENT_READ_READY, p);
00947 if (!p->chunked_handler.chunked_reader->read_avail() && sm->redirection_tries > 0 && p->vc_type == HT_HTTP_CLIENT) {
00948
00949
00950
00951
00952 producer_n = 0;
00953 }
00954 }
00955
00956 if (p->alive) {
00957 ink_assert(producer_n >= 0);
00958
00959 if (producer_n == 0) {
00960
00961
00962
00963 p->alive = false;
00964 p->read_success = true;
00965 Debug("http_tunnel", "[%" PRId64 "] [tunnel_run] producer already done", sm->sm_id);
00966 producer_handler(HTTP_TUNNEL_EVENT_PRECOMPLETE, p);
00967 } else {
00968 if (read_start_pos > 0) {
00969 p->read_vio = ((CacheVC*)p->vc)->do_io_pread(this, producer_n, p->read_buffer, read_start_pos);
00970 }
00971 else {
00972 p->read_vio = p->vc->do_io_read(this, producer_n, p->read_buffer);
00973 }
00974 }
00975 }
00976
00977
00978
00979 p->read_buffer->dealloc_reader(p->buffer_start);
00980 p->buffer_start = NULL;
00981 }
00982
00983 int
00984 HttpTunnel::producer_handler_dechunked(int event, HttpTunnelProducer * p)
00985 {
00986 ink_assert(p->do_chunking);
00987
00988 Debug("http_tunnel", "[%" PRId64 "] producer_handler_dechunked [%s %s]", sm->sm_id, p->name, HttpDebugNames::get_event_name(event));
00989
00990
00991 switch (event) {
00992 case VC_EVENT_READ_READY:
00993 case VC_EVENT_READ_COMPLETE:
00994 case HTTP_TUNNEL_EVENT_PRECOMPLETE:
00995 case VC_EVENT_EOS:
00996 p->last_event =
00997 p->chunked_handler.last_server_event = event;
00998
00999 p->chunked_handler.generate_chunked_content();
01000 break;
01001 };
01002
01003
01004
01005 return event;
01006 }
01007
01008
01009
01010
01011
01012
01013
01014 int
01015 HttpTunnel::producer_handler_chunked(int event, HttpTunnelProducer * p)
01016 {
01017 ink_assert(p->do_dechunking || p->do_chunked_passthru);
01018
01019 Debug("http_tunnel", "[%" PRId64 "] producer_handler_chunked [%s %s]", sm->sm_id, p->name, HttpDebugNames::get_event_name(event));
01020
01021
01022 switch (event) {
01023 case VC_EVENT_READ_READY:
01024 case VC_EVENT_READ_COMPLETE:
01025 case HTTP_TUNNEL_EVENT_PRECOMPLETE:
01026 case VC_EVENT_EOS:
01027 break;
01028 default:
01029 return event;
01030 }
01031
01032 p->last_event =
01033 p->chunked_handler.last_server_event = event;
01034 bool done = p->chunked_handler.process_chunked_content();
01035
01036
01037
01038 if (p->chunked_handler.state == ChunkedHandler::CHUNK_READ_ERROR) {
01039 Debug("http_tunnel", "[%" PRId64 "] producer_handler_chunked [%s chunk decoding error]", sm->sm_id, p->name);
01040 p->chunked_handler.truncation = true;
01041
01042
01043
01044 return VC_EVENT_EOS;
01045 }
01046
01047 switch (event) {
01048 case VC_EVENT_READ_READY:
01049 if (done) {
01050 return VC_EVENT_READ_COMPLETE;
01051 }
01052 break;
01053 case HTTP_TUNNEL_EVENT_PRECOMPLETE:
01054 case VC_EVENT_EOS:
01055 case VC_EVENT_READ_COMPLETE:
01056 if (!done) {
01057 p->chunked_handler.truncation = true;
01058 }
01059 break;
01060 }
01061
01062 return event;
01063 }
01064
01065
01066
01067
01068
01069
01070
01071
01072
01073
01074
01075
01076 bool HttpTunnel::producer_handler(int event, HttpTunnelProducer * p)
01077 {
01078 HttpTunnelConsumer *c;
01079 HttpProducerHandler jump_point;
01080 bool sm_callback = false;
01081
01082 Debug("http_tunnel", "[%" PRId64 "] producer_handler [%s %s]", sm->sm_id, p->name, HttpDebugNames::get_event_name(event));
01083
01084
01085 if (p->do_chunking) {
01086 event = producer_handler_dechunked(event, p);
01087
01088
01089
01090
01091
01092 if (event == HTTP_TUNNEL_EVENT_PRECOMPLETE) {
01093 event = VC_EVENT_EOS;
01094 }
01095 } else if (p->do_dechunking || p->do_chunked_passthru) {
01096 event = producer_handler_chunked(event, p);
01097 } else {
01098 p->last_event = event;
01099 }
01100
01101
01102
01103
01104 if (sm->t_state.method == HTTP_WKSIDX_POST && sm->enable_redirection &&
01105 (event == VC_EVENT_READ_READY || event == VC_EVENT_READ_COMPLETE) && (p->vc_type == HT_HTTP_CLIENT)) {
01106 Debug("http_redirect", "[HttpTunnel::producer_handler] [%s %s]", p->name, HttpDebugNames::get_event_name(event));
01107
01108 if ((postbuf->postdata_copy_buffer_start->read_avail() + postbuf->ua_buffer_reader->read_avail())
01109 > HttpConfig::m_master.post_copy_size) {
01110 Debug("http_redirect",
01111 "[HttpTunnel::producer_handler] post exceeds buffer limit, buffer_avail=%" PRId64" reader_avail=%" PRId64" limit=%" PRId64"",
01112 postbuf->postdata_copy_buffer_start->read_avail(), postbuf->ua_buffer_reader->read_avail(),
01113 HttpConfig::m_master.post_copy_size);
01114 deallocate_redirect_postdata_buffers();
01115 sm->enable_redirection = false;
01116 } else {
01117 copy_partial_post_data();
01118 }
01119 }
01120
01121 Debug("http_redirect", "[HttpTunnel::producer_handler] enable_redirection: [%d %d %d] event: %d",
01122 p->alive == true, sm->enable_redirection, (p->self_consumer && p->self_consumer->alive == true), event);
01123 ink_assert(p->alive == true || event == HTTP_TUNNEL_EVENT_PRECOMPLETE || event == VC_EVENT_EOS ||
01124 sm->enable_redirection || (p->self_consumer && p->self_consumer->alive == true));
01125
01126 switch (event) {
01127 case VC_EVENT_READ_READY:
01128
01129 for (c = p->consumer_list.head; c; c = c->link.next) {
01130 if (c->alive) {
01131 c->write_vio->reenable();
01132 }
01133 }
01134 break;
01135
01136 case HTTP_TUNNEL_EVENT_PRECOMPLETE:
01137
01138
01139
01140
01141
01142
01143 p->bytes_read = 0;
01144 jump_point = p->vc_handler;
01145 (sm->*jump_point) (event, p);
01146 sm_callback = true;
01147 break;
01148
01149 case VC_EVENT_READ_COMPLETE:
01150 case VC_EVENT_EOS:
01151
01152 p->alive = false;
01153 if (p->read_vio) {
01154 p->bytes_read = p->read_vio->ndone;
01155 } else {
01156
01157
01158
01159
01160
01161 p->bytes_read = 0;
01162 }
01163
01164
01165
01166
01167
01168
01169
01170
01171 jump_point = p->vc_handler;
01172 (sm->*jump_point) (event, p);
01173 sm_callback = true;
01174
01175
01176 for (c = p->consumer_list.head; c; c = c->link.next) {
01177 if (c->alive) {
01178 c->write_vio->reenable();
01179 }
01180 }
01181 break;
01182
01183 case VC_EVENT_ERROR:
01184 case VC_EVENT_ACTIVE_TIMEOUT:
01185 case VC_EVENT_INACTIVITY_TIMEOUT:
01186 case HTTP_TUNNEL_EVENT_CONSUMER_DETACH:
01187 p->alive = false;
01188 p->bytes_read = p->read_vio->ndone;
01189
01190 jump_point = p->vc_handler;
01191 (sm->*jump_point) (event, p);
01192 sm_callback = true;
01193 break;
01194
01195 case VC_EVENT_WRITE_READY:
01196 case VC_EVENT_WRITE_COMPLETE:
01197 default:
01198
01199 ink_release_assert(0);
01200 break;
01201 }
01202
01203 return sm_callback;
01204 }
01205
01206 bool
01207 HttpTunnel::consumer_reenable(HttpTunnelConsumer* c)
01208 {
01209 HttpTunnelProducer* p = c->producer;
01210 HttpTunnelProducer* srcp = p->flow_control_source;
01211 if (p->alive
01212 #ifndef LAZY_BUF_ALLOC
01213 && p->read_buffer->write_avail() > 0
01214 #endif
01215 ) {
01216
01217
01218
01219
01220
01221 uint64_t backlog = (flow_state.enabled_p && p->is_source())
01222 ? p->backlog(flow_state.high_water) : 0;
01223
01224 if (backlog >= flow_state.high_water) {
01225 if (is_debug_tag_set("http_tunnel"))
01226 Debug("http_tunnel", "Throttle %p %" PRId64 " / %" PRId64, p, backlog, p->backlog());
01227 p->throttle();
01228 } else {
01229 if (srcp && srcp->alive && c->is_sink()) {
01230
01231
01232
01233
01234
01235 if (srcp != p)
01236 backlog = srcp->backlog(flow_state.low_water);
01237 if (backlog < flow_state.low_water) {
01238 if (is_debug_tag_set("http_tunnel"))
01239 Debug("http_tunnel", "Unthrottle %p %" PRId64 " / %" PRId64, p, backlog, p->backlog());
01240 srcp->unthrottle();
01241 srcp->read_vio->reenable();
01242
01243 this->producer_handler(VC_EVENT_READ_READY, srcp);
01244 }
01245 }
01246 p->read_vio->reenable();
01247 }
01248 }
01249 return p->is_throttled();
01250 }
01251
01252
01253
01254
01255
01256
01257
01258
01259
01260
01261
01262
01263 bool HttpTunnel::consumer_handler(int event, HttpTunnelConsumer * c)
01264 {
01265 bool sm_callback = false;
01266 HttpConsumerHandler jump_point;
01267 HttpTunnelProducer* p = c->producer;
01268
01269 Debug("http_tunnel", "[%" PRId64 "] consumer_handler [%s %s]", sm->sm_id, c->name, HttpDebugNames::get_event_name(event));
01270
01271 ink_assert(c->alive == true);
01272
01273 switch (event) {
01274 case VC_EVENT_WRITE_READY:
01275 this->consumer_reenable(c);
01276 break;
01277
01278 case VC_EVENT_WRITE_COMPLETE:
01279 case VC_EVENT_EOS:
01280 case VC_EVENT_ERROR:
01281 case VC_EVENT_ACTIVE_TIMEOUT:
01282 case VC_EVENT_INACTIVITY_TIMEOUT:
01283 ink_assert(c->alive);
01284 ink_assert(c->buffer_reader);
01285 c->alive = false;
01286
01287 c->bytes_written = c->write_vio ? c->write_vio->ndone : 0;
01288
01289
01290 jump_point = c->vc_handler;
01291 (sm->*jump_point) (event, c);
01292 sm_callback = true;
01293
01294
01295
01296
01297 c->buffer_reader->mbuf->dealloc_reader(c->buffer_reader);
01298 c->buffer_reader = NULL;
01299
01300
01301
01302
01303
01304
01305
01306 if (p->alive && p->read_vio
01307 #ifndef LAZY_BUF_ALLOC
01308 && p->read_buffer->write_avail() > 0
01309 #endif
01310 ) {
01311 if (p->is_throttled())
01312 this->consumer_reenable(c);
01313 else
01314 p->read_vio->reenable();
01315 }
01316
01317
01318 if (p->is_throttled())
01319 Debug("http_tunnel", "Special event %s on %p with flow control on", HttpDebugNames::get_event_name(event), p);
01320 break;
01321
01322 case VC_EVENT_READ_READY:
01323 case VC_EVENT_READ_COMPLETE:
01324 default:
01325
01326 ink_release_assert(0);
01327 break;
01328 }
01329
01330 return sm_callback;
01331 }
01332
01333
01334
01335
01336
01337
01338
01339 void
01340 HttpTunnel::chain_abort_all(HttpTunnelProducer * p)
01341 {
01342 HttpTunnelConsumer *c = p->consumer_list.head;
01343
01344 while (c) {
01345 if (c->alive) {
01346 c->alive = false;
01347 c->write_vio = NULL;
01348 c->vc->do_io_close(EHTTP_ERROR);
01349 update_stats_after_abort(c->vc_type);
01350 }
01351
01352 if (c->self_producer) {
01353 chain_abort_all(c->self_producer);
01354 }
01355
01356 c = c->link.next;
01357 }
01358
01359 if (p->alive) {
01360 p->alive = false;
01361 p->bytes_read = p->read_vio->ndone;
01362 if (p->self_consumer) {
01363 p->self_consumer->alive = false;
01364 }
01365 p->read_vio = NULL;
01366 p->vc->do_io_close(EHTTP_ERROR);
01367 update_stats_after_abort(p->vc_type);
01368 }
01369 }
01370
01371
01372
01373
01374
01375
01376
01377 void
01378 HttpTunnel::finish_all_internal(HttpTunnelProducer * p, bool chain)
01379 {
01380 ink_assert(p->alive == false);
01381 HttpTunnelConsumer *c = p->consumer_list.head;
01382 int64_t total_bytes = 0;
01383 TunnelChunkingAction_t action = p->chunking_action;
01384
01385 while (c) {
01386 if (c->alive) {
01387 if (c->vc_type == HT_CACHE_WRITE) {
01388 switch (action) {
01389 case TCA_CHUNK_CONTENT:
01390 case TCA_PASSTHRU_DECHUNKED_CONTENT:
01391 total_bytes = p->bytes_read + p->init_bytes_done;
01392 break;
01393 case TCA_DECHUNK_CONTENT:
01394 case TCA_PASSTHRU_CHUNKED_CONTENT:
01395 total_bytes = p->chunked_handler.skip_bytes + p->chunked_handler.dechunked_size;
01396 break;
01397 default:
01398 break;
01399 }
01400 } else if (action == TCA_CHUNK_CONTENT) {
01401 total_bytes = p->chunked_handler.skip_bytes + p->chunked_handler.chunked_size;
01402 } else if (action == TCA_DECHUNK_CONTENT) {
01403 total_bytes = p->chunked_handler.skip_bytes + p->chunked_handler.dechunked_size;
01404 } else
01405 total_bytes = p->bytes_read + p->init_bytes_done;
01406
01407 c->write_vio->nbytes = total_bytes - c->skip_bytes;
01408 ink_assert(c->write_vio->nbytes >= 0);
01409
01410 if (c->write_vio->nbytes < 0) {
01411
01412 fprintf(stderr,
01413 "[HttpTunnel::finish_all_internal] ERROR: Incorrect total_bytes - c->skip_bytes = %" PRId64 "\n",
01414 (int64_t) (total_bytes - c->skip_bytes));
01415 }
01416
01417 if (chain == true && c->self_producer) {
01418 chain_finish_all(c->self_producer);
01419 }
01420
01421
01422
01423
01424 if (c->write_vio->nbytes == c->write_vio->ndone) {
01425 consumer_handler(VC_EVENT_WRITE_COMPLETE, c);
01426 }
01427 }
01428
01429 c = c->link.next;
01430 }
01431 }
01432
01433
01434
01435
01436
01437
01438 void
01439 HttpTunnel::chain_abort_cache_write(HttpTunnelProducer * p)
01440 {
01441 HttpTunnelConsumer *c = p->consumer_list.head;
01442
01443 while (c) {
01444 if (c->alive) {
01445 if (c->vc_type == HT_CACHE_WRITE) {
01446 ink_assert(c->self_producer == NULL);
01447 c->write_vio = NULL;
01448 c->vc->do_io_close(EHTTP_ERROR);
01449 c->alive = false;
01450 HTTP_DECREMENT_DYN_STAT(http_current_cache_connections_stat);
01451 } else if (c->self_producer) {
01452 chain_abort_cache_write(c->self_producer);
01453 }
01454 }
01455 c = c->link.next;
01456 }
01457 }
01458
01459
01460
01461
01462
01463
01464 void
01465 HttpTunnel::close_vc(HttpTunnelProducer * p)
01466 {
01467 ink_assert(p->alive == false);
01468 HttpTunnelConsumer *c = p->self_consumer;
01469
01470 if (c && c->alive) {
01471 c->alive = false;
01472 if (c->write_vio) {
01473 c->bytes_written = c->write_vio->ndone;
01474 }
01475 }
01476
01477 p->vc->do_io_close();
01478 }
01479
01480
01481
01482
01483
01484
01485 void
01486 HttpTunnel::close_vc(HttpTunnelConsumer * c)
01487 {
01488 ink_assert(c->alive == false);
01489 HttpTunnelProducer *p = c->self_producer;
01490
01491 if (p && p->alive) {
01492 p->alive = false;
01493 if (p->read_vio) {
01494 p->bytes_read = p->read_vio->ndone;
01495 }
01496 }
01497
01498 c->vc->do_io_close();
01499 }
01500
01501
01502
01503
01504
01505
01506
01507 int
01508 HttpTunnel::main_handler(int event, void *data)
01509 {
01510 HttpTunnelProducer *p = NULL;
01511 HttpTunnelConsumer *c = NULL;
01512 bool sm_callback = false;
01513
01514 ink_assert(sm->magic == HTTP_SM_MAGIC_ALIVE);
01515
01516
01517 if ((p = get_producer((VIO *) data)) != 0) {
01518 sm_callback = producer_handler(event, p);
01519 } else {
01520 if ((c = get_consumer((VIO *) data)) != 0) {
01521 ink_assert(c->write_vio == (VIO *) data);
01522 sm_callback = consumer_handler(event, c);
01523 } else {
01524 internal_error();
01525 }
01526 }
01527
01528
01529
01530
01531
01532 if (sm_callback && !is_tunnel_alive()) {
01533 active = false;
01534 sm->handleEvent(HTTP_TUNNEL_EVENT_DONE, this);
01535 return EVENT_DONE;
01536 }
01537 return EVENT_CONT;
01538 }
01539
01540 void
01541 HttpTunnel::update_stats_after_abort(HttpTunnelType_t t)
01542 {
01543 switch (t) {
01544 case HT_CACHE_READ:
01545 case HT_CACHE_WRITE:
01546 HTTP_DECREMENT_DYN_STAT(http_current_cache_connections_stat);
01547 break;
01548 default:
01549
01550
01551
01552 break;
01553 };
01554 }
01555
01556 void
01557 HttpTunnel::internal_error()
01558 {
01559 }
01560
01561
01562
01563
01564 void
01565 HttpTunnel::copy_partial_post_data()
01566 {
01567 postbuf->postdata_copy_buffer->write(postbuf->ua_buffer_reader);
01568 Debug("http_redirect", "[HttpTunnel::copy_partial_post_data] wrote %" PRId64" bytes to buffers %" PRId64"",
01569 postbuf->ua_buffer_reader->read_avail(), postbuf->postdata_copy_buffer_start->read_avail());
01570 postbuf->ua_buffer_reader->consume(postbuf->ua_buffer_reader->read_avail());
01571 }
01572
01573
01574
01575 void
01576 HttpTunnel::allocate_redirect_postdata_producer_buffer()
01577 {
01578 int64_t alloc_index = buffer_size_to_index(sm->t_state.hdr_info.request_content_length);
01579
01580 ink_release_assert(postbuf->postdata_producer_buffer == NULL);
01581
01582 postbuf->postdata_producer_buffer = new_MIOBuffer(alloc_index);
01583 postbuf->postdata_producer_reader = postbuf->postdata_producer_buffer->alloc_reader();
01584 }
01585
01586
01587
01588 void
01589 HttpTunnel::allocate_redirect_postdata_buffers(IOBufferReader * ua_reader)
01590 {
01591 int64_t alloc_index = buffer_size_to_index(sm->t_state.hdr_info.request_content_length);
01592
01593 Debug("http_redirect", "[HttpTunnel::allocate_postdata_buffers]");
01594
01595
01596
01597 if (postbuf == NULL) {
01598 postbuf = new PostDataBuffers();
01599 }
01600 postbuf->ua_buffer_reader = ua_reader;
01601 postbuf->postdata_copy_buffer = new_MIOBuffer(alloc_index);
01602 postbuf->postdata_copy_buffer_start = postbuf->postdata_copy_buffer->alloc_reader();
01603 allocate_redirect_postdata_producer_buffer();
01604 }
01605
01606
01607
01608
01609 void
01610 HttpTunnel::deallocate_redirect_postdata_buffers()
01611 {
01612 Debug("http_redirect", "[HttpTunnel::deallocate_postdata_copy_buffers]");
01613
01614 if (postbuf != NULL) {
01615 if (postbuf->postdata_producer_buffer != NULL) {
01616 free_MIOBuffer(postbuf->postdata_producer_buffer);
01617 postbuf->postdata_producer_buffer = NULL;
01618 postbuf->postdata_producer_reader = NULL;
01619 }
01620 if (postbuf->postdata_copy_buffer != NULL) {
01621 free_MIOBuffer(postbuf->postdata_copy_buffer);
01622 postbuf->postdata_copy_buffer = NULL;
01623 postbuf->postdata_copy_buffer_start = NULL;
01624 }
01625 delete postbuf;
01626 postbuf = NULL;
01627 }
01628 }