00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #ifndef _HTTP_TUNNEL_H_
00034 #define _HTTP_TUNNEL_H_
00035
00036 #include "libts.h"
00037 #include "P_EventSystem.h"
00038
00039
00040 #ifdef MAX_PRODUCERS
00041 #undef MAX_PRODUCERS
00042 #endif
00043 #ifdef MAX_CONSUMERS
00044 #undef MAX_CONSUMERS
00045 #endif
00046 #define MAX_PRODUCERS 2
00047 #define MAX_CONSUMERS 4
00048
00049 #define HTTP_TUNNEL_EVENT_DONE (HTTP_TUNNEL_EVENTS_START + 1)
00050 #define HTTP_TUNNEL_EVENT_PRECOMPLETE (HTTP_TUNNEL_EVENTS_START + 2)
00051 #define HTTP_TUNNEL_EVENT_CONSUMER_DETACH (HTTP_TUNNEL_EVENTS_START + 3)
00052
00053 #define HTTP_TUNNEL_STATIC_PRODUCER (VConnection*)!0
00054
00055
00056 #define ALLOCATE_AND_WRITE_TO_BUF 1
00057 #define WRITE_TO_BUF 2
00058
00059 struct HttpTunnelProducer;
00060 class HttpSM;
00061 class HttpPagesHandler;
00062 typedef int (HttpSM::*HttpSMHandler) (int event, void *data);
00063
00064 struct HttpTunnelConsumer;
00065 struct HttpTunnelProducer;
00066 typedef int (HttpSM::*HttpProducerHandler) (int event, HttpTunnelProducer * p);
00067 typedef int (HttpSM::*HttpConsumerHandler) (int event, HttpTunnelConsumer * c);
00068
00069 enum HttpTunnelType_t
00070 {
00071 HT_HTTP_SERVER,
00072 HT_HTTP_CLIENT,
00073 HT_CACHE_READ,
00074 HT_CACHE_WRITE,
00075 HT_TRANSFORM,
00076 HT_STATIC
00077 };
00078
00079 enum TunnelChunkingAction_t
00080 {
00081 TCA_CHUNK_CONTENT,
00082 TCA_DECHUNK_CONTENT,
00083 TCA_PASSTHRU_CHUNKED_CONTENT,
00084 TCA_PASSTHRU_DECHUNKED_CONTENT
00085 };
00086
00087 struct ChunkedHandler
00088 {
00089 enum ChunkedState {
00090 CHUNK_READ_CHUNK = 0,
00091 CHUNK_READ_SIZE_START,
00092 CHUNK_READ_SIZE,
00093 CHUNK_READ_SIZE_CRLF,
00094 CHUNK_READ_TRAILER_BLANK,
00095 CHUNK_READ_TRAILER_CR,
00096 CHUNK_READ_TRAILER_LINE,
00097 CHUNK_READ_ERROR,
00098 CHUNK_READ_DONE,
00099 CHUNK_WRITE_CHUNK,
00100 CHUNK_WRITE_DONE,
00101 CHUNK_FLOW_CONTROL
00102 };
00103
00104 static int const DEFAULT_MAX_CHUNK_SIZE = 4096;
00105
00106 enum Action {
00107 ACTION_DOCHUNK = 0,
00108 ACTION_DECHUNK,
00109 ACTION_PASSTHRU,
00110 };
00111
00112 Action action;
00113
00114 IOBufferReader *chunked_reader;
00115 MIOBuffer *dechunked_buffer;
00116 int64_t dechunked_size;
00117
00118 IOBufferReader *dechunked_reader;
00119 MIOBuffer *chunked_buffer;
00120 int64_t chunked_size;
00121
00122 bool truncation;
00123 int64_t skip_bytes;
00124
00125 ChunkedState state;
00126 int64_t cur_chunk_size;
00127 int64_t bytes_left;
00128 int last_server_event;
00129
00130
00131 int running_sum;
00132 int num_digits;
00133
00134
00135
00136
00137
00138 int64_t max_chunk_size;
00139
00140
00141
00142 char max_chunk_header[16];
00143 int max_chunk_header_len;
00144
00145 ChunkedHandler();
00146
00147 void init(IOBufferReader *buffer_in, HttpTunnelProducer *p);
00148 void init_by_action(IOBufferReader *buffer_in, Action action);
00149 void clear();
00150
00151
00152
00153 void set_max_chunk_size(int64_t size);
00154
00155
00156 bool process_chunked_content();
00157 bool generate_chunked_content();
00158
00159 private:
00160 void read_size();
00161 void read_chunk();
00162 void read_trailer();
00163 int64_t transfer_bytes();
00164 };
00165
00166 struct HttpTunnelConsumer
00167 {
00168 HttpTunnelConsumer();
00169
00170 LINK(HttpTunnelConsumer, link);
00171 HttpTunnelProducer *producer;
00172 HttpTunnelProducer *self_producer;
00173
00174 HttpTunnelType_t vc_type;
00175 VConnection *vc;
00176 IOBufferReader *buffer_reader;
00177 HttpConsumerHandler vc_handler;
00178 VIO *write_vio;
00179
00180 int64_t skip_bytes;
00181 int64_t bytes_written;
00182 int handler_state;
00183
00184 bool alive;
00185 bool write_success;
00186 const char *name;
00187
00188
00189
00190
00191
00192 bool is_downstream_from(VConnection* vc);
00193
00194
00195
00196 bool is_sink() const;
00197 };
00198
00199 struct HttpTunnelProducer
00200 {
00201 HttpTunnelProducer();
00202
00203 DLL<HttpTunnelConsumer> consumer_list;
00204 HttpTunnelConsumer *self_consumer;
00205 VConnection *vc;
00206 HttpProducerHandler vc_handler;
00207 VIO *read_vio;
00208 MIOBuffer *read_buffer;
00209 IOBufferReader *buffer_start;
00210 HttpTunnelType_t vc_type;
00211
00212 ChunkedHandler chunked_handler;
00213 TunnelChunkingAction_t chunking_action;
00214
00215 bool do_chunking;
00216 bool do_dechunking;
00217 bool do_chunked_passthru;
00218
00219 int64_t init_bytes_done;
00220 int64_t nbytes;
00221 int64_t ntodo;
00222 int64_t bytes_read;
00223 int handler_state;
00224 int last_event;
00225
00226 int num_consumers;
00227
00228 bool alive;
00229 bool read_success;
00230
00231
00232
00233 HttpTunnelProducer* flow_control_source;
00234 const char *name;
00235
00236
00237
00238
00239
00240 uint64_t backlog(
00241 uint64_t limit = UINT64_MAX
00242 );
00243
00244
00245 bool is_source() const;
00246
00247 void throttle();
00248
00249 void unthrottle();
00250
00251 bool is_throttled() const;
00252
00253
00254
00255
00256
00257
00258
00259 void set_throttle_src(
00260 HttpTunnelProducer* srcp
00261 );
00262 };
00263
00264 class PostDataBuffers
00265 {
00266 public:
00267 PostDataBuffers()
00268 : postdata_producer_buffer(NULL), postdata_copy_buffer(NULL), postdata_producer_reader(NULL),
00269 postdata_copy_buffer_start(NULL), ua_buffer_reader(NULL)
00270 { Debug("http_redirect", "[PostDataBuffers::PostDataBuffers]"); }
00271
00272 MIOBuffer *postdata_producer_buffer;
00273 MIOBuffer *postdata_copy_buffer;
00274 IOBufferReader *postdata_producer_reader;
00275 IOBufferReader *postdata_copy_buffer_start;
00276 IOBufferReader *ua_buffer_reader;
00277 };
00278
00279 class HttpTunnel:public Continuation
00280 {
00281 friend class HttpPagesHandler;
00282 friend class CoreUtils;
00283
00284
00285
00286
00287
00288
00289
00290
00291 struct FlowControl {
00292
00293 static uint64_t const DEFAULT_WATER_MARK = 1<<16;
00294
00295 uint64_t high_water;
00296 uint64_t low_water;
00297 bool enabled_p;
00298
00299
00300 FlowControl();
00301 };
00302
00303 public:
00304 HttpTunnel();
00305
00306 void init(HttpSM * sm_arg, ProxyMutex * amutex);
00307 void reset();
00308 void kill_tunnel();
00309 bool is_tunnel_active() const { return active; }
00310 bool is_tunnel_alive() const;
00311 bool has_cache_writer() const;
00312
00313
00314 void copy_partial_post_data();
00315 void allocate_redirect_postdata_producer_buffer();
00316 void allocate_redirect_postdata_buffers(IOBufferReader * ua_reader);
00317 void deallocate_redirect_postdata_buffers();
00318
00319 HttpTunnelProducer *add_producer(VConnection * vc,
00320 int64_t nbytes,
00321 IOBufferReader * reader_start,
00322 HttpProducerHandler sm_handler, HttpTunnelType_t vc_type, const char *name);
00323
00324 void set_producer_chunking_action(HttpTunnelProducer * p, int64_t skip_bytes, TunnelChunkingAction_t action);
00325
00326 void set_producer_chunking_size(HttpTunnelProducer* producer, int64_t size);
00327
00328 HttpTunnelConsumer *add_consumer(VConnection * vc,
00329 VConnection * producer,
00330 HttpConsumerHandler sm_handler,
00331 HttpTunnelType_t vc_type, const char *name, int64_t skip_bytes = 0);
00332
00333 int deallocate_buffers();
00334 DLL<HttpTunnelConsumer> *get_consumers(VConnection * vc);
00335 HttpTunnelProducer *get_producer(VConnection * vc);
00336 HttpTunnelConsumer *get_consumer(VConnection * vc);
00337 void tunnel_run(HttpTunnelProducer * p = NULL);
00338
00339 int main_handler(int event, void *data);
00340 bool consumer_reenable(HttpTunnelConsumer* c);
00341 bool consumer_handler(int event, HttpTunnelConsumer * c);
00342 bool producer_handler(int event, HttpTunnelProducer * p);
00343 int producer_handler_dechunked(int event, HttpTunnelProducer * p);
00344 int producer_handler_chunked(int event, HttpTunnelProducer * p);
00345 void local_finish_all(HttpTunnelProducer * p);
00346 void chain_finish_all(HttpTunnelProducer * p);
00347 void chain_abort_cache_write(HttpTunnelProducer * p);
00348 void chain_abort_all(HttpTunnelProducer * p);
00349 void abort_cache_write_finish_others(HttpTunnelProducer * p);
00350 void append_message_to_producer_buffer(HttpTunnelProducer * p, const char *msg, int64_t msg_len);
00351
00352
00353
00354
00355
00356
00357
00358
00359 void chain(
00360 HttpTunnelConsumer* c,
00361 HttpTunnelProducer* p
00362 );
00363
00364 void close_vc(HttpTunnelProducer * p);
00365 void close_vc(HttpTunnelConsumer * c);
00366
00367 private:
00368
00369 void internal_error();
00370 void finish_all_internal(HttpTunnelProducer * p, bool chain);
00371 void update_stats_after_abort(HttpTunnelType_t t);
00372 void producer_run(HttpTunnelProducer * p);
00373
00374 HttpTunnelProducer *get_producer(VIO * vio);
00375 HttpTunnelConsumer *get_consumer(VIO * vio);
00376
00377 HttpTunnelProducer *alloc_producer();
00378 HttpTunnelConsumer *alloc_consumer();
00379
00380 int num_producers;
00381 int num_consumers;
00382 HttpTunnelConsumer consumers[MAX_CONSUMERS];
00383 HttpTunnelProducer producers[MAX_PRODUCERS];
00384 HttpSM *sm;
00385
00386 bool active;
00387
00388
00389 FlowControl flow_state;
00390
00391 public:
00392 PostDataBuffers * postbuf;
00393 };
00394
00395
00396
00397
00398
00399
00400 inline void
00401 HttpTunnel::abort_cache_write_finish_others(HttpTunnelProducer * p)
00402 {
00403 chain_abort_cache_write(p);
00404 local_finish_all(p);
00405 }
00406
00407
00408
00409
00410
00411
00412 inline void
00413 HttpTunnel::local_finish_all(HttpTunnelProducer * p)
00414 {
00415 finish_all_internal(p, false);
00416 }
00417
00418
00419
00420
00421
00422
00423
00424 inline void
00425 HttpTunnel::chain_finish_all(HttpTunnelProducer * p)
00426 {
00427 finish_all_internal(p, true);
00428 }
00429
00430 inline bool
00431 HttpTunnel::is_tunnel_alive() const
00432 {
00433 bool tunnel_alive = false;
00434
00435 for (int i = 0; i < MAX_PRODUCERS; i++) {
00436 if (producers[i].alive == true) {
00437 tunnel_alive = true;
00438 break;
00439 }
00440 }
00441 if (!tunnel_alive) {
00442 for (int i = 0; i < MAX_CONSUMERS; i++) {
00443 if (consumers[i].alive == true) {
00444 tunnel_alive = true;
00445 break;
00446 }
00447 }
00448
00449 }
00450
00451 return tunnel_alive;
00452 }
00453
00454 inline HttpTunnelProducer *
00455 HttpTunnel::get_producer(VConnection * vc)
00456 {
00457 for (int i = 0; i < MAX_PRODUCERS; i++) {
00458 if (producers[i].vc == vc) {
00459 return producers + i;
00460 }
00461 }
00462 return NULL;
00463 }
00464
00465 inline HttpTunnelConsumer *
00466 HttpTunnel::get_consumer(VConnection * vc)
00467 {
00468 for (int i = 0; i < MAX_CONSUMERS; i++) {
00469 if (consumers[i].vc == vc) {
00470 return consumers + i;
00471 }
00472 }
00473 return NULL;
00474 }
00475
00476 inline HttpTunnelProducer *
00477 HttpTunnel::get_producer(VIO * vio)
00478 {
00479 for (int i = 0; i < MAX_PRODUCERS; i++) {
00480 if (producers[i].read_vio == vio) {
00481 return producers + i;
00482 }
00483 }
00484 return NULL;
00485 }
00486
00487 inline HttpTunnelConsumer *
00488 HttpTunnel::get_consumer(VIO * vio)
00489 {
00490 for (int i = 0; i < MAX_CONSUMERS; i++) {
00491 if (consumers[i].write_vio == vio) {
00492 return consumers + i;
00493 }
00494 }
00495 return NULL;
00496 }
00497
00498 inline void
00499 HttpTunnel::append_message_to_producer_buffer(HttpTunnelProducer * p, const char *msg, int64_t msg_len)
00500 {
00501 if (p == NULL || p->read_buffer == NULL)
00502 return;
00503
00504 p->read_buffer->write(msg, msg_len);
00505 p->nbytes += msg_len;
00506 p->bytes_read += msg_len;
00507 }
00508
00509 inline bool
00510 HttpTunnel::has_cache_writer() const
00511 {
00512 for (int i = 0; i < MAX_CONSUMERS; i++) {
00513 if (consumers[i].vc_type == HT_CACHE_WRITE && consumers[i].vc != NULL) {
00514 return true;
00515 }
00516 }
00517 return false;
00518 }
00519
00520 inline bool
00521 HttpTunnelConsumer::is_downstream_from(VConnection *vc)
00522 {
00523 HttpTunnelProducer* p = producer;
00524 HttpTunnelConsumer* c;
00525 while (p) {
00526 if (p->vc == vc) return true;
00527
00528
00529
00530 c = p->self_consumer;
00531 p = (c && c != this) ? c->producer : 0;
00532 }
00533 return false;
00534 }
00535
00536 inline bool
00537 HttpTunnelConsumer::is_sink() const
00538 {
00539 return HT_HTTP_CLIENT == vc_type || HT_CACHE_WRITE == vc_type;
00540 }
00541
00542 inline bool
00543 HttpTunnelProducer::is_source() const
00544 {
00545
00546
00547 return HT_HTTP_SERVER == vc_type || HT_CACHE_READ == vc_type || HT_HTTP_CLIENT == vc_type;
00548 }
00549
00550 inline bool
00551 HttpTunnelProducer::is_throttled() const
00552 {
00553 return 0 != flow_control_source;
00554 }
00555
00556 inline void
00557 HttpTunnelProducer::throttle()
00558 {
00559 if (!this->is_throttled())
00560 this->set_throttle_src(this);
00561 }
00562
00563 inline void
00564 HttpTunnelProducer::unthrottle()
00565 {
00566 if (this->is_throttled())
00567 this->set_throttle_src(0);
00568 }
00569
00570 inline
00571 HttpTunnel::FlowControl::FlowControl()
00572 : high_water(DEFAULT_WATER_MARK)
00573 , low_water(DEFAULT_WATER_MARK)
00574 , enabled_p(false)
00575 {
00576 }
00577
00578 #endif