00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 #include "ink_config.h"
00025 #include "FetchSM.h"
00026 #include <stdio.h>
00027 #include "HTTP.h"
00028 #include "PluginVC.h"
00029 
00030 #define DEBUG_TAG "FetchSM"
00031 #define FETCH_LOCK_RETRY_TIME HRTIME_MSECONDS(10)
00032 
00033 ClassAllocator < FetchSM > FetchSMAllocator("FetchSMAllocator");
00034 void
00035 FetchSM::cleanUp()
00036 {
00037   Debug(DEBUG_TAG, "[%s] calling cleanup", __FUNCTION__);
00038 
00039   if (resp_is_chunked > 0 && (fetch_flags & TS_FETCH_FLAGS_DECHUNK)) {
00040     chunked_handler.clear();
00041    }
00042 
00043   free_MIOBuffer(req_buffer);
00044   free_MIOBuffer(resp_buffer);
00045   mutex.clear();
00046   http_parser_clear(&http_parser);
00047   client_response_hdr.destroy();
00048   ats_free(client_response);
00049   cont_mutex.clear();
00050   http_vc->do_io_close();
00051   FetchSMAllocator.free(this);
00052 }
00053 
00054 void
00055 FetchSM::httpConnect()
00056 {
00057   PluginIdentity* pi = dynamic_cast<PluginIdentity*>(contp);
00058   char const* tag = pi ? pi->getPluginTag() : "fetchSM";
00059   int64_t id = pi ? pi->getPluginId() : 0;
00060 
00061   Debug(DEBUG_TAG, "[%s] calling httpconnect write", __FUNCTION__);
00062   http_vc = reinterpret_cast<PluginVC*>(TSHttpConnectWithPluginId(&_addr.sa, tag, id));
00063 
00064   
00065 
00066 
00067 
00068 
00069   if (!is_internal_request) {
00070     PluginVC* other_side = reinterpret_cast<PluginVC*>(http_vc)->get_other_side();
00071     if (other_side != NULL) {
00072       other_side->set_is_internal_request(false);
00073     }
00074   }
00075 
00076   read_vio = http_vc->do_io_read(this, INT64_MAX, resp_buffer);
00077   write_vio = http_vc->do_io_write(this, getReqLen() + req_content_length, req_reader);
00078 }
00079 
00080 char* FetchSM::resp_get(int *length) {
00081   *length = client_bytes;
00082   return client_response;
00083 }
00084 
00085 int
00086 FetchSM::InvokePlugin(int event, void *data)
00087 {
00088   EThread *mythread = this_ethread();
00089 
00090   MUTEX_TAKE_LOCK(contp->mutex,mythread);
00091 
00092   int ret = contp->handleEvent(event,data);
00093 
00094   MUTEX_UNTAKE_LOCK(contp->mutex,mythread);
00095 
00096   return ret;
00097 }
00098 
00099 bool
00100 FetchSM::has_body()
00101 {
00102   int status_code;
00103   HTTPHdr *hdr;
00104 
00105   if (!header_done)
00106     return false;
00107 
00108   
00109   
00110   
00111   
00112 
00113   hdr = &client_response_hdr;
00114 
00115   status_code = hdr->status_get();
00116   if (status_code < 200 || status_code == 204 || status_code == 304)
00117     return false;
00118 
00119   if (check_chunked())
00120     return true;
00121 
00122   resp_content_length = hdr->value_get_int64(MIME_FIELD_CONTENT_LENGTH, MIME_LEN_CONTENT_LENGTH);
00123   if (!resp_content_length)
00124     return false;
00125 
00126   return true;
00127 }
00128 
00129 bool
00130 FetchSM::check_body_done()
00131 {
00132   if (!check_chunked()) {
00133     if (resp_content_length == resp_recived_body_len + resp_reader->read_avail())
00134       return true;
00135 
00136     return false;
00137   }
00138 
00139   
00140   
00141   
00142   return true;
00143 }
00144 
00145 bool
00146 FetchSM::check_chunked()
00147 {
00148   int ret;
00149   StrList slist;
00150   HTTPHdr *hdr = &client_response_hdr;
00151   if (resp_is_chunked >= 0)
00152     return resp_is_chunked;
00153 
00154   ink_release_assert(header_done);
00155 
00156   resp_is_chunked = 0;
00157   ret = hdr->value_get_comma_list(MIME_FIELD_TRANSFER_ENCODING,
00158                                   MIME_LEN_TRANSFER_ENCODING, &slist);
00159   if (ret) {
00160     for (Str *f = slist.head; f != NULL; f = f->next) {
00161       if (f->len == 0)
00162         continue;
00163 
00164       size_t len = sizeof("chunked") - 1;
00165       len = len > f->len ? f->len : len;
00166       if (!strncasecmp(f->str, "chunked", len)) {
00167         resp_is_chunked = 1;
00168         if (fetch_flags & TS_FETCH_FLAGS_DECHUNK) {
00169           ChunkedHandler *ch = &chunked_handler;
00170           ch->init_by_action(resp_reader, ChunkedHandler::ACTION_DECHUNK);
00171           ch->dechunked_reader = ch->dechunked_buffer->alloc_reader();
00172           ch->state = ChunkedHandler::CHUNK_READ_SIZE;
00173           resp_reader->dealloc();
00174         }
00175         return true;
00176       }
00177     }
00178   }
00179 
00180   return resp_is_chunked;
00181 }
00182 
00183 int
00184 FetchSM::dechunk_body()
00185 {
00186   ink_assert(resp_is_chunked > 0);
00187   
00188   
00189   
00190   
00191   
00192   
00193   if (chunked_handler.process_chunked_content())
00194     return TS_FETCH_EVENT_EXT_BODY_DONE;
00195 
00196   if (chunked_handler.dechunked_reader->read_avail())
00197     return TS_FETCH_EVENT_EXT_BODY_READY;
00198 
00199   return 0;
00200 }
00201 
00202 void
00203 FetchSM::InvokePluginExt(int error_event)
00204 {
00205   int event;
00206   EThread *mythread = this_ethread();
00207 
00208   
00209   
00210   
00211   
00212   recursion++;
00213 
00214   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
00215     MUTEX_TAKE_LOCK(cont_mutex, mythread);
00216   }
00217 
00218   if (!contp)
00219     goto out;
00220 
00221   if (error_event) {
00222     contp->handleEvent(error_event, this);
00223     goto out;
00224   }
00225 
00226   if (!has_sent_header) {
00227     contp->handleEvent(TS_FETCH_EVENT_EXT_HEAD_DONE, this);
00228     has_sent_header = true;
00229   }
00230 
00231   if (!has_body()) {
00232     contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
00233     goto out;
00234   }
00235 
00236   Debug(DEBUG_TAG, "[%s] chunked:%d, content_len: %" PRId64 ", recived_len: %" PRId64 ", avail: %" PRId64 "\n",
00237         __FUNCTION__, resp_is_chunked, resp_content_length, resp_recived_body_len,
00238         resp_is_chunked > 0 ? chunked_handler.chunked_reader->read_avail() : resp_reader->read_avail());
00239 
00240   if (resp_is_chunked > 0) {
00241     if (!chunked_handler.chunked_reader->read_avail())
00242       goto out;
00243   } else if (!resp_reader->read_avail()) {
00244       goto out;
00245   }
00246 
00247   if (!check_chunked()) {
00248     if (!check_body_done())
00249       contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_READY, this);
00250     else
00251       contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
00252   } else if (fetch_flags & TS_FETCH_FLAGS_DECHUNK){
00253     do {
00254       if (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL) {
00255         chunked_handler.state = ChunkedHandler::CHUNK_READ_SIZE_START;
00256       }
00257 
00258       event = dechunk_body();
00259       if (!event) {
00260         read_vio->reenable();
00261         goto out;
00262       }
00263 
00264       contp->handleEvent(event, this);
00265     } while (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL);
00266   } else if (check_body_done()){
00267     contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
00268   } else {
00269     contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_READY, this);
00270   }
00271 
00272 out:
00273   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
00274     MUTEX_UNTAKE_LOCK(cont_mutex, mythread);
00275   }
00276   recursion--;
00277 
00278   if (!contp && !recursion)
00279     cleanUp();
00280 
00281   return;
00282 }
00283 
00284 void
00285 FetchSM::get_info_from_buffer(IOBufferReader *the_reader)
00286 {
00287   char *buf, *info;
00288   int64_t read_avail, read_done;
00289   IOBufferBlock *blk;
00290   IOBufferReader *reader = the_reader;
00291 
00292   if (!reader) {
00293     client_bytes = 0;
00294     return ;
00295   }
00296 
00297   read_avail = reader->read_avail();
00298   Debug(DEBUG_TAG, "[%s] total avail %" PRId64 , __FUNCTION__, read_avail);
00299   if (!read_avail) {
00300     client_bytes = 0;
00301     return;
00302   }
00303 
00304   info = (char *)ats_malloc(sizeof(char) * (read_avail+1));
00305   client_response = info;
00306 
00307   
00308   if (!(fetch_flags & TS_FETCH_FLAGS_STREAM) || !check_chunked()) {
00309     
00310     while (read_avail > 0) {
00311       if (reader->block != NULL)
00312         reader->skip_empty_blocks();
00313       blk = reader->block;
00314 
00315       
00316       buf = blk->start() + reader->start_offset;
00317       read_done = blk->read_avail() - reader->start_offset;
00318 
00319       if (read_done > 0) {
00320         memcpy(info, buf, read_done);
00321         reader->consume(read_done);
00322         read_avail -= read_done;
00323         info += read_done;
00324         client_bytes += read_done;
00325       }
00326     }
00327     client_response[client_bytes] = '\0';
00328     return;
00329   }
00330 
00331   reader = chunked_handler.dechunked_reader;
00332   do {
00333     if (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL) {
00334       chunked_handler.state = ChunkedHandler::CHUNK_READ_SIZE_START;
00335     }
00336 
00337     if (!dechunk_body())
00338       break;
00339 
00340     
00341     read_avail = reader->read_avail();
00342     while (read_avail > 0) {
00343       if (reader->block != NULL)
00344         reader->skip_empty_blocks();
00345       blk = reader->block;
00346 
00347       
00348       buf = blk->start() + reader->start_offset;
00349       read_done = blk->read_avail() - reader->start_offset;
00350 
00351       if (read_done > 0) {
00352         memcpy(info, buf, read_done);
00353         reader->consume(read_done);
00354         read_avail -= read_done;
00355         info += read_done;
00356         client_bytes += read_done;
00357       }
00358     }
00359   } while (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL);
00360 
00361   client_response[client_bytes] = '\0';
00362   return;
00363 }
00364 
00365 void
00366 FetchSM::process_fetch_read(int event)
00367 {
00368   Debug(DEBUG_TAG, "[%s] I am here read", __FUNCTION__);
00369   int64_t bytes;
00370   int bytes_used;
00371   int64_t total_bytes_copied = 0;
00372 
00373   switch (event) {
00374   case TS_EVENT_VCONN_READ_READY:
00375     bytes = resp_reader->read_avail();
00376     Debug(DEBUG_TAG, "[%s] number of bytes in read ready %" PRId64, __FUNCTION__, bytes);
00377 
00378 
00379     while (total_bytes_copied < bytes) {
00380        int64_t actual_bytes_copied;
00381        actual_bytes_copied = resp_buffer->write(resp_reader, bytes, 0);
00382        Debug(DEBUG_TAG, "[%s] copied %" PRId64 " bytes", __FUNCTION__, actual_bytes_copied);
00383        if (actual_bytes_copied <= 0) {
00384            break;
00385        }
00386        total_bytes_copied += actual_bytes_copied;
00387     }
00388     Debug(DEBUG_TAG, "[%s] total copied %" PRId64 " bytes", __FUNCTION__, total_bytes_copied);
00389     resp_reader->consume(total_bytes_copied);
00390 
00391     if (header_done == 0 && ((fetch_flags & TS_FETCH_FLAGS_STREAM) || callback_options == AFTER_HEADER)) {
00392       if (client_response_hdr.parse_resp(&http_parser, resp_reader, &bytes_used, 0) == PARSE_DONE) {
00393         header_done = 1;
00394         if (fetch_flags & TS_FETCH_FLAGS_STREAM)
00395           return InvokePluginExt();
00396         else
00397           InvokePlugin( callback_events.success_event_id, (void *) &client_response_hdr);
00398       }
00399     } else {
00400       if (fetch_flags & TS_FETCH_FLAGS_STREAM)
00401         return InvokePluginExt();
00402     }
00403     read_vio->reenable();
00404     break;
00405   case TS_EVENT_VCONN_READ_COMPLETE:
00406   case TS_EVENT_VCONN_EOS:
00407     if (fetch_flags & TS_FETCH_FLAGS_STREAM)
00408       return InvokePluginExt();
00409     if(callback_options == AFTER_HEADER || callback_options == AFTER_BODY) {
00410       get_info_from_buffer(resp_reader);
00411       InvokePlugin( callback_events.success_event_id, (void *) this);
00412     }
00413     Debug(DEBUG_TAG, "[%s] received EOS", __FUNCTION__);
00414     cleanUp();
00415     break;
00416   case TS_EVENT_ERROR:
00417   default:
00418     if (fetch_flags & TS_FETCH_FLAGS_STREAM)
00419       return InvokePluginExt(event);
00420     InvokePlugin( callback_events.failure_event_id, NULL);
00421     cleanUp();
00422     break;
00423   }
00424 }
00425 
00426 void
00427 FetchSM::process_fetch_write(int event)
00428 {
00429   Debug(DEBUG_TAG, "[%s] calling process write", __FUNCTION__);
00430   switch (event) {
00431   case TS_EVENT_VCONN_WRITE_COMPLETE:
00432     req_finished = true;
00433     break;
00434   case TS_EVENT_VCONN_WRITE_READY:
00435     
00436     
00437     
00438     if (req_reader->read_avail() > 0)
00439       ((PluginVC *) http_vc)->reenable(write_vio);
00440     break;
00441   case TS_EVENT_ERROR:
00442     if (fetch_flags & TS_FETCH_FLAGS_STREAM)
00443       return InvokePluginExt(event);
00444     InvokePlugin( callback_events.failure_event_id, NULL);
00445     cleanUp();
00446   default:
00447     break;
00448   }
00449 }
00450 
00451 int
00452 FetchSM::fetch_handler(int event, void *edata)
00453 {
00454   Debug(DEBUG_TAG, "[%s] calling fetch_plugin", __FUNCTION__);
00455 
00456   if (edata == read_vio) {
00457     process_fetch_read(event);
00458   } else if (edata == write_vio) {
00459     process_fetch_write(event);
00460   } else {
00461     if (fetch_flags & TS_FETCH_FLAGS_STREAM) {
00462       InvokePluginExt(event);
00463       return 1;
00464     }
00465     InvokePlugin( callback_events.failure_event_id, NULL);
00466     cleanUp();
00467   }
00468   return 1;
00469 }
00470 
00471 void
00472 FetchSM::ext_init(Continuation *cont, const char *method,
00473                   const char *url, const char *version,
00474                   const sockaddr *client_addr, int flags)
00475 {
00476   init_comm();
00477 
00478   if (flags & TS_FETCH_FLAGS_NEWLOCK) {
00479     mutex = new_ProxyMutex();
00480     cont_mutex = cont->mutex;
00481   } else {
00482     mutex = cont->mutex;
00483   }
00484 
00485   contp = cont;
00486   _addr.assign(client_addr);
00487 
00488   
00489   
00490   
00491   fetch_flags = (TS_FETCH_FLAGS_STREAM | flags);
00492   if (fetch_flags & TS_FETCH_FLAGS_NOT_INTERNAL_REQUEST) {
00493     set_internal_request(false);
00494   }
00495 
00496   
00497   
00498   
00499   
00500   memset(&callback_options, 0, sizeof(callback_options));
00501   memset(&callback_events, 0, sizeof(callback_events));
00502 
00503   req_buffer->write(method, strlen(method));
00504   req_buffer->write(" ", 1);
00505   req_buffer->write(url, strlen(url));
00506   req_buffer->write(" ", 1);
00507   req_buffer->write(version, strlen(version));
00508   req_buffer->write("\r\n", 2);
00509 }
00510 
00511 void
00512 FetchSM::ext_add_header(const char *name, int name_len,
00513                         const char *value, int value_len)
00514 {
00515   if (TS_MIME_LEN_CONTENT_LENGTH == name_len &&
00516       !strncasecmp(TS_MIME_FIELD_CONTENT_LENGTH, name, name_len)) {
00517     req_content_length = atoll(value);
00518   }
00519 
00520   req_buffer->write(name, name_len);
00521   req_buffer->write(": ", 2);
00522   req_buffer->write(value, value_len);
00523   req_buffer->write("\r\n", 2);
00524 }
00525 
00526 void
00527 FetchSM::ext_lanuch()
00528 {
00529   req_buffer->write("\r\n", 2);
00530   httpConnect();
00531 }
00532 
00533 void
00534 FetchSM::ext_write_data(const void *data, size_t len)
00535 {
00536   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
00537     MUTEX_TAKE_LOCK(mutex, this_ethread());
00538   }
00539   req_buffer->write(data, len);
00540 
00541   Debug(DEBUG_TAG, "[%s] re-enabling write_vio, header_done %u", __FUNCTION__, header_done);
00542   write_vio->reenable();
00543 
00544   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
00545     MUTEX_UNTAKE_LOCK(mutex, this_ethread());
00546   }
00547 }
00548 
00549 ssize_t
00550 FetchSM::ext_read_data(char *buf, size_t len)
00551 {
00552   const char *start;
00553   TSIOBufferReader reader;
00554   TSIOBufferBlock blk, next_blk;
00555   int64_t already, blk_len, need, wavail;
00556 
00557   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
00558     MUTEX_TRY_LOCK(lock, mutex, this_ethread());
00559     if (!lock)
00560       return 0;
00561   }
00562 
00563   if (!header_done)
00564     return 0;
00565 
00566   if (check_chunked() && (fetch_flags & TS_FETCH_FLAGS_DECHUNK))
00567     reader = (tsapi_bufferreader*)chunked_handler.dechunked_reader;
00568   else
00569     reader = (TSIOBufferReader)resp_reader;
00570 
00571   already = 0;
00572   blk = TSIOBufferReaderStart(reader);
00573 
00574   while (blk) {
00575 
00576     wavail = len - already;
00577 
00578     next_blk = TSIOBufferBlockNext(blk);
00579     start = TSIOBufferBlockReadStart(blk, reader, &blk_len);
00580 
00581     need = blk_len > wavail ? wavail : blk_len;
00582 
00583     memcpy(&buf[already], start, need);
00584     already += need;
00585 
00586     if (already >= (int64_t)len)
00587       break;
00588 
00589     blk = next_blk;
00590   }
00591 
00592   resp_recived_body_len += already;
00593   TSIOBufferReaderConsume(reader, already);
00594 
00595   read_vio->reenable();
00596   return already;
00597 }
00598 
00599 void
00600 FetchSM::ext_destroy()
00601 {
00602   contp = NULL;
00603 
00604   if (recursion)
00605     return;
00606 
00607   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
00608     MUTEX_TRY_LOCK(lock, mutex, this_ethread());
00609     if (!lock) {
00610       eventProcessor.schedule_in(this, FETCH_LOCK_RETRY_TIME);
00611       return;
00612     }
00613   }
00614 
00615   cleanUp();
00616 }
00617 
00618 void
00619 FetchSM::ext_set_user_data(void *data)
00620 {
00621   user_data = data;
00622 }
00623 
00624 void*
00625 FetchSM::ext_get_user_data()
00626 {
00627   return user_data;
00628 }
00629 
00630 TSMBuffer
00631 FetchSM::resp_hdr_bufp()
00632 {
00633   HdrHeapSDKHandle *heap;
00634   heap = (HdrHeapSDKHandle *)&client_response_hdr;
00635 
00636   return (TSMBuffer)heap;
00637 }
00638 
00639 TSMLoc
00640 FetchSM::resp_hdr_mloc()
00641 {
00642   return (TSMLoc)client_response_hdr.m_http;
00643 }