• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

OneWayMultiTunnel.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 /****************************************************************************
00025 
00026    OneWayMultiTunnel.h
00027  ****************************************************************************/
00028 
00029 #include "P_EventSystem.h"
00030 #include "I_OneWayMultiTunnel.h"
00031 
00032 // #define TEST
00033 
00034 //////////////////////////////////////////////////////////////////////////////
00035 //
00036 //      OneWayMultiTunnel::OneWayMultiTunnel()
00037 //
00038 //////////////////////////////////////////////////////////////////////////////
00039 
00040 ClassAllocator<OneWayMultiTunnel> OneWayMultiTunnelAllocator("OneWayMultiTunnelAllocator");
00041 
00042 OneWayMultiTunnel::OneWayMultiTunnel():
00043 OneWayTunnel(), n_vioTargets(0), source_read_previously_completed(false)
00044 {
00045 }
00046 
00047 OneWayMultiTunnel *
00048 OneWayMultiTunnel::OneWayMultiTunnel_alloc()
00049 {
00050   return OneWayMultiTunnelAllocator.alloc();
00051 }
00052 
00053 void
00054 OneWayMultiTunnel::OneWayMultiTunnel_free(OneWayMultiTunnel * pOWT)
00055 {
00056 
00057   pOWT->mutex = NULL;
00058   OneWayMultiTunnelAllocator.free(pOWT);
00059 }
00060 
00061 void
00062 OneWayMultiTunnel::init(VConnection * vcSource, VConnection ** vcTargets, int n_vcTargets, Continuation * aCont, int size_estimate, int64_t nbytes, bool asingle_buffer,    /* = true */
00063                         bool aclose_source,     /* = false */
00064                         bool aclose_targets,    /* = false */
00065                         Transform_fn aManipulate_fn, int water_mark)
00066 {
00067   mutex = aCont ? (ProxyMutex *) aCont->mutex : new_ProxyMutex();
00068   cont = aCont;
00069   manipulate_fn = aManipulate_fn;
00070   close_source = aclose_source;
00071   close_target = aclose_targets;
00072   source_read_previously_completed = false;
00073 
00074   SET_HANDLER(&OneWayMultiTunnel::startEvent);
00075 
00076   n_connections = n_vioTargets + 1;
00077 
00078   int64_t size_index = 0;
00079   if (size_estimate)
00080     size_index = buffer_size_to_index(size_estimate, default_large_iobuffer_size);
00081   else
00082     size_index = default_large_iobuffer_size;
00083 
00084   tunnel_till_done = (nbytes == TUNNEL_TILL_DONE);
00085 
00086   MIOBuffer *buf1 = new_MIOBuffer(size_index);
00087   MIOBuffer *buf2 = NULL;
00088 
00089   single_buffer = asingle_buffer;
00090 
00091   if (single_buffer)
00092     buf2 = buf1;
00093   else
00094     buf2 = new_MIOBuffer(size_index);
00095   topOutBuffer.writer_for(buf2);
00096 
00097   buf1->water_mark = water_mark;
00098 
00099   vioSource = vcSource->do_io(VIO::READ, this, nbytes, buf1, 0);
00100 
00101   ink_assert(n_vcTargets <= ONE_WAY_MULTI_TUNNEL_LIMIT);
00102   for (int i = 0; i < n_vcTargets; i++)
00103     vioTargets[i] = vcTargets[i]->do_io(VIO::WRITE, this, INT64_MAX, buf2, 0);
00104 
00105   return;
00106 }
00107 
00108 void
00109 OneWayMultiTunnel::init(Continuation * aCont,
00110                         VIO * SourceVio, VIO ** TargetVios, int n_TargetVios, bool aclose_source, bool aclose_targets)
00111 {
00112 
00113   mutex = aCont ? (ProxyMutex *) aCont->mutex : new_ProxyMutex();
00114   cont = aCont;
00115   single_buffer = true;
00116   manipulate_fn = 0;
00117   n_connections = n_TargetVios + 1;;
00118   close_source = aclose_source;
00119   close_target = aclose_targets;
00120   // The read on the source vio may have already been completed, yet
00121   // we still need to write data into the target buffers.  Note this
00122   // fact as we'll not get a VC_EVENT_READ_COMPLETE callback later.
00123   source_read_previously_completed = (SourceVio->ntodo() == 0);
00124   tunnel_till_done = true;
00125   n_vioTargets = n_TargetVios;
00126   topOutBuffer.writer_for(SourceVio->buffer.writer());
00127 
00128   // do_io_read() read already posted on vcSource.
00129   // do_io_write() already posted on vcTargets
00130   SET_HANDLER(&OneWayMultiTunnel::startEvent);
00131 
00132   SourceVio->set_continuation(this);
00133   vioSource = SourceVio;
00134 
00135   for (int i = 0; i < n_vioTargets; i++) {
00136     vioTargets[i] = TargetVios[i];
00137     vioTargets[i]->set_continuation(this);
00138   }
00139 
00140 }
00141 
00142 //////////////////////////////////////////////////////////////////////////////
00143 //
00144 //      int OneWayMultiTunnel::startEvent()
00145 //
00146 //////////////////////////////////////////////////////////////////////////////
00147 
00148 int
00149 OneWayMultiTunnel::startEvent(int event, void *data)
00150 {
00151   VIO *vio = (VIO *) data;
00152   int ret = VC_EVENT_DONE;
00153   int result = 0;
00154 
00155 #ifdef TEST
00156   const char *event_origin = (vio == vioSource ? "source" : "target"), *event_name = get_vc_event_name(event);
00157   printf("OneWayMultiTunnel::startEvent --- %s received from %s VC\n", event_name, event_origin);
00158 #endif
00159 
00160   // handle the event
00161   //
00162   switch (event) {
00163 
00164   case VC_EVENT_READ_READY:{   // SunCC uses old scoping rules
00165       transform(vioSource->buffer, topOutBuffer);
00166       for (int i = 0; i < n_vioTargets; i++)
00167         if (vioTargets[i])
00168           vioTargets[i]->reenable();
00169       ret = VC_EVENT_CONT;
00170       break;
00171     }
00172 
00173   case VC_EVENT_WRITE_READY:
00174     if (vioSource)
00175       vioSource->reenable();
00176     ret = VC_EVENT_CONT;
00177     break;
00178 
00179   case VC_EVENT_EOS:
00180     if (!tunnel_till_done && vio->ntodo())
00181       goto Lerror;
00182     if (vio == vioSource) {
00183       transform(vioSource->buffer, topOutBuffer);
00184       goto Lread_complete;
00185     } else
00186       goto Lwrite_complete;
00187 
00188   Lread_complete:
00189   case VC_EVENT_READ_COMPLETE:{// SunCC uses old scoping rules
00190       // set write nbytes to the current buffer size
00191       //
00192       for (int i = 0; i < n_vioTargets; i++)
00193         if (vioTargets[i]) {
00194           vioTargets[i]->nbytes = vioTargets[i]->ndone + vioTargets[i]->buffer.reader()->read_avail();
00195           vioTargets[i]->reenable();
00196         }
00197       close_source_vio(0);
00198       ret = VC_EVENT_DONE;
00199       break;
00200     }
00201 
00202   Lwrite_complete:
00203   case VC_EVENT_WRITE_COMPLETE:
00204     close_target_vio(0, (VIO *) data);
00205     if ((n_connections == 0) || (n_connections == 1 && source_read_previously_completed))
00206       goto Ldone;
00207     else if (vioSource)
00208       vioSource->reenable();
00209     break;
00210 
00211   Lerror:
00212   case VC_EVENT_ERROR:
00213   case VC_EVENT_INACTIVITY_TIMEOUT:
00214   case VC_EVENT_ACTIVE_TIMEOUT:
00215     result = -1;
00216   Ldone:
00217     close_source_vio(result);
00218     close_target_vio(result);
00219     connection_closed(result);
00220     break;
00221 
00222   default:
00223     ret = VC_EVENT_CONT;
00224     break;
00225   }
00226 #ifdef TEST
00227   printf("    (OneWayMultiTunnel returning value: %s)\n", (ret == VC_EVENT_DONE ? "VC_EVENT_DONE" : "VC_EVENT_CONT"));
00228 #endif
00229   return (ret);
00230 }
00231 
00232 void
00233 OneWayMultiTunnel::close_target_vio(int result, VIO * vio)
00234 {
00235   for (int i = 0; i < n_vioTargets; i++) {
00236     VIO *v = vioTargets[i];
00237     if (v && (!vio || v == vio)) {
00238       if (last_connection() || !single_buffer)
00239         free_MIOBuffer(v->buffer.writer());
00240       if (close_target)
00241         v->vc_server->do_io(result ? VIO::ABORT : VIO::CLOSE);
00242       vioTargets[i] = NULL;
00243       n_connections--;
00244     }
00245   }
00246 }
00247 
00248 void
00249 OneWayMultiTunnel::reenable_all()
00250 {
00251   for (int i = 0; i < n_vioTargets; i++)
00252     if (vioTargets[i])
00253       vioTargets[i]->reenable();
00254   if (vioSource)
00255     vioSource->reenable();
00256 }

Generated by  doxygen 1.7.1