• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

ClusterLib.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 /****************************************************************************
00025 
00026   ClusterLib.cc
00027 ****************************************************************************/
00028 
00029 #include "P_Cluster.h"
00030 //
00031 // cluster_xxx() functions dealing with scheduling of Virtual Connections
00032 // in the read and write data buckets (read_vcs, write_vcs).
00033 //
00034 // In contrast to the net versions, these versions simply change the priority
00035 // scheduling only occurs after they move into the data_bucket.
00036 //
00037 void
00038 cluster_schedule(ClusterHandler * ch, ClusterVConnection * vc, ClusterVConnState * ns)
00039 {
00040   //
00041   // actually schedule into new bucket
00042   //
00043   int new_bucket = ch->cur_vcs;
00044 
00045   if (vc->type == VC_NULL)
00046     vc->type = VC_CLUSTER;
00047   if (ns == &vc->read) {
00048     ClusterVC_enqueue_read(ch->read_vcs[new_bucket], vc);
00049   } else {
00050     ClusterVC_enqueue_write(ch->write_vcs[new_bucket], vc);
00051   }
00052 }
00053 
00054 void
00055 cluster_reschedule_offset(ClusterHandler * ch, ClusterVConnection * vc, ClusterVConnState * ns, int offset)
00056 {
00057   if (ns == &vc->read) {
00058     if (vc->read.queue)
00059       ClusterVC_remove_read(vc);
00060     ClusterVC_enqueue_read(ch->read_vcs[(ch->cur_vcs + offset) % CLUSTER_BUCKETS], vc);
00061   } else {
00062     if (vc->write.queue)
00063       ClusterVC_remove_write(vc);
00064     ClusterVC_enqueue_write(ch->write_vcs[(ch->cur_vcs + offset) % CLUSTER_BUCKETS], vc);
00065   }
00066 }
00067 /*************************************************************************/
00068 // ClusterVCToken member functions (Public Class)
00069 /*************************************************************************/
00070 
00071 // global sequence number for building tokens
00072 unsigned int cluster_sequence_number = 0;
00073 
00074 void
00075 ClusterVCToken::alloc()
00076 {
00077 #ifdef LOCAL_CLUSTER_TEST_MODE
00078   ip_created = this_cluster_machine()->cluster_port;
00079 #else
00080   ip_created = this_cluster_machine()->ip;
00081 #endif
00082   sequence_number = ink_atomic_increment((int *) &cluster_sequence_number, 1);
00083 }
00084 
00085 ///////////////////////////////////////////
00086 // IOBufferBlock manipulation routines
00087 ///////////////////////////////////////////
00088 
00089 IOBufferBlock *
00090 clone_IOBufferBlockList(IOBufferBlock * b, int start_off, int n, IOBufferBlock ** b_tail)
00091 {
00092   ////////////////////////////////////////////////////////////////
00093   // Create a clone list of IOBufferBlock(s) where the sum
00094   // of all block read_avail is 'n'.  The given source list
00095   // must contain at least 'n' read avail bytes.
00096   ////////////////////////////////////////////////////////////////
00097   int64_t nbytes = n;
00098   int64_t block_read_avail;
00099   int64_t bytes_to_skip = start_off;
00100   IOBufferBlock *bsrc = b;
00101   IOBufferBlock *bclone = 0;
00102   IOBufferBlock *bclone_head = 0;
00103 
00104   while (bsrc && nbytes) {
00105     // Skip zero length blocks
00106     if (!bsrc->read_avail()) {
00107       bsrc = bsrc->next;
00108       continue;
00109     }
00110 
00111     if (bclone_head) {
00112       bclone->next = bsrc->clone();
00113       bclone = bclone->next;
00114     } else {
00115 
00116       // Skip bytes already processed
00117       if (bytes_to_skip) {
00118         bytes_to_skip -= bsrc->read_avail();
00119 
00120         if (bytes_to_skip < 0) {
00121           // Skip bytes in current block
00122           bclone_head = bsrc->clone();
00123           bclone_head->consume(bsrc->read_avail() + bytes_to_skip);
00124           bclone = bclone_head;
00125           bytes_to_skip = 0;
00126 
00127         } else {
00128           // Skip entire block
00129           bsrc = bsrc->next;
00130           continue;
00131         }
00132       } else {
00133         bclone_head = bsrc->clone();
00134         bclone = bclone_head;
00135       }
00136     }
00137     block_read_avail = bclone->read_avail();
00138     nbytes -= block_read_avail;
00139     if (nbytes < 0) {
00140       // Adjust read_avail in clone to match nbytes
00141       bclone->fill(nbytes);
00142       nbytes = 0;
00143     }
00144     bsrc = bsrc->next;
00145   }
00146   ink_release_assert(!nbytes);
00147   *b_tail = bclone;
00148   return bclone_head;
00149 }
00150 
00151 IOBufferBlock *
00152 consume_IOBufferBlockList(IOBufferBlock * b, int64_t n)
00153 {
00154   IOBufferBlock *b_remainder = 0;
00155   int64_t nbytes = n;
00156 
00157   while (b) {
00158     nbytes -= b->read_avail();
00159     if (nbytes <= 0) {
00160       if (nbytes < 0) {
00161         // Consumed a partial block, clone remainder
00162         b_remainder = b->clone();
00163         b->fill(nbytes);        // make read_avail match nbytes
00164         b_remainder->consume(b->read_avail());  // clone for remaining bytes
00165         b_remainder->next = b->next;
00166         b->next = 0;
00167         nbytes = 0;
00168 
00169       } else {
00170         // Consumed entire block
00171         b_remainder = b->next;
00172       }
00173       break;
00174 
00175     } else {
00176       b = b->next;
00177     }
00178   }
00179   ink_release_assert(nbytes == 0);
00180   return b_remainder;           // return remaining blocks
00181 }
00182 
00183 int64_t
00184 bytes_IOBufferBlockList(IOBufferBlock * b, int64_t read_avail_bytes)
00185 {
00186   int64_t n = 0;;
00187 
00188   while (b) {
00189     if (read_avail_bytes) {
00190       n += b->read_avail();
00191     } else {
00192       n += b->write_avail();
00193     }
00194     b = b->next;
00195   }
00196   return n;
00197 }
00198 
00199 
00200 //////////////////////////////////////////////////////
00201 // Miscellaneous test code
00202 //////////////////////////////////////////////////////
00203 #if TEST_PARTIAL_READS
00204 //
00205 // Test code which mimic the network slowdown
00206 //
00207 int
00208 partial_readv(int fd, IOVec * iov, int n_iov, int seq)
00209 {
00210   IOVec tiov[16];
00211   for (int i = 0; i < n_iov; i++)
00212     tiov[i] = iov[i];
00213   int tn_iov = n_iov;
00214   int rnd = seq;
00215   int element = rand_r((unsigned int *) &rnd);
00216   element = element % n_iov;
00217   int byte = rand_r((unsigned int *) &rnd);
00218   byte = byte % iov[element].iov_len;
00219   int stop = rand_r((unsigned int *) &rnd);
00220   if (!(stop % 3)) {            // 33% chance
00221     tn_iov = element + 1;
00222     tiov[element].iov_len = byte;
00223     if (!byte)
00224       tn_iov--;
00225     if (!tn_iov) {
00226       tiov[element].iov_len = 1;
00227       tn_iov++;
00228     }
00229     // printf("partitial read %d [%d]\n",tn_iov,tiov[element].iov_len);
00230   }
00231   return socketManager.read_vector(fd, &tiov[0], tn_iov);
00232 }
00233 #endif // TEST_PARTIAL_READS
00234 
00235 #if TEST_PARTIAL_WRITES
00236 //
00237 // Test code which mimic the network backing up (too little buffering)
00238 //
00239 int
00240 partial_writev(int fd, IOVec * iov, int n_iov, int seq)
00241 {
00242   int rnd = seq;
00243   int sum = 0;
00244   int i = 0;
00245     for (i = 0; i < n_iov; i++) {
00246       int l = iov[i].iov_len;
00247       int r = rand_r((unsigned int *) &rnd);
00248       if ((r >> 4) & 1) {
00249         l = ((unsigned int) rand_r((unsigned int *) &rnd)) % iov[i].iov_len;
00250         if (!l) {
00251           l = iov[i].iov_len;
00252         }
00253       }
00254       ink_assert(l <= iov[i].iov_len);
00255       fprintf(stderr, "writing %d: [%d] &%X %d of %d\n", seq, i, iov[i].iov_base, l, iov[i].iov_len);
00256       int res = socketManager.write(fd, iov[i].iov_base, l);
00257       if (res < 0) {
00258         return res;
00259       }
00260       sum += res;
00261       if (res != iov[i].iov_len) {
00262         return sum;
00263       }
00264     }
00265     return sum;
00266 }
00267 #endif // TEST_PARTIAL_WRITES
00268 
00269 ////////////////////////////////////////////////////////////////////////
00270 // Global periodic system dump functions
00271 ////////////////////////////////////////////////////////////////////////
00272 #ifdef ENABLE_TIME_TRACE
00273 int inmsg_time_dist[TIME_DIST_BUCKETS_SIZE];
00274 int inmsg_events = 0;
00275 
00276 int cluster_send_time_dist[TIME_DIST_BUCKETS_SIZE];
00277 int cluster_send_events = 0;
00278 #endif // ENABLE_TIME_TRACE
00279 
00280 int time_trace = 0;
00281 
00282 void
00283 dump_time_buckets()
00284 {
00285 #ifdef ENABLE_TIME_TRACE
00286   printf("\nremote ops:\n");
00287   for (int i = 0; i < TIME_DIST_BUCKETS_SIZE; ++i) {
00288     printf("%d ", rmt_callback_time_dist[i]);
00289     rmt_callback_time_dist[i] = 0;
00290   }
00291   printf("\nremote lookup ops:\n");
00292   for (int j = 0; j < TIME_DIST_BUCKETS_SIZE; ++j) {
00293     printf("%d ", lkrmt_callback_time_dist[j]);
00294     lkrmt_callback_time_dist[j] = 0;
00295   }
00296   printf("\nlocal cache ops:\n");
00297   for (int k = 0; k < TIME_DIST_BUCKETS_SIZE; ++k) {
00298     printf("%d ", callback_time_dist[k]);
00299     callback_time_dist[k] = 0;
00300   }
00301   printf("\nphysical cache ops:\n");
00302   for (int l = 0; l < TIME_DIST_BUCKETS_SIZE; ++l) {
00303     printf("%d ", cdb_callback_time_dist[l]);
00304     cdb_callback_time_dist[l] = 0;
00305   }
00306   printf("\nin message ops:\n");
00307   for (int m = 0; m < TIME_DIST_BUCKETS_SIZE; ++m) {
00308     printf("%d ", inmsg_time_dist[m]);
00309     inmsg_time_dist[m] = 0;
00310   }
00311   printf("\ncluster send time:\n");
00312   for (int n = 0; n < TIME_DIST_BUCKETS_SIZE; ++n) {
00313     printf("%d ", cluster_send_time_dist[n]);
00314     cluster_send_time_dist[n] = 0;
00315   }
00316 #endif // ENABLE_TIME_TRACE
00317 }
00318 
00319 GlobalClusterPeriodicEvent::GlobalClusterPeriodicEvent():Continuation(new_ProxyMutex())
00320 {
00321   SET_HANDLER((GClusterPEHandler) & GlobalClusterPeriodicEvent::calloutEvent);
00322 }
00323 
00324 GlobalClusterPeriodicEvent::~GlobalClusterPeriodicEvent()
00325 {
00326   _thisCallout->cancel(this);
00327 }
00328 
00329 void
00330 GlobalClusterPeriodicEvent::init()
00331 {
00332   _thisCallout = eventProcessor.schedule_every(this, HRTIME_SECONDS(10), ET_CALL);
00333 }
00334 
00335 int
00336 GlobalClusterPeriodicEvent::calloutEvent(Event * /* e ATS_UNUSED */, void * /* data ATS_UNUSED */)
00337 {
00338   if (time_trace) {
00339     dump_time_buckets();
00340   }
00341   clusterProcessor.compute_cluster_mode();
00342   return EVENT_CONT;
00343 }
00344 
00345 // End of ClusterLib.cc

Generated by  doxygen 1.7.1