• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

FetchSM.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   Implements callin functions for plugins
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 #include "ink_config.h"
00025 #include "FetchSM.h"
00026 #include <stdio.h>
00027 #include "HTTP.h"
00028 #include "PluginVC.h"
00029 
00030 #define DEBUG_TAG "FetchSM"
00031 #define FETCH_LOCK_RETRY_TIME HRTIME_MSECONDS(10)
00032 
00033 ClassAllocator < FetchSM > FetchSMAllocator("FetchSMAllocator");
00034 void
00035 FetchSM::cleanUp()
00036 {
00037   Debug(DEBUG_TAG, "[%s] calling cleanup", __FUNCTION__);
00038 
00039   if (resp_is_chunked > 0 && (fetch_flags & TS_FETCH_FLAGS_DECHUNK)) {
00040     chunked_handler.clear();
00041    }
00042 
00043   free_MIOBuffer(req_buffer);
00044   free_MIOBuffer(resp_buffer);
00045   mutex.clear();
00046   http_parser_clear(&http_parser);
00047   client_response_hdr.destroy();
00048   ats_free(client_response);
00049   cont_mutex.clear();
00050   http_vc->do_io_close();
00051   FetchSMAllocator.free(this);
00052 }
00053 
00054 void
00055 FetchSM::httpConnect()
00056 {
00057   PluginIdentity* pi = dynamic_cast<PluginIdentity*>(contp);
00058   char const* tag = pi ? pi->getPluginTag() : "fetchSM";
00059   int64_t id = pi ? pi->getPluginId() : 0;
00060 
00061   Debug(DEBUG_TAG, "[%s] calling httpconnect write", __FUNCTION__);
00062   http_vc = reinterpret_cast<PluginVC*>(TSHttpConnectWithPluginId(&_addr.sa, tag, id));
00063 
00064   /*
00065    * TS-2906: We need a way to unset internal request when using FetchSM, the use case for this
00066    * is SPDY when it creates outgoing requests it uses FetchSM and the outgoing requests
00067    * are spawned via SPDY SYN packets which are definitely not internal requests.
00068    */
00069   if (!is_internal_request) {
00070     PluginVC* other_side = reinterpret_cast<PluginVC*>(http_vc)->get_other_side();
00071     if (other_side != NULL) {
00072       other_side->set_is_internal_request(false);
00073     }
00074   }
00075 
00076   read_vio = http_vc->do_io_read(this, INT64_MAX, resp_buffer);
00077   write_vio = http_vc->do_io_write(this, getReqLen() + req_content_length, req_reader);
00078 }
00079 
00080 char* FetchSM::resp_get(int *length) {
00081   *length = client_bytes;
00082   return client_response;
00083 }
00084 
00085 int
00086 FetchSM::InvokePlugin(int event, void *data)
00087 {
00088   EThread *mythread = this_ethread();
00089 
00090   MUTEX_TAKE_LOCK(contp->mutex,mythread);
00091 
00092   int ret = contp->handleEvent(event,data);
00093 
00094   MUTEX_UNTAKE_LOCK(contp->mutex,mythread);
00095 
00096   return ret;
00097 }
00098 
00099 bool
00100 FetchSM::has_body()
00101 {
00102   int status_code;
00103   HTTPHdr *hdr;
00104 
00105   if (!header_done)
00106     return false;
00107 
00108   //
00109   // The following code comply with HTTP/1.1:
00110   // http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.4
00111   //
00112 
00113   hdr = &client_response_hdr;
00114 
00115   status_code = hdr->status_get();
00116   if (status_code < 200 || status_code == 204 || status_code == 304)
00117     return false;
00118 
00119   if (check_chunked())
00120     return true;
00121 
00122   resp_content_length = hdr->value_get_int64(MIME_FIELD_CONTENT_LENGTH, MIME_LEN_CONTENT_LENGTH);
00123   if (!resp_content_length)
00124     return false;
00125 
00126   return true;
00127 }
00128 
00129 bool
00130 FetchSM::check_body_done()
00131 {
00132   if (!check_chunked()) {
00133     if (resp_content_length == resp_recived_body_len + resp_reader->read_avail())
00134       return true;
00135 
00136     return false;
00137   }
00138 
00139   //
00140   // TODO: check whether the chunked body is done
00141   //
00142   return true;
00143 }
00144 
00145 bool
00146 FetchSM::check_chunked()
00147 {
00148   int ret;
00149   StrList slist;
00150   HTTPHdr *hdr = &client_response_hdr;
00151   if (resp_is_chunked >= 0)
00152     return resp_is_chunked;
00153 
00154   ink_release_assert(header_done);
00155 
00156   resp_is_chunked = 0;
00157   ret = hdr->value_get_comma_list(MIME_FIELD_TRANSFER_ENCODING,
00158                                   MIME_LEN_TRANSFER_ENCODING, &slist);
00159   if (ret) {
00160     for (Str *f = slist.head; f != NULL; f = f->next) {
00161       if (f->len == 0)
00162         continue;
00163 
00164       size_t len = sizeof("chunked") - 1;
00165       len = len > f->len ? f->len : len;
00166       if (!strncasecmp(f->str, "chunked", len)) {
00167         resp_is_chunked = 1;
00168         if (fetch_flags & TS_FETCH_FLAGS_DECHUNK) {
00169           ChunkedHandler *ch = &chunked_handler;
00170           ch->init_by_action(resp_reader, ChunkedHandler::ACTION_DECHUNK);
00171           ch->dechunked_reader = ch->dechunked_buffer->alloc_reader();
00172           ch->state = ChunkedHandler::CHUNK_READ_SIZE;
00173           resp_reader->dealloc();
00174         }
00175         return true;
00176       }
00177     }
00178   }
00179 
00180   return resp_is_chunked;
00181 }
00182 
00183 int
00184 FetchSM::dechunk_body()
00185 {
00186   ink_assert(resp_is_chunked > 0);
00187   //
00188   // Return Value:
00189   //  - 0: need to read more data.
00190   //  - TS_FETCH_EVENT_EXT_BODY_READY.
00191   //  - TS_FETCH_EVENT_EXT_BODY_DONE.
00192   //
00193   if (chunked_handler.process_chunked_content())
00194     return TS_FETCH_EVENT_EXT_BODY_DONE;
00195 
00196   if (chunked_handler.dechunked_reader->read_avail())
00197     return TS_FETCH_EVENT_EXT_BODY_READY;
00198 
00199   return 0;
00200 }
00201 
00202 void
00203 FetchSM::InvokePluginExt(int error_event)
00204 {
00205   int event;
00206   EThread *mythread = this_ethread();
00207 
00208   //
00209   // Increasing *recursion* to prevent
00210   // FetchSM being deleted by callback.
00211   //
00212   recursion++;
00213 
00214   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
00215     MUTEX_TAKE_LOCK(cont_mutex, mythread);
00216   }
00217 
00218   if (!contp)
00219     goto out;
00220 
00221   if (error_event) {
00222     contp->handleEvent(error_event, this);
00223     goto out;
00224   }
00225 
00226   if (!has_sent_header) {
00227     contp->handleEvent(TS_FETCH_EVENT_EXT_HEAD_DONE, this);
00228     has_sent_header = true;
00229   }
00230 
00231   if (!has_body()) {
00232     contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
00233     goto out;
00234   }
00235 
00236   Debug(DEBUG_TAG, "[%s] chunked:%d, content_len: %" PRId64 ", recived_len: %" PRId64 ", avail: %" PRId64 "\n",
00237         __FUNCTION__, resp_is_chunked, resp_content_length, resp_recived_body_len,
00238         resp_is_chunked > 0 ? chunked_handler.chunked_reader->read_avail() : resp_reader->read_avail());
00239 
00240   if (resp_is_chunked > 0) {
00241     if (!chunked_handler.chunked_reader->read_avail())
00242       goto out;
00243   } else if (!resp_reader->read_avail()) {
00244       goto out;
00245   }
00246 
00247   if (!check_chunked()) {
00248     if (!check_body_done())
00249       contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_READY, this);
00250     else
00251       contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
00252   } else if (fetch_flags & TS_FETCH_FLAGS_DECHUNK){
00253     do {
00254       if (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL) {
00255         chunked_handler.state = ChunkedHandler::CHUNK_READ_SIZE_START;
00256       }
00257 
00258       event = dechunk_body();
00259       if (!event) {
00260         read_vio->reenable();
00261         goto out;
00262       }
00263 
00264       contp->handleEvent(event, this);
00265     } while (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL);
00266   } else if (check_body_done()){
00267     contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
00268   } else {
00269     contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_READY, this);
00270   }
00271 
00272 out:
00273   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
00274     MUTEX_UNTAKE_LOCK(cont_mutex, mythread);
00275   }
00276   recursion--;
00277 
00278   if (!contp && !recursion)
00279     cleanUp();
00280 
00281   return;
00282 }
00283 
00284 void
00285 FetchSM::get_info_from_buffer(IOBufferReader *the_reader)
00286 {
00287   char *buf, *info;
00288   int64_t read_avail, read_done;
00289   IOBufferBlock *blk;
00290   IOBufferReader *reader = the_reader;
00291 
00292   if (!reader) {
00293     client_bytes = 0;
00294     return ;
00295   }
00296 
00297   read_avail = reader->read_avail();
00298   Debug(DEBUG_TAG, "[%s] total avail %" PRId64 , __FUNCTION__, read_avail);
00299   if (!read_avail) {
00300     client_bytes = 0;
00301     return;
00302   }
00303 
00304   info = (char *)ats_malloc(sizeof(char) * (read_avail+1));
00305   client_response = info;
00306 
00307   // To maintain backwards compatability we don't allow chunking when it's not streaming.
00308   if (!(fetch_flags & TS_FETCH_FLAGS_STREAM) || !check_chunked()) {
00309     /* Read the data out of the reader */
00310     while (read_avail > 0) {
00311       if (reader->block != NULL)
00312         reader->skip_empty_blocks();
00313       blk = reader->block;
00314 
00315       // This is the equivalent of TSIOBufferBlockReadStart()
00316       buf = blk->start() + reader->start_offset;
00317       read_done = blk->read_avail() - reader->start_offset;
00318 
00319       if (read_done > 0) {
00320         memcpy(info, buf, read_done);
00321         reader->consume(read_done);
00322         read_avail -= read_done;
00323         info += read_done;
00324         client_bytes += read_done;
00325       }
00326     }
00327     client_response[client_bytes] = '\0';
00328     return;
00329   }
00330 
00331   reader = chunked_handler.dechunked_reader;
00332   do {
00333     if (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL) {
00334       chunked_handler.state = ChunkedHandler::CHUNK_READ_SIZE_START;
00335     }
00336 
00337     if (!dechunk_body())
00338       break;
00339 
00340     /* Read the data out of the reader */
00341     read_avail = reader->read_avail();
00342     while (read_avail > 0) {
00343       if (reader->block != NULL)
00344         reader->skip_empty_blocks();
00345       blk = reader->block;
00346 
00347       // This is the equivalent of TSIOBufferBlockReadStart()
00348       buf = blk->start() + reader->start_offset;
00349       read_done = blk->read_avail() - reader->start_offset;
00350 
00351       if (read_done > 0) {
00352         memcpy(info, buf, read_done);
00353         reader->consume(read_done);
00354         read_avail -= read_done;
00355         info += read_done;
00356         client_bytes += read_done;
00357       }
00358     }
00359   } while (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL);
00360 
00361   client_response[client_bytes] = '\0';
00362   return;
00363 }
00364 
00365 void
00366 FetchSM::process_fetch_read(int event)
00367 {
00368   Debug(DEBUG_TAG, "[%s] I am here read", __FUNCTION__);
00369   int64_t bytes;
00370   int bytes_used;
00371   int64_t total_bytes_copied = 0;
00372 
00373   switch (event) {
00374   case TS_EVENT_VCONN_READ_READY:
00375     bytes = resp_reader->read_avail();
00376     Debug(DEBUG_TAG, "[%s] number of bytes in read ready %" PRId64, __FUNCTION__, bytes);
00377 
00378 
00379     while (total_bytes_copied < bytes) {
00380        int64_t actual_bytes_copied;
00381        actual_bytes_copied = resp_buffer->write(resp_reader, bytes, 0);
00382        Debug(DEBUG_TAG, "[%s] copied %" PRId64 " bytes", __FUNCTION__, actual_bytes_copied);
00383        if (actual_bytes_copied <= 0) {
00384            break;
00385        }
00386        total_bytes_copied += actual_bytes_copied;
00387     }
00388     Debug(DEBUG_TAG, "[%s] total copied %" PRId64 " bytes", __FUNCTION__, total_bytes_copied);
00389     resp_reader->consume(total_bytes_copied);
00390 
00391     if (header_done == 0 && ((fetch_flags & TS_FETCH_FLAGS_STREAM) || callback_options == AFTER_HEADER)) {
00392       if (client_response_hdr.parse_resp(&http_parser, resp_reader, &bytes_used, 0) == PARSE_DONE) {
00393         header_done = 1;
00394         if (fetch_flags & TS_FETCH_FLAGS_STREAM)
00395           return InvokePluginExt();
00396         else
00397           InvokePlugin( callback_events.success_event_id, (void *) &client_response_hdr);
00398       }
00399     } else {
00400       if (fetch_flags & TS_FETCH_FLAGS_STREAM)
00401         return InvokePluginExt();
00402     }
00403     read_vio->reenable();
00404     break;
00405   case TS_EVENT_VCONN_READ_COMPLETE:
00406   case TS_EVENT_VCONN_EOS:
00407     if (fetch_flags & TS_FETCH_FLAGS_STREAM)
00408       return InvokePluginExt();
00409     if(callback_options == AFTER_HEADER || callback_options == AFTER_BODY) {
00410       get_info_from_buffer(resp_reader);
00411       InvokePlugin( callback_events.success_event_id, (void *) this);
00412     }
00413     Debug(DEBUG_TAG, "[%s] received EOS", __FUNCTION__);
00414     cleanUp();
00415     break;
00416   case TS_EVENT_ERROR:
00417   default:
00418     if (fetch_flags & TS_FETCH_FLAGS_STREAM)
00419       return InvokePluginExt(event);
00420     InvokePlugin( callback_events.failure_event_id, NULL);
00421     cleanUp();
00422     break;
00423   }
00424 }
00425 
00426 void
00427 FetchSM::process_fetch_write(int event)
00428 {
00429   Debug(DEBUG_TAG, "[%s] calling process write", __FUNCTION__);
00430   switch (event) {
00431   case TS_EVENT_VCONN_WRITE_COMPLETE:
00432     req_finished = true;
00433     break;
00434   case TS_EVENT_VCONN_WRITE_READY:
00435     // data is processed in chunks of 32k; if there is more than 32k
00436     // of input data, we have to continue reenabling until all data is
00437     // read (we have already written all the data to the buffer)
00438     if (req_reader->read_avail() > 0)
00439       ((PluginVC *) http_vc)->reenable(write_vio);
00440     break;
00441   case TS_EVENT_ERROR:
00442     if (fetch_flags & TS_FETCH_FLAGS_STREAM)
00443       return InvokePluginExt(event);
00444     InvokePlugin( callback_events.failure_event_id, NULL);
00445     cleanUp();
00446   default:
00447     break;
00448   }
00449 }
00450 
00451 int
00452 FetchSM::fetch_handler(int event, void *edata)
00453 {
00454   Debug(DEBUG_TAG, "[%s] calling fetch_plugin", __FUNCTION__);
00455 
00456   if (edata == read_vio) {
00457     process_fetch_read(event);
00458   } else if (edata == write_vio) {
00459     process_fetch_write(event);
00460   } else {
00461     if (fetch_flags & TS_FETCH_FLAGS_STREAM) {
00462       InvokePluginExt(event);
00463       return 1;
00464     }
00465     InvokePlugin( callback_events.failure_event_id, NULL);
00466     cleanUp();
00467   }
00468   return 1;
00469 }
00470 
00471 void
00472 FetchSM::ext_init(Continuation *cont, const char *method,
00473                   const char *url, const char *version,
00474                   const sockaddr *client_addr, int flags)
00475 {
00476   init_comm();
00477 
00478   if (flags & TS_FETCH_FLAGS_NEWLOCK) {
00479     mutex = new_ProxyMutex();
00480     cont_mutex = cont->mutex;
00481   } else {
00482     mutex = cont->mutex;
00483   }
00484 
00485   contp = cont;
00486   _addr.assign(client_addr);
00487 
00488   //
00489   // Enable stream IO automatically.
00490   //
00491   fetch_flags = (TS_FETCH_FLAGS_STREAM | flags);
00492   if (fetch_flags & TS_FETCH_FLAGS_NOT_INTERNAL_REQUEST) {
00493     set_internal_request(false);
00494   }
00495 
00496   //
00497   // These options are not used when enable
00498   // stream IO.
00499   //
00500   memset(&callback_options, 0, sizeof(callback_options));
00501   memset(&callback_events, 0, sizeof(callback_events));
00502 
00503   req_buffer->write(method, strlen(method));
00504   req_buffer->write(" ", 1);
00505   req_buffer->write(url, strlen(url));
00506   req_buffer->write(" ", 1);
00507   req_buffer->write(version, strlen(version));
00508   req_buffer->write("\r\n", 2);
00509 }
00510 
00511 void
00512 FetchSM::ext_add_header(const char *name, int name_len,
00513                         const char *value, int value_len)
00514 {
00515   if (TS_MIME_LEN_CONTENT_LENGTH == name_len &&
00516       !strncasecmp(TS_MIME_FIELD_CONTENT_LENGTH, name, name_len)) {
00517     req_content_length = atoll(value);
00518   }
00519 
00520   req_buffer->write(name, name_len);
00521   req_buffer->write(": ", 2);
00522   req_buffer->write(value, value_len);
00523   req_buffer->write("\r\n", 2);
00524 }
00525 
00526 void
00527 FetchSM::ext_lanuch()
00528 {
00529   req_buffer->write("\r\n", 2);
00530   httpConnect();
00531 }
00532 
00533 void
00534 FetchSM::ext_write_data(const void *data, size_t len)
00535 {
00536   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
00537     MUTEX_TAKE_LOCK(mutex, this_ethread());
00538   }
00539   req_buffer->write(data, len);
00540 
00541   Debug(DEBUG_TAG, "[%s] re-enabling write_vio, header_done %u", __FUNCTION__, header_done);
00542   write_vio->reenable();
00543 
00544   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
00545     MUTEX_UNTAKE_LOCK(mutex, this_ethread());
00546   }
00547 }
00548 
00549 ssize_t
00550 FetchSM::ext_read_data(char *buf, size_t len)
00551 {
00552   const char *start;
00553   TSIOBufferReader reader;
00554   TSIOBufferBlock blk, next_blk;
00555   int64_t already, blk_len, need, wavail;
00556 
00557   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
00558     MUTEX_TRY_LOCK(lock, mutex, this_ethread());
00559     if (!lock)
00560       return 0;
00561   }
00562 
00563   if (!header_done)
00564     return 0;
00565 
00566   if (check_chunked() && (fetch_flags & TS_FETCH_FLAGS_DECHUNK))
00567     reader = (tsapi_bufferreader*)chunked_handler.dechunked_reader;
00568   else
00569     reader = (TSIOBufferReader)resp_reader;
00570 
00571   already = 0;
00572   blk = TSIOBufferReaderStart(reader);
00573 
00574   while (blk) {
00575 
00576     wavail = len - already;
00577 
00578     next_blk = TSIOBufferBlockNext(blk);
00579     start = TSIOBufferBlockReadStart(blk, reader, &blk_len);
00580 
00581     need = blk_len > wavail ? wavail : blk_len;
00582 
00583     memcpy(&buf[already], start, need);
00584     already += need;
00585 
00586     if (already >= (int64_t)len)
00587       break;
00588 
00589     blk = next_blk;
00590   }
00591 
00592   resp_recived_body_len += already;
00593   TSIOBufferReaderConsume(reader, already);
00594 
00595   read_vio->reenable();
00596   return already;
00597 }
00598 
00599 void
00600 FetchSM::ext_destroy()
00601 {
00602   contp = NULL;
00603 
00604   if (recursion)
00605     return;
00606 
00607   if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
00608     MUTEX_TRY_LOCK(lock, mutex, this_ethread());
00609     if (!lock) {
00610       eventProcessor.schedule_in(this, FETCH_LOCK_RETRY_TIME);
00611       return;
00612     }
00613   }
00614 
00615   cleanUp();
00616 }
00617 
00618 void
00619 FetchSM::ext_set_user_data(void *data)
00620 {
00621   user_data = data;
00622 }
00623 
00624 void*
00625 FetchSM::ext_get_user_data()
00626 {
00627   return user_data;
00628 }
00629 
00630 TSMBuffer
00631 FetchSM::resp_hdr_bufp()
00632 {
00633   HdrHeapSDKHandle *heap;
00634   heap = (HdrHeapSDKHandle *)&client_response_hdr;
00635 
00636   return (TSMBuffer)heap;
00637 }
00638 
00639 TSMLoc
00640 FetchSM::resp_hdr_mloc()
00641 {
00642   return (TSMLoc)client_response_hdr.m_http;
00643 }

Generated by  doxygen 1.7.1