• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

Transform.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022 
00023   @section thoughts Transform thoughts
00024 
00025     - Must be able to handle a chain of transformations.
00026     - Any transformation in the chain may fail.
00027       Failure options:
00028         - abort the client (if transformed data already sent)
00029         - serve the client the untransformed document
00030         - remove the failing transformation from the chain and attempt the transformation again (difficult to do)
00031         - never send untransformed document to client if client would not understand it (e.g. a set top box)
00032     - Must be able to change response header fields up until the point that TRANSFORM_READ_READY is sent to the user.
00033 
00034   @section usage Transform usage
00035 
00036     -# transformProcessor.open (cont, hooks); - returns "tvc", a TransformVConnection if 'hooks != NULL'
00037     -# tvc->do_io_write (cont, nbytes, buffer1);
00038     -# cont->handleEvent (TRANSFORM_READ_READY, NULL);
00039     -# tvc->do_io_read (cont, nbytes, buffer2);
00040     -# tvc->do_io_close ();
00041 
00042   @section visualization Transform visualization
00043 
00044   @verbatim
00045          +----+     +----+     +----+     +----+
00046     -IB->| T1 |-B1->| T2 |-B2->| T3 |-B3->| T4 |-OB->
00047          +----+     +----+     +----+     +----+
00048   @endverbatim
00049 
00050   Data flows into the first transform in the form of the buffer
00051   passed to TransformVConnection::do_io_write (IB). Data flows
00052   out of the last transform in the form of the buffer passed to
00053   TransformVConnection::do_io_read (OB). Between each transformation is
00054   another buffer (B1, B2 and B3).
00055 
00056   A transformation is a Continuation. The continuation is called with the
00057   event TRANSFORM_IO_WRITE to initialize the write and TRANSFORM_IO_READ
00058   to initialize the read.
00059 
00060 */
00061 
00062 #include "ProxyConfig.h"
00063 #include "P_Net.h"
00064 #include "MimeTable.h"
00065 #include "TransformInternal.h"
00066 #include "HdrUtils.h"
00067 #include "Log.h"
00068 
00069 
00070 #define ART                   1
00071 #define AGIF                  2
00072 
00073 
00074 TransformProcessor transformProcessor;
00075 
00076 
00077 /*-------------------------------------------------------------------------
00078   -------------------------------------------------------------------------*/
00079 
00080 void
00081 TransformProcessor::start()
00082 {
00083 #ifdef PREFETCH
00084   prefetchProcessor.start();
00085 #endif
00086 }
00087 
00088 /*-------------------------------------------------------------------------
00089   -------------------------------------------------------------------------*/
00090 
00091 VConnection *
00092 TransformProcessor::open(Continuation *cont, APIHook *hooks)
00093 {
00094   if (hooks) {
00095     return new TransformVConnection(cont, hooks);
00096   } else {
00097     return NULL;
00098   }
00099 }
00100 
00101 /*-------------------------------------------------------------------------
00102   -------------------------------------------------------------------------*/
00103 
00104 INKVConnInternal *
00105 TransformProcessor::null_transform(ProxyMutex *mutex)
00106 {
00107   return new NullTransform(mutex);
00108 }
00109 
00110 
00111 /*-------------------------------------------------------------------------
00112   -------------------------------------------------------------------------*/
00113 
00114 INKVConnInternal *
00115 TransformProcessor::range_transform(ProxyMutex *mut, RangeRecord *ranges, int num_fields, HTTPHdr *transform_resp, const char * content_type, int content_type_len, int64_t content_length)
00116 {
00117   RangeTransform *range_transform = new RangeTransform(mut, ranges, num_fields, transform_resp, content_type, content_type_len, content_length);
00118   return range_transform;
00119 }
00120 
00121 
00122 /*-------------------------------------------------------------------------
00123   -------------------------------------------------------------------------*/
00124 
00125 TransformTerminus::TransformTerminus(TransformVConnection *tvc)
00126   : VConnection(tvc->mutex),
00127     m_tvc(tvc), m_read_vio(), m_write_vio(), m_event_count(0), m_deletable(0), m_closed(0), m_called_user(0)
00128 {
00129   SET_HANDLER(&TransformTerminus::handle_event);
00130 }
00131 
00132 
00133 #define RETRY() \
00134     if (ink_atomic_increment ((int*) &m_event_count, 1) < 0) { \
00135         ink_assert (!"not reached"); \
00136     } \
00137     eventProcessor.schedule_in (this, HRTIME_MSECONDS (10), ET_NET); \
00138     return 0;
00139 
00140 
00141 int
00142 TransformTerminus::handle_event(int event, void * /* edata ATS_UNUSED */)
00143 {
00144   int val;
00145 
00146   m_deletable = ((m_closed != 0) && (m_tvc->m_closed != 0));
00147 
00148   val = ink_atomic_increment((int *) &m_event_count, -1);
00149 
00150   Debug("transform", "[TransformTerminus::handle_event] event_count %d", m_event_count);
00151 
00152   if (val <= 0) {
00153     ink_assert(!"not reached");
00154   }
00155 
00156   m_deletable = m_deletable && (val == 1);
00157 
00158   if (m_closed != 0 && m_tvc->m_closed != 0) {
00159     if (m_deletable) {
00160       Debug("transform", "TransformVConnection destroy [0x%lx]", (long) m_tvc);
00161       delete m_tvc;
00162       return 0;
00163     }
00164   } else if (m_write_vio.op == VIO::WRITE) {
00165     if (m_read_vio.op == VIO::NONE) {
00166       if (!m_called_user) {
00167         Debug("transform", "TransformVConnection calling user: %d %d [0x%lx] [0x%lx]",
00168               m_event_count, event, (long) m_tvc, (long) m_tvc->m_cont);
00169 
00170         m_called_user = 1;
00171         // It is our belief this is safe to pass a reference, i.e. its scope
00172         // and locking ought to be safe across the lifetime of the continuation.
00173         m_tvc->m_cont->handleEvent(TRANSFORM_READ_READY, (void *)&m_write_vio.nbytes);
00174       }
00175     } else {
00176       int64_t towrite;
00177 
00178       MUTEX_TRY_LOCK(trylock1, m_write_vio.mutex, this_ethread());
00179       if (!trylock1) {
00180         RETRY();
00181       }
00182 
00183       MUTEX_TRY_LOCK(trylock2, m_read_vio.mutex, this_ethread());
00184       if (!trylock2) {
00185         RETRY();
00186       }
00187 
00188       if (m_closed != 0) {
00189         return 0;
00190       }
00191 
00192       if (m_write_vio.op == VIO::NONE) {
00193         return 0;
00194       }
00195 
00196       towrite = m_write_vio.ntodo();
00197       if (towrite > 0) {
00198         if (towrite > m_write_vio.get_reader()->read_avail()) {
00199           towrite = m_write_vio.get_reader()->read_avail();
00200         }
00201         if (towrite > m_read_vio.ntodo()) {
00202           towrite = m_read_vio.ntodo();
00203         }
00204 
00205         if (towrite > 0) {
00206           m_read_vio.get_writer()->write(m_write_vio.get_reader(), towrite);
00207           m_read_vio.ndone += towrite;
00208 
00209           m_write_vio.get_reader()->consume(towrite);
00210           m_write_vio.ndone += towrite;
00211         }
00212       }
00213 
00214       if (m_write_vio.ntodo() > 0) {
00215         if (towrite > 0) {
00216           m_write_vio._cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio);
00217         }
00218       } else {
00219         m_write_vio._cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &m_write_vio);
00220       }
00221 
00222       // We could have closed on the write callback
00223       if (m_closed != 0 && m_tvc->m_closed != 0) {
00224         return 0;
00225       }
00226 
00227       if (m_read_vio.ntodo() > 0) {
00228         if (m_write_vio.ntodo() <= 0) {
00229           m_read_vio._cont->handleEvent(VC_EVENT_EOS, &m_read_vio);
00230         } else if (towrite > 0) {
00231           m_read_vio._cont->handleEvent(VC_EVENT_READ_READY, &m_read_vio);
00232         }
00233       } else {
00234         m_read_vio._cont->handleEvent(VC_EVENT_READ_COMPLETE, &m_read_vio);
00235       }
00236     }
00237   } else {
00238     MUTEX_TRY_LOCK(trylock2, m_read_vio.mutex, this_ethread());
00239     if (!trylock2) {
00240       RETRY();
00241     }
00242 
00243     if (m_closed != 0) {
00244       // The terminus was closed, but the enclosing transform
00245       // vconnection wasn't. If the terminus was aborted then we
00246       // call the read_vio cont back with VC_EVENT_ERROR. If it
00247       // was closed normally then we call it back with
00248       // VC_EVENT_EOS. If a read operation hasn't been initiated
00249       // yet and we haven't called the user back then we call
00250       // the user back instead of the read_vio cont (which won't
00251       // exist).
00252       if (m_tvc->m_closed == 0) {
00253         int ev = (m_closed == TS_VC_CLOSE_ABORT) ? VC_EVENT_ERROR : VC_EVENT_EOS;
00254 
00255         if (!m_called_user) {
00256           m_called_user = 1;
00257           m_tvc->m_cont->handleEvent(ev, NULL);
00258         } else {
00259           ink_assert(m_read_vio._cont != NULL);
00260           m_read_vio._cont->handleEvent(ev, &m_read_vio);
00261         }
00262       }
00263 
00264       return 0;
00265     }
00266   }
00267 
00268   return 0;
00269 }
00270 
00271 /*-------------------------------------------------------------------------
00272   -------------------------------------------------------------------------*/
00273 
00274 VIO *
00275 TransformTerminus::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
00276 {
00277   m_read_vio.buffer.writer_for(buf);
00278   m_read_vio.op = VIO::READ;
00279   m_read_vio.set_continuation(c);
00280   m_read_vio.nbytes = nbytes;
00281   m_read_vio.ndone = 0;
00282   m_read_vio.vc_server = this;
00283 
00284   if (ink_atomic_increment((int *) &m_event_count, 1) < 0) {
00285     ink_assert(!"not reached");
00286   }
00287   Debug("transform", "[TransformTerminus::do_io_read] event_count %d", m_event_count);
00288 
00289   eventProcessor.schedule_imm(this, ET_NET);
00290 
00291   return &m_read_vio;
00292 }
00293 
00294 /*-------------------------------------------------------------------------
00295   -------------------------------------------------------------------------*/
00296 
00297 VIO *
00298 TransformTerminus::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner)
00299 {
00300   // In the process of eliminating 'owner' mode so asserting against it
00301   ink_assert(!owner);
00302   m_write_vio.buffer.reader_for(buf);
00303   m_write_vio.op = VIO::WRITE;
00304   m_write_vio.set_continuation(c);
00305   m_write_vio.nbytes = nbytes;
00306   m_write_vio.ndone = 0;
00307   m_write_vio.vc_server = this;
00308 
00309   if (ink_atomic_increment((int *) &m_event_count, 1) < 0) {
00310     ink_assert(!"not reached");
00311   }
00312   Debug("transform", "[TransformTerminus::do_io_write] event_count %d", m_event_count);
00313 
00314   eventProcessor.schedule_imm(this, ET_NET);
00315 
00316   return &m_write_vio;
00317 }
00318 
00319 /*-------------------------------------------------------------------------
00320   -------------------------------------------------------------------------*/
00321 
00322 void
00323 TransformTerminus::do_io_close(int error)
00324 {
00325   if (ink_atomic_increment((int *) &m_event_count, 1) < 0) {
00326     ink_assert(!"not reached");
00327   }
00328 
00329   INK_WRITE_MEMORY_BARRIER;
00330 
00331   if (error != -1) {
00332     lerrno = error;
00333     m_closed = TS_VC_CLOSE_ABORT;
00334   } else {
00335     m_closed = TS_VC_CLOSE_NORMAL;
00336   }
00337 
00338   m_read_vio.op = VIO::NONE;
00339   m_read_vio.buffer.clear();
00340 
00341   m_write_vio.op = VIO::NONE;
00342   m_write_vio.buffer.clear();
00343 
00344   eventProcessor.schedule_imm(this, ET_NET);
00345 }
00346 
00347 /*-------------------------------------------------------------------------
00348   -------------------------------------------------------------------------*/
00349 
00350 void
00351 TransformTerminus::do_io_shutdown(ShutdownHowTo_t howto)
00352 {
00353   if ((howto == IO_SHUTDOWN_READ) || (howto == IO_SHUTDOWN_READWRITE)) {
00354     m_read_vio.op = VIO::NONE;
00355     m_read_vio.buffer.clear();
00356   }
00357 
00358   if ((howto == IO_SHUTDOWN_WRITE) || (howto == IO_SHUTDOWN_READWRITE)) {
00359     m_write_vio.op = VIO::NONE;
00360     m_write_vio.buffer.clear();
00361   }
00362 }
00363 
00364 /*-------------------------------------------------------------------------
00365   -------------------------------------------------------------------------*/
00366 
00367 void
00368 TransformTerminus::reenable(VIO *vio)
00369 {
00370   ink_assert((vio == &m_read_vio) || (vio == &m_write_vio));
00371 
00372   if (m_event_count == 0) {
00373 
00374     if (ink_atomic_increment((int *) &m_event_count, 1) < 0) {
00375       ink_assert(!"not reached");
00376     }
00377     Debug("transform", "[TransformTerminus::reenable] event_count %d", m_event_count);
00378     eventProcessor.schedule_imm(this, ET_NET);
00379   } else {
00380     Debug("transform", "[TransformTerminus::reenable] skipping due to " "pending events");
00381   }
00382 }
00383 
00384 
00385 /*-------------------------------------------------------------------------
00386   -------------------------------------------------------------------------*/
00387 
00388 TransformVConnection::TransformVConnection(Continuation *cont, APIHook *hooks)
00389 :TransformVCChain(cont->mutex), m_cont(cont), m_terminus(this), m_closed(0)
00390 {
00391   INKVConnInternal *xform;
00392 
00393   SET_HANDLER(&TransformVConnection::handle_event);
00394 
00395   ink_assert(hooks != NULL);
00396 
00397   m_transform = hooks->m_cont;
00398   while (hooks->m_link.next) {
00399     xform = (INKVConnInternal *) hooks->m_cont;
00400     hooks = hooks->m_link.next;
00401     xform->do_io_transform(hooks->m_cont);
00402   }
00403   xform = (INKVConnInternal *) hooks->m_cont;
00404   xform->do_io_transform(&m_terminus);
00405 
00406   Debug("transform", "TransformVConnection create [0x%lx]", (long) this);
00407 }
00408 
00409 /*-------------------------------------------------------------------------
00410   -------------------------------------------------------------------------*/
00411 
00412 TransformVConnection::~TransformVConnection()
00413 {
00414   // Clear the continuations in terminus VConnections so that
00415   //  mutex's get released (INKqa05596)
00416   m_terminus.m_read_vio.set_continuation(NULL);
00417   m_terminus.m_write_vio.set_continuation(NULL);
00418   m_terminus.mutex = NULL;
00419   this->mutex = NULL;
00420 }
00421 
00422 /*-------------------------------------------------------------------------
00423   -------------------------------------------------------------------------*/
00424 
00425 int
00426 TransformVConnection::handle_event(int /* event ATS_UNUSED */, void * /* edata ATS_UNUSED */)
00427 {
00428   ink_assert(!"not reached");
00429   return 0;
00430 }
00431 
00432 /*-------------------------------------------------------------------------
00433   -------------------------------------------------------------------------*/
00434 
00435 VIO *
00436 TransformVConnection::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf)
00437 {
00438   Debug("transform", "TransformVConnection do_io_read: 0x%lx [0x%lx]", (long) c, (long) this);
00439 
00440   return m_terminus.do_io_read(c, nbytes, buf);
00441 }
00442 
00443 /*-------------------------------------------------------------------------
00444   -------------------------------------------------------------------------*/
00445 
00446 VIO *
00447 TransformVConnection::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf,
00448                                   bool /* owner ATS_UNUSED */)
00449 {
00450   Debug("transform", "TransformVConnection do_io_write: 0x%lx [0x%lx]", (long) c, (long) this);
00451 
00452   return m_transform->do_io_write(c, nbytes, buf);
00453 }
00454 
00455 /*-------------------------------------------------------------------------
00456   -------------------------------------------------------------------------*/
00457 
00458 void
00459 TransformVConnection::do_io_close(int error)
00460 {
00461   Debug("transform", "TransformVConnection do_io_close: %d [0x%lx]", error, (long) this);
00462 
00463   if (error != -1) {
00464     m_closed = TS_VC_CLOSE_ABORT;
00465   } else {
00466     m_closed = TS_VC_CLOSE_NORMAL;
00467   }
00468 
00469   m_transform->do_io_close(error);
00470 }
00471 
00472 /*-------------------------------------------------------------------------
00473   -------------------------------------------------------------------------*/
00474 
00475 void
00476 TransformVConnection::do_io_shutdown(ShutdownHowTo_t howto)
00477 {
00478   ink_assert(howto == IO_SHUTDOWN_WRITE);
00479 
00480   Debug("transform", "TransformVConnection do_io_shutdown: %d [0x%lx]", howto, (long) this);
00481 
00482   m_transform->do_io_shutdown(howto);
00483 }
00484 
00485 /*-------------------------------------------------------------------------
00486   -------------------------------------------------------------------------*/
00487 
00488 void
00489 TransformVConnection::reenable(VIO * /* vio ATS_UNUSED */)
00490 {
00491   ink_assert(!"not reached");
00492 }
00493 
00494 /*-------------------------------------------------------------------------
00495   -------------------------------------------------------------------------*/
00496 
00497 uint64_t
00498 TransformVConnection::backlog(uint64_t limit)
00499 {
00500   uint64_t b = 0; // backlog
00501   VConnection* raw_vc = m_transform;
00502   MIOBuffer* w;
00503   while (raw_vc && raw_vc != &m_terminus) {
00504     INKVConnInternal* vc = static_cast<INKVConnInternal*>(raw_vc);
00505     if (0 != (w = vc->m_read_vio.buffer.writer()))
00506       b += w->max_read_avail();
00507     if (b >= limit) return b;
00508     raw_vc = vc->m_output_vc;
00509   }
00510   if (0 != (w = m_terminus.m_read_vio.buffer.writer()))
00511     b += w->max_read_avail();
00512   if (b >= limit) return b;
00513 
00514   IOBufferReader* r = m_terminus.m_write_vio.get_reader();
00515   if (r)
00516     b += r->read_avail();
00517   return b;
00518 }
00519 
00520 /*-------------------------------------------------------------------------
00521   -------------------------------------------------------------------------*/
00522 
00523 TransformControl::TransformControl()
00524 :Continuation(new_ProxyMutex()), m_hooks(), m_tvc(NULL), m_read_buf(NULL), m_write_buf(NULL)
00525 {
00526   SET_HANDLER(&TransformControl::handle_event);
00527 
00528   m_hooks.append(transformProcessor.null_transform(new_ProxyMutex()));
00529 }
00530 
00531 /*-------------------------------------------------------------------------
00532   -------------------------------------------------------------------------*/
00533 
00534 int
00535 TransformControl::handle_event(int event, void * /* edata ATS_UNUSED */)
00536 {
00537   switch (event) {
00538   case EVENT_IMMEDIATE:
00539     {
00540       char *s, *e;
00541 
00542       ink_assert(m_tvc == NULL);
00543       if (http_global_hooks && http_global_hooks->get(TS_HTTP_RESPONSE_TRANSFORM_HOOK)) {
00544         m_tvc = transformProcessor.open(this, http_global_hooks->get(TS_HTTP_RESPONSE_TRANSFORM_HOOK));
00545       } else {
00546         m_tvc = transformProcessor.open(this, m_hooks.get());
00547       }
00548       ink_assert(m_tvc != NULL);
00549 
00550       m_write_buf = new_MIOBuffer();
00551       s = m_write_buf->end();
00552       e = m_write_buf->buf_end();
00553 
00554       memset(s, 'a', e - s);
00555       m_write_buf->fill(e - s);
00556 
00557       m_tvc->do_io_write(this, 4 * 1024, m_write_buf->alloc_reader());
00558       break;
00559     }
00560 
00561   case TRANSFORM_READ_READY:
00562     {
00563       MIOBuffer *buf = new_empty_MIOBuffer();
00564   
00565       m_read_buf = buf->alloc_reader();
00566       m_tvc->do_io_read(this, INT64_MAX, buf);
00567       break;
00568     }
00569 
00570   case VC_EVENT_READ_COMPLETE:
00571   case VC_EVENT_EOS:
00572     m_tvc->do_io_close();
00573 
00574     free_MIOBuffer(m_read_buf->mbuf);
00575     m_read_buf = NULL;
00576 
00577     free_MIOBuffer(m_write_buf);
00578     m_write_buf = NULL;
00579     break;
00580 
00581   case VC_EVENT_WRITE_COMPLETE:
00582     break;
00583 
00584   default:
00585     ink_assert(!"not reached");
00586     break;
00587   }
00588 
00589   return 0;
00590 }
00591 
00592 /*-------------------------------------------------------------------------
00593   -------------------------------------------------------------------------*/
00594 
00595 NullTransform::NullTransform(ProxyMutex *_mutex)
00596   : INKVConnInternal(NULL, reinterpret_cast<TSMutex>(_mutex)),
00597     m_output_buf(NULL), m_output_reader(NULL), m_output_vio(NULL)
00598 {
00599   SET_HANDLER(&NullTransform::handle_event);
00600 
00601   Debug("transform", "NullTransform create [0x%lx]", (long) this);
00602 }
00603 
00604 /*-------------------------------------------------------------------------
00605   -------------------------------------------------------------------------*/
00606 
00607 NullTransform::~NullTransform()
00608 {
00609   if (m_output_buf) {
00610     free_MIOBuffer(m_output_buf);
00611   }
00612 }
00613 
00614 /*-------------------------------------------------------------------------
00615   -------------------------------------------------------------------------*/
00616 
00617 int
00618 NullTransform::handle_event(int event, void *edata)
00619 {
00620   handle_event_count(event);
00621 
00622   Debug("transform", "[NullTransform::handle_event] event count %d", m_event_count);
00623 
00624   if (m_closed) {
00625     if (m_deletable) {
00626       Debug("transform", "NullTransform destroy: %" PRId64" [%p]", m_output_vio ? m_output_vio->ndone : 0, this);
00627       delete this;
00628     }
00629   } else {
00630     switch (event) {
00631     case VC_EVENT_ERROR:
00632       m_write_vio._cont->handleEvent(VC_EVENT_ERROR, &m_write_vio);
00633       break;
00634     case VC_EVENT_WRITE_COMPLETE:
00635       ink_assert(m_output_vio == (VIO *) edata);
00636 
00637       // The write to the output vconnection completed. This
00638       // could only be the case if the data being fed into us
00639       // has also completed.
00640       ink_assert(m_write_vio.ntodo() == 0);
00641 
00642       m_output_vc->do_io_shutdown(IO_SHUTDOWN_WRITE);
00643       break;
00644     case VC_EVENT_WRITE_READY:
00645     default:
00646       {
00647         int64_t towrite;
00648         int64_t avail;
00649 
00650         ink_assert(m_output_vc != NULL);
00651 
00652         if (!m_output_vio) {
00653           m_output_buf = new_empty_MIOBuffer();
00654           m_output_reader = m_output_buf->alloc_reader();
00655           m_output_vio = m_output_vc->do_io_write(this, m_write_vio.nbytes, m_output_reader);
00656         }
00657 
00658         MUTEX_TRY_LOCK(trylock, m_write_vio.mutex, this_ethread());
00659         if (!trylock) {
00660           retry(10);
00661           return 0;
00662         }
00663 
00664         if (m_closed) {
00665           return 0;
00666         }
00667 
00668         if (m_write_vio.op == VIO::NONE) {
00669           m_output_vio->nbytes = m_write_vio.ndone;
00670           m_output_vio->reenable();
00671           return 0;
00672         }
00673 
00674         towrite = m_write_vio.ntodo();
00675         if (towrite > 0) {
00676           avail = m_write_vio.get_reader()->read_avail();
00677           if (towrite > avail) {
00678             towrite = avail;
00679           }
00680 
00681           if (towrite > 0) {
00682             Debug("transform", "[NullTransform::handle_event] " "writing %" PRId64" bytes to output", towrite);
00683             m_output_buf->write(m_write_vio.get_reader(), towrite);
00684 
00685             m_write_vio.get_reader()->consume(towrite);
00686             m_write_vio.ndone += towrite;
00687           }
00688         }
00689 
00690         if (m_write_vio.ntodo() > 0) {
00691           if (towrite > 0) {
00692             m_output_vio->reenable();
00693             m_write_vio._cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio);
00694           }
00695         } else {
00696           m_output_vio->nbytes = m_write_vio.ndone;
00697           m_output_vio->reenable();
00698           m_write_vio._cont->handleEvent(VC_EVENT_WRITE_COMPLETE, &m_write_vio);
00699         }
00700 
00701         break;
00702       }
00703     }
00704   }
00705 
00706   return 0;
00707 }
00708 
00709 
00710 /*-------------------------------------------------------------------------
00711   Reasons the JG transform cannot currently be a plugin:
00712     a) Uses the config system
00713        - Easily avoided by using the plugin.config file to pass the config
00714          values as parameters to the plugin initialization routine.
00715     b) Uses the stat system
00716        - FIXME: should probably solve this.
00717   -------------------------------------------------------------------------*/
00718 
00719 /* the JG transform is now a plugin. All the JG code,
00720    config variables and stats are removed from Transform.cc */
00721 
00722 
00723 /*-------------------------------------------------------------------------
00724   -------------------------------------------------------------------------*/
00725 
00726 #ifdef TS_HAS_TESTS
00727 void
00728 TransformTest::run()
00729 {
00730   if (is_action_tag_set("transform_test")) {
00731     eventProcessor.schedule_imm(new TransformControl(), ET_NET);
00732   }
00733 }
00734 #endif
00735 
00736 
00737 
00738 /*-------------------------------------------------------------------------
00739   -------------------------------------------------------------------------*/
00740 
00741 RangeTransform::RangeTransform(ProxyMutex *mut, RangeRecord *ranges, int num_fields, HTTPHdr * transform_resp, const char * content_type, int content_type_len, int64_t content_length)
00742   : INKVConnInternal(NULL, reinterpret_cast<TSMutex>(mut)),
00743   m_output_buf(NULL),
00744   m_output_reader(NULL),
00745   m_transform_resp(transform_resp),
00746   m_output_vio(NULL),
00747   m_range_content_length(0),
00748   m_num_range_fields(num_fields),
00749   m_current_range(0), m_content_type(content_type), m_content_type_len(content_type_len), m_ranges(ranges), m_output_cl(content_length), m_done(0)
00750 {
00751   SET_HANDLER(&RangeTransform::handle_event);
00752 
00753   m_num_chars_for_cl = num_chars_for_int(m_range_content_length);
00754   Debug("http_trans", "RangeTransform creation finishes");
00755 }
00756 
00757 
00758 /*-------------------------------------------------------------------------
00759   -------------------------------------------------------------------------*/
00760 
00761 RangeTransform::~RangeTransform()
00762 {
00763   if (m_output_buf)
00764     free_MIOBuffer(m_output_buf);
00765 }
00766 
00767 
00768 /*-------------------------------------------------------------------------
00769   -------------------------------------------------------------------------*/
00770 
00771 int
00772 RangeTransform::handle_event(int event, void *edata)
00773 {
00774   handle_event_count(event);
00775 
00776   if (m_closed) {
00777     if (m_deletable) {
00778       Debug("http_trans", "RangeTransform destroy: %p ndone=%" PRId64, this, m_output_vio ? m_output_vio->ndone : 0);
00779       delete this;
00780     }
00781   } else {
00782     switch (event) {
00783     case VC_EVENT_ERROR:
00784       m_write_vio._cont->handleEvent(VC_EVENT_ERROR, &m_write_vio);
00785       break;
00786     case VC_EVENT_WRITE_COMPLETE:
00787       ink_assert(m_output_vio == (VIO *) edata);
00788       m_output_vc->do_io_shutdown(IO_SHUTDOWN_WRITE);
00789       break;
00790     case VC_EVENT_WRITE_READY:
00791     default:
00792       ink_assert(m_output_vc != NULL);
00793 
00794       if (!m_output_vio) {
00795         m_output_buf = new_empty_MIOBuffer();
00796         m_output_reader = m_output_buf->alloc_reader();
00797         m_output_vio = m_output_vc->do_io_write(this, m_output_cl, m_output_reader);
00798 
00799         change_response_header();
00800 
00801         if (m_num_range_fields > 1) {
00802           add_boundary(false);
00803           add_sub_header(m_current_range);
00804         }
00805       }
00806 
00807       MUTEX_TRY_LOCK(trylock, m_write_vio.mutex, this_ethread());
00808       if (!trylock) {
00809         retry(10);
00810         return 0;
00811       }
00812 
00813       if (m_closed) {
00814         return 0;
00815       }
00816 
00817       if (m_write_vio.op == VIO::NONE) {
00818         m_output_vio->nbytes = m_done;
00819         m_output_vio->reenable();
00820         return 0;
00821       }
00822 
00823       transform_to_range();
00824       break;
00825     }
00826   }
00827 
00828   return 0;
00829 }
00830 
00831 
00832 /*-------------------------------------------------------------------------
00833   -------------------------------------------------------------------------*/
00834 
00835 void
00836 RangeTransform::transform_to_range()
00837 {
00838   IOBufferReader *reader = m_write_vio.get_reader();
00839   int64_t toskip, tosend, avail;
00840   const int64_t *end, *start;
00841   int64_t prev_end = 0;
00842   int64_t *done_byte;
00843 
00844   end = &m_ranges[m_current_range]._end;
00845   done_byte = &m_ranges[m_current_range]._done_byte;
00846   start = &m_ranges[m_current_range]._start;
00847   avail = reader->read_avail();
00848 
00849   while (true) {
00850     if (*done_byte < (*start - 1)) {
00851       toskip = *start - *done_byte - 1;
00852 
00853       if (toskip > avail)
00854         toskip = avail;
00855 
00856       if (toskip > 0) {
00857         reader->consume(toskip);
00858         *done_byte += toskip;
00859         avail = reader->read_avail();
00860       }
00861     }
00862 
00863     if (avail > 0) {
00864       tosend = *end - *done_byte;
00865 
00866       if (tosend > avail)
00867         tosend = avail;
00868 
00869       m_output_buf->write(reader, tosend);
00870       reader->consume(tosend);
00871 
00872       m_done += tosend;
00873       *done_byte += tosend;
00874     }
00875 
00876     if (*done_byte == *end)
00877       prev_end = *end;
00878 
00879     // move to next Range if done one
00880     // ignore bad Range: _done_byte -1, _end -1
00881     while (*done_byte == *end) {
00882       m_current_range++;
00883 
00884       if (m_current_range == m_num_range_fields) {
00885         if (m_num_range_fields > 1) {
00886           m_done += m_output_buf->write("\r\n", 2);
00887           add_boundary(true);
00888         }
00889 
00890         Debug("http_trans", "total bytes of Range response body is %" PRId64, m_done);
00891         m_output_vio->nbytes = m_done;
00892         m_output_vio->reenable();
00893 
00894         // if we are detaching before processing all the
00895         //   input data, send VC_EVENT_EOS to let the upstream know
00896         //   that it can rely on us consuming any more data
00897         int cb_event = (m_write_vio.ntodo() > 0) ? VC_EVENT_EOS : VC_EVENT_WRITE_COMPLETE;
00898         m_write_vio._cont->handleEvent(cb_event, &m_write_vio);
00899         return;
00900       }
00901 
00902       end = &m_ranges[m_current_range]._end;
00903       done_byte = &m_ranges[m_current_range]._done_byte;
00904       start = &m_ranges[m_current_range]._start;
00905 
00906       // if this is a good Range
00907       if (*end != -1) {
00908         m_done += m_output_buf->write("\r\n", 2);
00909         add_boundary(false);
00910         add_sub_header(m_current_range);
00911 
00912         // keep this part for future support of out-of-order Range
00913         // if this is NOT a sequential Range compared to the previous one -
00914         // start of current Range is larger than the end of last Range, do
00915         // not need to go back to the start of the IOBuffereReader.
00916         // Otherwise, reset the IOBufferReader.
00917         //if ( *start > prev_end )
00918         *done_byte = prev_end;
00919         //else
00920         //  reader->reset();
00921 
00922         break;
00923       }
00924     }
00925 
00926     // When we need to read and there is nothing available
00927     avail = reader->read_avail();
00928     if (avail == 0)
00929       break;
00930   }
00931 
00932   m_output_vio->reenable();
00933   m_write_vio._cont->handleEvent(VC_EVENT_WRITE_READY, &m_write_vio);
00934 }
00935 
00936 
00937 /*-------------------------------------------------------------------------
00938   -------------------------------------------------------------------------*/
00939 
00940 /*
00941  * these two need be changed at the same time
00942  */
00943 
00944 static char bound[] = "RANGE_SEPARATOR";
00945 static char range_type[] = "multipart/byteranges; boundary=RANGE_SEPARATOR";
00946 static char cont_type[] = "Content-type: ";
00947 static char cont_range[] = "Content-range: bytes ";
00948 
00949 void
00950 RangeTransform::add_boundary(bool end)
00951 {
00952   m_done += m_output_buf->write("--", 2);
00953   m_done += m_output_buf->write(bound, sizeof(bound) - 1);
00954 
00955   if (end)
00956     m_done += m_output_buf->write("--", 2);
00957 
00958   m_done += m_output_buf->write("\r\n", 2);
00959 }
00960 
00961 
00962 /*-------------------------------------------------------------------------
00963   -------------------------------------------------------------------------*/
00964 
00965 #define RANGE_NUMBERS_LENGTH 60
00966 
00967 void
00968 RangeTransform::add_sub_header(int index)
00969 {
00970   // this should be large enough to hold three integers!
00971   char numbers[RANGE_NUMBERS_LENGTH];
00972   int len;
00973 
00974   m_done += m_output_buf->write(cont_type, sizeof(cont_type) - 1);
00975   if (m_content_type)
00976     m_done += m_output_buf->write(m_content_type, m_content_type_len);
00977   m_done += m_output_buf->write("\r\n", 2);
00978   m_done += m_output_buf->write(cont_range, sizeof(cont_range) - 1);
00979 
00980   snprintf(numbers, sizeof(numbers), "%" PRId64 "-%" PRId64 "/%" PRId64 "", m_ranges[index]._start, m_ranges[index]._end, m_output_cl);
00981   len = strlen(numbers);
00982   if (len < RANGE_NUMBERS_LENGTH)
00983     m_done += m_output_buf->write(numbers, len);
00984   m_done += m_output_buf->write("\r\n\r\n", 4);
00985 }
00986 
00987 /*-------------------------------------------------------------------------
00988   -------------------------------------------------------------------------*/
00989 
00990 /*
00991  * this function changes the response header to reflect this is
00992  * a Range response.
00993  */
00994 
00995 void
00996 RangeTransform::change_response_header()
00997 {
00998   MIMEField *field;
00999   char *reason_phrase;
01000   HTTPStatus status_code;
01001   
01002   ink_release_assert(m_transform_resp);
01003 
01004   status_code = HTTP_STATUS_PARTIAL_CONTENT;
01005   m_transform_resp->status_set(status_code);
01006   reason_phrase = (char *) (http_hdr_reason_lookup(status_code));
01007   m_transform_resp->reason_set(reason_phrase, strlen(reason_phrase));
01008 
01009   if (m_num_range_fields > 1) {
01010     // set the right Content-Type for multiple entry Range
01011     field = m_transform_resp->field_find(MIME_FIELD_CONTENT_TYPE, MIME_LEN_CONTENT_TYPE);
01012 
01013     if (field != NULL)
01014       m_transform_resp->field_delete(MIME_FIELD_CONTENT_TYPE, MIME_LEN_CONTENT_TYPE);
01015 
01016 
01017     field = m_transform_resp->field_create(MIME_FIELD_CONTENT_TYPE, MIME_LEN_CONTENT_TYPE);
01018     field->value_append(m_transform_resp->m_heap, m_transform_resp->m_mime, range_type, sizeof(range_type) - 1);
01019 
01020     m_transform_resp->field_attach(field);
01021   } else {
01022     char numbers[RANGE_NUMBERS_LENGTH];
01023     m_transform_resp->field_delete(MIME_FIELD_CONTENT_RANGE, MIME_LEN_CONTENT_RANGE);
01024     field = m_transform_resp->field_create(MIME_FIELD_CONTENT_RANGE, MIME_LEN_CONTENT_RANGE);
01025     snprintf(numbers, sizeof(numbers), "bytes %" PRId64"-%" PRId64"/%" PRId64, m_ranges[0]._start, m_ranges[0]._end, m_output_cl);
01026     field->value_set(m_transform_resp->m_heap, m_transform_resp->m_mime, numbers, strlen(numbers));
01027     m_transform_resp->field_attach(field);
01028   }
01029 }
01030 
01031 #undef RANGE_NUMBERS_LENGTH

Generated by  doxygen 1.7.1