00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 
00036 
00037 
00038 
00039 
00040 
00041 
00042 
00043 
00044 
00045 
00046 
00047 
00048 
00049 
00050 
00051 
00052 
00053 
00054 
00055 
00056 
00057 
00058 
00059 
00060 
00061 
00062 #include "ProxyConfig.h"
00063 #include "P_Net.h"
00064 #include "MimeTable.h"
00065 #include "TransformInternal.h"
00066 #include "HdrUtils.h"
00067 #include "Log.h"
00068 
00069 
00070 #define ART                   1
00071 #define AGIF                  2
00072 
00073 
00074 TransformProcessor transformProcessor;
00075 
00076 
00077 
00078 
00079 
00080 void
00081 TransformProcessor::start()
00082 {
00083 #ifdef PREFETCH
00084   prefetchProcessor.start();
00085 #endif
00086 }
00087 
00088 
00089 
00090 
00091 VConnection *
00092 TransformProcessor::open(Continuation *cont, APIHook *hooks)
00093 {
00094   if (hooks) {
00095     return new TransformVConnection(cont, hooks);
00096   } else {
00097     return NULL;
00098   }
00099 }
00100 
00101 
00102 
00103 
00104 INKVConnInternal *
00105 TransformProcessor::null_transform(ProxyMutex *mutex)
00106 {
00107   return new NullTransform(mutex);
00108 }
00109 
00110 
00111 
00112 
00113 
00114 INKVConnInternal *
00115 TransformProcessor::range_transform(ProxyMutex *mut, RangeRecord *ranges, int num_fields, HTTPHdr *transform_resp, const char * content_type, int content_type_len, int64_t content_length)
00116 {
00117   RangeTransform *range_transform = new RangeTransform(mut, ranges, num_fields, transform_resp, content_type, content_type_len, content_length);
00118   return range_transform;
00119 }
00120 
00121 
00122 
00123 
00124 
00125 TransformTerminus::TransformTerminus(TransformVConnection *tvc)
00126   : VConnection(tvc->mutex),
00127     m_tvc(tvc), m_read_vio(), m_write_vio(), m_event_count(0), m_deletable(0), m_closed(0), m_called_user(0)
00128 {
00129   SET_HANDLER(&TransformTerminus::handle_event);
00130 }
00131 
00132 
00133 #define RETRY() \
00134     if (ink_atomic_increment ((int*) &m_event_count, 1) < 0) { \
00135         ink_assert (!"not reached"); \
00136     } \
00137     eventProcessor.schedule_in (this, HRTIME_MSECONDS (10), ET_NET); \
00138     return 0;
00139 
00140 
00141 int
00142 TransformTerminus::handle_event(int event, void * )
00143 {
00144   int val;
00145 
00146   m_deletable = ((m_closed != 0) && (m_tvc->m_closed != 0));
00147 
00148   val = ink_atomic_increment((int *) &m_event_count, -1);
00149 
00150   Debug("transform", "[TransformTerminus::handle_event] event_count %d", m_event_count);
00151 
00152   if (val <= 0) {
00153     ink_assert(!"not reached");
00154   }
00155 
00156   m_deletable = m_deletable && (val == 1);
00157 
00158   if (m_closed != 0 && m_tvc->m_closed != 0) {
00159     if (m_deletable) {
00160       Debug("transform", "TransformVConnection destroy [0x%lx]", (long) m_tvc);
00161       delete m_tvc;
00162       return 0;
00163     }
00164   } else if (m_write_vio.op == VIO::WRITE) {
00165     if (m_read_vio.op == VIO::NONE) {
00166       if (!m_called_user) {
00167         Debug("transform", "TransformVConnection calling user: %d %d [0x%lx] [0x%lx]",
00168               m_event_count, event, (long) m_tvc, (long) m_tvc->m_cont);
00169 
00170         m_called_user = 1;
00171         
00172         
00173         m_tvc->m_cont->handleEvent(TRANSFORM_READ_READY, (void *)&m_write_vio.nbytes);
00174       }
00175     } else {
00176       int64_t towrite;
00177 
00178       MUTEX_TRY_LOCK(trylock1, m_write_vio.mutex, this_ethread());
00179       if (!trylock1) {
00180         RETRY();
00181       }
00182 
00183       MUTEX_TRY_LOCK(trylock2, m_read_vio.mutex, this_ethread());
00184       if (!trylock2) {
00185         RETRY();
00186       }
00187 
00188       if (m_closed != 0) {
00189         return 0;
00190       }
00191 
00192       if (m_write_vio.op == VIO::NONE) {
00193         return 0;
00194       }
00195 
00196       towrite = m_write_vio.ntodo();
00197       if (towrite > 0) {
00198         if (towrite > m_write_vio.get_reader()->read_avail()) {
00199           towrite = m_write_vio.get_reader()->read_avail();
00200         }
00201         if (towrite > m_read_vio.ntodo()) {
00202           towrite = m_read_vio.ntodo();
00203         }
00204 
00205         if (towrite > 0) {
00206           m_read_vio.get_writer()->write(m_write_vio.get_reader(), towrite);
00207           m_read_vio.ndone += towrite;
00208 
00209           m_write_vio.get_reader()->consume(towrite);
00210           m_write_vio.ndone += towrite;
00211         }
00212       }
00213 
00214       if (m_write_vio.ntodo() > 0) {
00215         if (towrite > 0) {
00216           m_write_vio._cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio);
00217         }
00218       } else {
00219         m_write_vio._cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &m_write_vio);
00220       }
00221 
00222       
00223       if (m_closed != 0 && m_tvc->m_closed != 0) {
00224         return 0;
00225       }
00226 
00227       if (m_read_vio.ntodo() > 0) {
00228         if (m_write_vio.ntodo() <= 0) {
00229           m_read_vio._cont->handleEvent(VC_EVENT_EOS, &m_read_vio);
00230         } else if (towrite > 0) {
00231           m_read_vio._cont->handleEvent(VC_EVENT_READ_READY, &m_read_vio);
00232         }
00233       } else {
00234         m_read_vio._cont->handleEvent(VC_EVENT_READ_COMPLETE, &m_read_vio);
00235       }
00236     }
00237   } else {
00238     MUTEX_TRY_LOCK(trylock2, m_read_vio.mutex, this_ethread());
00239     if (!trylock2) {
00240       RETRY();
00241     }
00242 
00243     if (m_closed != 0) {
00244       
00245       
00246       
00247       
00248       
00249       
00250       
00251       
00252       if (m_tvc->m_closed == 0) {
00253         int ev = (m_closed == TS_VC_CLOSE_ABORT) ? VC_EVENT_ERROR : VC_EVENT_EOS;
00254 
00255         if (!m_called_user) {
00256           m_called_user = 1;
00257           m_tvc->m_cont->handleEvent(ev, NULL);
00258         } else {
00259           ink_assert(m_read_vio._cont != NULL);
00260           m_read_vio._cont->handleEvent(ev, &m_read_vio);
00261         }
00262       }
00263 
00264       return 0;
00265     }
00266   }
00267 
00268   return 0;
00269 }
00270 
00271 
00272 
00273 
00274 VIO *
00275 TransformTerminus::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
00276 {
00277   m_read_vio.buffer.writer_for(buf);
00278   m_read_vio.op = VIO::READ;
00279   m_read_vio.set_continuation(c);
00280   m_read_vio.nbytes = nbytes;
00281   m_read_vio.ndone = 0;
00282   m_read_vio.vc_server = this;
00283 
00284   if (ink_atomic_increment((int *) &m_event_count, 1) < 0) {
00285     ink_assert(!"not reached");
00286   }
00287   Debug("transform", "[TransformTerminus::do_io_read] event_count %d", m_event_count);
00288 
00289   eventProcessor.schedule_imm(this, ET_NET);
00290 
00291   return &m_read_vio;
00292 }
00293 
00294 
00295 
00296 
00297 VIO *
00298 TransformTerminus::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner)
00299 {
00300   
00301   ink_assert(!owner);
00302   m_write_vio.buffer.reader_for(buf);
00303   m_write_vio.op = VIO::WRITE;
00304   m_write_vio.set_continuation(c);
00305   m_write_vio.nbytes = nbytes;
00306   m_write_vio.ndone = 0;
00307   m_write_vio.vc_server = this;
00308 
00309   if (ink_atomic_increment((int *) &m_event_count, 1) < 0) {
00310     ink_assert(!"not reached");
00311   }
00312   Debug("transform", "[TransformTerminus::do_io_write] event_count %d", m_event_count);
00313 
00314   eventProcessor.schedule_imm(this, ET_NET);
00315 
00316   return &m_write_vio;
00317 }
00318 
00319 
00320 
00321 
00322 void
00323 TransformTerminus::do_io_close(int error)
00324 {
00325   if (ink_atomic_increment((int *) &m_event_count, 1) < 0) {
00326     ink_assert(!"not reached");
00327   }
00328 
00329   INK_WRITE_MEMORY_BARRIER;
00330 
00331   if (error != -1) {
00332     lerrno = error;
00333     m_closed = TS_VC_CLOSE_ABORT;
00334   } else {
00335     m_closed = TS_VC_CLOSE_NORMAL;
00336   }
00337 
00338   m_read_vio.op = VIO::NONE;
00339   m_read_vio.buffer.clear();
00340 
00341   m_write_vio.op = VIO::NONE;
00342   m_write_vio.buffer.clear();
00343 
00344   eventProcessor.schedule_imm(this, ET_NET);
00345 }
00346 
00347 
00348 
00349 
00350 void
00351 TransformTerminus::do_io_shutdown(ShutdownHowTo_t howto)
00352 {
00353   if ((howto == IO_SHUTDOWN_READ) || (howto == IO_SHUTDOWN_READWRITE)) {
00354     m_read_vio.op = VIO::NONE;
00355     m_read_vio.buffer.clear();
00356   }
00357 
00358   if ((howto == IO_SHUTDOWN_WRITE) || (howto == IO_SHUTDOWN_READWRITE)) {
00359     m_write_vio.op = VIO::NONE;
00360     m_write_vio.buffer.clear();
00361   }
00362 }
00363 
00364 
00365 
00366 
00367 void
00368 TransformTerminus::reenable(VIO *vio)
00369 {
00370   ink_assert((vio == &m_read_vio) || (vio == &m_write_vio));
00371 
00372   if (m_event_count == 0) {
00373 
00374     if (ink_atomic_increment((int *) &m_event_count, 1) < 0) {
00375       ink_assert(!"not reached");
00376     }
00377     Debug("transform", "[TransformTerminus::reenable] event_count %d", m_event_count);
00378     eventProcessor.schedule_imm(this, ET_NET);
00379   } else {
00380     Debug("transform", "[TransformTerminus::reenable] skipping due to " "pending events");
00381   }
00382 }
00383 
00384 
00385 
00386 
00387 
00388 TransformVConnection::TransformVConnection(Continuation *cont, APIHook *hooks)
00389 :TransformVCChain(cont->mutex), m_cont(cont), m_terminus(this), m_closed(0)
00390 {
00391   INKVConnInternal *xform;
00392 
00393   SET_HANDLER(&TransformVConnection::handle_event);
00394 
00395   ink_assert(hooks != NULL);
00396 
00397   m_transform = hooks->m_cont;
00398   while (hooks->m_link.next) {
00399     xform = (INKVConnInternal *) hooks->m_cont;
00400     hooks = hooks->m_link.next;
00401     xform->do_io_transform(hooks->m_cont);
00402   }
00403   xform = (INKVConnInternal *) hooks->m_cont;
00404   xform->do_io_transform(&m_terminus);
00405 
00406   Debug("transform", "TransformVConnection create [0x%lx]", (long) this);
00407 }
00408 
00409 
00410 
00411 
00412 TransformVConnection::~TransformVConnection()
00413 {
00414   
00415   
00416   m_terminus.m_read_vio.set_continuation(NULL);
00417   m_terminus.m_write_vio.set_continuation(NULL);
00418   m_terminus.mutex = NULL;
00419   this->mutex = NULL;
00420 }
00421 
00422 
00423 
00424 
00425 int
00426 TransformVConnection::handle_event(int , void * )
00427 {
00428   ink_assert(!"not reached");
00429   return 0;
00430 }
00431 
00432 
00433 
00434 
00435 VIO *
00436 TransformVConnection::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
00437 {
00438   Debug("transform", "TransformVConnection do_io_read: 0x%lx [0x%lx]", (long) c, (long) this);
00439 
00440   return m_terminus.do_io_read(c, nbytes, buf);
00441 }
00442 
00443 
00444 
00445 
00446 VIO *
00447 TransformVConnection::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf,
00448                                   bool )
00449 {
00450   Debug("transform", "TransformVConnection do_io_write: 0x%lx [0x%lx]", (long) c, (long) this);
00451 
00452   return m_transform->do_io_write(c, nbytes, buf);
00453 }
00454 
00455 
00456 
00457 
00458 void
00459 TransformVConnection::do_io_close(int error)
00460 {
00461   Debug("transform", "TransformVConnection do_io_close: %d [0x%lx]", error, (long) this);
00462 
00463   if (error != -1) {
00464     m_closed = TS_VC_CLOSE_ABORT;
00465   } else {
00466     m_closed = TS_VC_CLOSE_NORMAL;
00467   }
00468 
00469   m_transform->do_io_close(error);
00470 }
00471 
00472 
00473 
00474 
00475 void
00476 TransformVConnection::do_io_shutdown(ShutdownHowTo_t howto)
00477 {
00478   ink_assert(howto == IO_SHUTDOWN_WRITE);
00479 
00480   Debug("transform", "TransformVConnection do_io_shutdown: %d [0x%lx]", howto, (long) this);
00481 
00482   m_transform->do_io_shutdown(howto);
00483 }
00484 
00485 
00486 
00487 
00488 void
00489 TransformVConnection::reenable(VIO * )
00490 {
00491   ink_assert(!"not reached");
00492 }
00493 
00494 
00495 
00496 
00497 uint64_t
00498 TransformVConnection::backlog(uint64_t limit)
00499 {
00500   uint64_t b = 0; 
00501   VConnection* raw_vc = m_transform;
00502   MIOBuffer* w;
00503   while (raw_vc && raw_vc != &m_terminus) {
00504     INKVConnInternal* vc = static_cast<INKVConnInternal*>(raw_vc);
00505     if (0 != (w = vc->m_read_vio.buffer.writer()))
00506       b += w->max_read_avail();
00507     if (b >= limit) return b;
00508     raw_vc = vc->m_output_vc;
00509   }
00510   if (0 != (w = m_terminus.m_read_vio.buffer.writer()))
00511     b += w->max_read_avail();
00512   if (b >= limit) return b;
00513 
00514   IOBufferReader* r = m_terminus.m_write_vio.get_reader();
00515   if (r)
00516     b += r->read_avail();
00517   return b;
00518 }
00519 
00520 
00521 
00522 
00523 TransformControl::TransformControl()
00524 :Continuation(new_ProxyMutex()), m_hooks(), m_tvc(NULL), m_read_buf(NULL), m_write_buf(NULL)
00525 {
00526   SET_HANDLER(&TransformControl::handle_event);
00527 
00528   m_hooks.append(transformProcessor.null_transform(new_ProxyMutex()));
00529 }
00530 
00531 
00532 
00533 
00534 int
00535 TransformControl::handle_event(int event, void * )
00536 {
00537   switch (event) {
00538   case EVENT_IMMEDIATE:
00539     {
00540       char *s, *e;
00541 
00542       ink_assert(m_tvc == NULL);
00543       if (http_global_hooks && http_global_hooks->get(TS_HTTP_RESPONSE_TRANSFORM_HOOK)) {
00544         m_tvc = transformProcessor.open(this, http_global_hooks->get(TS_HTTP_RESPONSE_TRANSFORM_HOOK));
00545       } else {
00546         m_tvc = transformProcessor.open(this, m_hooks.get());
00547       }
00548       ink_assert(m_tvc != NULL);
00549 
00550       m_write_buf = new_MIOBuffer();
00551       s = m_write_buf->end();
00552       e = m_write_buf->buf_end();
00553 
00554       memset(s, 'a', e - s);
00555       m_write_buf->fill(e - s);
00556 
00557       m_tvc->do_io_write(this, 4 * 1024, m_write_buf->alloc_reader());
00558       break;
00559     }
00560 
00561   case TRANSFORM_READ_READY:
00562     {
00563       MIOBuffer *buf = new_empty_MIOBuffer();
00564   
00565       m_read_buf = buf->alloc_reader();
00566       m_tvc->do_io_read(this, INT64_MAX, buf);
00567       break;
00568     }
00569 
00570   case VC_EVENT_READ_COMPLETE:
00571   case VC_EVENT_EOS:
00572     m_tvc->do_io_close();
00573 
00574     free_MIOBuffer(m_read_buf->mbuf);
00575     m_read_buf = NULL;
00576 
00577     free_MIOBuffer(m_write_buf);
00578     m_write_buf = NULL;
00579     break;
00580 
00581   case VC_EVENT_WRITE_COMPLETE:
00582     break;
00583 
00584   default:
00585     ink_assert(!"not reached");
00586     break;
00587   }
00588 
00589   return 0;
00590 }
00591 
00592 
00593 
00594 
00595 NullTransform::NullTransform(ProxyMutex *_mutex)
00596   : INKVConnInternal(NULL, reinterpret_cast<TSMutex>(_mutex)),
00597     m_output_buf(NULL), m_output_reader(NULL), m_output_vio(NULL)
00598 {
00599   SET_HANDLER(&NullTransform::handle_event);
00600 
00601   Debug("transform", "NullTransform create [0x%lx]", (long) this);
00602 }
00603 
00604 
00605 
00606 
00607 NullTransform::~NullTransform()
00608 {
00609   if (m_output_buf) {
00610     free_MIOBuffer(m_output_buf);
00611   }
00612 }
00613 
00614 
00615 
00616 
00617 int
00618 NullTransform::handle_event(int event, void *edata)
00619 {
00620   handle_event_count(event);
00621 
00622   Debug("transform", "[NullTransform::handle_event] event count %d", m_event_count);
00623 
00624   if (m_closed) {
00625     if (m_deletable) {
00626       Debug("transform", "NullTransform destroy: %" PRId64" [%p]", m_output_vio ? m_output_vio->ndone : 0, this);
00627       delete this;
00628     }
00629   } else {
00630     switch (event) {
00631     case VC_EVENT_ERROR:
00632       m_write_vio._cont->handleEvent(VC_EVENT_ERROR, &m_write_vio);
00633       break;
00634     case VC_EVENT_WRITE_COMPLETE:
00635       ink_assert(m_output_vio == (VIO *) edata);
00636 
00637       
00638       
00639       
00640       ink_assert(m_write_vio.ntodo() == 0);
00641 
00642       m_output_vc->do_io_shutdown(IO_SHUTDOWN_WRITE);
00643       break;
00644     case VC_EVENT_WRITE_READY:
00645     default:
00646       {
00647         int64_t towrite;
00648         int64_t avail;
00649 
00650         ink_assert(m_output_vc != NULL);
00651 
00652         if (!m_output_vio) {
00653           m_output_buf = new_empty_MIOBuffer();
00654           m_output_reader = m_output_buf->alloc_reader();
00655           m_output_vio = m_output_vc->do_io_write(this, m_write_vio.nbytes, m_output_reader);
00656         }
00657 
00658         MUTEX_TRY_LOCK(trylock, m_write_vio.mutex, this_ethread());
00659         if (!trylock) {
00660           retry(10);
00661           return 0;
00662         }
00663 
00664         if (m_closed) {
00665           return 0;
00666         }
00667 
00668         if (m_write_vio.op == VIO::NONE) {
00669           m_output_vio->nbytes = m_write_vio.ndone;
00670           m_output_vio->reenable();
00671           return 0;
00672         }
00673 
00674         towrite = m_write_vio.ntodo();
00675         if (towrite > 0) {
00676           avail = m_write_vio.get_reader()->read_avail();
00677           if (towrite > avail) {
00678             towrite = avail;
00679           }
00680 
00681           if (towrite > 0) {
00682             Debug("transform", "[NullTransform::handle_event] " "writing %" PRId64" bytes to output", towrite);
00683             m_output_buf->write(m_write_vio.get_reader(), towrite);
00684 
00685             m_write_vio.get_reader()->consume(towrite);
00686             m_write_vio.ndone += towrite;
00687           }
00688         }
00689 
00690         if (m_write_vio.ntodo() > 0) {
00691           if (towrite > 0) {
00692             m_output_vio->reenable();
00693             m_write_vio._cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio);
00694           }
00695         } else {
00696           m_output_vio->nbytes = m_write_vio.ndone;
00697           m_output_vio->reenable();
00698           m_write_vio._cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &m_write_vio);
00699         }
00700 
00701         break;
00702       }
00703     }
00704   }
00705 
00706   return 0;
00707 }
00708 
00709 
00710 
00711 
00712 
00713 
00714 
00715 
00716 
00717 
00718 
00719 
00720 
00721 
00722 
00723 
00724 
00725 
00726 #ifdef TS_HAS_TESTS
00727 void
00728 TransformTest::run()
00729 {
00730   if (is_action_tag_set("transform_test")) {
00731     eventProcessor.schedule_imm(new TransformControl(), ET_NET);
00732   }
00733 }
00734 #endif
00735 
00736 
00737 
00738 
00739 
00740 
00741 RangeTransform::RangeTransform(ProxyMutex *mut, RangeRecord *ranges, int num_fields, HTTPHdr * transform_resp, const char * content_type, int content_type_len, int64_t content_length)
00742   : INKVConnInternal(NULL, reinterpret_cast<TSMutex>(mut)),
00743   m_output_buf(NULL),
00744   m_output_reader(NULL),
00745   m_transform_resp(transform_resp),
00746   m_output_vio(NULL),
00747   m_range_content_length(0),
00748   m_num_range_fields(num_fields),
00749   m_current_range(0), m_content_type(content_type), m_content_type_len(content_type_len), m_ranges(ranges), m_output_cl(content_length), m_done(0)
00750 {
00751   SET_HANDLER(&RangeTransform::handle_event);
00752 
00753   m_num_chars_for_cl = num_chars_for_int(m_range_content_length);
00754   Debug("http_trans", "RangeTransform creation finishes");
00755 }
00756 
00757 
00758 
00759 
00760 
00761 RangeTransform::~RangeTransform()
00762 {
00763   if (m_output_buf)
00764     free_MIOBuffer(m_output_buf);
00765 }
00766 
00767 
00768 
00769 
00770 
00771 int
00772 RangeTransform::handle_event(int event, void *edata)
00773 {
00774   handle_event_count(event);
00775 
00776   if (m_closed) {
00777     if (m_deletable) {
00778       Debug("http_trans", "RangeTransform destroy: %p ndone=%" PRId64, this, m_output_vio ? m_output_vio->ndone : 0);
00779       delete this;
00780     }
00781   } else {
00782     switch (event) {
00783     case VC_EVENT_ERROR:
00784       m_write_vio._cont->handleEvent(VC_EVENT_ERROR, &m_write_vio);
00785       break;
00786     case VC_EVENT_WRITE_COMPLETE:
00787       ink_assert(m_output_vio == (VIO *) edata);
00788       m_output_vc->do_io_shutdown(IO_SHUTDOWN_WRITE);
00789       break;
00790     case VC_EVENT_WRITE_READY:
00791     default:
00792       ink_assert(m_output_vc != NULL);
00793 
00794       if (!m_output_vio) {
00795         m_output_buf = new_empty_MIOBuffer();
00796         m_output_reader = m_output_buf->alloc_reader();
00797         m_output_vio = m_output_vc->do_io_write(this, m_output_cl, m_output_reader);
00798 
00799         change_response_header();
00800 
00801         if (m_num_range_fields > 1) {
00802           add_boundary(false);
00803           add_sub_header(m_current_range);
00804         }
00805       }
00806 
00807       MUTEX_TRY_LOCK(trylock, m_write_vio.mutex, this_ethread());
00808       if (!trylock) {
00809         retry(10);
00810         return 0;
00811       }
00812 
00813       if (m_closed) {
00814         return 0;
00815       }
00816 
00817       if (m_write_vio.op == VIO::NONE) {
00818         m_output_vio->nbytes = m_done;
00819         m_output_vio->reenable();
00820         return 0;
00821       }
00822 
00823       transform_to_range();
00824       break;
00825     }
00826   }
00827 
00828   return 0;
00829 }
00830 
00831 
00832 
00833 
00834 
00835 void
00836 RangeTransform::transform_to_range()
00837 {
00838   IOBufferReader *reader = m_write_vio.get_reader();
00839   int64_t toskip, tosend, avail;
00840   const int64_t *end, *start;
00841   int64_t prev_end = 0;
00842   int64_t *done_byte;
00843 
00844   end = &m_ranges[m_current_range]._end;
00845   done_byte = &m_ranges[m_current_range]._done_byte;
00846   start = &m_ranges[m_current_range]._start;
00847   avail = reader->read_avail();
00848 
00849   while (true) {
00850     if (*done_byte < (*start - 1)) {
00851       toskip = *start - *done_byte - 1;
00852 
00853       if (toskip > avail)
00854         toskip = avail;
00855 
00856       if (toskip > 0) {
00857         reader->consume(toskip);
00858         *done_byte += toskip;
00859         avail = reader->read_avail();
00860       }
00861     }
00862 
00863     if (avail > 0) {
00864       tosend = *end - *done_byte;
00865 
00866       if (tosend > avail)
00867         tosend = avail;
00868 
00869       m_output_buf->write(reader, tosend);
00870       reader->consume(tosend);
00871 
00872       m_done += tosend;
00873       *done_byte += tosend;
00874     }
00875 
00876     if (*done_byte == *end)
00877       prev_end = *end;
00878 
00879     
00880     
00881     while (*done_byte == *end) {
00882       m_current_range++;
00883 
00884       if (m_current_range == m_num_range_fields) {
00885         if (m_num_range_fields > 1) {
00886           m_done += m_output_buf->write("\r\n", 2);
00887           add_boundary(true);
00888         }
00889 
00890         Debug("http_trans", "total bytes of Range response body is %" PRId64, m_done);
00891         m_output_vio->nbytes = m_done;
00892         m_output_vio->reenable();
00893 
00894         
00895         
00896         
00897         int cb_event = (m_write_vio.ntodo() > 0) ? VC_EVENT_EOS : VC_EVENT_WRITE_COMPLETE;
00898         m_write_vio._cont->handleEvent(cb_event, &m_write_vio);
00899         return;
00900       }
00901 
00902       end = &m_ranges[m_current_range]._end;
00903       done_byte = &m_ranges[m_current_range]._done_byte;
00904       start = &m_ranges[m_current_range]._start;
00905 
00906       
00907       if (*end != -1) {
00908         m_done += m_output_buf->write("\r\n", 2);
00909         add_boundary(false);
00910         add_sub_header(m_current_range);
00911 
00912         
00913         
00914         
00915         
00916         
00917         
00918         *done_byte = prev_end;
00919         
00920         
00921 
00922         break;
00923       }
00924     }
00925 
00926     
00927     avail = reader->read_avail();
00928     if (avail == 0)
00929       break;
00930   }
00931 
00932   m_output_vio->reenable();
00933   m_write_vio._cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio);
00934 }
00935 
00936 
00937 
00938 
00939 
00940 
00941 
00942 
00943 
00944 static char bound[] = "RANGE_SEPARATOR";
00945 static char range_type[] = "multipart/byteranges; boundary=RANGE_SEPARATOR";
00946 static char cont_type[] = "Content-type: ";
00947 static char cont_range[] = "Content-range: bytes ";
00948 
00949 void
00950 RangeTransform::add_boundary(bool end)
00951 {
00952   m_done += m_output_buf->write("--", 2);
00953   m_done += m_output_buf->write(bound, sizeof(bound) - 1);
00954 
00955   if (end)
00956     m_done += m_output_buf->write("--", 2);
00957 
00958   m_done += m_output_buf->write("\r\n", 2);
00959 }
00960 
00961 
00962 
00963 
00964 
00965 #define RANGE_NUMBERS_LENGTH 60
00966 
00967 void
00968 RangeTransform::add_sub_header(int index)
00969 {
00970   
00971   char numbers[RANGE_NUMBERS_LENGTH];
00972   int len;
00973 
00974   m_done += m_output_buf->write(cont_type, sizeof(cont_type) - 1);
00975   if (m_content_type)
00976     m_done += m_output_buf->write(m_content_type, m_content_type_len);
00977   m_done += m_output_buf->write("\r\n", 2);
00978   m_done += m_output_buf->write(cont_range, sizeof(cont_range) - 1);
00979 
00980   snprintf(numbers, sizeof(numbers), "%" PRId64 "-%" PRId64 "/%" PRId64 "", m_ranges[index]._start, m_ranges[index]._end, m_output_cl);
00981   len = strlen(numbers);
00982   if (len < RANGE_NUMBERS_LENGTH)
00983     m_done += m_output_buf->write(numbers, len);
00984   m_done += m_output_buf->write("\r\n\r\n", 4);
00985 }
00986 
00987 
00988 
00989 
00990 
00991 
00992 
00993 
00994 
00995 void
00996 RangeTransform::change_response_header()
00997 {
00998   MIMEField *field;
00999   char *reason_phrase;
01000   HTTPStatus status_code;
01001   
01002   ink_release_assert(m_transform_resp);
01003 
01004   status_code = HTTP_STATUS_PARTIAL_CONTENT;
01005   m_transform_resp->status_set(status_code);
01006   reason_phrase = (char *) (http_hdr_reason_lookup(status_code));
01007   m_transform_resp->reason_set(reason_phrase, strlen(reason_phrase));
01008 
01009   if (m_num_range_fields > 1) {
01010     
01011     field = m_transform_resp->field_find(MIME_FIELD_CONTENT_TYPE, MIME_LEN_CONTENT_TYPE);
01012 
01013     if (field != NULL)
01014       m_transform_resp->field_delete(MIME_FIELD_CONTENT_TYPE, MIME_LEN_CONTENT_TYPE);
01015 
01016 
01017     field = m_transform_resp->field_create(MIME_FIELD_CONTENT_TYPE, MIME_LEN_CONTENT_TYPE);
01018     field->value_append(m_transform_resp->m_heap, m_transform_resp->m_mime, range_type, sizeof(range_type) - 1);
01019 
01020     m_transform_resp->field_attach(field);
01021   } else {
01022     char numbers[RANGE_NUMBERS_LENGTH];
01023     m_transform_resp->field_delete(MIME_FIELD_CONTENT_RANGE, MIME_LEN_CONTENT_RANGE);
01024     field = m_transform_resp->field_create(MIME_FIELD_CONTENT_RANGE, MIME_LEN_CONTENT_RANGE);
01025     snprintf(numbers, sizeof(numbers), "bytes %" PRId64"-%" PRId64"/%" PRId64, m_ranges[0]._start, m_ranges[0]._end, m_output_cl);
01026     field->value_set(m_transform_resp->m_heap, m_transform_resp->m_mime, numbers, strlen(numbers));
01027     m_transform_resp->field_attach(field);
01028   }
01029 }
01030 
01031 #undef RANGE_NUMBERS_LENGTH