• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

OneWayTunnel.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 /****************************************************************************
00025 
00026    OneWayTunnel.cc
00027 
00028    A OneWayTunnel is a module that connects two virtual connections, a
00029    source vc and a target vc, and copies the data between source and target.
00030 
00031    This class used to be called HttpTunnelVC, but it doesn't seem to have
00032    anything to do with HTTP, so it has been renamed to OneWayTunnel.
00033  ****************************************************************************/
00034 
00035 #include "P_EventSystem.h"
00036 #include "I_OneWayTunnel.h"
00037 
00038 // #define TEST
00039 
00040 //////////////////////////////////////////////////////////////////////////////
00041 //
00042 //      OneWayTunnel::OneWayTunnel()
00043 //
00044 //////////////////////////////////////////////////////////////////////////////
00045 
00046 ClassAllocator<OneWayTunnel> OneWayTunnelAllocator("OneWayTunnelAllocator");
00047 
00048 inline void
00049 transfer_data(MIOBufferAccessor & in_buf, MIOBufferAccessor & out_buf)
00050 {
00051   ink_release_assert(!"Not Implemented.");
00052 
00053   int64_t n = in_buf.reader()->read_avail();
00054   int64_t o = out_buf.writer()->write_avail();
00055 
00056   if (n > o)
00057     n = o;
00058   if (!n)
00059     return;
00060   memcpy(in_buf.reader()->start(), out_buf.writer()->end(), n);
00061   in_buf.reader()->consume(n);
00062   out_buf.writer()->fill(n);
00063 }
00064 
00065 OneWayTunnel::OneWayTunnel():Continuation(0),
00066 vioSource(0), vioTarget(0), cont(0), manipulate_fn(0),
00067 n_connections(0), lerrno(0), single_buffer(0),
00068 close_source(0), close_target(0), tunnel_till_done(0), tunnel_peer(0), free_vcs(true)
00069 {
00070 }
00071 
00072 OneWayTunnel *
00073 OneWayTunnel::OneWayTunnel_alloc()
00074 {
00075   return OneWayTunnelAllocator.alloc();
00076 }
00077 
00078 void
00079 OneWayTunnel::OneWayTunnel_free(OneWayTunnel * pOWT)
00080 {
00081 
00082   pOWT->mutex = NULL;
00083   OneWayTunnelAllocator.free(pOWT);
00084 }
00085 
00086 void
00087 OneWayTunnel::SetupTwoWayTunnel(OneWayTunnel * east, OneWayTunnel * west)
00088 {
00089   //make sure the both use the same mutex
00090   ink_assert(east->mutex == west->mutex);
00091 
00092   east->tunnel_peer = west;
00093   west->tunnel_peer = east;
00094 }
00095 
00096 OneWayTunnel::~OneWayTunnel()
00097 {
00098 }
00099 
00100 OneWayTunnel::OneWayTunnel(Continuation * aCont, Transform_fn aManipulate_fn, bool aclose_source, bool aclose_target)
00101 :
00102 Continuation(aCont
00103              ? (ProxyMutex *) aCont->mutex
00104              : new_ProxyMutex()),
00105 cont(aCont),
00106 manipulate_fn(aManipulate_fn),
00107 n_connections(2),
00108 lerrno(0),
00109 single_buffer(true), close_source(aclose_source), close_target(aclose_target), tunnel_till_done(false), free_vcs(false)
00110 {
00111   ink_assert(!"This form of OneWayTunnel() constructor not supported");
00112 }
00113 
00114 void
00115 OneWayTunnel::init(VConnection * vcSource,
00116                    VConnection * vcTarget,
00117                    Continuation * aCont,
00118                    int size_estimate,
00119                    ProxyMutex * aMutex,
00120                    int64_t nbytes,
00121                    bool asingle_buffer,
00122                    bool aclose_source, bool aclose_target, Transform_fn aManipulate_fn, int water_mark)
00123 {
00124   mutex = aCont ? (ProxyMutex *) aCont->mutex : (aMutex ? aMutex : new_ProxyMutex());
00125   cont = aMutex ? NULL : aCont;
00126   single_buffer = asingle_buffer;
00127   manipulate_fn = aManipulate_fn;
00128   n_connections = 2;
00129   close_source = aclose_source;
00130   close_target = aclose_target;
00131   lerrno = 0;
00132   tunnel_till_done = (nbytes == TUNNEL_TILL_DONE);
00133 
00134   SET_HANDLER(&OneWayTunnel::startEvent);
00135 
00136   int64_t size_index = 0;
00137 
00138   if (size_estimate)
00139     size_index = buffer_size_to_index(size_estimate);
00140   else
00141     size_index = default_large_iobuffer_size;
00142 
00143   Debug("one_way_tunnel", "buffer size index [%" PRId64"] [%d]\n", size_index, size_estimate);
00144 
00145   // enqueue read request on vcSource.
00146   MIOBuffer *buf1 = new_MIOBuffer(size_index);
00147   MIOBuffer *buf2 = NULL;
00148   if (single_buffer)
00149     buf2 = buf1;
00150   else
00151     buf2 = new_MIOBuffer(size_index);
00152 
00153   buf1->water_mark = water_mark;
00154 
00155   MUTEX_LOCK(lock, mutex, this_ethread());
00156   vioSource = vcSource->do_io_read(this, nbytes, buf1);
00157   vioTarget = vcTarget->do_io_write(this, nbytes, buf2->alloc_reader(), 0);
00158   ink_assert(vioSource && vioTarget);
00159 
00160   return;
00161 }
00162 
00163 void
00164 OneWayTunnel::init(VConnection * vcSource,
00165                    VConnection * vcTarget,
00166                    Continuation * aCont,
00167                    VIO * SourceVio, IOBufferReader * reader, bool aclose_source, bool aclose_target)
00168 {
00169   (void) vcSource;
00170   mutex = aCont ? (ProxyMutex *) aCont->mutex : new_ProxyMutex();
00171   cont = aCont;
00172   single_buffer = true;
00173   manipulate_fn = 0;
00174   n_connections = 2;
00175   close_source = aclose_source;
00176   close_target = aclose_target;
00177   tunnel_till_done = true;
00178 
00179   // Prior to constructing the OneWayTunnel, we initiated a do_io(VIO::READ)
00180   // on the source VC.  We wish to use the same MIO buffer in the tunnel.
00181 
00182   // do_io() read already posted on vcSource.
00183   SET_HANDLER(&OneWayTunnel::startEvent);
00184 
00185   SourceVio->set_continuation(this);
00186   MUTEX_LOCK(lock, mutex, this_ethread());
00187   vioSource = SourceVio;
00188 
00189   vioTarget = vcTarget->do_io_write(this, TUNNEL_TILL_DONE, reader, 0);
00190   ink_assert(vioSource && vioTarget);
00191 }
00192 
00193 void
00194 OneWayTunnel::init(Continuation * aCont, VIO * SourceVio, VIO * TargetVio, bool aclose_source, bool aclose_target)
00195 {
00196   mutex = aCont ? (ProxyMutex *) aCont->mutex : new_ProxyMutex();
00197   cont = aCont;
00198   single_buffer = true;
00199   manipulate_fn = 0;
00200   n_connections = 2;
00201   close_source = aclose_source;
00202   close_target = aclose_target;
00203   tunnel_till_done = true;
00204 
00205   // do_io_read() read already posted on vcSource.
00206   // do_io_write() already posted on vcTarget
00207   SET_HANDLER(&OneWayTunnel::startEvent);
00208 
00209   ink_assert(SourceVio && TargetVio);
00210 
00211   SourceVio->set_continuation(this);
00212   TargetVio->set_continuation(this);
00213   vioSource = SourceVio;
00214   vioTarget = TargetVio;
00215 }
00216 
00217 
00218 void
00219 OneWayTunnel::transform(MIOBufferAccessor & in_buf, MIOBufferAccessor & out_buf)
00220 {
00221   if (manipulate_fn)
00222     manipulate_fn(in_buf, out_buf);
00223   else if (in_buf.writer() != out_buf.writer())
00224     transfer_data(in_buf, out_buf);
00225 }
00226 
00227 //////////////////////////////////////////////////////////////////////////////
00228 //
00229 //      int OneWayTunnel::startEvent()
00230 //
00231 //////////////////////////////////////////////////////////////////////////////
00232 
00233 //
00234 // tunnel was invoked with an event
00235 //
00236 int
00237 OneWayTunnel::startEvent(int event, void *data)
00238 {
00239   VIO *vio = (VIO *) data;
00240   int ret = VC_EVENT_DONE;
00241   int result = 0;
00242 
00243 #ifdef TEST
00244   const char *event_origin = (vio == vioSource ? "source" : "target"), *event_name = get_vc_event_name(event);
00245   printf("OneWayTunnel --- %s received from %s VC\n", event_name, event_origin);
00246 #endif
00247 
00248   if (!vioTarget)
00249     goto Lerror;
00250 
00251   // handle the event
00252   //
00253   switch (event) {
00254 
00255   case ONE_WAY_TUNNEL_EVENT_PEER_CLOSE:
00256     /* This event is sent out by our peer */
00257     ink_assert(tunnel_peer);
00258     tunnel_peer = NULL;
00259     free_vcs = false;
00260     goto Ldone;
00261 
00262   case VC_EVENT_READ_READY:
00263     transform(vioSource->buffer, vioTarget->buffer);
00264     vioTarget->reenable();
00265     ret = VC_EVENT_CONT;
00266     break;
00267 
00268   case VC_EVENT_WRITE_READY:
00269     if (vioSource)
00270       vioSource->reenable();
00271     ret = VC_EVENT_CONT;
00272     break;
00273 
00274   case VC_EVENT_EOS:
00275     if (!tunnel_till_done && vio->ntodo())
00276       goto Lerror;
00277     if (vio == vioSource) {
00278       transform(vioSource->buffer, vioTarget->buffer);
00279       goto Lread_complete;
00280     } else
00281       goto Ldone;
00282 
00283   Lread_complete:
00284   case VC_EVENT_READ_COMPLETE:
00285     // set write nbytes to the current buffer size
00286     //
00287     vioTarget->nbytes = vioTarget->ndone + vioTarget->buffer.reader()->read_avail();
00288     if (vioTarget->nbytes == vioTarget->ndone)
00289       goto Ldone;
00290     vioTarget->reenable();
00291     if (!tunnel_peer)
00292       close_source_vio(0);
00293     break;
00294 
00295   Lerror:
00296   case VC_EVENT_ERROR:
00297     lerrno = ((VIO *) data)->vc_server->lerrno;
00298   case VC_EVENT_INACTIVITY_TIMEOUT:
00299   case VC_EVENT_ACTIVE_TIMEOUT:
00300     result = -1;
00301   Ldone:
00302   case VC_EVENT_WRITE_COMPLETE:
00303     if (tunnel_peer) {
00304       //inform the peer:
00305       tunnel_peer->startEvent(ONE_WAY_TUNNEL_EVENT_PEER_CLOSE, data);
00306     }
00307     close_source_vio(result);
00308     close_target_vio(result);
00309     connection_closed(result);
00310     break;
00311 
00312   default:
00313     ink_assert(!"bad case");
00314     ret = VC_EVENT_CONT;
00315     break;
00316   }
00317 #ifdef TEST
00318   printf("    (OneWayTunnel returning value: %s)\n", (ret == VC_EVENT_DONE ? "VC_EVENT_DONE" : "VC_EVENT_CONT"));
00319 #endif
00320   return ret;
00321 }
00322 
00323 // If result is Non-zero, the vc should be aborted.
00324 void
00325 OneWayTunnel::close_source_vio(int result)
00326 {
00327 
00328   if (vioSource) {
00329     if (last_connection() || !single_buffer) {
00330       free_MIOBuffer(vioSource->buffer.writer());
00331       vioSource->buffer.clear();
00332     }
00333     if (close_source && free_vcs) {
00334       vioSource->vc_server->do_io_close(result ? lerrno : -1);
00335     }
00336     vioSource = NULL;
00337     n_connections--;
00338   }
00339 }
00340 
00341 void
00342 OneWayTunnel::close_target_vio(int result, VIO * vio)
00343 {
00344 
00345   (void) vio;
00346   if (vioTarget) {
00347     if (last_connection() || !single_buffer) {
00348       free_MIOBuffer(vioTarget->buffer.writer());
00349       vioTarget->buffer.clear();
00350     }
00351     if (close_target && free_vcs) {
00352       vioTarget->vc_server->do_io_close(result ? lerrno : -1);
00353     }
00354     vioTarget = NULL;
00355     n_connections--;
00356   }
00357 }
00358 
00359 //////////////////////////////////////////////////////////////////////////////
00360 //
00361 //      void OneWayTunnel::connection_closed
00362 //
00363 //////////////////////////////////////////////////////////////////////////////
00364 void
00365 OneWayTunnel::connection_closed(int result)
00366 {
00367   if (cont) {
00368 #ifdef TEST
00369     cout << "OneWayTunnel::connection_closed() ... calling cont" << endl;
00370 #endif
00371     cont->handleEvent(result ? VC_EVENT_ERROR : VC_EVENT_EOS, cont);
00372   } else {
00373     OneWayTunnel_free(this);
00374   }
00375 }
00376 
00377 void
00378 OneWayTunnel::reenable_all()
00379 {
00380   if (vioSource)
00381     vioSource->reenable();
00382   if (vioTarget)
00383     vioTarget->reenable();
00384 }
00385 
00386 bool
00387 OneWayTunnel::last_connection()
00388 {
00389   return n_connections == 1;
00390 }

Generated by  doxygen 1.7.1