00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "ink_config.h"
00025 #include "FetchSM.h"
00026 #include <stdio.h>
00027 #include "HTTP.h"
00028 #include "PluginVC.h"
00029
00030 #define DEBUG_TAG "FetchSM"
00031 #define FETCH_LOCK_RETRY_TIME HRTIME_MSECONDS(10)
00032
00033 ClassAllocator < FetchSM > FetchSMAllocator("FetchSMAllocator");
00034 void
00035 FetchSM::cleanUp()
00036 {
00037 Debug(DEBUG_TAG, "[%s] calling cleanup", __FUNCTION__);
00038
00039 if (resp_is_chunked > 0 && (fetch_flags & TS_FETCH_FLAGS_DECHUNK)) {
00040 chunked_handler.clear();
00041 }
00042
00043 free_MIOBuffer(req_buffer);
00044 free_MIOBuffer(resp_buffer);
00045 mutex.clear();
00046 http_parser_clear(&http_parser);
00047 client_response_hdr.destroy();
00048 ats_free(client_response);
00049 cont_mutex.clear();
00050 http_vc->do_io_close();
00051 FetchSMAllocator.free(this);
00052 }
00053
00054 void
00055 FetchSM::httpConnect()
00056 {
00057 PluginIdentity* pi = dynamic_cast<PluginIdentity*>(contp);
00058 char const* tag = pi ? pi->getPluginTag() : "fetchSM";
00059 int64_t id = pi ? pi->getPluginId() : 0;
00060
00061 Debug(DEBUG_TAG, "[%s] calling httpconnect write", __FUNCTION__);
00062 http_vc = reinterpret_cast<PluginVC*>(TSHttpConnectWithPluginId(&_addr.sa, tag, id));
00063
00064
00065
00066
00067
00068
00069 if (!is_internal_request) {
00070 PluginVC* other_side = reinterpret_cast<PluginVC*>(http_vc)->get_other_side();
00071 if (other_side != NULL) {
00072 other_side->set_is_internal_request(false);
00073 }
00074 }
00075
00076 read_vio = http_vc->do_io_read(this, INT64_MAX, resp_buffer);
00077 write_vio = http_vc->do_io_write(this, getReqLen() + req_content_length, req_reader);
00078 }
00079
00080 char* FetchSM::resp_get(int *length) {
00081 *length = client_bytes;
00082 return client_response;
00083 }
00084
00085 int
00086 FetchSM::InvokePlugin(int event, void *data)
00087 {
00088 EThread *mythread = this_ethread();
00089
00090 MUTEX_TAKE_LOCK(contp->mutex,mythread);
00091
00092 int ret = contp->handleEvent(event,data);
00093
00094 MUTEX_UNTAKE_LOCK(contp->mutex,mythread);
00095
00096 return ret;
00097 }
00098
00099 bool
00100 FetchSM::has_body()
00101 {
00102 int status_code;
00103 HTTPHdr *hdr;
00104
00105 if (!header_done)
00106 return false;
00107
00108
00109
00110
00111
00112
00113 hdr = &client_response_hdr;
00114
00115 status_code = hdr->status_get();
00116 if (status_code < 200 || status_code == 204 || status_code == 304)
00117 return false;
00118
00119 if (check_chunked())
00120 return true;
00121
00122 resp_content_length = hdr->value_get_int64(MIME_FIELD_CONTENT_LENGTH, MIME_LEN_CONTENT_LENGTH);
00123 if (!resp_content_length)
00124 return false;
00125
00126 return true;
00127 }
00128
00129 bool
00130 FetchSM::check_body_done()
00131 {
00132 if (!check_chunked()) {
00133 if (resp_content_length == resp_recived_body_len + resp_reader->read_avail())
00134 return true;
00135
00136 return false;
00137 }
00138
00139
00140
00141
00142 return true;
00143 }
00144
00145 bool
00146 FetchSM::check_chunked()
00147 {
00148 int ret;
00149 StrList slist;
00150 HTTPHdr *hdr = &client_response_hdr;
00151 if (resp_is_chunked >= 0)
00152 return resp_is_chunked;
00153
00154 ink_release_assert(header_done);
00155
00156 resp_is_chunked = 0;
00157 ret = hdr->value_get_comma_list(MIME_FIELD_TRANSFER_ENCODING,
00158 MIME_LEN_TRANSFER_ENCODING, &slist);
00159 if (ret) {
00160 for (Str *f = slist.head; f != NULL; f = f->next) {
00161 if (f->len == 0)
00162 continue;
00163
00164 size_t len = sizeof("chunked") - 1;
00165 len = len > f->len ? f->len : len;
00166 if (!strncasecmp(f->str, "chunked", len)) {
00167 resp_is_chunked = 1;
00168 if (fetch_flags & TS_FETCH_FLAGS_DECHUNK) {
00169 ChunkedHandler *ch = &chunked_handler;
00170 ch->init_by_action(resp_reader, ChunkedHandler::ACTION_DECHUNK);
00171 ch->dechunked_reader = ch->dechunked_buffer->alloc_reader();
00172 ch->state = ChunkedHandler::CHUNK_READ_SIZE;
00173 resp_reader->dealloc();
00174 }
00175 return true;
00176 }
00177 }
00178 }
00179
00180 return resp_is_chunked;
00181 }
00182
00183 int
00184 FetchSM::dechunk_body()
00185 {
00186 ink_assert(resp_is_chunked > 0);
00187
00188
00189
00190
00191
00192
00193 if (chunked_handler.process_chunked_content())
00194 return TS_FETCH_EVENT_EXT_BODY_DONE;
00195
00196 if (chunked_handler.dechunked_reader->read_avail())
00197 return TS_FETCH_EVENT_EXT_BODY_READY;
00198
00199 return 0;
00200 }
00201
00202 void
00203 FetchSM::InvokePluginExt(int error_event)
00204 {
00205 int event;
00206 EThread *mythread = this_ethread();
00207
00208
00209
00210
00211
00212 recursion++;
00213
00214 if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
00215 MUTEX_TAKE_LOCK(cont_mutex, mythread);
00216 }
00217
00218 if (!contp)
00219 goto out;
00220
00221 if (error_event) {
00222 contp->handleEvent(error_event, this);
00223 goto out;
00224 }
00225
00226 if (!has_sent_header) {
00227 contp->handleEvent(TS_FETCH_EVENT_EXT_HEAD_DONE, this);
00228 has_sent_header = true;
00229 }
00230
00231 if (!has_body()) {
00232 contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
00233 goto out;
00234 }
00235
00236 Debug(DEBUG_TAG, "[%s] chunked:%d, content_len: %" PRId64 ", recived_len: %" PRId64 ", avail: %" PRId64 "\n",
00237 __FUNCTION__, resp_is_chunked, resp_content_length, resp_recived_body_len,
00238 resp_is_chunked > 0 ? chunked_handler.chunked_reader->read_avail() : resp_reader->read_avail());
00239
00240 if (resp_is_chunked > 0) {
00241 if (!chunked_handler.chunked_reader->read_avail())
00242 goto out;
00243 } else if (!resp_reader->read_avail()) {
00244 goto out;
00245 }
00246
00247 if (!check_chunked()) {
00248 if (!check_body_done())
00249 contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_READY, this);
00250 else
00251 contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
00252 } else if (fetch_flags & TS_FETCH_FLAGS_DECHUNK){
00253 do {
00254 if (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL) {
00255 chunked_handler.state = ChunkedHandler::CHUNK_READ_SIZE_START;
00256 }
00257
00258 event = dechunk_body();
00259 if (!event) {
00260 read_vio->reenable();
00261 goto out;
00262 }
00263
00264 contp->handleEvent(event, this);
00265 } while (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL);
00266 } else if (check_body_done()){
00267 contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_DONE, this);
00268 } else {
00269 contp->handleEvent(TS_FETCH_EVENT_EXT_BODY_READY, this);
00270 }
00271
00272 out:
00273 if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
00274 MUTEX_UNTAKE_LOCK(cont_mutex, mythread);
00275 }
00276 recursion--;
00277
00278 if (!contp && !recursion)
00279 cleanUp();
00280
00281 return;
00282 }
00283
00284 void
00285 FetchSM::get_info_from_buffer(IOBufferReader *the_reader)
00286 {
00287 char *buf, *info;
00288 int64_t read_avail, read_done;
00289 IOBufferBlock *blk;
00290 IOBufferReader *reader = the_reader;
00291
00292 if (!reader) {
00293 client_bytes = 0;
00294 return ;
00295 }
00296
00297 read_avail = reader->read_avail();
00298 Debug(DEBUG_TAG, "[%s] total avail %" PRId64 , __FUNCTION__, read_avail);
00299 if (!read_avail) {
00300 client_bytes = 0;
00301 return;
00302 }
00303
00304 info = (char *)ats_malloc(sizeof(char) * (read_avail+1));
00305 client_response = info;
00306
00307
00308 if (!(fetch_flags & TS_FETCH_FLAGS_STREAM) || !check_chunked()) {
00309
00310 while (read_avail > 0) {
00311 if (reader->block != NULL)
00312 reader->skip_empty_blocks();
00313 blk = reader->block;
00314
00315
00316 buf = blk->start() + reader->start_offset;
00317 read_done = blk->read_avail() - reader->start_offset;
00318
00319 if (read_done > 0) {
00320 memcpy(info, buf, read_done);
00321 reader->consume(read_done);
00322 read_avail -= read_done;
00323 info += read_done;
00324 client_bytes += read_done;
00325 }
00326 }
00327 client_response[client_bytes] = '\0';
00328 return;
00329 }
00330
00331 reader = chunked_handler.dechunked_reader;
00332 do {
00333 if (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL) {
00334 chunked_handler.state = ChunkedHandler::CHUNK_READ_SIZE_START;
00335 }
00336
00337 if (!dechunk_body())
00338 break;
00339
00340
00341 read_avail = reader->read_avail();
00342 while (read_avail > 0) {
00343 if (reader->block != NULL)
00344 reader->skip_empty_blocks();
00345 blk = reader->block;
00346
00347
00348 buf = blk->start() + reader->start_offset;
00349 read_done = blk->read_avail() - reader->start_offset;
00350
00351 if (read_done > 0) {
00352 memcpy(info, buf, read_done);
00353 reader->consume(read_done);
00354 read_avail -= read_done;
00355 info += read_done;
00356 client_bytes += read_done;
00357 }
00358 }
00359 } while (chunked_handler.state == ChunkedHandler::CHUNK_FLOW_CONTROL);
00360
00361 client_response[client_bytes] = '\0';
00362 return;
00363 }
00364
00365 void
00366 FetchSM::process_fetch_read(int event)
00367 {
00368 Debug(DEBUG_TAG, "[%s] I am here read", __FUNCTION__);
00369 int64_t bytes;
00370 int bytes_used;
00371 int64_t total_bytes_copied = 0;
00372
00373 switch (event) {
00374 case TS_EVENT_VCONN_READ_READY:
00375 bytes = resp_reader->read_avail();
00376 Debug(DEBUG_TAG, "[%s] number of bytes in read ready %" PRId64, __FUNCTION__, bytes);
00377
00378
00379 while (total_bytes_copied < bytes) {
00380 int64_t actual_bytes_copied;
00381 actual_bytes_copied = resp_buffer->write(resp_reader, bytes, 0);
00382 Debug(DEBUG_TAG, "[%s] copied %" PRId64 " bytes", __FUNCTION__, actual_bytes_copied);
00383 if (actual_bytes_copied <= 0) {
00384 break;
00385 }
00386 total_bytes_copied += actual_bytes_copied;
00387 }
00388 Debug(DEBUG_TAG, "[%s] total copied %" PRId64 " bytes", __FUNCTION__, total_bytes_copied);
00389 resp_reader->consume(total_bytes_copied);
00390
00391 if (header_done == 0 && ((fetch_flags & TS_FETCH_FLAGS_STREAM) || callback_options == AFTER_HEADER)) {
00392 if (client_response_hdr.parse_resp(&http_parser, resp_reader, &bytes_used, 0) == PARSE_DONE) {
00393 header_done = 1;
00394 if (fetch_flags & TS_FETCH_FLAGS_STREAM)
00395 return InvokePluginExt();
00396 else
00397 InvokePlugin( callback_events.success_event_id, (void *) &client_response_hdr);
00398 }
00399 } else {
00400 if (fetch_flags & TS_FETCH_FLAGS_STREAM)
00401 return InvokePluginExt();
00402 }
00403 read_vio->reenable();
00404 break;
00405 case TS_EVENT_VCONN_READ_COMPLETE:
00406 case TS_EVENT_VCONN_EOS:
00407 if (fetch_flags & TS_FETCH_FLAGS_STREAM)
00408 return InvokePluginExt();
00409 if(callback_options == AFTER_HEADER || callback_options == AFTER_BODY) {
00410 get_info_from_buffer(resp_reader);
00411 InvokePlugin( callback_events.success_event_id, (void *) this);
00412 }
00413 Debug(DEBUG_TAG, "[%s] received EOS", __FUNCTION__);
00414 cleanUp();
00415 break;
00416 case TS_EVENT_ERROR:
00417 default:
00418 if (fetch_flags & TS_FETCH_FLAGS_STREAM)
00419 return InvokePluginExt(event);
00420 InvokePlugin( callback_events.failure_event_id, NULL);
00421 cleanUp();
00422 break;
00423 }
00424 }
00425
00426 void
00427 FetchSM::process_fetch_write(int event)
00428 {
00429 Debug(DEBUG_TAG, "[%s] calling process write", __FUNCTION__);
00430 switch (event) {
00431 case TS_EVENT_VCONN_WRITE_COMPLETE:
00432 req_finished = true;
00433 break;
00434 case TS_EVENT_VCONN_WRITE_READY:
00435
00436
00437
00438 if (req_reader->read_avail() > 0)
00439 ((PluginVC *) http_vc)->reenable(write_vio);
00440 break;
00441 case TS_EVENT_ERROR:
00442 if (fetch_flags & TS_FETCH_FLAGS_STREAM)
00443 return InvokePluginExt(event);
00444 InvokePlugin( callback_events.failure_event_id, NULL);
00445 cleanUp();
00446 default:
00447 break;
00448 }
00449 }
00450
00451 int
00452 FetchSM::fetch_handler(int event, void *edata)
00453 {
00454 Debug(DEBUG_TAG, "[%s] calling fetch_plugin", __FUNCTION__);
00455
00456 if (edata == read_vio) {
00457 process_fetch_read(event);
00458 } else if (edata == write_vio) {
00459 process_fetch_write(event);
00460 } else {
00461 if (fetch_flags & TS_FETCH_FLAGS_STREAM) {
00462 InvokePluginExt(event);
00463 return 1;
00464 }
00465 InvokePlugin( callback_events.failure_event_id, NULL);
00466 cleanUp();
00467 }
00468 return 1;
00469 }
00470
00471 void
00472 FetchSM::ext_init(Continuation *cont, const char *method,
00473 const char *url, const char *version,
00474 const sockaddr *client_addr, int flags)
00475 {
00476 init_comm();
00477
00478 if (flags & TS_FETCH_FLAGS_NEWLOCK) {
00479 mutex = new_ProxyMutex();
00480 cont_mutex = cont->mutex;
00481 } else {
00482 mutex = cont->mutex;
00483 }
00484
00485 contp = cont;
00486 _addr.assign(client_addr);
00487
00488
00489
00490
00491 fetch_flags = (TS_FETCH_FLAGS_STREAM | flags);
00492 if (fetch_flags & TS_FETCH_FLAGS_NOT_INTERNAL_REQUEST) {
00493 set_internal_request(false);
00494 }
00495
00496
00497
00498
00499
00500 memset(&callback_options, 0, sizeof(callback_options));
00501 memset(&callback_events, 0, sizeof(callback_events));
00502
00503 req_buffer->write(method, strlen(method));
00504 req_buffer->write(" ", 1);
00505 req_buffer->write(url, strlen(url));
00506 req_buffer->write(" ", 1);
00507 req_buffer->write(version, strlen(version));
00508 req_buffer->write("\r\n", 2);
00509 }
00510
00511 void
00512 FetchSM::ext_add_header(const char *name, int name_len,
00513 const char *value, int value_len)
00514 {
00515 if (TS_MIME_LEN_CONTENT_LENGTH == name_len &&
00516 !strncasecmp(TS_MIME_FIELD_CONTENT_LENGTH, name, name_len)) {
00517 req_content_length = atoll(value);
00518 }
00519
00520 req_buffer->write(name, name_len);
00521 req_buffer->write(": ", 2);
00522 req_buffer->write(value, value_len);
00523 req_buffer->write("\r\n", 2);
00524 }
00525
00526 void
00527 FetchSM::ext_lanuch()
00528 {
00529 req_buffer->write("\r\n", 2);
00530 httpConnect();
00531 }
00532
00533 void
00534 FetchSM::ext_write_data(const void *data, size_t len)
00535 {
00536 if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
00537 MUTEX_TAKE_LOCK(mutex, this_ethread());
00538 }
00539 req_buffer->write(data, len);
00540
00541 Debug(DEBUG_TAG, "[%s] re-enabling write_vio, header_done %u", __FUNCTION__, header_done);
00542 write_vio->reenable();
00543
00544 if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
00545 MUTEX_UNTAKE_LOCK(mutex, this_ethread());
00546 }
00547 }
00548
00549 ssize_t
00550 FetchSM::ext_read_data(char *buf, size_t len)
00551 {
00552 const char *start;
00553 TSIOBufferReader reader;
00554 TSIOBufferBlock blk, next_blk;
00555 int64_t already, blk_len, need, wavail;
00556
00557 if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
00558 MUTEX_TRY_LOCK(lock, mutex, this_ethread());
00559 if (!lock)
00560 return 0;
00561 }
00562
00563 if (!header_done)
00564 return 0;
00565
00566 if (check_chunked() && (fetch_flags & TS_FETCH_FLAGS_DECHUNK))
00567 reader = (tsapi_bufferreader*)chunked_handler.dechunked_reader;
00568 else
00569 reader = (TSIOBufferReader)resp_reader;
00570
00571 already = 0;
00572 blk = TSIOBufferReaderStart(reader);
00573
00574 while (blk) {
00575
00576 wavail = len - already;
00577
00578 next_blk = TSIOBufferBlockNext(blk);
00579 start = TSIOBufferBlockReadStart(blk, reader, &blk_len);
00580
00581 need = blk_len > wavail ? wavail : blk_len;
00582
00583 memcpy(&buf[already], start, need);
00584 already += need;
00585
00586 if (already >= (int64_t)len)
00587 break;
00588
00589 blk = next_blk;
00590 }
00591
00592 resp_recived_body_len += already;
00593 TSIOBufferReaderConsume(reader, already);
00594
00595 read_vio->reenable();
00596 return already;
00597 }
00598
00599 void
00600 FetchSM::ext_destroy()
00601 {
00602 contp = NULL;
00603
00604 if (recursion)
00605 return;
00606
00607 if (fetch_flags & TS_FETCH_FLAGS_NEWLOCK) {
00608 MUTEX_TRY_LOCK(lock, mutex, this_ethread());
00609 if (!lock) {
00610 eventProcessor.schedule_in(this, FETCH_LOCK_RETRY_TIME);
00611 return;
00612 }
00613 }
00614
00615 cleanUp();
00616 }
00617
00618 void
00619 FetchSM::ext_set_user_data(void *data)
00620 {
00621 user_data = data;
00622 }
00623
00624 void*
00625 FetchSM::ext_get_user_data()
00626 {
00627 return user_data;
00628 }
00629
00630 TSMBuffer
00631 FetchSM::resp_hdr_bufp()
00632 {
00633 HdrHeapSDKHandle *heap;
00634 heap = (HdrHeapSDKHandle *)&client_response_hdr;
00635
00636 return (TSMBuffer)heap;
00637 }
00638
00639 TSMLoc
00640 FetchSM::resp_hdr_mloc()
00641 {
00642 return (TSMLoc)client_response_hdr.m_http;
00643 }