• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

ClusterVConnection.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 
00025 /****************************************************************************
00026 
00027   ClusterVConnection.cc
00028 ****************************************************************************/
00029 
00030 #include "P_Cluster.h"
00031 ClassAllocator<ClusterVConnection> clusterVCAllocator("clusterVCAllocator");
00032 ClassAllocator<ByteBankDescriptor> byteBankAllocator("byteBankAllocator");
00033 
00034 ByteBankDescriptor *
00035 ByteBankDescriptor::ByteBankDescriptor_alloc(IOBufferBlock * iob)
00036 {
00037   ByteBankDescriptor *b = byteBankAllocator.alloc();
00038   b->block = iob;
00039   return b;
00040 }
00041 
00042 void
00043 ByteBankDescriptor::ByteBankDescriptor_free(ByteBankDescriptor * b)
00044 {
00045   b->block = 0;
00046   byteBankAllocator.free(b);
00047 }
00048 
00049 void
00050 clusterVCAllocator_free(ClusterVConnection * vc)
00051 {
00052   vc->mutex = 0;
00053   vc->action_ = 0;
00054   vc->free();
00055   if (vc->in_vcs) {
00056     vc->type = VC_CLUSTER_CLOSED;
00057     return;
00058   }
00059   clusterVCAllocator.free(vc);
00060 }
00061 
00062 ClusterVConnState::ClusterVConnState():enabled(0), priority(1), vio(VIO::NONE), queue(0), ifd(-1), delay_timeout(NULL)
00063 {
00064 }
00065 
00066 ClusterVConnectionBase::ClusterVConnectionBase():
00067 thread(0), closed(0), inactivity_timeout_in(0), active_timeout_in(0), inactivity_timeout(NULL), active_timeout(NULL)
00068 {
00069 }
00070 
00071 #ifdef DEBUG
00072 int
00073   ClusterVConnectionBase::enable_debug_trace = 0;
00074 #endif
00075 
00076 VIO *
00077 ClusterVConnectionBase::do_io_read(Continuation * acont, int64_t anbytes, MIOBuffer * abuffer)
00078 {
00079   ink_assert(!closed);
00080   read.vio.buffer.writer_for(abuffer);
00081   read.vio.op = VIO::READ;
00082   read.vio.set_continuation(acont);
00083   read.vio.nbytes = anbytes;
00084   read.vio.ndone = 0;
00085   read.vio.vc_server = (VConnection *) this;
00086   read.enabled = 1;
00087 
00088   ClusterVConnection *cvc = (ClusterVConnection *) this;
00089   Debug("cluster_vc_xfer", "do_io_read [%s] chan %d", "", cvc->channel);
00090   return &read.vio;
00091 }
00092 
00093 VIO *
00094 ClusterVConnectionBase::do_io_pread(Continuation * /* acont ATS_UNUSED */, int64_t /* anbytes ATS_UNUSED */,
00095                                     MIOBuffer * /* abuffer ATS_UNUSED */, int64_t /* off ATS_UNUSED */)
00096 {
00097   return 0;
00098 }
00099 
00100 int
00101 ClusterVConnection::get_header(void ** /* ptr ATS_UNUSED */, int * /*len ATS_UNUSED */)
00102 {
00103   ink_assert(!"implemented");
00104   return -1;
00105 }
00106 
00107 int
00108 ClusterVConnection::set_header(void * /* ptr ATS_UNUSED */, int /* len ATS_UNUSED */)
00109 {
00110   ink_assert(!"implemented");
00111   return -1;
00112 }
00113 
00114 int
00115 ClusterVConnection::get_single_data(void ** /* ptr ATS_UNUSED */, int * /* len ATS_UNUSED */)
00116 {
00117   ink_assert(!"implemented");
00118   return -1;
00119 }
00120 
00121 VIO *
00122 ClusterVConnectionBase::do_io_write(Continuation * acont, int64_t anbytes, IOBufferReader * abuffer, bool owner)
00123 {
00124   ink_assert(!closed);
00125   ink_assert(!owner);
00126   write.vio.buffer.reader_for(abuffer);
00127   write.vio.op = VIO::WRITE;
00128   write.vio.set_continuation(acont);
00129   write.vio.nbytes = anbytes;
00130   write.vio.ndone = 0;
00131   write.vio.vc_server = (VConnection *) this;
00132   write.enabled = 1;
00133 
00134   return &write.vio;
00135 }
00136 
00137 void
00138 ClusterVConnectionBase::do_io_close(int alerrno)
00139 {
00140   read.enabled = 0;
00141   write.enabled = 0;
00142   read.vio.buffer.clear();
00143   write.vio.buffer.clear();
00144   INK_WRITE_MEMORY_BARRIER;
00145   if (alerrno && alerrno != -1)
00146     this->lerrno = alerrno;
00147 
00148   if (alerrno == -1) {
00149     closed = 1;
00150   } else {
00151     closed = -1;
00152   }
00153 }
00154 
00155 void
00156 ClusterVConnection::reenable(VIO *vio)
00157 {
00158   if (type == VC_CLUSTER_WRITE)
00159     ch->vcs_push(this, VC_CLUSTER_WRITE);
00160 
00161   ClusterVConnectionBase::reenable(vio);
00162 }
00163 
00164 void
00165 ClusterVConnectionBase::reenable(VIO * vio)
00166 {
00167   ink_assert(!closed);
00168   if (vio == &read.vio) {
00169     read.enabled = 1;
00170 #ifdef DEBUG
00171     if (enable_debug_trace && (vio->buffer.writer() && !vio->buffer.writer()->write_avail()))
00172       printf("NetVConnection re-enabled for read when full\n");
00173 #endif
00174   } else if (vio == &write.vio) {
00175     write.enabled = 1;
00176 #ifdef DEBUG
00177     if (enable_debug_trace && (vio->buffer.writer() && !vio->buffer.reader()->read_avail()))
00178       printf("NetVConnection re-enabled for write when empty\n");
00179 #endif
00180   } else {
00181     ink_assert(!"bad vio");
00182   }
00183 }
00184 
00185 void
00186 ClusterVConnectionBase::reenable_re(VIO * vio)
00187 {
00188   reenable(vio);
00189 }
00190 
00191 ClusterVConnection::ClusterVConnection(int is_new_connect_read)
00192   :  ch(NULL),
00193      new_connect_read(is_new_connect_read),
00194      remote_free(0),
00195      last_local_free(0),
00196      channel(0),
00197      close_disabled(0),
00198      remote_closed(0),
00199      remote_close_disabled(1),
00200      remote_lerrno(0),
00201      in_vcs(0),
00202      type(0),
00203      start_time(0),
00204      last_activity_time(0),
00205      n_set_data_msgs(0),
00206      n_recv_set_data_msgs(0),
00207      pending_remote_fill(0),
00208      remote_ram_cache_hit(0),
00209      have_all_data(0),
00210      initial_data_bytes(0),
00211      current_cont(0),
00212      iov_map(CLUSTER_IOV_NOT_OPEN),
00213      write_list_tail(0),
00214      write_list_bytes(0),
00215      write_bytes_in_transit(0),
00216      alternate(),
00217      time_pin(0),
00218      disk_io_priority(0)
00219 {
00220 #ifdef DEBUG
00221   read.vio.buffer.name = "ClusterVConnection.read";
00222   write.vio.buffer.name = "ClusterVConnection.write";
00223 #endif
00224   SET_HANDLER((ClusterVConnHandler) & ClusterVConnection::startEvent);
00225 }
00226 
00227 ClusterVConnection::~ClusterVConnection()
00228 {
00229   free();
00230 }
00231 
00232 void
00233 ClusterVConnection::free()
00234 {
00235   if (alternate.valid()) {
00236     alternate.destroy();
00237   }
00238   ByteBankDescriptor *d;
00239   while ((d = byte_bank_q.dequeue())) {
00240     ByteBankDescriptor::ByteBankDescriptor_free(d);
00241   }
00242   read_block = 0;
00243   remote_write_block = 0;
00244   marshal_buf = 0;
00245   write_list = 0;
00246   write_list_tail = 0;
00247   write_list_bytes = 0;
00248   write_bytes_in_transit = 0;
00249 }
00250 
00251 VIO *
00252 ClusterVConnection::do_io_read(Continuation * c, int64_t nbytes, MIOBuffer * buf)
00253 {
00254   if (type == VC_CLUSTER)
00255     type = VC_CLUSTER_READ;
00256   ch->vcs_push(this, VC_CLUSTER_READ);
00257 
00258   return ClusterVConnectionBase::do_io_read(c, nbytes, buf);
00259 }
00260 
00261 VIO *
00262 ClusterVConnection::do_io_write(Continuation * c, int64_t nbytes, IOBufferReader * buf, bool owner)
00263 {
00264   if (type == VC_CLUSTER)
00265     type = VC_CLUSTER_WRITE;
00266   ch->vcs_push(this, VC_CLUSTER_WRITE);
00267 
00268   return ClusterVConnectionBase::do_io_write(c, nbytes, buf, owner);
00269 }
00270 
00271 void
00272 ClusterVConnection::do_io_close(int alerrno)
00273 {
00274   if ((type == VC_CLUSTER) && current_cont) {
00275     if (((CacheContinuation *)current_cont)->read_cluster_vc == this)
00276       type = VC_CLUSTER_READ;
00277     else if (((CacheContinuation *)current_cont)->write_cluster_vc == this)
00278       type = VC_CLUSTER_WRITE;
00279   }
00280   ch->vcs_push(this, type);
00281 
00282   ClusterVConnectionBase::do_io_close(alerrno);
00283 }
00284 
00285 int
00286 ClusterVConnection::startEvent(int event, Event * e)
00287 {
00288   //
00289   // Safe to call with e == NULL from the same thread.
00290   //
00291   (void) event;
00292   start(e ? e->ethread : (EThread *) NULL);
00293   return EVENT_DONE;
00294 }
00295 
00296 int
00297 ClusterVConnection::mainEvent(int event, Event * e)
00298 {
00299   (void) event;
00300   (void) e;
00301   ink_assert(!"unexpected event");
00302   return EVENT_DONE;
00303 }
00304 
00305 int
00306 ClusterVConnection::start(EThread * t)
00307 {
00308   //
00309   //  New channel connect protocol.  Establish VC locally and send the
00310   //  channel id to the target.  Reverse of existing connect protocol
00311   //
00312   //////////////////////////////////////////////////////////////////////////
00313   // In the new VC connect protocol, we always establish the local side
00314   // of the connection followed by the remote side.
00315   //
00316   // Read connection notes:
00317   // ----------------------
00318   // The response message now consists of the standard reply message
00319   // along with a portion of the  object data.  This data is always
00320   // transferred in the same Cluster transfer message as channel data.
00321   // In order to transfer data into a partially connected VC, we introduced
00322   // a VC "pending_remote_fill" state allowing us to move the initial data
00323   // using the existing user channel mechanism.
00324   // Initially, both sides of the connection set "pending_remote_fill".
00325   //
00326   // "pending_remote_fill" allows us to make the following assumptions.
00327   //   1) No free space messages are sent for VC(s) in this state.
00328   //   2) Writer side, the initial write data is described by
00329   //      vc->remote_write_block NOT by vc->write.vio.buffer, since
00330   //      vc->write.vio is reserved for use in the OneWayTunnel.
00331   //      OneWayTunnel is used when all the object data cannot be
00332   //      contained in the initial send buffer.
00333   //   3) Writer side, write vio mutex not acquired for initial data write.
00334   ///////////////////////////////////////////////////////////////////////////
00335 
00336   int status;
00337   if (!channel) {
00338 #ifdef CLUSTER_TOMCAT
00339     Ptr<ProxyMutex> m = action_.mutex;
00340     if (!m) {
00341       m = new_ProxyMutex();
00342     }
00343 #else
00344     Ptr<ProxyMutex> m = action_.mutex;
00345 #endif
00346 
00347     // Establish the local side of the VC connection
00348     MUTEX_TRY_LOCK(lock, m, t);
00349     if (!lock) {
00350       t->schedule_in(this, CLUSTER_CONNECT_RETRY);
00351       return EVENT_DONE;
00352     }
00353     if (!ch) {
00354       if (action_.continuation) {
00355         action_.continuation->handleEvent(CLUSTER_EVENT_OPEN_FAILED, (void *) -ECLUSTER_NO_MACHINE);
00356         clusterVCAllocator_free(this);
00357         return EVENT_DONE;
00358       } else {
00359         // if we have been invoked immediately
00360         clusterVCAllocator_free(this);
00361         return -1;
00362       }
00363     }
00364 
00365     channel = ch->alloc_channel(this);
00366     if (channel < 0) {
00367       if (action_.continuation) {
00368         action_.continuation->handleEvent(CLUSTER_EVENT_OPEN_FAILED, (void *) -ECLUSTER_NOMORE_CHANNELS);
00369         clusterVCAllocator_free(this);
00370         return EVENT_DONE;
00371       } else {
00372         // if we have been invoked immediately
00373         clusterVCAllocator_free(this);
00374         return -1;
00375       }
00376 
00377     } else {
00378       Debug(CL_TRACE, "VC start alloc local chan=%d VC=%p", channel, this);
00379       if (new_connect_read)
00380         this->pending_remote_fill = 1;
00381     }
00382 
00383   } else {
00384     // Establish the remote side of the VC connection
00385     if ((status = ch->alloc_channel(this, channel)) < 0) {
00386       Debug(CL_TRACE, "VC start alloc remote failed chan=%d VC=%p", channel, this);
00387       clusterVCAllocator_free(this);
00388       return status;            // Channel active or no more channels
00389     } else {
00390       Debug(CL_TRACE, "VC start alloc remote chan=%d VC=%p", channel, this);
00391       if (new_connect_read)
00392         this->pending_remote_fill = 1;
00393       this->iov_map = CLUSTER_IOV_NONE; // disable connect timeout
00394     }
00395   }
00396   cluster_schedule(ch, this, &read);
00397   cluster_schedule(ch, this, &write);
00398   if (action_.continuation) {
00399     action_.continuation->handleEvent(CLUSTER_EVENT_OPEN, this);
00400   }
00401   mutex = NULL;
00402   return EVENT_DONE;
00403 }
00404 
00405 int
00406 ClusterVConnection::was_closed()
00407 {
00408   return (closed && !close_disabled);
00409 }
00410 
00411 void
00412 ClusterVConnection::allow_close()
00413 {
00414   close_disabled = 0;
00415 }
00416 
00417 void
00418 ClusterVConnection::disable_close()
00419 {
00420   close_disabled = 1;
00421 }
00422 
00423 int
00424 ClusterVConnection::was_remote_closed()
00425 {
00426   if (!byte_bank_q.head && !remote_close_disabled)
00427     return remote_closed;
00428   else
00429     return 0;
00430 }
00431 
00432 void
00433 ClusterVConnection::allow_remote_close()
00434 {
00435   remote_close_disabled = 0;
00436 }
00437 
00438 bool ClusterVConnection::schedule_write()
00439 {
00440   //
00441   // Schedule write if we have all data or current write data is
00442   // at least DEFAULT_MAX_BUFFER_SIZE.
00443   //
00444   if (write_list) {
00445     if ((closed < 0) || remote_closed) {
00446       // User aborted connection, dump data.
00447 
00448       write_list = 0;
00449       write_list_tail = 0;
00450       write_list_bytes = 0;
00451 
00452       return false;
00453     }
00454 
00455     if (closed || (write_list_bytes >= DEFAULT_MAX_BUFFER_SIZE)) {
00456       // No more data to write or buffer list is full, start write
00457       return true;
00458     } else {
00459       // Buffer list is not full, defer write
00460       return false;
00461     }
00462   } else {
00463     return false;
00464   }
00465 }
00466 
00467 void
00468 ClusterVConnection::set_type(int options)
00469 {
00470   new_connect_read = (options & CLUSTER_OPT_CONN_READ) ? 1 : 0;
00471   if (new_connect_read) {
00472     pending_remote_fill = 1;
00473   } else {
00474     pending_remote_fill = 0;
00475   }
00476 }
00477 
00478 // Overide functions in base class VConnection.
00479 bool ClusterVConnection::get_data(int id, void * /* data ATS_UNUSED */)
00480 {
00481   switch (id) {
00482   case CACHE_DATA_HTTP_INFO:
00483     {
00484       ink_release_assert(!"ClusterVConnection::get_data CACHE_DATA_HTTP_INFO not supported");
00485     }
00486   case CACHE_DATA_KEY:
00487     {
00488       ink_release_assert(!"ClusterVConnection::get_data CACHE_DATA_KEY not supported");
00489     }
00490   default:
00491     {
00492       ink_release_assert(!"ClusterVConnection::get_data invalid id");
00493     }
00494   }
00495   return false;
00496 }
00497 
00498 void
00499 ClusterVConnection::get_http_info(CacheHTTPInfo ** info)
00500 {
00501   *info = &alternate;
00502 }
00503 
00504 int64_t
00505 ClusterVConnection::get_object_size()
00506 {
00507   return alternate.object_size_get();
00508 }
00509 
00510 bool
00511 ClusterVConnection::is_pread_capable()
00512 {
00513   return false;
00514 }
00515 
00516 void
00517 ClusterVConnection::set_http_info(CacheHTTPInfo * d)
00518 {
00519   int flen, len;
00520   void *data;
00521   int res;
00522   SetChanDataMessage *m;
00523   SetChanDataMessage msg;
00524 
00525   //
00526   // set_http_info() is a mechanism to associate additional data with a
00527   // open_write() ClusterVConnection.  It is only allowed after a
00528   // successful open_write() and prior to issuing the do_io(VIO::WRITE).
00529   // Cache semantics dictate that set_http_info() be established prior
00530   // to transferring any data on the ClusterVConnection.
00531   //
00532   ink_release_assert(this->write.vio.op == VIO::NONE);  // not true if do_io()
00533   //   already done
00534   ink_release_assert(this->read.vio.op == VIO::NONE);   // should always be true
00535 
00536   int vers = SetChanDataMessage::protoToVersion(ch->machine->msg_proto_major);
00537   if (vers == SetChanDataMessage::SET_CHANNEL_DATA_MESSAGE_VERSION) {
00538     flen = SetChanDataMessage::sizeof_fixedlen_msg();
00539   } else {
00540     //////////////////////////////////////////////////////////////
00541     // Create the specified down rev version of this message
00542     //////////////////////////////////////////////////////////////
00543     ink_release_assert(!"ClusterVConnection::set_http_info() bad msg version");
00544   }
00545 
00546   // Create message and marshal data.
00547 
00548   CacheHTTPInfo *r = d;
00549   len = r->marshal_length();
00550   data = (void *) ALLOCA_DOUBLE(flen + len);
00551   memcpy((char *) data, (char *) &msg, sizeof(msg));
00552   m = (SetChanDataMessage *) data;
00553   m->data_type = CACHE_DATA_HTTP_INFO;
00554 
00555   char *p = (char *) m + flen;
00556   res = r->marshal(p, len);
00557   if (res < 0) {
00558     r->destroy();
00559     return;
00560   }
00561   r->destroy();
00562 
00563   m->channel = channel;
00564   m->sequence_number = token.sequence_number;
00565 
00566   // note pending set_data() msgs on VC.
00567   ink_atomic_increment(&n_set_data_msgs, 1);
00568 
00569   clusterProcessor.invoke_remote(ch, SET_CHANNEL_DATA_CLUSTER_FUNCTION, data, flen + len);
00570 }
00571 
00572 bool ClusterVConnection::set_pin_in_cache(time_t t)
00573 {
00574   SetChanPinMessage msg;
00575 
00576   //
00577   // set_pin_in_cache() is a mechanism to set an attribute on a
00578   // open_write() ClusterVConnection.  It is only allowed after a
00579   // successful open_write() and prior to issuing the do_io(VIO::WRITE).
00580   //
00581   ink_release_assert(this->write.vio.op == VIO::NONE);  // not true if do_io()
00582   //   already done
00583   ink_release_assert(this->read.vio.op == VIO::NONE);   // should always be true
00584   time_pin = t;
00585 
00586   int vers = SetChanPinMessage::protoToVersion(ch->machine->msg_proto_major);
00587 
00588   if (vers == SetChanPinMessage::SET_CHANNEL_PIN_MESSAGE_VERSION) {
00589     SetChanPinMessage::sizeof_fixedlen_msg();
00590   } else {
00591     //////////////////////////////////////////////////////////////
00592     // Create the specified down rev version of this message
00593     //////////////////////////////////////////////////////////////
00594     ink_release_assert(!"ClusterVConnection::set_pin_in_cache() bad msg " "version");
00595   }
00596   msg.channel = channel;
00597   msg.sequence_number = token.sequence_number;
00598   msg.pin_time = time_pin;
00599 
00600   // note pending set_data() msgs on VC.
00601   ink_atomic_increment(&n_set_data_msgs, 1);
00602 
00603   clusterProcessor.invoke_remote(ch, SET_CHANNEL_PIN_CLUSTER_FUNCTION, (char *) &msg, sizeof(msg));
00604   return true;
00605 }
00606 
00607 time_t ClusterVConnection::get_pin_in_cache()
00608 {
00609   return time_pin;
00610 }
00611 
00612 bool ClusterVConnection::set_disk_io_priority(int priority)
00613 {
00614   SetChanPriorityMessage msg;
00615 
00616   //
00617   // set_disk_io_priority() is a mechanism to set an attribute on a
00618   // open_write() ClusterVConnection.  It is only allowed after a
00619   // successful open_write() and prior to issuing the do_io(VIO::WRITE).
00620   //
00621   ink_release_assert(this->write.vio.op == VIO::NONE);  // not true if do_io()
00622   //   already done
00623   ink_release_assert(this->read.vio.op == VIO::NONE);   // should always be true
00624   disk_io_priority = priority;
00625 
00626   int vers = SetChanPriorityMessage::protoToVersion(ch->machine->msg_proto_major);
00627 
00628   if (vers == SetChanPriorityMessage::SET_CHANNEL_PRIORITY_MESSAGE_VERSION) {
00629     SetChanPriorityMessage::sizeof_fixedlen_msg();
00630   } else {
00631     //////////////////////////////////////////////////////////////
00632     // Create the specified down rev version of this message
00633     //////////////////////////////////////////////////////////////
00634     ink_release_assert(!"ClusterVConnection::set_disk_io_priority() bad msg " "version");
00635   }
00636   msg.channel = channel;
00637   msg.sequence_number = token.sequence_number;
00638   msg.disk_priority = priority;
00639 
00640   // note pending set_data() msgs on VC.
00641   ink_atomic_increment(&n_set_data_msgs, 1);
00642 
00643   clusterProcessor.invoke_remote(ch, SET_CHANNEL_PRIORITY_CLUSTER_FUNCTION, (char *) &msg, sizeof(msg));
00644   return true;
00645 }
00646 
00647 int
00648 ClusterVConnection::get_disk_io_priority()
00649 {
00650   return disk_io_priority;
00651 }
00652 
00653 // End of ClusterVConnection.cc

Generated by  doxygen 1.7.1