Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include "P_EventSystem.h"
00030 #include "I_OneWayMultiTunnel.h"
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 ClassAllocator<OneWayMultiTunnel> OneWayMultiTunnelAllocator("OneWayMultiTunnelAllocator");
00041
00042 OneWayMultiTunnel::OneWayMultiTunnel():
00043 OneWayTunnel(), n_vioTargets(0), source_read_previously_completed(false)
00044 {
00045 }
00046
00047 OneWayMultiTunnel *
00048 OneWayMultiTunnel::OneWayMultiTunnel_alloc()
00049 {
00050 return OneWayMultiTunnelAllocator.alloc();
00051 }
00052
00053 void
00054 OneWayMultiTunnel::OneWayMultiTunnel_free(OneWayMultiTunnel * pOWT)
00055 {
00056
00057 pOWT->mutex = NULL;
00058 OneWayMultiTunnelAllocator.free(pOWT);
00059 }
00060
00061 void
00062 OneWayMultiTunnel::init(VConnection * vcSource, VConnection ** vcTargets, int n_vcTargets, Continuation * aCont, int size_estimate, int64_t nbytes, bool asingle_buffer,
00063 bool aclose_source,
00064 bool aclose_targets,
00065 Transform_fn aManipulate_fn, int water_mark)
00066 {
00067 mutex = aCont ? (ProxyMutex *) aCont->mutex : new_ProxyMutex();
00068 cont = aCont;
00069 manipulate_fn = aManipulate_fn;
00070 close_source = aclose_source;
00071 close_target = aclose_targets;
00072 source_read_previously_completed = false;
00073
00074 SET_HANDLER(&OneWayMultiTunnel::startEvent);
00075
00076 n_connections = n_vioTargets + 1;
00077
00078 int64_t size_index = 0;
00079 if (size_estimate)
00080 size_index = buffer_size_to_index(size_estimate, default_large_iobuffer_size);
00081 else
00082 size_index = default_large_iobuffer_size;
00083
00084 tunnel_till_done = (nbytes == TUNNEL_TILL_DONE);
00085
00086 MIOBuffer *buf1 = new_MIOBuffer(size_index);
00087 MIOBuffer *buf2 = NULL;
00088
00089 single_buffer = asingle_buffer;
00090
00091 if (single_buffer)
00092 buf2 = buf1;
00093 else
00094 buf2 = new_MIOBuffer(size_index);
00095 topOutBuffer.writer_for(buf2);
00096
00097 buf1->water_mark = water_mark;
00098
00099 vioSource = vcSource->do_io(VIO::READ, this, nbytes, buf1, 0);
00100
00101 ink_assert(n_vcTargets <= ONE_WAY_MULTI_TUNNEL_LIMIT);
00102 for (int i = 0; i < n_vcTargets; i++)
00103 vioTargets[i] = vcTargets[i]->do_io(VIO::WRITE, this, INT64_MAX, buf2, 0);
00104
00105 return;
00106 }
00107
00108 void
00109 OneWayMultiTunnel::init(Continuation * aCont,
00110 VIO * SourceVio, VIO ** TargetVios, int n_TargetVios, bool aclose_source, bool aclose_targets)
00111 {
00112
00113 mutex = aCont ? (ProxyMutex *) aCont->mutex : new_ProxyMutex();
00114 cont = aCont;
00115 single_buffer = true;
00116 manipulate_fn = 0;
00117 n_connections = n_TargetVios + 1;;
00118 close_source = aclose_source;
00119 close_target = aclose_targets;
00120
00121
00122
00123 source_read_previously_completed = (SourceVio->ntodo() == 0);
00124 tunnel_till_done = true;
00125 n_vioTargets = n_TargetVios;
00126 topOutBuffer.writer_for(SourceVio->buffer.writer());
00127
00128
00129
00130 SET_HANDLER(&OneWayMultiTunnel::startEvent);
00131
00132 SourceVio->set_continuation(this);
00133 vioSource = SourceVio;
00134
00135 for (int i = 0; i < n_vioTargets; i++) {
00136 vioTargets[i] = TargetVios[i];
00137 vioTargets[i]->set_continuation(this);
00138 }
00139
00140 }
00141
00142
00143
00144
00145
00146
00147
00148 int
00149 OneWayMultiTunnel::startEvent(int event, void *data)
00150 {
00151 VIO *vio = (VIO *) data;
00152 int ret = VC_EVENT_DONE;
00153 int result = 0;
00154
00155 #ifdef TEST
00156 const char *event_origin = (vio == vioSource ? "source" : "target"), *event_name = get_vc_event_name(event);
00157 printf("OneWayMultiTunnel::startEvent --- %s received from %s VC\n", event_name, event_origin);
00158 #endif
00159
00160
00161
00162 switch (event) {
00163
00164 case VC_EVENT_READ_READY:{
00165 transform(vioSource->buffer, topOutBuffer);
00166 for (int i = 0; i < n_vioTargets; i++)
00167 if (vioTargets[i])
00168 vioTargets[i]->reenable();
00169 ret = VC_EVENT_CONT;
00170 break;
00171 }
00172
00173 case VC_EVENT_WRITE_READY:
00174 if (vioSource)
00175 vioSource->reenable();
00176 ret = VC_EVENT_CONT;
00177 break;
00178
00179 case VC_EVENT_EOS:
00180 if (!tunnel_till_done && vio->ntodo())
00181 goto Lerror;
00182 if (vio == vioSource) {
00183 transform(vioSource->buffer, topOutBuffer);
00184 goto Lread_complete;
00185 } else
00186 goto Lwrite_complete;
00187
00188 Lread_complete:
00189 case VC_EVENT_READ_COMPLETE:{
00190
00191
00192 for (int i = 0; i < n_vioTargets; i++)
00193 if (vioTargets[i]) {
00194 vioTargets[i]->nbytes = vioTargets[i]->ndone + vioTargets[i]->buffer.reader()->read_avail();
00195 vioTargets[i]->reenable();
00196 }
00197 close_source_vio(0);
00198 ret = VC_EVENT_DONE;
00199 break;
00200 }
00201
00202 Lwrite_complete:
00203 case VC_EVENT_WRITE_COMPLETE:
00204 close_target_vio(0, (VIO *) data);
00205 if ((n_connections == 0) || (n_connections == 1 && source_read_previously_completed))
00206 goto Ldone;
00207 else if (vioSource)
00208 vioSource->reenable();
00209 break;
00210
00211 Lerror:
00212 case VC_EVENT_ERROR:
00213 case VC_EVENT_INACTIVITY_TIMEOUT:
00214 case VC_EVENT_ACTIVE_TIMEOUT:
00215 result = -1;
00216 Ldone:
00217 close_source_vio(result);
00218 close_target_vio(result);
00219 connection_closed(result);
00220 break;
00221
00222 default:
00223 ret = VC_EVENT_CONT;
00224 break;
00225 }
00226 #ifdef TEST
00227 printf(" (OneWayMultiTunnel returning value: %s)\n", (ret == VC_EVENT_DONE ? "VC_EVENT_DONE" : "VC_EVENT_CONT"));
00228 #endif
00229 return (ret);
00230 }
00231
00232 void
00233 OneWayMultiTunnel::close_target_vio(int result, VIO * vio)
00234 {
00235 for (int i = 0; i < n_vioTargets; i++) {
00236 VIO *v = vioTargets[i];
00237 if (v && (!vio || v == vio)) {
00238 if (last_connection() || !single_buffer)
00239 free_MIOBuffer(v->buffer.writer());
00240 if (close_target)
00241 v->vc_server->do_io(result ? VIO::ABORT : VIO::CLOSE);
00242 vioTargets[i] = NULL;
00243 n_connections--;
00244 }
00245 }
00246 }
00247
00248 void
00249 OneWayMultiTunnel::reenable_all()
00250 {
00251 for (int i = 0; i < n_vioTargets; i++)
00252 if (vioTargets[i])
00253 vioTargets[i]->reenable();
00254 if (vioSource)
00255 vioSource->reenable();
00256 }