00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include "P_Cluster.h"
00031 ClassAllocator<ClusterVConnection> clusterVCAllocator("clusterVCAllocator");
00032 ClassAllocator<ByteBankDescriptor> byteBankAllocator("byteBankAllocator");
00033
00034 ByteBankDescriptor *
00035 ByteBankDescriptor::ByteBankDescriptor_alloc(IOBufferBlock * iob)
00036 {
00037 ByteBankDescriptor *b = byteBankAllocator.alloc();
00038 b->block = iob;
00039 return b;
00040 }
00041
00042 void
00043 ByteBankDescriptor::ByteBankDescriptor_free(ByteBankDescriptor * b)
00044 {
00045 b->block = 0;
00046 byteBankAllocator.free(b);
00047 }
00048
00049 void
00050 clusterVCAllocator_free(ClusterVConnection * vc)
00051 {
00052 vc->mutex = 0;
00053 vc->action_ = 0;
00054 vc->free();
00055 if (vc->in_vcs) {
00056 vc->type = VC_CLUSTER_CLOSED;
00057 return;
00058 }
00059 clusterVCAllocator.free(vc);
00060 }
00061
00062 ClusterVConnState::ClusterVConnState():enabled(0), priority(1), vio(VIO::NONE), queue(0), ifd(-1), delay_timeout(NULL)
00063 {
00064 }
00065
00066 ClusterVConnectionBase::ClusterVConnectionBase():
00067 thread(0), closed(0), inactivity_timeout_in(0), active_timeout_in(0), inactivity_timeout(NULL), active_timeout(NULL)
00068 {
00069 }
00070
00071 #ifdef DEBUG
00072 int
00073 ClusterVConnectionBase::enable_debug_trace = 0;
00074 #endif
00075
00076 VIO *
00077 ClusterVConnectionBase::do_io_read(Continuation * acont, int64_t anbytes, MIOBuffer * abuffer)
00078 {
00079 ink_assert(!closed);
00080 read.vio.buffer.writer_for(abuffer);
00081 read.vio.op = VIO::READ;
00082 read.vio.set_continuation(acont);
00083 read.vio.nbytes = anbytes;
00084 read.vio.ndone = 0;
00085 read.vio.vc_server = (VConnection *) this;
00086 read.enabled = 1;
00087
00088 ClusterVConnection *cvc = (ClusterVConnection *) this;
00089 Debug("cluster_vc_xfer", "do_io_read [%s] chan %d", "", cvc->channel);
00090 return &read.vio;
00091 }
00092
00093 VIO *
00094 ClusterVConnectionBase::do_io_pread(Continuation * , int64_t ,
00095 MIOBuffer * , int64_t )
00096 {
00097 return 0;
00098 }
00099
00100 int
00101 ClusterVConnection::get_header(void ** , int * )
00102 {
00103 ink_assert(!"implemented");
00104 return -1;
00105 }
00106
00107 int
00108 ClusterVConnection::set_header(void * , int )
00109 {
00110 ink_assert(!"implemented");
00111 return -1;
00112 }
00113
00114 int
00115 ClusterVConnection::get_single_data(void ** , int * )
00116 {
00117 ink_assert(!"implemented");
00118 return -1;
00119 }
00120
00121 VIO *
00122 ClusterVConnectionBase::do_io_write(Continuation * acont, int64_t anbytes, IOBufferReader * abuffer, bool owner)
00123 {
00124 ink_assert(!closed);
00125 ink_assert(!owner);
00126 write.vio.buffer.reader_for(abuffer);
00127 write.vio.op = VIO::WRITE;
00128 write.vio.set_continuation(acont);
00129 write.vio.nbytes = anbytes;
00130 write.vio.ndone = 0;
00131 write.vio.vc_server = (VConnection *) this;
00132 write.enabled = 1;
00133
00134 return &write.vio;
00135 }
00136
00137 void
00138 ClusterVConnectionBase::do_io_close(int alerrno)
00139 {
00140 read.enabled = 0;
00141 write.enabled = 0;
00142 read.vio.buffer.clear();
00143 write.vio.buffer.clear();
00144 INK_WRITE_MEMORY_BARRIER;
00145 if (alerrno && alerrno != -1)
00146 this->lerrno = alerrno;
00147
00148 if (alerrno == -1) {
00149 closed = 1;
00150 } else {
00151 closed = -1;
00152 }
00153 }
00154
00155 void
00156 ClusterVConnection::reenable(VIO *vio)
00157 {
00158 if (type == VC_CLUSTER_WRITE)
00159 ch->vcs_push(this, VC_CLUSTER_WRITE);
00160
00161 ClusterVConnectionBase::reenable(vio);
00162 }
00163
00164 void
00165 ClusterVConnectionBase::reenable(VIO * vio)
00166 {
00167 ink_assert(!closed);
00168 if (vio == &read.vio) {
00169 read.enabled = 1;
00170 #ifdef DEBUG
00171 if (enable_debug_trace && (vio->buffer.writer() && !vio->buffer.writer()->write_avail()))
00172 printf("NetVConnection re-enabled for read when full\n");
00173 #endif
00174 } else if (vio == &write.vio) {
00175 write.enabled = 1;
00176 #ifdef DEBUG
00177 if (enable_debug_trace && (vio->buffer.writer() && !vio->buffer.reader()->read_avail()))
00178 printf("NetVConnection re-enabled for write when empty\n");
00179 #endif
00180 } else {
00181 ink_assert(!"bad vio");
00182 }
00183 }
00184
00185 void
00186 ClusterVConnectionBase::reenable_re(VIO * vio)
00187 {
00188 reenable(vio);
00189 }
00190
00191 ClusterVConnection::ClusterVConnection(int is_new_connect_read)
00192 : ch(NULL),
00193 new_connect_read(is_new_connect_read),
00194 remote_free(0),
00195 last_local_free(0),
00196 channel(0),
00197 close_disabled(0),
00198 remote_closed(0),
00199 remote_close_disabled(1),
00200 remote_lerrno(0),
00201 in_vcs(0),
00202 type(0),
00203 start_time(0),
00204 last_activity_time(0),
00205 n_set_data_msgs(0),
00206 n_recv_set_data_msgs(0),
00207 pending_remote_fill(0),
00208 remote_ram_cache_hit(0),
00209 have_all_data(0),
00210 initial_data_bytes(0),
00211 current_cont(0),
00212 iov_map(CLUSTER_IOV_NOT_OPEN),
00213 write_list_tail(0),
00214 write_list_bytes(0),
00215 write_bytes_in_transit(0),
00216 alternate(),
00217 time_pin(0),
00218 disk_io_priority(0)
00219 {
00220 #ifdef DEBUG
00221 read.vio.buffer.name = "ClusterVConnection.read";
00222 write.vio.buffer.name = "ClusterVConnection.write";
00223 #endif
00224 SET_HANDLER((ClusterVConnHandler) & ClusterVConnection::startEvent);
00225 }
00226
00227 ClusterVConnection::~ClusterVConnection()
00228 {
00229 free();
00230 }
00231
00232 void
00233 ClusterVConnection::free()
00234 {
00235 if (alternate.valid()) {
00236 alternate.destroy();
00237 }
00238 ByteBankDescriptor *d;
00239 while ((d = byte_bank_q.dequeue())) {
00240 ByteBankDescriptor::ByteBankDescriptor_free(d);
00241 }
00242 read_block = 0;
00243 remote_write_block = 0;
00244 marshal_buf = 0;
00245 write_list = 0;
00246 write_list_tail = 0;
00247 write_list_bytes = 0;
00248 write_bytes_in_transit = 0;
00249 }
00250
00251 VIO *
00252 ClusterVConnection::do_io_read(Continuation * c, int64_t nbytes, MIOBuffer * buf)
00253 {
00254 if (type == VC_CLUSTER)
00255 type = VC_CLUSTER_READ;
00256 ch->vcs_push(this, VC_CLUSTER_READ);
00257
00258 return ClusterVConnectionBase::do_io_read(c, nbytes, buf);
00259 }
00260
00261 VIO *
00262 ClusterVConnection::do_io_write(Continuation * c, int64_t nbytes, IOBufferReader * buf, bool owner)
00263 {
00264 if (type == VC_CLUSTER)
00265 type = VC_CLUSTER_WRITE;
00266 ch->vcs_push(this, VC_CLUSTER_WRITE);
00267
00268 return ClusterVConnectionBase::do_io_write(c, nbytes, buf, owner);
00269 }
00270
00271 void
00272 ClusterVConnection::do_io_close(int alerrno)
00273 {
00274 if ((type == VC_CLUSTER) && current_cont) {
00275 if (((CacheContinuation *)current_cont)->read_cluster_vc == this)
00276 type = VC_CLUSTER_READ;
00277 else if (((CacheContinuation *)current_cont)->write_cluster_vc == this)
00278 type = VC_CLUSTER_WRITE;
00279 }
00280 ch->vcs_push(this, type);
00281
00282 ClusterVConnectionBase::do_io_close(alerrno);
00283 }
00284
00285 int
00286 ClusterVConnection::startEvent(int event, Event * e)
00287 {
00288
00289
00290
00291 (void) event;
00292 start(e ? e->ethread : (EThread *) NULL);
00293 return EVENT_DONE;
00294 }
00295
00296 int
00297 ClusterVConnection::mainEvent(int event, Event * e)
00298 {
00299 (void) event;
00300 (void) e;
00301 ink_assert(!"unexpected event");
00302 return EVENT_DONE;
00303 }
00304
00305 int
00306 ClusterVConnection::start(EThread * t)
00307 {
00308
00309
00310
00311
00312
00313
00314
00315
00316
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327
00328
00329
00330
00331
00332
00333
00334
00335
00336 int status;
00337 if (!channel) {
00338 #ifdef CLUSTER_TOMCAT
00339 Ptr<ProxyMutex> m = action_.mutex;
00340 if (!m) {
00341 m = new_ProxyMutex();
00342 }
00343 #else
00344 Ptr<ProxyMutex> m = action_.mutex;
00345 #endif
00346
00347
00348 MUTEX_TRY_LOCK(lock, m, t);
00349 if (!lock) {
00350 t->schedule_in(this, CLUSTER_CONNECT_RETRY);
00351 return EVENT_DONE;
00352 }
00353 if (!ch) {
00354 if (action_.continuation) {
00355 action_.continuation->handleEvent(CLUSTER_EVENT_OPEN_FAILED, (void *) -ECLUSTER_NO_MACHINE);
00356 clusterVCAllocator_free(this);
00357 return EVENT_DONE;
00358 } else {
00359
00360 clusterVCAllocator_free(this);
00361 return -1;
00362 }
00363 }
00364
00365 channel = ch->alloc_channel(this);
00366 if (channel < 0) {
00367 if (action_.continuation) {
00368 action_.continuation->handleEvent(CLUSTER_EVENT_OPEN_FAILED, (void *) -ECLUSTER_NOMORE_CHANNELS);
00369 clusterVCAllocator_free(this);
00370 return EVENT_DONE;
00371 } else {
00372
00373 clusterVCAllocator_free(this);
00374 return -1;
00375 }
00376
00377 } else {
00378 Debug(CL_TRACE, "VC start alloc local chan=%d VC=%p", channel, this);
00379 if (new_connect_read)
00380 this->pending_remote_fill = 1;
00381 }
00382
00383 } else {
00384
00385 if ((status = ch->alloc_channel(this, channel)) < 0) {
00386 Debug(CL_TRACE, "VC start alloc remote failed chan=%d VC=%p", channel, this);
00387 clusterVCAllocator_free(this);
00388 return status;
00389 } else {
00390 Debug(CL_TRACE, "VC start alloc remote chan=%d VC=%p", channel, this);
00391 if (new_connect_read)
00392 this->pending_remote_fill = 1;
00393 this->iov_map = CLUSTER_IOV_NONE;
00394 }
00395 }
00396 cluster_schedule(ch, this, &read);
00397 cluster_schedule(ch, this, &write);
00398 if (action_.continuation) {
00399 action_.continuation->handleEvent(CLUSTER_EVENT_OPEN, this);
00400 }
00401 mutex = NULL;
00402 return EVENT_DONE;
00403 }
00404
00405 int
00406 ClusterVConnection::was_closed()
00407 {
00408 return (closed && !close_disabled);
00409 }
00410
00411 void
00412 ClusterVConnection::allow_close()
00413 {
00414 close_disabled = 0;
00415 }
00416
00417 void
00418 ClusterVConnection::disable_close()
00419 {
00420 close_disabled = 1;
00421 }
00422
00423 int
00424 ClusterVConnection::was_remote_closed()
00425 {
00426 if (!byte_bank_q.head && !remote_close_disabled)
00427 return remote_closed;
00428 else
00429 return 0;
00430 }
00431
00432 void
00433 ClusterVConnection::allow_remote_close()
00434 {
00435 remote_close_disabled = 0;
00436 }
00437
00438 bool ClusterVConnection::schedule_write()
00439 {
00440
00441
00442
00443
00444 if (write_list) {
00445 if ((closed < 0) || remote_closed) {
00446
00447
00448 write_list = 0;
00449 write_list_tail = 0;
00450 write_list_bytes = 0;
00451
00452 return false;
00453 }
00454
00455 if (closed || (write_list_bytes >= DEFAULT_MAX_BUFFER_SIZE)) {
00456
00457 return true;
00458 } else {
00459
00460 return false;
00461 }
00462 } else {
00463 return false;
00464 }
00465 }
00466
00467 void
00468 ClusterVConnection::set_type(int options)
00469 {
00470 new_connect_read = (options & CLUSTER_OPT_CONN_READ) ? 1 : 0;
00471 if (new_connect_read) {
00472 pending_remote_fill = 1;
00473 } else {
00474 pending_remote_fill = 0;
00475 }
00476 }
00477
00478
00479 bool ClusterVConnection::get_data(int id, void * )
00480 {
00481 switch (id) {
00482 case CACHE_DATA_HTTP_INFO:
00483 {
00484 ink_release_assert(!"ClusterVConnection::get_data CACHE_DATA_HTTP_INFO not supported");
00485 }
00486 case CACHE_DATA_KEY:
00487 {
00488 ink_release_assert(!"ClusterVConnection::get_data CACHE_DATA_KEY not supported");
00489 }
00490 default:
00491 {
00492 ink_release_assert(!"ClusterVConnection::get_data invalid id");
00493 }
00494 }
00495 return false;
00496 }
00497
00498 void
00499 ClusterVConnection::get_http_info(CacheHTTPInfo ** info)
00500 {
00501 *info = &alternate;
00502 }
00503
00504 int64_t
00505 ClusterVConnection::get_object_size()
00506 {
00507 return alternate.object_size_get();
00508 }
00509
00510 bool
00511 ClusterVConnection::is_pread_capable()
00512 {
00513 return false;
00514 }
00515
00516 void
00517 ClusterVConnection::set_http_info(CacheHTTPInfo * d)
00518 {
00519 int flen, len;
00520 void *data;
00521 int res;
00522 SetChanDataMessage *m;
00523 SetChanDataMessage msg;
00524
00525
00526
00527
00528
00529
00530
00531
00532 ink_release_assert(this->write.vio.op == VIO::NONE);
00533
00534 ink_release_assert(this->read.vio.op == VIO::NONE);
00535
00536 int vers = SetChanDataMessage::protoToVersion(ch->machine->msg_proto_major);
00537 if (vers == SetChanDataMessage::SET_CHANNEL_DATA_MESSAGE_VERSION) {
00538 flen = SetChanDataMessage::sizeof_fixedlen_msg();
00539 } else {
00540
00541
00542
00543 ink_release_assert(!"ClusterVConnection::set_http_info() bad msg version");
00544 }
00545
00546
00547
00548 CacheHTTPInfo *r = d;
00549 len = r->marshal_length();
00550 data = (void *) ALLOCA_DOUBLE(flen + len);
00551 memcpy((char *) data, (char *) &msg, sizeof(msg));
00552 m = (SetChanDataMessage *) data;
00553 m->data_type = CACHE_DATA_HTTP_INFO;
00554
00555 char *p = (char *) m + flen;
00556 res = r->marshal(p, len);
00557 if (res < 0) {
00558 r->destroy();
00559 return;
00560 }
00561 r->destroy();
00562
00563 m->channel = channel;
00564 m->sequence_number = token.sequence_number;
00565
00566
00567 ink_atomic_increment(&n_set_data_msgs, 1);
00568
00569 clusterProcessor.invoke_remote(ch, SET_CHANNEL_DATA_CLUSTER_FUNCTION, data, flen + len);
00570 }
00571
00572 bool ClusterVConnection::set_pin_in_cache(time_t t)
00573 {
00574 SetChanPinMessage msg;
00575
00576
00577
00578
00579
00580
00581 ink_release_assert(this->write.vio.op == VIO::NONE);
00582
00583 ink_release_assert(this->read.vio.op == VIO::NONE);
00584 time_pin = t;
00585
00586 int vers = SetChanPinMessage::protoToVersion(ch->machine->msg_proto_major);
00587
00588 if (vers == SetChanPinMessage::SET_CHANNEL_PIN_MESSAGE_VERSION) {
00589 SetChanPinMessage::sizeof_fixedlen_msg();
00590 } else {
00591
00592
00593
00594 ink_release_assert(!"ClusterVConnection::set_pin_in_cache() bad msg " "version");
00595 }
00596 msg.channel = channel;
00597 msg.sequence_number = token.sequence_number;
00598 msg.pin_time = time_pin;
00599
00600
00601 ink_atomic_increment(&n_set_data_msgs, 1);
00602
00603 clusterProcessor.invoke_remote(ch, SET_CHANNEL_PIN_CLUSTER_FUNCTION, (char *) &msg, sizeof(msg));
00604 return true;
00605 }
00606
00607 time_t ClusterVConnection::get_pin_in_cache()
00608 {
00609 return time_pin;
00610 }
00611
00612 bool ClusterVConnection::set_disk_io_priority(int priority)
00613 {
00614 SetChanPriorityMessage msg;
00615
00616
00617
00618
00619
00620
00621 ink_release_assert(this->write.vio.op == VIO::NONE);
00622
00623 ink_release_assert(this->read.vio.op == VIO::NONE);
00624 disk_io_priority = priority;
00625
00626 int vers = SetChanPriorityMessage::protoToVersion(ch->machine->msg_proto_major);
00627
00628 if (vers == SetChanPriorityMessage::SET_CHANNEL_PRIORITY_MESSAGE_VERSION) {
00629 SetChanPriorityMessage::sizeof_fixedlen_msg();
00630 } else {
00631
00632
00633
00634 ink_release_assert(!"ClusterVConnection::set_disk_io_priority() bad msg " "version");
00635 }
00636 msg.channel = channel;
00637 msg.sequence_number = token.sequence_number;
00638 msg.disk_priority = priority;
00639
00640
00641 ink_atomic_increment(&n_set_data_msgs, 1);
00642
00643 clusterProcessor.invoke_remote(ch, SET_CHANNEL_PRIORITY_CLUSTER_FUNCTION, (char *) &msg, sizeof(msg));
00644 return true;
00645 }
00646
00647 int
00648 ClusterVConnection::get_disk_io_priority()
00649 {
00650 return disk_io_priority;
00651 }
00652
00653