00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "P_EventSystem.h"
00036 #include "I_OneWayTunnel.h"
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046 ClassAllocator<OneWayTunnel> OneWayTunnelAllocator("OneWayTunnelAllocator");
00047
00048 inline void
00049 transfer_data(MIOBufferAccessor & in_buf, MIOBufferAccessor & out_buf)
00050 {
00051 ink_release_assert(!"Not Implemented.");
00052
00053 int64_t n = in_buf.reader()->read_avail();
00054 int64_t o = out_buf.writer()->write_avail();
00055
00056 if (n > o)
00057 n = o;
00058 if (!n)
00059 return;
00060 memcpy(in_buf.reader()->start(), out_buf.writer()->end(), n);
00061 in_buf.reader()->consume(n);
00062 out_buf.writer()->fill(n);
00063 }
00064
00065 OneWayTunnel::OneWayTunnel():Continuation(0),
00066 vioSource(0), vioTarget(0), cont(0), manipulate_fn(0),
00067 n_connections(0), lerrno(0), single_buffer(0),
00068 close_source(0), close_target(0), tunnel_till_done(0), tunnel_peer(0), free_vcs(true)
00069 {
00070 }
00071
00072 OneWayTunnel *
00073 OneWayTunnel::OneWayTunnel_alloc()
00074 {
00075 return OneWayTunnelAllocator.alloc();
00076 }
00077
00078 void
00079 OneWayTunnel::OneWayTunnel_free(OneWayTunnel * pOWT)
00080 {
00081
00082 pOWT->mutex = NULL;
00083 OneWayTunnelAllocator.free(pOWT);
00084 }
00085
00086 void
00087 OneWayTunnel::SetupTwoWayTunnel(OneWayTunnel * east, OneWayTunnel * west)
00088 {
00089
00090 ink_assert(east->mutex == west->mutex);
00091
00092 east->tunnel_peer = west;
00093 west->tunnel_peer = east;
00094 }
00095
00096 OneWayTunnel::~OneWayTunnel()
00097 {
00098 }
00099
00100 OneWayTunnel::OneWayTunnel(Continuation * aCont, Transform_fn aManipulate_fn, bool aclose_source, bool aclose_target)
00101 :
00102 Continuation(aCont
00103 ? (ProxyMutex *) aCont->mutex
00104 : new_ProxyMutex()),
00105 cont(aCont),
00106 manipulate_fn(aManipulate_fn),
00107 n_connections(2),
00108 lerrno(0),
00109 single_buffer(true), close_source(aclose_source), close_target(aclose_target), tunnel_till_done(false), free_vcs(false)
00110 {
00111 ink_assert(!"This form of OneWayTunnel() constructor not supported");
00112 }
00113
00114 void
00115 OneWayTunnel::init(VConnection * vcSource,
00116 VConnection * vcTarget,
00117 Continuation * aCont,
00118 int size_estimate,
00119 ProxyMutex * aMutex,
00120 int64_t nbytes,
00121 bool asingle_buffer,
00122 bool aclose_source, bool aclose_target, Transform_fn aManipulate_fn, int water_mark)
00123 {
00124 mutex = aCont ? (ProxyMutex *) aCont->mutex : (aMutex ? aMutex : new_ProxyMutex());
00125 cont = aMutex ? NULL : aCont;
00126 single_buffer = asingle_buffer;
00127 manipulate_fn = aManipulate_fn;
00128 n_connections = 2;
00129 close_source = aclose_source;
00130 close_target = aclose_target;
00131 lerrno = 0;
00132 tunnel_till_done = (nbytes == TUNNEL_TILL_DONE);
00133
00134 SET_HANDLER(&OneWayTunnel::startEvent);
00135
00136 int64_t size_index = 0;
00137
00138 if (size_estimate)
00139 size_index = buffer_size_to_index(size_estimate);
00140 else
00141 size_index = default_large_iobuffer_size;
00142
00143 Debug("one_way_tunnel", "buffer size index [%" PRId64"] [%d]\n", size_index, size_estimate);
00144
00145
00146 MIOBuffer *buf1 = new_MIOBuffer(size_index);
00147 MIOBuffer *buf2 = NULL;
00148 if (single_buffer)
00149 buf2 = buf1;
00150 else
00151 buf2 = new_MIOBuffer(size_index);
00152
00153 buf1->water_mark = water_mark;
00154
00155 MUTEX_LOCK(lock, mutex, this_ethread());
00156 vioSource = vcSource->do_io_read(this, nbytes, buf1);
00157 vioTarget = vcTarget->do_io_write(this, nbytes, buf2->alloc_reader(), 0);
00158 ink_assert(vioSource && vioTarget);
00159
00160 return;
00161 }
00162
00163 void
00164 OneWayTunnel::init(VConnection * vcSource,
00165 VConnection * vcTarget,
00166 Continuation * aCont,
00167 VIO * SourceVio, IOBufferReader * reader, bool aclose_source, bool aclose_target)
00168 {
00169 (void) vcSource;
00170 mutex = aCont ? (ProxyMutex *) aCont->mutex : new_ProxyMutex();
00171 cont = aCont;
00172 single_buffer = true;
00173 manipulate_fn = 0;
00174 n_connections = 2;
00175 close_source = aclose_source;
00176 close_target = aclose_target;
00177 tunnel_till_done = true;
00178
00179
00180
00181
00182
00183 SET_HANDLER(&OneWayTunnel::startEvent);
00184
00185 SourceVio->set_continuation(this);
00186 MUTEX_LOCK(lock, mutex, this_ethread());
00187 vioSource = SourceVio;
00188
00189 vioTarget = vcTarget->do_io_write(this, TUNNEL_TILL_DONE, reader, 0);
00190 ink_assert(vioSource && vioTarget);
00191 }
00192
00193 void
00194 OneWayTunnel::init(Continuation * aCont, VIO * SourceVio, VIO * TargetVio, bool aclose_source, bool aclose_target)
00195 {
00196 mutex = aCont ? (ProxyMutex *) aCont->mutex : new_ProxyMutex();
00197 cont = aCont;
00198 single_buffer = true;
00199 manipulate_fn = 0;
00200 n_connections = 2;
00201 close_source = aclose_source;
00202 close_target = aclose_target;
00203 tunnel_till_done = true;
00204
00205
00206
00207 SET_HANDLER(&OneWayTunnel::startEvent);
00208
00209 ink_assert(SourceVio && TargetVio);
00210
00211 SourceVio->set_continuation(this);
00212 TargetVio->set_continuation(this);
00213 vioSource = SourceVio;
00214 vioTarget = TargetVio;
00215 }
00216
00217
00218 void
00219 OneWayTunnel::transform(MIOBufferAccessor & in_buf, MIOBufferAccessor & out_buf)
00220 {
00221 if (manipulate_fn)
00222 manipulate_fn(in_buf, out_buf);
00223 else if (in_buf.writer() != out_buf.writer())
00224 transfer_data(in_buf, out_buf);
00225 }
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236 int
00237 OneWayTunnel::startEvent(int event, void *data)
00238 {
00239 VIO *vio = (VIO *) data;
00240 int ret = VC_EVENT_DONE;
00241 int result = 0;
00242
00243 #ifdef TEST
00244 const char *event_origin = (vio == vioSource ? "source" : "target"), *event_name = get_vc_event_name(event);
00245 printf("OneWayTunnel --- %s received from %s VC\n", event_name, event_origin);
00246 #endif
00247
00248 if (!vioTarget)
00249 goto Lerror;
00250
00251
00252
00253 switch (event) {
00254
00255 case ONE_WAY_TUNNEL_EVENT_PEER_CLOSE:
00256
00257 ink_assert(tunnel_peer);
00258 tunnel_peer = NULL;
00259 free_vcs = false;
00260 goto Ldone;
00261
00262 case VC_EVENT_READ_READY:
00263 transform(vioSource->buffer, vioTarget->buffer);
00264 vioTarget->reenable();
00265 ret = VC_EVENT_CONT;
00266 break;
00267
00268 case VC_EVENT_WRITE_READY:
00269 if (vioSource)
00270 vioSource->reenable();
00271 ret = VC_EVENT_CONT;
00272 break;
00273
00274 case VC_EVENT_EOS:
00275 if (!tunnel_till_done && vio->ntodo())
00276 goto Lerror;
00277 if (vio == vioSource) {
00278 transform(vioSource->buffer, vioTarget->buffer);
00279 goto Lread_complete;
00280 } else
00281 goto Ldone;
00282
00283 Lread_complete:
00284 case VC_EVENT_READ_COMPLETE:
00285
00286
00287 vioTarget->nbytes = vioTarget->ndone + vioTarget->buffer.reader()->read_avail();
00288 if (vioTarget->nbytes == vioTarget->ndone)
00289 goto Ldone;
00290 vioTarget->reenable();
00291 if (!tunnel_peer)
00292 close_source_vio(0);
00293 break;
00294
00295 Lerror:
00296 case VC_EVENT_ERROR:
00297 lerrno = ((VIO *) data)->vc_server->lerrno;
00298 case VC_EVENT_INACTIVITY_TIMEOUT:
00299 case VC_EVENT_ACTIVE_TIMEOUT:
00300 result = -1;
00301 Ldone:
00302 case VC_EVENT_WRITE_COMPLETE:
00303 if (tunnel_peer) {
00304
00305 tunnel_peer->startEvent(ONE_WAY_TUNNEL_EVENT_PEER_CLOSE, data);
00306 }
00307 close_source_vio(result);
00308 close_target_vio(result);
00309 connection_closed(result);
00310 break;
00311
00312 default:
00313 ink_assert(!"bad case");
00314 ret = VC_EVENT_CONT;
00315 break;
00316 }
00317 #ifdef TEST
00318 printf(" (OneWayTunnel returning value: %s)\n", (ret == VC_EVENT_DONE ? "VC_EVENT_DONE" : "VC_EVENT_CONT"));
00319 #endif
00320 return ret;
00321 }
00322
00323
00324 void
00325 OneWayTunnel::close_source_vio(int result)
00326 {
00327
00328 if (vioSource) {
00329 if (last_connection() || !single_buffer) {
00330 free_MIOBuffer(vioSource->buffer.writer());
00331 vioSource->buffer.clear();
00332 }
00333 if (close_source && free_vcs) {
00334 vioSource->vc_server->do_io_close(result ? lerrno : -1);
00335 }
00336 vioSource = NULL;
00337 n_connections--;
00338 }
00339 }
00340
00341 void
00342 OneWayTunnel::close_target_vio(int result, VIO * vio)
00343 {
00344
00345 (void) vio;
00346 if (vioTarget) {
00347 if (last_connection() || !single_buffer) {
00348 free_MIOBuffer(vioTarget->buffer.writer());
00349 vioTarget->buffer.clear();
00350 }
00351 if (close_target && free_vcs) {
00352 vioTarget->vc_server->do_io_close(result ? lerrno : -1);
00353 }
00354 vioTarget = NULL;
00355 n_connections--;
00356 }
00357 }
00358
00359
00360
00361
00362
00363
00364 void
00365 OneWayTunnel::connection_closed(int result)
00366 {
00367 if (cont) {
00368 #ifdef TEST
00369 cout << "OneWayTunnel::connection_closed() ... calling cont" << endl;
00370 #endif
00371 cont->handleEvent(result ? VC_EVENT_ERROR : VC_EVENT_EOS, cont);
00372 } else {
00373 OneWayTunnel_free(this);
00374 }
00375 }
00376
00377 void
00378 OneWayTunnel::reenable_all()
00379 {
00380 if (vioSource)
00381 vioSource->reenable();
00382 if (vioTarget)
00383 vioTarget->reenable();
00384 }
00385
00386 bool
00387 OneWayTunnel::last_connection()
00388 {
00389 return n_connections == 1;
00390 }