Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 #include "P_Cluster.h"
00030 
00031 
00032 
00033 
00034 
00035 
00036 
00037 void
00038 cluster_schedule(ClusterHandler * ch, ClusterVConnection * vc, ClusterVConnState * ns)
00039 {
00040   
00041   
00042   
00043   int new_bucket = ch->cur_vcs;
00044 
00045   if (vc->type == VC_NULL)
00046     vc->type = VC_CLUSTER;
00047   if (ns == &vc->read) {
00048     ClusterVC_enqueue_read(ch->read_vcs[new_bucket], vc);
00049   } else {
00050     ClusterVC_enqueue_write(ch->write_vcs[new_bucket], vc);
00051   }
00052 }
00053 
00054 void
00055 cluster_reschedule_offset(ClusterHandler * ch, ClusterVConnection * vc, ClusterVConnState * ns, int offset)
00056 {
00057   if (ns == &vc->read) {
00058     if (vc->read.queue)
00059       ClusterVC_remove_read(vc);
00060     ClusterVC_enqueue_read(ch->read_vcs[(ch->cur_vcs + offset) % CLUSTER_BUCKETS], vc);
00061   } else {
00062     if (vc->write.queue)
00063       ClusterVC_remove_write(vc);
00064     ClusterVC_enqueue_write(ch->write_vcs[(ch->cur_vcs + offset) % CLUSTER_BUCKETS], vc);
00065   }
00066 }
00067 
00068 
00069 
00070 
00071 
00072 unsigned int cluster_sequence_number = 0;
00073 
00074 void
00075 ClusterVCToken::alloc()
00076 {
00077 #ifdef LOCAL_CLUSTER_TEST_MODE
00078   ip_created = this_cluster_machine()->cluster_port;
00079 #else
00080   ip_created = this_cluster_machine()->ip;
00081 #endif
00082   sequence_number = ink_atomic_increment((int *) &cluster_sequence_number, 1);
00083 }
00084 
00085 
00086 
00087 
00088 
00089 IOBufferBlock *
00090 clone_IOBufferBlockList(IOBufferBlock * b, int start_off, int n, IOBufferBlock ** b_tail)
00091 {
00092 
00093   
00094   
00095   
00096 
00097   int64_t nbytes = n;
00098   int64_t block_read_avail;
00099   int64_t bytes_to_skip = start_off;
00100   IOBufferBlock *bsrc = b;
00101   IOBufferBlock *bclone = 0;
00102   IOBufferBlock *bclone_head = 0;
00103 
00104   while (bsrc && nbytes) {
00105     
00106     if (!bsrc->read_avail()) {
00107       bsrc = bsrc->next;
00108       continue;
00109     }
00110 
00111     if (bclone_head) {
00112       bclone->next = bsrc->clone();
00113       bclone = bclone->next;
00114     } else {
00115 
00116       
00117       if (bytes_to_skip) {
00118         bytes_to_skip -= bsrc->read_avail();
00119 
00120         if (bytes_to_skip < 0) {
00121           
00122           bclone_head = bsrc->clone();
00123           bclone_head->consume(bsrc->read_avail() + bytes_to_skip);
00124           bclone = bclone_head;
00125           bytes_to_skip = 0;
00126 
00127         } else {
00128           
00129           bsrc = bsrc->next;
00130           continue;
00131         }
00132       } else {
00133         bclone_head = bsrc->clone();
00134         bclone = bclone_head;
00135       }
00136     }
00137     block_read_avail = bclone->read_avail();
00138     nbytes -= block_read_avail;
00139     if (nbytes < 0) {
00140       
00141       bclone->fill(nbytes);
00142       nbytes = 0;
00143     }
00144     bsrc = bsrc->next;
00145   }
00146   ink_release_assert(!nbytes);
00147   *b_tail = bclone;
00148   return bclone_head;
00149 }
00150 
00151 IOBufferBlock *
00152 consume_IOBufferBlockList(IOBufferBlock * b, int64_t n)
00153 {
00154   IOBufferBlock *b_remainder = 0;
00155   int64_t nbytes = n;
00156 
00157   while (b) {
00158     nbytes -= b->read_avail();
00159     if (nbytes <= 0) {
00160       if (nbytes < 0) {
00161         
00162         b_remainder = b->clone();
00163         b->fill(nbytes);        
00164         b_remainder->consume(b->read_avail());  
00165         b_remainder->next = b->next;
00166         b->next = 0;
00167         nbytes = 0;
00168 
00169       } else {
00170         
00171         b_remainder = b->next;
00172       }
00173       break;
00174 
00175     } else {
00176       b = b->next;
00177     }
00178   }
00179   ink_release_assert(nbytes == 0);
00180   return b_remainder;           
00181 }
00182 
00183 int64_t
00184 bytes_IOBufferBlockList(IOBufferBlock * b, int64_t read_avail_bytes)
00185 {
00186   int64_t n = 0;;
00187 
00188   while (b) {
00189     if (read_avail_bytes) {
00190       n += b->read_avail();
00191     } else {
00192       n += b->write_avail();
00193     }
00194     b = b->next;
00195   }
00196   return n;
00197 }
00198 
00199 
00200 
00201 
00202 
00203 #if TEST_PARTIAL_READS
00204 
00205 
00206 
00207 int
00208 partial_readv(int fd, IOVec * iov, int n_iov, int seq)
00209 {
00210   IOVec tiov[16];
00211   for (int i = 0; i < n_iov; i++)
00212     tiov[i] = iov[i];
00213   int tn_iov = n_iov;
00214   int rnd = seq;
00215   int element = rand_r((unsigned int *) &rnd);
00216   element = element % n_iov;
00217   int byte = rand_r((unsigned int *) &rnd);
00218   byte = byte % iov[element].iov_len;
00219   int stop = rand_r((unsigned int *) &rnd);
00220   if (!(stop % 3)) {            
00221     tn_iov = element + 1;
00222     tiov[element].iov_len = byte;
00223     if (!byte)
00224       tn_iov--;
00225     if (!tn_iov) {
00226       tiov[element].iov_len = 1;
00227       tn_iov++;
00228     }
00229     
00230   }
00231   return socketManager.read_vector(fd, &tiov[0], tn_iov);
00232 }
00233 #endif // TEST_PARTIAL_READS
00234 
00235 #if TEST_PARTIAL_WRITES
00236 
00237 
00238 
00239 int
00240 partial_writev(int fd, IOVec * iov, int n_iov, int seq)
00241 {
00242   int rnd = seq;
00243   int sum = 0;
00244   int i = 0;
00245     for (i = 0; i < n_iov; i++) {
00246       int l = iov[i].iov_len;
00247       int r = rand_r((unsigned int *) &rnd);
00248       if ((r >> 4) & 1) {
00249         l = ((unsigned int) rand_r((unsigned int *) &rnd)) % iov[i].iov_len;
00250         if (!l) {
00251           l = iov[i].iov_len;
00252         }
00253       }
00254       ink_assert(l <= iov[i].iov_len);
00255       fprintf(stderr, "writing %d: [%d] &%X %d of %d\n", seq, i, iov[i].iov_base, l, iov[i].iov_len);
00256       int res = socketManager.write(fd, iov[i].iov_base, l);
00257       if (res < 0) {
00258         return res;
00259       }
00260       sum += res;
00261       if (res != iov[i].iov_len) {
00262         return sum;
00263       }
00264     }
00265     return sum;
00266 }
00267 #endif // TEST_PARTIAL_WRITES
00268 
00269 
00270 
00271 
00272 #ifdef ENABLE_TIME_TRACE
00273 int inmsg_time_dist[TIME_DIST_BUCKETS_SIZE];
00274 int inmsg_events = 0;
00275 
00276 int cluster_send_time_dist[TIME_DIST_BUCKETS_SIZE];
00277 int cluster_send_events = 0;
00278 #endif // ENABLE_TIME_TRACE
00279 
00280 int time_trace = 0;
00281 
00282 void
00283 dump_time_buckets()
00284 {
00285 #ifdef ENABLE_TIME_TRACE
00286   printf("\nremote ops:\n");
00287   for (int i = 0; i < TIME_DIST_BUCKETS_SIZE; ++i) {
00288     printf("%d ", rmt_callback_time_dist[i]);
00289     rmt_callback_time_dist[i] = 0;
00290   }
00291   printf("\nremote lookup ops:\n");
00292   for (int j = 0; j < TIME_DIST_BUCKETS_SIZE; ++j) {
00293     printf("%d ", lkrmt_callback_time_dist[j]);
00294     lkrmt_callback_time_dist[j] = 0;
00295   }
00296   printf("\nlocal cache ops:\n");
00297   for (int k = 0; k < TIME_DIST_BUCKETS_SIZE; ++k) {
00298     printf("%d ", callback_time_dist[k]);
00299     callback_time_dist[k] = 0;
00300   }
00301   printf("\nphysical cache ops:\n");
00302   for (int l = 0; l < TIME_DIST_BUCKETS_SIZE; ++l) {
00303     printf("%d ", cdb_callback_time_dist[l]);
00304     cdb_callback_time_dist[l] = 0;
00305   }
00306   printf("\nin message ops:\n");
00307   for (int m = 0; m < TIME_DIST_BUCKETS_SIZE; ++m) {
00308     printf("%d ", inmsg_time_dist[m]);
00309     inmsg_time_dist[m] = 0;
00310   }
00311   printf("\ncluster send time:\n");
00312   for (int n = 0; n < TIME_DIST_BUCKETS_SIZE; ++n) {
00313     printf("%d ", cluster_send_time_dist[n]);
00314     cluster_send_time_dist[n] = 0;
00315   }
00316 #endif // ENABLE_TIME_TRACE
00317 }
00318 
00319 GlobalClusterPeriodicEvent::GlobalClusterPeriodicEvent():Continuation(new_ProxyMutex())
00320 {
00321   SET_HANDLER((GClusterPEHandler) & GlobalClusterPeriodicEvent::calloutEvent);
00322 }
00323 
00324 GlobalClusterPeriodicEvent::~GlobalClusterPeriodicEvent()
00325 {
00326   _thisCallout->cancel(this);
00327 }
00328 
00329 void
00330 GlobalClusterPeriodicEvent::init()
00331 {
00332   _thisCallout = eventProcessor.schedule_every(this, HRTIME_SECONDS(10), ET_CALL);
00333 }
00334 
00335 int
00336 GlobalClusterPeriodicEvent::calloutEvent(Event * , void * )
00337 {
00338   if (time_trace) {
00339     dump_time_buckets();
00340   }
00341   clusterProcessor.compute_cluster_mode();
00342   return EVENT_CONT;
00343 }
00344 
00345