00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062 #include "ProxyConfig.h"
00063 #include "P_Net.h"
00064 #include "MimeTable.h"
00065 #include "TransformInternal.h"
00066 #include "HdrUtils.h"
00067 #include "Log.h"
00068
00069
00070 #define ART 1
00071 #define AGIF 2
00072
00073
00074 TransformProcessor transformProcessor;
00075
00076
00077
00078
00079
00080 void
00081 TransformProcessor::start()
00082 {
00083 #ifdef PREFETCH
00084 prefetchProcessor.start();
00085 #endif
00086 }
00087
00088
00089
00090
00091 VConnection *
00092 TransformProcessor::open(Continuation *cont, APIHook *hooks)
00093 {
00094 if (hooks) {
00095 return new TransformVConnection(cont, hooks);
00096 } else {
00097 return NULL;
00098 }
00099 }
00100
00101
00102
00103
00104 INKVConnInternal *
00105 TransformProcessor::null_transform(ProxyMutex *mutex)
00106 {
00107 return new NullTransform(mutex);
00108 }
00109
00110
00111
00112
00113
00114 INKVConnInternal *
00115 TransformProcessor::range_transform(ProxyMutex *mut, RangeRecord *ranges, int num_fields, HTTPHdr *transform_resp, const char * content_type, int content_type_len, int64_t content_length)
00116 {
00117 RangeTransform *range_transform = new RangeTransform(mut, ranges, num_fields, transform_resp, content_type, content_type_len, content_length);
00118 return range_transform;
00119 }
00120
00121
00122
00123
00124
00125 TransformTerminus::TransformTerminus(TransformVConnection *tvc)
00126 : VConnection(tvc->mutex),
00127 m_tvc(tvc), m_read_vio(), m_write_vio(), m_event_count(0), m_deletable(0), m_closed(0), m_called_user(0)
00128 {
00129 SET_HANDLER(&TransformTerminus::handle_event);
00130 }
00131
00132
00133 #define RETRY() \
00134 if (ink_atomic_increment ((int*) &m_event_count, 1) < 0) { \
00135 ink_assert (!"not reached"); \
00136 } \
00137 eventProcessor.schedule_in (this, HRTIME_MSECONDS (10), ET_NET); \
00138 return 0;
00139
00140
00141 int
00142 TransformTerminus::handle_event(int event, void * )
00143 {
00144 int val;
00145
00146 m_deletable = ((m_closed != 0) && (m_tvc->m_closed != 0));
00147
00148 val = ink_atomic_increment((int *) &m_event_count, -1);
00149
00150 Debug("transform", "[TransformTerminus::handle_event] event_count %d", m_event_count);
00151
00152 if (val <= 0) {
00153 ink_assert(!"not reached");
00154 }
00155
00156 m_deletable = m_deletable && (val == 1);
00157
00158 if (m_closed != 0 && m_tvc->m_closed != 0) {
00159 if (m_deletable) {
00160 Debug("transform", "TransformVConnection destroy [0x%lx]", (long) m_tvc);
00161 delete m_tvc;
00162 return 0;
00163 }
00164 } else if (m_write_vio.op == VIO::WRITE) {
00165 if (m_read_vio.op == VIO::NONE) {
00166 if (!m_called_user) {
00167 Debug("transform", "TransformVConnection calling user: %d %d [0x%lx] [0x%lx]",
00168 m_event_count, event, (long) m_tvc, (long) m_tvc->m_cont);
00169
00170 m_called_user = 1;
00171
00172
00173 m_tvc->m_cont->handleEvent(TRANSFORM_READ_READY, (void *)&m_write_vio.nbytes);
00174 }
00175 } else {
00176 int64_t towrite;
00177
00178 MUTEX_TRY_LOCK(trylock1, m_write_vio.mutex, this_ethread());
00179 if (!trylock1) {
00180 RETRY();
00181 }
00182
00183 MUTEX_TRY_LOCK(trylock2, m_read_vio.mutex, this_ethread());
00184 if (!trylock2) {
00185 RETRY();
00186 }
00187
00188 if (m_closed != 0) {
00189 return 0;
00190 }
00191
00192 if (m_write_vio.op == VIO::NONE) {
00193 return 0;
00194 }
00195
00196 towrite = m_write_vio.ntodo();
00197 if (towrite > 0) {
00198 if (towrite > m_write_vio.get_reader()->read_avail()) {
00199 towrite = m_write_vio.get_reader()->read_avail();
00200 }
00201 if (towrite > m_read_vio.ntodo()) {
00202 towrite = m_read_vio.ntodo();
00203 }
00204
00205 if (towrite > 0) {
00206 m_read_vio.get_writer()->write(m_write_vio.get_reader(), towrite);
00207 m_read_vio.ndone += towrite;
00208
00209 m_write_vio.get_reader()->consume(towrite);
00210 m_write_vio.ndone += towrite;
00211 }
00212 }
00213
00214 if (m_write_vio.ntodo() > 0) {
00215 if (towrite > 0) {
00216 m_write_vio._cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio);
00217 }
00218 } else {
00219 m_write_vio._cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &m_write_vio);
00220 }
00221
00222
00223 if (m_closed != 0 && m_tvc->m_closed != 0) {
00224 return 0;
00225 }
00226
00227 if (m_read_vio.ntodo() > 0) {
00228 if (m_write_vio.ntodo() <= 0) {
00229 m_read_vio._cont->handleEvent(VC_EVENT_EOS, &m_read_vio);
00230 } else if (towrite > 0) {
00231 m_read_vio._cont->handleEvent(VC_EVENT_READ_READY, &m_read_vio);
00232 }
00233 } else {
00234 m_read_vio._cont->handleEvent(VC_EVENT_READ_COMPLETE, &m_read_vio);
00235 }
00236 }
00237 } else {
00238 MUTEX_TRY_LOCK(trylock2, m_read_vio.mutex, this_ethread());
00239 if (!trylock2) {
00240 RETRY();
00241 }
00242
00243 if (m_closed != 0) {
00244
00245
00246
00247
00248
00249
00250
00251
00252 if (m_tvc->m_closed == 0) {
00253 int ev = (m_closed == TS_VC_CLOSE_ABORT) ? VC_EVENT_ERROR : VC_EVENT_EOS;
00254
00255 if (!m_called_user) {
00256 m_called_user = 1;
00257 m_tvc->m_cont->handleEvent(ev, NULL);
00258 } else {
00259 ink_assert(m_read_vio._cont != NULL);
00260 m_read_vio._cont->handleEvent(ev, &m_read_vio);
00261 }
00262 }
00263
00264 return 0;
00265 }
00266 }
00267
00268 return 0;
00269 }
00270
00271
00272
00273
00274 VIO *
00275 TransformTerminus::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
00276 {
00277 m_read_vio.buffer.writer_for(buf);
00278 m_read_vio.op = VIO::READ;
00279 m_read_vio.set_continuation(c);
00280 m_read_vio.nbytes = nbytes;
00281 m_read_vio.ndone = 0;
00282 m_read_vio.vc_server = this;
00283
00284 if (ink_atomic_increment((int *) &m_event_count, 1) < 0) {
00285 ink_assert(!"not reached");
00286 }
00287 Debug("transform", "[TransformTerminus::do_io_read] event_count %d", m_event_count);
00288
00289 eventProcessor.schedule_imm(this, ET_NET);
00290
00291 return &m_read_vio;
00292 }
00293
00294
00295
00296
00297 VIO *
00298 TransformTerminus::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner)
00299 {
00300
00301 ink_assert(!owner);
00302 m_write_vio.buffer.reader_for(buf);
00303 m_write_vio.op = VIO::WRITE;
00304 m_write_vio.set_continuation(c);
00305 m_write_vio.nbytes = nbytes;
00306 m_write_vio.ndone = 0;
00307 m_write_vio.vc_server = this;
00308
00309 if (ink_atomic_increment((int *) &m_event_count, 1) < 0) {
00310 ink_assert(!"not reached");
00311 }
00312 Debug("transform", "[TransformTerminus::do_io_write] event_count %d", m_event_count);
00313
00314 eventProcessor.schedule_imm(this, ET_NET);
00315
00316 return &m_write_vio;
00317 }
00318
00319
00320
00321
00322 void
00323 TransformTerminus::do_io_close(int error)
00324 {
00325 if (ink_atomic_increment((int *) &m_event_count, 1) < 0) {
00326 ink_assert(!"not reached");
00327 }
00328
00329 INK_WRITE_MEMORY_BARRIER;
00330
00331 if (error != -1) {
00332 lerrno = error;
00333 m_closed = TS_VC_CLOSE_ABORT;
00334 } else {
00335 m_closed = TS_VC_CLOSE_NORMAL;
00336 }
00337
00338 m_read_vio.op = VIO::NONE;
00339 m_read_vio.buffer.clear();
00340
00341 m_write_vio.op = VIO::NONE;
00342 m_write_vio.buffer.clear();
00343
00344 eventProcessor.schedule_imm(this, ET_NET);
00345 }
00346
00347
00348
00349
00350 void
00351 TransformTerminus::do_io_shutdown(ShutdownHowTo_t howto)
00352 {
00353 if ((howto == IO_SHUTDOWN_READ) || (howto == IO_SHUTDOWN_READWRITE)) {
00354 m_read_vio.op = VIO::NONE;
00355 m_read_vio.buffer.clear();
00356 }
00357
00358 if ((howto == IO_SHUTDOWN_WRITE) || (howto == IO_SHUTDOWN_READWRITE)) {
00359 m_write_vio.op = VIO::NONE;
00360 m_write_vio.buffer.clear();
00361 }
00362 }
00363
00364
00365
00366
00367 void
00368 TransformTerminus::reenable(VIO *vio)
00369 {
00370 ink_assert((vio == &m_read_vio) || (vio == &m_write_vio));
00371
00372 if (m_event_count == 0) {
00373
00374 if (ink_atomic_increment((int *) &m_event_count, 1) < 0) {
00375 ink_assert(!"not reached");
00376 }
00377 Debug("transform", "[TransformTerminus::reenable] event_count %d", m_event_count);
00378 eventProcessor.schedule_imm(this, ET_NET);
00379 } else {
00380 Debug("transform", "[TransformTerminus::reenable] skipping due to " "pending events");
00381 }
00382 }
00383
00384
00385
00386
00387
00388 TransformVConnection::TransformVConnection(Continuation *cont, APIHook *hooks)
00389 :TransformVCChain(cont->mutex), m_cont(cont), m_terminus(this), m_closed(0)
00390 {
00391 INKVConnInternal *xform;
00392
00393 SET_HANDLER(&TransformVConnection::handle_event);
00394
00395 ink_assert(hooks != NULL);
00396
00397 m_transform = hooks->m_cont;
00398 while (hooks->m_link.next) {
00399 xform = (INKVConnInternal *) hooks->m_cont;
00400 hooks = hooks->m_link.next;
00401 xform->do_io_transform(hooks->m_cont);
00402 }
00403 xform = (INKVConnInternal *) hooks->m_cont;
00404 xform->do_io_transform(&m_terminus);
00405
00406 Debug("transform", "TransformVConnection create [0x%lx]", (long) this);
00407 }
00408
00409
00410
00411
00412 TransformVConnection::~TransformVConnection()
00413 {
00414
00415
00416 m_terminus.m_read_vio.set_continuation(NULL);
00417 m_terminus.m_write_vio.set_continuation(NULL);
00418 m_terminus.mutex = NULL;
00419 this->mutex = NULL;
00420 }
00421
00422
00423
00424
00425 int
00426 TransformVConnection::handle_event(int , void * )
00427 {
00428 ink_assert(!"not reached");
00429 return 0;
00430 }
00431
00432
00433
00434
00435 VIO *
00436 TransformVConnection::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
00437 {
00438 Debug("transform", "TransformVConnection do_io_read: 0x%lx [0x%lx]", (long) c, (long) this);
00439
00440 return m_terminus.do_io_read(c, nbytes, buf);
00441 }
00442
00443
00444
00445
00446 VIO *
00447 TransformVConnection::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf,
00448 bool )
00449 {
00450 Debug("transform", "TransformVConnection do_io_write: 0x%lx [0x%lx]", (long) c, (long) this);
00451
00452 return m_transform->do_io_write(c, nbytes, buf);
00453 }
00454
00455
00456
00457
00458 void
00459 TransformVConnection::do_io_close(int error)
00460 {
00461 Debug("transform", "TransformVConnection do_io_close: %d [0x%lx]", error, (long) this);
00462
00463 if (error != -1) {
00464 m_closed = TS_VC_CLOSE_ABORT;
00465 } else {
00466 m_closed = TS_VC_CLOSE_NORMAL;
00467 }
00468
00469 m_transform->do_io_close(error);
00470 }
00471
00472
00473
00474
00475 void
00476 TransformVConnection::do_io_shutdown(ShutdownHowTo_t howto)
00477 {
00478 ink_assert(howto == IO_SHUTDOWN_WRITE);
00479
00480 Debug("transform", "TransformVConnection do_io_shutdown: %d [0x%lx]", howto, (long) this);
00481
00482 m_transform->do_io_shutdown(howto);
00483 }
00484
00485
00486
00487
00488 void
00489 TransformVConnection::reenable(VIO * )
00490 {
00491 ink_assert(!"not reached");
00492 }
00493
00494
00495
00496
00497 uint64_t
00498 TransformVConnection::backlog(uint64_t limit)
00499 {
00500 uint64_t b = 0;
00501 VConnection* raw_vc = m_transform;
00502 MIOBuffer* w;
00503 while (raw_vc && raw_vc != &m_terminus) {
00504 INKVConnInternal* vc = static_cast<INKVConnInternal*>(raw_vc);
00505 if (0 != (w = vc->m_read_vio.buffer.writer()))
00506 b += w->max_read_avail();
00507 if (b >= limit) return b;
00508 raw_vc = vc->m_output_vc;
00509 }
00510 if (0 != (w = m_terminus.m_read_vio.buffer.writer()))
00511 b += w->max_read_avail();
00512 if (b >= limit) return b;
00513
00514 IOBufferReader* r = m_terminus.m_write_vio.get_reader();
00515 if (r)
00516 b += r->read_avail();
00517 return b;
00518 }
00519
00520
00521
00522
00523 TransformControl::TransformControl()
00524 :Continuation(new_ProxyMutex()), m_hooks(), m_tvc(NULL), m_read_buf(NULL), m_write_buf(NULL)
00525 {
00526 SET_HANDLER(&TransformControl::handle_event);
00527
00528 m_hooks.append(transformProcessor.null_transform(new_ProxyMutex()));
00529 }
00530
00531
00532
00533
00534 int
00535 TransformControl::handle_event(int event, void * )
00536 {
00537 switch (event) {
00538 case EVENT_IMMEDIATE:
00539 {
00540 char *s, *e;
00541
00542 ink_assert(m_tvc == NULL);
00543 if (http_global_hooks && http_global_hooks->get(TS_HTTP_RESPONSE_TRANSFORM_HOOK)) {
00544 m_tvc = transformProcessor.open(this, http_global_hooks->get(TS_HTTP_RESPONSE_TRANSFORM_HOOK));
00545 } else {
00546 m_tvc = transformProcessor.open(this, m_hooks.get());
00547 }
00548 ink_assert(m_tvc != NULL);
00549
00550 m_write_buf = new_MIOBuffer();
00551 s = m_write_buf->end();
00552 e = m_write_buf->buf_end();
00553
00554 memset(s, 'a', e - s);
00555 m_write_buf->fill(e - s);
00556
00557 m_tvc->do_io_write(this, 4 * 1024, m_write_buf->alloc_reader());
00558 break;
00559 }
00560
00561 case TRANSFORM_READ_READY:
00562 {
00563 MIOBuffer *buf = new_empty_MIOBuffer();
00564
00565 m_read_buf = buf->alloc_reader();
00566 m_tvc->do_io_read(this, INT64_MAX, buf);
00567 break;
00568 }
00569
00570 case VC_EVENT_READ_COMPLETE:
00571 case VC_EVENT_EOS:
00572 m_tvc->do_io_close();
00573
00574 free_MIOBuffer(m_read_buf->mbuf);
00575 m_read_buf = NULL;
00576
00577 free_MIOBuffer(m_write_buf);
00578 m_write_buf = NULL;
00579 break;
00580
00581 case VC_EVENT_WRITE_COMPLETE:
00582 break;
00583
00584 default:
00585 ink_assert(!"not reached");
00586 break;
00587 }
00588
00589 return 0;
00590 }
00591
00592
00593
00594
00595 NullTransform::NullTransform(ProxyMutex *_mutex)
00596 : INKVConnInternal(NULL, reinterpret_cast<TSMutex>(_mutex)),
00597 m_output_buf(NULL), m_output_reader(NULL), m_output_vio(NULL)
00598 {
00599 SET_HANDLER(&NullTransform::handle_event);
00600
00601 Debug("transform", "NullTransform create [0x%lx]", (long) this);
00602 }
00603
00604
00605
00606
00607 NullTransform::~NullTransform()
00608 {
00609 if (m_output_buf) {
00610 free_MIOBuffer(m_output_buf);
00611 }
00612 }
00613
00614
00615
00616
00617 int
00618 NullTransform::handle_event(int event, void *edata)
00619 {
00620 handle_event_count(event);
00621
00622 Debug("transform", "[NullTransform::handle_event] event count %d", m_event_count);
00623
00624 if (m_closed) {
00625 if (m_deletable) {
00626 Debug("transform", "NullTransform destroy: %" PRId64" [%p]", m_output_vio ? m_output_vio->ndone : 0, this);
00627 delete this;
00628 }
00629 } else {
00630 switch (event) {
00631 case VC_EVENT_ERROR:
00632 m_write_vio._cont->handleEvent(VC_EVENT_ERROR, &m_write_vio);
00633 break;
00634 case VC_EVENT_WRITE_COMPLETE:
00635 ink_assert(m_output_vio == (VIO *) edata);
00636
00637
00638
00639
00640 ink_assert(m_write_vio.ntodo() == 0);
00641
00642 m_output_vc->do_io_shutdown(IO_SHUTDOWN_WRITE);
00643 break;
00644 case VC_EVENT_WRITE_READY:
00645 default:
00646 {
00647 int64_t towrite;
00648 int64_t avail;
00649
00650 ink_assert(m_output_vc != NULL);
00651
00652 if (!m_output_vio) {
00653 m_output_buf = new_empty_MIOBuffer();
00654 m_output_reader = m_output_buf->alloc_reader();
00655 m_output_vio = m_output_vc->do_io_write(this, m_write_vio.nbytes, m_output_reader);
00656 }
00657
00658 MUTEX_TRY_LOCK(trylock, m_write_vio.mutex, this_ethread());
00659 if (!trylock) {
00660 retry(10);
00661 return 0;
00662 }
00663
00664 if (m_closed) {
00665 return 0;
00666 }
00667
00668 if (m_write_vio.op == VIO::NONE) {
00669 m_output_vio->nbytes = m_write_vio.ndone;
00670 m_output_vio->reenable();
00671 return 0;
00672 }
00673
00674 towrite = m_write_vio.ntodo();
00675 if (towrite > 0) {
00676 avail = m_write_vio.get_reader()->read_avail();
00677 if (towrite > avail) {
00678 towrite = avail;
00679 }
00680
00681 if (towrite > 0) {
00682 Debug("transform", "[NullTransform::handle_event] " "writing %" PRId64" bytes to output", towrite);
00683 m_output_buf->write(m_write_vio.get_reader(), towrite);
00684
00685 m_write_vio.get_reader()->consume(towrite);
00686 m_write_vio.ndone += towrite;
00687 }
00688 }
00689
00690 if (m_write_vio.ntodo() > 0) {
00691 if (towrite > 0) {
00692 m_output_vio->reenable();
00693 m_write_vio._cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio);
00694 }
00695 } else {
00696 m_output_vio->nbytes = m_write_vio.ndone;
00697 m_output_vio->reenable();
00698 m_write_vio._cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &m_write_vio);
00699 }
00700
00701 break;
00702 }
00703 }
00704 }
00705
00706 return 0;
00707 }
00708
00709
00710
00711
00712
00713
00714
00715
00716
00717
00718
00719
00720
00721
00722
00723
00724
00725
00726 #ifdef TS_HAS_TESTS
00727 void
00728 TransformTest::run()
00729 {
00730 if (is_action_tag_set("transform_test")) {
00731 eventProcessor.schedule_imm(new TransformControl(), ET_NET);
00732 }
00733 }
00734 #endif
00735
00736
00737
00738
00739
00740
00741 RangeTransform::RangeTransform(ProxyMutex *mut, RangeRecord *ranges, int num_fields, HTTPHdr * transform_resp, const char * content_type, int content_type_len, int64_t content_length)
00742 : INKVConnInternal(NULL, reinterpret_cast<TSMutex>(mut)),
00743 m_output_buf(NULL),
00744 m_output_reader(NULL),
00745 m_transform_resp(transform_resp),
00746 m_output_vio(NULL),
00747 m_range_content_length(0),
00748 m_num_range_fields(num_fields),
00749 m_current_range(0), m_content_type(content_type), m_content_type_len(content_type_len), m_ranges(ranges), m_output_cl(content_length), m_done(0)
00750 {
00751 SET_HANDLER(&RangeTransform::handle_event);
00752
00753 m_num_chars_for_cl = num_chars_for_int(m_range_content_length);
00754 Debug("http_trans", "RangeTransform creation finishes");
00755 }
00756
00757
00758
00759
00760
00761 RangeTransform::~RangeTransform()
00762 {
00763 if (m_output_buf)
00764 free_MIOBuffer(m_output_buf);
00765 }
00766
00767
00768
00769
00770
00771 int
00772 RangeTransform::handle_event(int event, void *edata)
00773 {
00774 handle_event_count(event);
00775
00776 if (m_closed) {
00777 if (m_deletable) {
00778 Debug("http_trans", "RangeTransform destroy: %p ndone=%" PRId64, this, m_output_vio ? m_output_vio->ndone : 0);
00779 delete this;
00780 }
00781 } else {
00782 switch (event) {
00783 case VC_EVENT_ERROR:
00784 m_write_vio._cont->handleEvent(VC_EVENT_ERROR, &m_write_vio);
00785 break;
00786 case VC_EVENT_WRITE_COMPLETE:
00787 ink_assert(m_output_vio == (VIO *) edata);
00788 m_output_vc->do_io_shutdown(IO_SHUTDOWN_WRITE);
00789 break;
00790 case VC_EVENT_WRITE_READY:
00791 default:
00792 ink_assert(m_output_vc != NULL);
00793
00794 if (!m_output_vio) {
00795 m_output_buf = new_empty_MIOBuffer();
00796 m_output_reader = m_output_buf->alloc_reader();
00797 m_output_vio = m_output_vc->do_io_write(this, m_output_cl, m_output_reader);
00798
00799 change_response_header();
00800
00801 if (m_num_range_fields > 1) {
00802 add_boundary(false);
00803 add_sub_header(m_current_range);
00804 }
00805 }
00806
00807 MUTEX_TRY_LOCK(trylock, m_write_vio.mutex, this_ethread());
00808 if (!trylock) {
00809 retry(10);
00810 return 0;
00811 }
00812
00813 if (m_closed) {
00814 return 0;
00815 }
00816
00817 if (m_write_vio.op == VIO::NONE) {
00818 m_output_vio->nbytes = m_done;
00819 m_output_vio->reenable();
00820 return 0;
00821 }
00822
00823 transform_to_range();
00824 break;
00825 }
00826 }
00827
00828 return 0;
00829 }
00830
00831
00832
00833
00834
00835 void
00836 RangeTransform::transform_to_range()
00837 {
00838 IOBufferReader *reader = m_write_vio.get_reader();
00839 int64_t toskip, tosend, avail;
00840 const int64_t *end, *start;
00841 int64_t prev_end = 0;
00842 int64_t *done_byte;
00843
00844 end = &m_ranges[m_current_range]._end;
00845 done_byte = &m_ranges[m_current_range]._done_byte;
00846 start = &m_ranges[m_current_range]._start;
00847 avail = reader->read_avail();
00848
00849 while (true) {
00850 if (*done_byte < (*start - 1)) {
00851 toskip = *start - *done_byte - 1;
00852
00853 if (toskip > avail)
00854 toskip = avail;
00855
00856 if (toskip > 0) {
00857 reader->consume(toskip);
00858 *done_byte += toskip;
00859 avail = reader->read_avail();
00860 }
00861 }
00862
00863 if (avail > 0) {
00864 tosend = *end - *done_byte;
00865
00866 if (tosend > avail)
00867 tosend = avail;
00868
00869 m_output_buf->write(reader, tosend);
00870 reader->consume(tosend);
00871
00872 m_done += tosend;
00873 *done_byte += tosend;
00874 }
00875
00876 if (*done_byte == *end)
00877 prev_end = *end;
00878
00879
00880
00881 while (*done_byte == *end) {
00882 m_current_range++;
00883
00884 if (m_current_range == m_num_range_fields) {
00885 if (m_num_range_fields > 1) {
00886 m_done += m_output_buf->write("\r\n", 2);
00887 add_boundary(true);
00888 }
00889
00890 Debug("http_trans", "total bytes of Range response body is %" PRId64, m_done);
00891 m_output_vio->nbytes = m_done;
00892 m_output_vio->reenable();
00893
00894
00895
00896
00897 int cb_event = (m_write_vio.ntodo() > 0) ? VC_EVENT_EOS : VC_EVENT_WRITE_COMPLETE;
00898 m_write_vio._cont->handleEvent(cb_event, &m_write_vio);
00899 return;
00900 }
00901
00902 end = &m_ranges[m_current_range]._end;
00903 done_byte = &m_ranges[m_current_range]._done_byte;
00904 start = &m_ranges[m_current_range]._start;
00905
00906
00907 if (*end != -1) {
00908 m_done += m_output_buf->write("\r\n", 2);
00909 add_boundary(false);
00910 add_sub_header(m_current_range);
00911
00912
00913
00914
00915
00916
00917
00918 *done_byte = prev_end;
00919
00920
00921
00922 break;
00923 }
00924 }
00925
00926
00927 avail = reader->read_avail();
00928 if (avail == 0)
00929 break;
00930 }
00931
00932 m_output_vio->reenable();
00933 m_write_vio._cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio);
00934 }
00935
00936
00937
00938
00939
00940
00941
00942
00943
00944 static char bound[] = "RANGE_SEPARATOR";
00945 static char range_type[] = "multipart/byteranges; boundary=RANGE_SEPARATOR";
00946 static char cont_type[] = "Content-type: ";
00947 static char cont_range[] = "Content-range: bytes ";
00948
00949 void
00950 RangeTransform::add_boundary(bool end)
00951 {
00952 m_done += m_output_buf->write("--", 2);
00953 m_done += m_output_buf->write(bound, sizeof(bound) - 1);
00954
00955 if (end)
00956 m_done += m_output_buf->write("--", 2);
00957
00958 m_done += m_output_buf->write("\r\n", 2);
00959 }
00960
00961
00962
00963
00964
00965 #define RANGE_NUMBERS_LENGTH 60
00966
00967 void
00968 RangeTransform::add_sub_header(int index)
00969 {
00970
00971 char numbers[RANGE_NUMBERS_LENGTH];
00972 int len;
00973
00974 m_done += m_output_buf->write(cont_type, sizeof(cont_type) - 1);
00975 if (m_content_type)
00976 m_done += m_output_buf->write(m_content_type, m_content_type_len);
00977 m_done += m_output_buf->write("\r\n", 2);
00978 m_done += m_output_buf->write(cont_range, sizeof(cont_range) - 1);
00979
00980 snprintf(numbers, sizeof(numbers), "%" PRId64 "-%" PRId64 "/%" PRId64 "", m_ranges[index]._start, m_ranges[index]._end, m_output_cl);
00981 len = strlen(numbers);
00982 if (len < RANGE_NUMBERS_LENGTH)
00983 m_done += m_output_buf->write(numbers, len);
00984 m_done += m_output_buf->write("\r\n\r\n", 4);
00985 }
00986
00987
00988
00989
00990
00991
00992
00993
00994
00995 void
00996 RangeTransform::change_response_header()
00997 {
00998 MIMEField *field;
00999 char *reason_phrase;
01000 HTTPStatus status_code;
01001
01002 ink_release_assert(m_transform_resp);
01003
01004 status_code = HTTP_STATUS_PARTIAL_CONTENT;
01005 m_transform_resp->status_set(status_code);
01006 reason_phrase = (char *) (http_hdr_reason_lookup(status_code));
01007 m_transform_resp->reason_set(reason_phrase, strlen(reason_phrase));
01008
01009 if (m_num_range_fields > 1) {
01010
01011 field = m_transform_resp->field_find(MIME_FIELD_CONTENT_TYPE, MIME_LEN_CONTENT_TYPE);
01012
01013 if (field != NULL)
01014 m_transform_resp->field_delete(MIME_FIELD_CONTENT_TYPE, MIME_LEN_CONTENT_TYPE);
01015
01016
01017 field = m_transform_resp->field_create(MIME_FIELD_CONTENT_TYPE, MIME_LEN_CONTENT_TYPE);
01018 field->value_append(m_transform_resp->m_heap, m_transform_resp->m_mime, range_type, sizeof(range_type) - 1);
01019
01020 m_transform_resp->field_attach(field);
01021 } else {
01022 char numbers[RANGE_NUMBERS_LENGTH];
01023 m_transform_resp->field_delete(MIME_FIELD_CONTENT_RANGE, MIME_LEN_CONTENT_RANGE);
01024 field = m_transform_resp->field_create(MIME_FIELD_CONTENT_RANGE, MIME_LEN_CONTENT_RANGE);
01025 snprintf(numbers, sizeof(numbers), "bytes %" PRId64"-%" PRId64"/%" PRId64, m_ranges[0]._start, m_ranges[0]._end, m_output_cl);
01026 field->value_set(m_transform_resp->m_heap, m_transform_resp->m_mime, numbers, strlen(numbers));
01027 m_transform_resp->field_attach(field);
01028 }
01029 }
01030
01031 #undef RANGE_NUMBERS_LENGTH