00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 #ifndef _HTTP_TUNNEL_H_
00034 #define _HTTP_TUNNEL_H_
00035 
00036 #include "libts.h"
00037 #include "P_EventSystem.h"
00038 
00039 
00040 #ifdef MAX_PRODUCERS
00041 #undef MAX_PRODUCERS
00042 #endif
00043 #ifdef MAX_CONSUMERS
00044 #undef MAX_CONSUMERS
00045 #endif
00046 #define MAX_PRODUCERS   2
00047 #define MAX_CONSUMERS   4
00048 
00049 #define HTTP_TUNNEL_EVENT_DONE             (HTTP_TUNNEL_EVENTS_START + 1)
00050 #define HTTP_TUNNEL_EVENT_PRECOMPLETE      (HTTP_TUNNEL_EVENTS_START + 2)
00051 #define HTTP_TUNNEL_EVENT_CONSUMER_DETACH  (HTTP_TUNNEL_EVENTS_START + 3)
00052 
00053 #define HTTP_TUNNEL_STATIC_PRODUCER  (VConnection*)!0
00054 
00055 
00056 #define ALLOCATE_AND_WRITE_TO_BUF 1
00057 #define WRITE_TO_BUF 2
00058 
00059 struct HttpTunnelProducer;
00060 class HttpSM;
00061 class HttpPagesHandler;
00062 typedef int (HttpSM::*HttpSMHandler) (int event, void *data);
00063 
00064 struct HttpTunnelConsumer;
00065 struct HttpTunnelProducer;
00066 typedef int (HttpSM::*HttpProducerHandler) (int event, HttpTunnelProducer * p);
00067 typedef int (HttpSM::*HttpConsumerHandler) (int event, HttpTunnelConsumer * c);
00068 
00069 enum HttpTunnelType_t
00070 {
00071   HT_HTTP_SERVER,
00072   HT_HTTP_CLIENT,
00073   HT_CACHE_READ,
00074   HT_CACHE_WRITE,
00075   HT_TRANSFORM,
00076   HT_STATIC
00077 };
00078 
00079 enum TunnelChunkingAction_t
00080 {
00081   TCA_CHUNK_CONTENT,
00082   TCA_DECHUNK_CONTENT,
00083   TCA_PASSTHRU_CHUNKED_CONTENT,
00084   TCA_PASSTHRU_DECHUNKED_CONTENT
00085 };
00086 
00087 struct ChunkedHandler
00088 {
00089   enum ChunkedState {
00090     CHUNK_READ_CHUNK = 0,
00091     CHUNK_READ_SIZE_START,
00092     CHUNK_READ_SIZE,
00093     CHUNK_READ_SIZE_CRLF,
00094     CHUNK_READ_TRAILER_BLANK,
00095     CHUNK_READ_TRAILER_CR,
00096     CHUNK_READ_TRAILER_LINE,
00097     CHUNK_READ_ERROR,
00098     CHUNK_READ_DONE,
00099     CHUNK_WRITE_CHUNK,
00100     CHUNK_WRITE_DONE,
00101     CHUNK_FLOW_CONTROL
00102   };
00103 
00104   static int const DEFAULT_MAX_CHUNK_SIZE = 4096;
00105 
00106   enum Action {
00107     ACTION_DOCHUNK = 0,
00108     ACTION_DECHUNK,
00109     ACTION_PASSTHRU,
00110   };
00111 
00112   Action action;
00113 
00114   IOBufferReader *chunked_reader;
00115   MIOBuffer *dechunked_buffer;
00116   int64_t dechunked_size;
00117 
00118   IOBufferReader *dechunked_reader;
00119   MIOBuffer *chunked_buffer;
00120   int64_t chunked_size;
00121 
00122   bool truncation;
00123   int64_t skip_bytes;
00124 
00125   ChunkedState state;
00126   int64_t cur_chunk_size;
00127   int64_t bytes_left;
00128   int last_server_event;
00129 
00130   
00131   int running_sum;
00132   int num_digits;
00133 
00134 
00135 
00136 
00137 
00138   int64_t max_chunk_size;
00139 
00140 
00141 
00142   char max_chunk_header[16];
00143   int max_chunk_header_len;
00144 
00145   ChunkedHandler();
00146 
00147   void init(IOBufferReader *buffer_in, HttpTunnelProducer *p);
00148   void init_by_action(IOBufferReader *buffer_in, Action action);
00149   void clear();
00150 
00151 
00152 
00153   void set_max_chunk_size(int64_t size);
00154 
00155   
00156   bool process_chunked_content();
00157   bool generate_chunked_content();
00158 
00159 private:
00160   void read_size();
00161   void read_chunk();
00162   void read_trailer();
00163   int64_t transfer_bytes();
00164 };
00165 
00166 struct HttpTunnelConsumer
00167 {
00168   HttpTunnelConsumer();
00169 
00170   LINK(HttpTunnelConsumer, link);
00171   HttpTunnelProducer *producer;
00172   HttpTunnelProducer *self_producer;
00173 
00174   HttpTunnelType_t vc_type;
00175   VConnection *vc;
00176   IOBufferReader *buffer_reader;
00177   HttpConsumerHandler vc_handler;
00178   VIO *write_vio;
00179 
00180   int64_t skip_bytes;               
00181   int64_t bytes_written;            
00182   int handler_state;              
00183 
00184   bool alive;
00185   bool write_success;
00186   const char *name;
00187 
00188 
00189 
00190 
00191 
00192   bool is_downstream_from(VConnection* vc);
00193 
00194 
00195 
00196   bool is_sink() const;
00197 };
00198 
00199 struct HttpTunnelProducer
00200 {
00201   HttpTunnelProducer();
00202 
00203   DLL<HttpTunnelConsumer> consumer_list;
00204   HttpTunnelConsumer *self_consumer;
00205   VConnection *vc;
00206   HttpProducerHandler vc_handler;
00207   VIO *read_vio;
00208   MIOBuffer *read_buffer;
00209   IOBufferReader *buffer_start;
00210   HttpTunnelType_t vc_type;
00211 
00212   ChunkedHandler chunked_handler;
00213   TunnelChunkingAction_t chunking_action;
00214 
00215   bool do_chunking;
00216   bool do_dechunking;
00217   bool do_chunked_passthru;
00218 
00219   int64_t init_bytes_done;          
00220   int64_t nbytes;                   
00221   int64_t ntodo;                    
00222   int64_t bytes_read;               
00223   int handler_state;              
00224   int last_event;                   
00225 
00226   int num_consumers;
00227 
00228   bool alive;
00229   bool read_success;
00230 
00231 
00232 
00233   HttpTunnelProducer* flow_control_source;
00234   const char *name;
00235 
00236 
00237 
00238 
00239 
00240   uint64_t backlog(
00241                    uint64_t limit = UINT64_MAX 
00242                    );
00243 
00244 
00245   bool is_source() const;
00246 
00247   void throttle();
00248 
00249   void unthrottle();
00250 
00251   bool is_throttled() const;
00252 
00253 
00254 
00255 
00256 
00257 
00258 
00259   void set_throttle_src(
00260                         HttpTunnelProducer* srcp 
00261                         );
00262 };
00263 
00264 class PostDataBuffers
00265 {
00266 public:
00267   PostDataBuffers()
00268     : postdata_producer_buffer(NULL), postdata_copy_buffer(NULL), postdata_producer_reader(NULL),
00269       postdata_copy_buffer_start(NULL), ua_buffer_reader(NULL)
00270   { Debug("http_redirect", "[PostDataBuffers::PostDataBuffers]");  }
00271 
00272   MIOBuffer *postdata_producer_buffer;
00273   MIOBuffer *postdata_copy_buffer;
00274   IOBufferReader *postdata_producer_reader;
00275   IOBufferReader *postdata_copy_buffer_start;
00276   IOBufferReader *ua_buffer_reader;
00277 };
00278 
00279 class HttpTunnel:public Continuation
00280 {
00281   friend class HttpPagesHandler;
00282   friend class CoreUtils;
00283 
00284 
00285 
00286 
00287 
00288 
00289 
00290 
00291   struct FlowControl {
00292     
00293     static uint64_t const DEFAULT_WATER_MARK = 1<<16;
00294 
00295     uint64_t high_water; 
00296     uint64_t low_water; 
00297     bool enabled_p; 
00298 
00299 
00300     FlowControl();
00301   };
00302 
00303 public:
00304   HttpTunnel();
00305 
00306   void init(HttpSM * sm_arg, ProxyMutex * amutex);
00307   void reset();
00308   void kill_tunnel();
00309   bool is_tunnel_active() const { return active; }
00310   bool is_tunnel_alive() const;
00311   bool has_cache_writer() const;
00312 
00313   
00314   void copy_partial_post_data();
00315   void allocate_redirect_postdata_producer_buffer();
00316   void allocate_redirect_postdata_buffers(IOBufferReader * ua_reader);
00317   void deallocate_redirect_postdata_buffers();
00318 
00319   HttpTunnelProducer *add_producer(VConnection * vc,
00320                                    int64_t nbytes,
00321                                    IOBufferReader * reader_start,
00322                                    HttpProducerHandler sm_handler, HttpTunnelType_t vc_type, const char *name);
00323 
00324   void set_producer_chunking_action(HttpTunnelProducer * p, int64_t skip_bytes, TunnelChunkingAction_t action);
00325 
00326   void set_producer_chunking_size(HttpTunnelProducer* producer, int64_t size);
00327 
00328   HttpTunnelConsumer *add_consumer(VConnection * vc,
00329                                    VConnection * producer,
00330                                    HttpConsumerHandler sm_handler,
00331                                    HttpTunnelType_t vc_type, const char *name, int64_t skip_bytes = 0);
00332 
00333   int deallocate_buffers();
00334   DLL<HttpTunnelConsumer> *get_consumers(VConnection * vc);
00335   HttpTunnelProducer *get_producer(VConnection * vc);
00336   HttpTunnelConsumer *get_consumer(VConnection * vc);
00337   void tunnel_run(HttpTunnelProducer * p = NULL);
00338 
00339   int main_handler(int event, void *data);
00340   bool consumer_reenable(HttpTunnelConsumer* c);
00341   bool consumer_handler(int event, HttpTunnelConsumer * c);
00342   bool producer_handler(int event, HttpTunnelProducer * p);
00343   int producer_handler_dechunked(int event, HttpTunnelProducer * p);
00344   int producer_handler_chunked(int event, HttpTunnelProducer * p);
00345   void local_finish_all(HttpTunnelProducer * p);
00346   void chain_finish_all(HttpTunnelProducer * p);
00347   void chain_abort_cache_write(HttpTunnelProducer * p);
00348   void chain_abort_all(HttpTunnelProducer * p);
00349   void abort_cache_write_finish_others(HttpTunnelProducer * p);
00350   void append_message_to_producer_buffer(HttpTunnelProducer * p, const char *msg, int64_t msg_len);
00351 
00352 
00353 
00354 
00355 
00356 
00357 
00358 
00359   void chain(
00360              HttpTunnelConsumer* c,  
00361              HttpTunnelProducer* p   
00362              );
00363 
00364   void close_vc(HttpTunnelProducer * p);
00365   void close_vc(HttpTunnelConsumer * c);
00366 
00367 private:
00368 
00369   void internal_error();
00370   void finish_all_internal(HttpTunnelProducer * p, bool chain);
00371   void update_stats_after_abort(HttpTunnelType_t t);
00372   void producer_run(HttpTunnelProducer * p);
00373 
00374   HttpTunnelProducer *get_producer(VIO * vio);
00375   HttpTunnelConsumer *get_consumer(VIO * vio);
00376 
00377   HttpTunnelProducer *alloc_producer();
00378   HttpTunnelConsumer *alloc_consumer();
00379 
00380   int num_producers;
00381   int num_consumers;
00382   HttpTunnelConsumer consumers[MAX_CONSUMERS];
00383   HttpTunnelProducer producers[MAX_PRODUCERS];
00384   HttpSM *sm;
00385 
00386   bool active;
00387 
00388 
00389   FlowControl flow_state;
00390 
00391 public:
00392   PostDataBuffers * postbuf;
00393 };
00394 
00395 
00396 
00397 
00398 
00399 
00400 inline void
00401 HttpTunnel::abort_cache_write_finish_others(HttpTunnelProducer * p)
00402 {
00403   chain_abort_cache_write(p);
00404   local_finish_all(p);
00405 }
00406 
00407 
00408 
00409 
00410 
00411 
00412 inline void
00413 HttpTunnel::local_finish_all(HttpTunnelProducer * p)
00414 {
00415   finish_all_internal(p, false);
00416 }
00417 
00418 
00419 
00420 
00421 
00422 
00423 
00424 inline void
00425 HttpTunnel::chain_finish_all(HttpTunnelProducer * p)
00426 {
00427   finish_all_internal(p, true);
00428 }
00429 
00430 inline bool
00431 HttpTunnel::is_tunnel_alive() const
00432 {
00433   bool tunnel_alive = false;
00434 
00435   for (int i = 0; i < MAX_PRODUCERS; i++) {
00436     if (producers[i].alive == true) {
00437       tunnel_alive = true;
00438       break;
00439     }
00440   }
00441   if (!tunnel_alive) {
00442     for (int i = 0; i < MAX_CONSUMERS; i++) {
00443       if (consumers[i].alive == true) {
00444         tunnel_alive = true;
00445         break;
00446       }
00447     }
00448 
00449   }
00450 
00451   return tunnel_alive;
00452 }
00453 
00454 inline HttpTunnelProducer *
00455 HttpTunnel::get_producer(VConnection * vc)
00456 {
00457   for (int i = 0; i < MAX_PRODUCERS; i++) {
00458     if (producers[i].vc == vc) {
00459       return producers + i;
00460     }
00461   }
00462   return NULL;
00463 }
00464 
00465 inline HttpTunnelConsumer *
00466 HttpTunnel::get_consumer(VConnection * vc)
00467 {
00468   for (int i = 0; i < MAX_CONSUMERS; i++) {
00469     if (consumers[i].vc == vc) {
00470       return consumers + i;
00471     }
00472   }
00473   return NULL;
00474 }
00475 
00476 inline HttpTunnelProducer *
00477 HttpTunnel::get_producer(VIO * vio)
00478 {
00479   for (int i = 0; i < MAX_PRODUCERS; i++) {
00480     if (producers[i].read_vio == vio) {
00481       return producers + i;
00482     }
00483   }
00484   return NULL;
00485 }
00486 
00487 inline HttpTunnelConsumer *
00488 HttpTunnel::get_consumer(VIO * vio)
00489 {
00490   for (int i = 0; i < MAX_CONSUMERS; i++) {
00491     if (consumers[i].write_vio == vio) {
00492       return consumers + i;
00493     }
00494   }
00495   return NULL;
00496 }
00497 
00498 inline void
00499 HttpTunnel::append_message_to_producer_buffer(HttpTunnelProducer * p, const char *msg, int64_t msg_len)
00500 {
00501   if (p == NULL || p->read_buffer == NULL)
00502     return;
00503 
00504   p->read_buffer->write(msg, msg_len);
00505   p->nbytes += msg_len;
00506   p->bytes_read += msg_len;
00507 }
00508 
00509 inline bool
00510 HttpTunnel::has_cache_writer() const
00511 {
00512   for (int i = 0; i < MAX_CONSUMERS; i++) {
00513     if (consumers[i].vc_type == HT_CACHE_WRITE && consumers[i].vc != NULL) {
00514       return true;
00515     }
00516   }
00517   return false;
00518 }
00519 
00520 inline bool
00521 HttpTunnelConsumer::is_downstream_from(VConnection *vc)
00522 {
00523   HttpTunnelProducer* p = producer;
00524   HttpTunnelConsumer* c;
00525   while (p) {
00526     if (p->vc == vc) return true;
00527     
00528     
00529     
00530     c = p->self_consumer;
00531     p = (c && c != this) ? c->producer : 0;
00532   }
00533   return false;
00534 }
00535 
00536 inline bool
00537 HttpTunnelConsumer::is_sink() const
00538 {
00539   return HT_HTTP_CLIENT == vc_type || HT_CACHE_WRITE == vc_type;
00540 }
00541 
00542 inline bool
00543 HttpTunnelProducer::is_source() const
00544 {
00545   
00546   
00547   return HT_HTTP_SERVER == vc_type || HT_CACHE_READ == vc_type || HT_HTTP_CLIENT == vc_type;
00548 }
00549 
00550 inline bool
00551 HttpTunnelProducer::is_throttled() const
00552 {
00553   return 0 != flow_control_source;
00554 }
00555 
00556 inline void
00557 HttpTunnelProducer::throttle()
00558 {
00559   if (!this->is_throttled())
00560     this->set_throttle_src(this);
00561 }
00562 
00563 inline void
00564 HttpTunnelProducer::unthrottle()
00565 {
00566   if (this->is_throttled())
00567     this->set_throttle_src(0);
00568 }
00569 
00570 inline
00571 HttpTunnel::FlowControl::FlowControl()
00572           : high_water(DEFAULT_WATER_MARK)
00573           , low_water(DEFAULT_WATER_MARK)
00574           , enabled_p(false)
00575 {
00576 }
00577 
00578 #endif