Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 #include "P_EventSystem.h"
00030 #include "I_OneWayMultiTunnel.h"
00031 
00032 
00033 
00034 
00035 
00036 
00037 
00038 
00039 
00040 ClassAllocator<OneWayMultiTunnel> OneWayMultiTunnelAllocator("OneWayMultiTunnelAllocator");
00041 
00042 OneWayMultiTunnel::OneWayMultiTunnel():
00043 OneWayTunnel(), n_vioTargets(0), source_read_previously_completed(false)
00044 {
00045 }
00046 
00047 OneWayMultiTunnel *
00048 OneWayMultiTunnel::OneWayMultiTunnel_alloc()
00049 {
00050   return OneWayMultiTunnelAllocator.alloc();
00051 }
00052 
00053 void
00054 OneWayMultiTunnel::OneWayMultiTunnel_free(OneWayMultiTunnel * pOWT)
00055 {
00056 
00057   pOWT->mutex = NULL;
00058   OneWayMultiTunnelAllocator.free(pOWT);
00059 }
00060 
00061 void
00062 OneWayMultiTunnel::init(VConnection * vcSource, VConnection ** vcTargets, int n_vcTargets, Continuation * aCont, int size_estimate, int64_t nbytes, bool asingle_buffer,    
00063                         bool aclose_source,     
00064                         bool aclose_targets,    
00065                         Transform_fn aManipulate_fn, int water_mark)
00066 {
00067   mutex = aCont ? (ProxyMutex *) aCont->mutex : new_ProxyMutex();
00068   cont = aCont;
00069   manipulate_fn = aManipulate_fn;
00070   close_source = aclose_source;
00071   close_target = aclose_targets;
00072   source_read_previously_completed = false;
00073 
00074   SET_HANDLER(&OneWayMultiTunnel::startEvent);
00075 
00076   n_connections = n_vioTargets + 1;
00077 
00078   int64_t size_index = 0;
00079   if (size_estimate)
00080     size_index = buffer_size_to_index(size_estimate, default_large_iobuffer_size);
00081   else
00082     size_index = default_large_iobuffer_size;
00083 
00084   tunnel_till_done = (nbytes == TUNNEL_TILL_DONE);
00085 
00086   MIOBuffer *buf1 = new_MIOBuffer(size_index);
00087   MIOBuffer *buf2 = NULL;
00088 
00089   single_buffer = asingle_buffer;
00090 
00091   if (single_buffer)
00092     buf2 = buf1;
00093   else
00094     buf2 = new_MIOBuffer(size_index);
00095   topOutBuffer.writer_for(buf2);
00096 
00097   buf1->water_mark = water_mark;
00098 
00099   vioSource = vcSource->do_io(VIO::READ, this, nbytes, buf1, 0);
00100 
00101   ink_assert(n_vcTargets <= ONE_WAY_MULTI_TUNNEL_LIMIT);
00102   for (int i = 0; i < n_vcTargets; i++)
00103     vioTargets[i] = vcTargets[i]->do_io(VIO::WRITE, this, INT64_MAX, buf2, 0);
00104 
00105   return;
00106 }
00107 
00108 void
00109 OneWayMultiTunnel::init(Continuation * aCont,
00110                         VIO * SourceVio, VIO ** TargetVios, int n_TargetVios, bool aclose_source, bool aclose_targets)
00111 {
00112 
00113   mutex = aCont ? (ProxyMutex *) aCont->mutex : new_ProxyMutex();
00114   cont = aCont;
00115   single_buffer = true;
00116   manipulate_fn = 0;
00117   n_connections = n_TargetVios + 1;;
00118   close_source = aclose_source;
00119   close_target = aclose_targets;
00120   
00121   
00122   
00123   source_read_previously_completed = (SourceVio->ntodo() == 0);
00124   tunnel_till_done = true;
00125   n_vioTargets = n_TargetVios;
00126   topOutBuffer.writer_for(SourceVio->buffer.writer());
00127 
00128   
00129   
00130   SET_HANDLER(&OneWayMultiTunnel::startEvent);
00131 
00132   SourceVio->set_continuation(this);
00133   vioSource = SourceVio;
00134 
00135   for (int i = 0; i < n_vioTargets; i++) {
00136     vioTargets[i] = TargetVios[i];
00137     vioTargets[i]->set_continuation(this);
00138   }
00139 
00140 }
00141 
00142 
00143 
00144 
00145 
00146 
00147 
00148 int
00149 OneWayMultiTunnel::startEvent(int event, void *data)
00150 {
00151   VIO *vio = (VIO *) data;
00152   int ret = VC_EVENT_DONE;
00153   int result = 0;
00154 
00155 #ifdef TEST
00156   const char *event_origin = (vio == vioSource ? "source" : "target"), *event_name = get_vc_event_name(event);
00157   printf("OneWayMultiTunnel::startEvent --- %s received from %s VC\n", event_name, event_origin);
00158 #endif
00159 
00160   
00161   
00162   switch (event) {
00163 
00164   case VC_EVENT_READ_READY:{   
00165       transform(vioSource->buffer, topOutBuffer);
00166       for (int i = 0; i < n_vioTargets; i++)
00167         if (vioTargets[i])
00168           vioTargets[i]->reenable();
00169       ret = VC_EVENT_CONT;
00170       break;
00171     }
00172 
00173   case VC_EVENT_WRITE_READY:
00174     if (vioSource)
00175       vioSource->reenable();
00176     ret = VC_EVENT_CONT;
00177     break;
00178 
00179   case VC_EVENT_EOS:
00180     if (!tunnel_till_done && vio->ntodo())
00181       goto Lerror;
00182     if (vio == vioSource) {
00183       transform(vioSource->buffer, topOutBuffer);
00184       goto Lread_complete;
00185     } else
00186       goto Lwrite_complete;
00187 
00188   Lread_complete:
00189   case VC_EVENT_READ_COMPLETE:{
00190       
00191       
00192       for (int i = 0; i < n_vioTargets; i++)
00193         if (vioTargets[i]) {
00194           vioTargets[i]->nbytes = vioTargets[i]->ndone + vioTargets[i]->buffer.reader()->read_avail();
00195           vioTargets[i]->reenable();
00196         }
00197       close_source_vio(0);
00198       ret = VC_EVENT_DONE;
00199       break;
00200     }
00201 
00202   Lwrite_complete:
00203   case VC_EVENT_WRITE_COMPLETE:
00204     close_target_vio(0, (VIO *) data);
00205     if ((n_connections == 0) || (n_connections == 1 && source_read_previously_completed))
00206       goto Ldone;
00207     else if (vioSource)
00208       vioSource->reenable();
00209     break;
00210 
00211   Lerror:
00212   case VC_EVENT_ERROR:
00213   case VC_EVENT_INACTIVITY_TIMEOUT:
00214   case VC_EVENT_ACTIVE_TIMEOUT:
00215     result = -1;
00216   Ldone:
00217     close_source_vio(result);
00218     close_target_vio(result);
00219     connection_closed(result);
00220     break;
00221 
00222   default:
00223     ret = VC_EVENT_CONT;
00224     break;
00225   }
00226 #ifdef TEST
00227   printf("    (OneWayMultiTunnel returning value: %s)\n", (ret == VC_EVENT_DONE ? "VC_EVENT_DONE" : "VC_EVENT_CONT"));
00228 #endif
00229   return (ret);
00230 }
00231 
00232 void
00233 OneWayMultiTunnel::close_target_vio(int result, VIO * vio)
00234 {
00235   for (int i = 0; i < n_vioTargets; i++) {
00236     VIO *v = vioTargets[i];
00237     if (v && (!vio || v == vio)) {
00238       if (last_connection() || !single_buffer)
00239         free_MIOBuffer(v->buffer.writer());
00240       if (close_target)
00241         v->vc_server->do_io(result ? VIO::ABORT : VIO::CLOSE);
00242       vioTargets[i] = NULL;
00243       n_connections--;
00244     }
00245   }
00246 }
00247 
00248 void
00249 OneWayMultiTunnel::reenable_all()
00250 {
00251   for (int i = 0; i < n_vioTargets; i++)
00252     if (vioTargets[i])
00253       vioTargets[i]->reenable();
00254   if (vioSource)
00255     vioSource->reenable();
00256 }