Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include "P_Cluster.h"
00030
00031
00032
00033
00034
00035
00036
00037 void
00038 cluster_schedule(ClusterHandler * ch, ClusterVConnection * vc, ClusterVConnState * ns)
00039 {
00040
00041
00042
00043 int new_bucket = ch->cur_vcs;
00044
00045 if (vc->type == VC_NULL)
00046 vc->type = VC_CLUSTER;
00047 if (ns == &vc->read) {
00048 ClusterVC_enqueue_read(ch->read_vcs[new_bucket], vc);
00049 } else {
00050 ClusterVC_enqueue_write(ch->write_vcs[new_bucket], vc);
00051 }
00052 }
00053
00054 void
00055 cluster_reschedule_offset(ClusterHandler * ch, ClusterVConnection * vc, ClusterVConnState * ns, int offset)
00056 {
00057 if (ns == &vc->read) {
00058 if (vc->read.queue)
00059 ClusterVC_remove_read(vc);
00060 ClusterVC_enqueue_read(ch->read_vcs[(ch->cur_vcs + offset) % CLUSTER_BUCKETS], vc);
00061 } else {
00062 if (vc->write.queue)
00063 ClusterVC_remove_write(vc);
00064 ClusterVC_enqueue_write(ch->write_vcs[(ch->cur_vcs + offset) % CLUSTER_BUCKETS], vc);
00065 }
00066 }
00067
00068
00069
00070
00071
00072 unsigned int cluster_sequence_number = 0;
00073
00074 void
00075 ClusterVCToken::alloc()
00076 {
00077 #ifdef LOCAL_CLUSTER_TEST_MODE
00078 ip_created = this_cluster_machine()->cluster_port;
00079 #else
00080 ip_created = this_cluster_machine()->ip;
00081 #endif
00082 sequence_number = ink_atomic_increment((int *) &cluster_sequence_number, 1);
00083 }
00084
00085
00086
00087
00088
00089 IOBufferBlock *
00090 clone_IOBufferBlockList(IOBufferBlock * b, int start_off, int n, IOBufferBlock ** b_tail)
00091 {
00092
00093
00094
00095
00096
00097 int64_t nbytes = n;
00098 int64_t block_read_avail;
00099 int64_t bytes_to_skip = start_off;
00100 IOBufferBlock *bsrc = b;
00101 IOBufferBlock *bclone = 0;
00102 IOBufferBlock *bclone_head = 0;
00103
00104 while (bsrc && nbytes) {
00105
00106 if (!bsrc->read_avail()) {
00107 bsrc = bsrc->next;
00108 continue;
00109 }
00110
00111 if (bclone_head) {
00112 bclone->next = bsrc->clone();
00113 bclone = bclone->next;
00114 } else {
00115
00116
00117 if (bytes_to_skip) {
00118 bytes_to_skip -= bsrc->read_avail();
00119
00120 if (bytes_to_skip < 0) {
00121
00122 bclone_head = bsrc->clone();
00123 bclone_head->consume(bsrc->read_avail() + bytes_to_skip);
00124 bclone = bclone_head;
00125 bytes_to_skip = 0;
00126
00127 } else {
00128
00129 bsrc = bsrc->next;
00130 continue;
00131 }
00132 } else {
00133 bclone_head = bsrc->clone();
00134 bclone = bclone_head;
00135 }
00136 }
00137 block_read_avail = bclone->read_avail();
00138 nbytes -= block_read_avail;
00139 if (nbytes < 0) {
00140
00141 bclone->fill(nbytes);
00142 nbytes = 0;
00143 }
00144 bsrc = bsrc->next;
00145 }
00146 ink_release_assert(!nbytes);
00147 *b_tail = bclone;
00148 return bclone_head;
00149 }
00150
00151 IOBufferBlock *
00152 consume_IOBufferBlockList(IOBufferBlock * b, int64_t n)
00153 {
00154 IOBufferBlock *b_remainder = 0;
00155 int64_t nbytes = n;
00156
00157 while (b) {
00158 nbytes -= b->read_avail();
00159 if (nbytes <= 0) {
00160 if (nbytes < 0) {
00161
00162 b_remainder = b->clone();
00163 b->fill(nbytes);
00164 b_remainder->consume(b->read_avail());
00165 b_remainder->next = b->next;
00166 b->next = 0;
00167 nbytes = 0;
00168
00169 } else {
00170
00171 b_remainder = b->next;
00172 }
00173 break;
00174
00175 } else {
00176 b = b->next;
00177 }
00178 }
00179 ink_release_assert(nbytes == 0);
00180 return b_remainder;
00181 }
00182
00183 int64_t
00184 bytes_IOBufferBlockList(IOBufferBlock * b, int64_t read_avail_bytes)
00185 {
00186 int64_t n = 0;;
00187
00188 while (b) {
00189 if (read_avail_bytes) {
00190 n += b->read_avail();
00191 } else {
00192 n += b->write_avail();
00193 }
00194 b = b->next;
00195 }
00196 return n;
00197 }
00198
00199
00200
00201
00202
00203 #if TEST_PARTIAL_READS
00204
00205
00206
00207 int
00208 partial_readv(int fd, IOVec * iov, int n_iov, int seq)
00209 {
00210 IOVec tiov[16];
00211 for (int i = 0; i < n_iov; i++)
00212 tiov[i] = iov[i];
00213 int tn_iov = n_iov;
00214 int rnd = seq;
00215 int element = rand_r((unsigned int *) &rnd);
00216 element = element % n_iov;
00217 int byte = rand_r((unsigned int *) &rnd);
00218 byte = byte % iov[element].iov_len;
00219 int stop = rand_r((unsigned int *) &rnd);
00220 if (!(stop % 3)) {
00221 tn_iov = element + 1;
00222 tiov[element].iov_len = byte;
00223 if (!byte)
00224 tn_iov--;
00225 if (!tn_iov) {
00226 tiov[element].iov_len = 1;
00227 tn_iov++;
00228 }
00229
00230 }
00231 return socketManager.read_vector(fd, &tiov[0], tn_iov);
00232 }
00233 #endif // TEST_PARTIAL_READS
00234
00235 #if TEST_PARTIAL_WRITES
00236
00237
00238
00239 int
00240 partial_writev(int fd, IOVec * iov, int n_iov, int seq)
00241 {
00242 int rnd = seq;
00243 int sum = 0;
00244 int i = 0;
00245 for (i = 0; i < n_iov; i++) {
00246 int l = iov[i].iov_len;
00247 int r = rand_r((unsigned int *) &rnd);
00248 if ((r >> 4) & 1) {
00249 l = ((unsigned int) rand_r((unsigned int *) &rnd)) % iov[i].iov_len;
00250 if (!l) {
00251 l = iov[i].iov_len;
00252 }
00253 }
00254 ink_assert(l <= iov[i].iov_len);
00255 fprintf(stderr, "writing %d: [%d] &%X %d of %d\n", seq, i, iov[i].iov_base, l, iov[i].iov_len);
00256 int res = socketManager.write(fd, iov[i].iov_base, l);
00257 if (res < 0) {
00258 return res;
00259 }
00260 sum += res;
00261 if (res != iov[i].iov_len) {
00262 return sum;
00263 }
00264 }
00265 return sum;
00266 }
00267 #endif // TEST_PARTIAL_WRITES
00268
00269
00270
00271
00272 #ifdef ENABLE_TIME_TRACE
00273 int inmsg_time_dist[TIME_DIST_BUCKETS_SIZE];
00274 int inmsg_events = 0;
00275
00276 int cluster_send_time_dist[TIME_DIST_BUCKETS_SIZE];
00277 int cluster_send_events = 0;
00278 #endif // ENABLE_TIME_TRACE
00279
00280 int time_trace = 0;
00281
00282 void
00283 dump_time_buckets()
00284 {
00285 #ifdef ENABLE_TIME_TRACE
00286 printf("\nremote ops:\n");
00287 for (int i = 0; i < TIME_DIST_BUCKETS_SIZE; ++i) {
00288 printf("%d ", rmt_callback_time_dist[i]);
00289 rmt_callback_time_dist[i] = 0;
00290 }
00291 printf("\nremote lookup ops:\n");
00292 for (int j = 0; j < TIME_DIST_BUCKETS_SIZE; ++j) {
00293 printf("%d ", lkrmt_callback_time_dist[j]);
00294 lkrmt_callback_time_dist[j] = 0;
00295 }
00296 printf("\nlocal cache ops:\n");
00297 for (int k = 0; k < TIME_DIST_BUCKETS_SIZE; ++k) {
00298 printf("%d ", callback_time_dist[k]);
00299 callback_time_dist[k] = 0;
00300 }
00301 printf("\nphysical cache ops:\n");
00302 for (int l = 0; l < TIME_DIST_BUCKETS_SIZE; ++l) {
00303 printf("%d ", cdb_callback_time_dist[l]);
00304 cdb_callback_time_dist[l] = 0;
00305 }
00306 printf("\nin message ops:\n");
00307 for (int m = 0; m < TIME_DIST_BUCKETS_SIZE; ++m) {
00308 printf("%d ", inmsg_time_dist[m]);
00309 inmsg_time_dist[m] = 0;
00310 }
00311 printf("\ncluster send time:\n");
00312 for (int n = 0; n < TIME_DIST_BUCKETS_SIZE; ++n) {
00313 printf("%d ", cluster_send_time_dist[n]);
00314 cluster_send_time_dist[n] = 0;
00315 }
00316 #endif // ENABLE_TIME_TRACE
00317 }
00318
00319 GlobalClusterPeriodicEvent::GlobalClusterPeriodicEvent():Continuation(new_ProxyMutex())
00320 {
00321 SET_HANDLER((GClusterPEHandler) & GlobalClusterPeriodicEvent::calloutEvent);
00322 }
00323
00324 GlobalClusterPeriodicEvent::~GlobalClusterPeriodicEvent()
00325 {
00326 _thisCallout->cancel(this);
00327 }
00328
00329 void
00330 GlobalClusterPeriodicEvent::init()
00331 {
00332 _thisCallout = eventProcessor.schedule_every(this, HRTIME_SECONDS(10), ET_CALL);
00333 }
00334
00335 int
00336 GlobalClusterPeriodicEvent::calloutEvent(Event * , void * )
00337 {
00338 if (time_trace) {
00339 dump_time_buckets();
00340 }
00341 clusterProcessor.compute_cluster_mode();
00342 return EVENT_CONT;
00343 }
00344
00345