00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 #include "P_Cluster.h"
00031 int
00032   ClusterLoadMonitor::cf_monitor_enabled;
00033 int
00034   ClusterLoadMonitor::cf_ping_message_send_msec_interval;
00035 int
00036   ClusterLoadMonitor::cf_num_ping_response_buckets;
00037 int
00038   ClusterLoadMonitor::cf_msecs_per_ping_response_bucket;
00039 int
00040   ClusterLoadMonitor::cf_ping_latency_threshold_msecs;
00041 int
00042   ClusterLoadMonitor::cf_cluster_load_compute_msec_interval;
00043 int
00044   ClusterLoadMonitor::cf_cluster_periodic_msec_interval;
00045 int
00046   ClusterLoadMonitor::cf_ping_history_buf_length;
00047 int
00048   ClusterLoadMonitor::cf_cluster_load_clear_duration;
00049 int
00050   ClusterLoadMonitor::cf_cluster_load_exceed_duration;
00051 
00052 ClusterLoadMonitor::ClusterLoadMonitor(ClusterHandler * ch)
00053 :Continuation(0), ch(ch), ping_history_buf_head(0),
00054 periodic_action(0), cluster_overloaded(0), cancel_periodic(0),
00055 cluster_load_msg_sequence_number(0), cluster_load_msg_start_sequence_number(0)
00056 {
00057   mutex = this->ch->mutex;
00058   SET_HANDLER(&ClusterLoadMonitor::cluster_load_periodic);
00059 
00060   ping_message_send_msec_interval = cf_ping_message_send_msec_interval ? cf_ping_message_send_msec_interval : 100;
00061   Debug("cluster_monitor", "ping_message_send_msec_interval=%d", ping_message_send_msec_interval);
00062 
00063   num_ping_response_buckets = cf_num_ping_response_buckets ? cf_num_ping_response_buckets : 100;
00064   Debug("cluster_monitor", "num_ping_response_buckets=%d", num_ping_response_buckets);
00065 
00066   msecs_per_ping_response_bucket = cf_msecs_per_ping_response_bucket ? cf_msecs_per_ping_response_bucket : 50;
00067   Debug("cluster_monitor", "msecs_per_ping_response_bucket=%d", msecs_per_ping_response_bucket);
00068 
00069   ping_latency_threshold_msecs = cf_ping_latency_threshold_msecs ? cf_ping_latency_threshold_msecs : 500;
00070   Debug("cluster_monitor", "ping_latency_threshold_msecs=%d", ping_latency_threshold_msecs);
00071 
00072   cluster_load_compute_msec_interval =
00073     cf_cluster_load_compute_msec_interval ? cf_cluster_load_compute_msec_interval : 5000;
00074   Debug("cluster_monitor", "cluster_load_compute_msec_interval=%d", cluster_load_compute_msec_interval);
00075 
00076   cluster_periodic_msec_interval = cf_cluster_periodic_msec_interval ? cf_cluster_periodic_msec_interval : 100;
00077   Debug("cluster_monitor", "cluster_periodic_msec_interval=%d", cluster_periodic_msec_interval);
00078 
00079   ping_history_buf_length = cf_ping_history_buf_length ? cf_ping_history_buf_length : 120;
00080   Debug("cluster_monitor", "ping_history_buf_length=%d", ping_history_buf_length);
00081 
00082   cluster_load_clear_duration = cf_cluster_load_clear_duration ? cf_cluster_load_clear_duration : 24;
00083   Debug("cluster_monitor", "cluster_load_clear_duration=%d", cluster_load_clear_duration);
00084 
00085   cluster_load_exceed_duration = cf_cluster_load_exceed_duration ? cf_cluster_load_exceed_duration : 4;
00086   Debug("cluster_monitor", "cluster_load_exceed_duration=%d", cluster_load_exceed_duration);
00087 
00088   int nbytes = sizeof(int) * num_ping_response_buckets;
00089   ping_response_buckets = (int *)ats_malloc(nbytes);
00090   memset((char *) ping_response_buckets, 0, nbytes);
00091 
00092   nbytes = sizeof(ink_hrtime) * ping_history_buf_length;
00093   ping_response_history_buf = (ink_hrtime *)ats_malloc(nbytes);
00094   memset((char *) ping_response_history_buf, 0, nbytes);
00095 
00096   last_ping_message_sent = HRTIME_SECONDS(0);
00097   last_cluster_load_compute = HRTIME_SECONDS(0);
00098 }
00099 
00100 void
00101 ClusterLoadMonitor::init()
00102 {
00103   periodic_action = eventProcessor.schedule_every(this, HRTIME_MSECONDS(cluster_periodic_msec_interval), ET_CALL);
00104 }
00105 
00106 ClusterLoadMonitor::~ClusterLoadMonitor()
00107 {
00108   
00109   
00110   
00111   
00112   
00113   
00114   
00115   
00116   ink_release_assert(!periodic_action);
00117   if (ping_response_buckets) {
00118     ats_free(ping_response_buckets);
00119     ping_response_buckets = 0;
00120   }
00121   if (ping_response_history_buf) {
00122     ats_free(ping_response_history_buf);
00123     ping_response_history_buf = 0;
00124   }
00125 }
00126 
00127 void
00128 ClusterLoadMonitor::cancel_monitor()
00129 {
00130   if (!cancel_periodic)
00131     cancel_periodic = 1;
00132 }
00133 
00134 bool ClusterLoadMonitor::is_cluster_overloaded()
00135 {
00136   return (cluster_overloaded ? true : false);
00137 }
00138 
00139 void
00140 ClusterLoadMonitor::compute_cluster_load()
00141 {
00142   
00143   
00144 
00145   int n;
00146   int sum = 0;
00147   int entries = 0;
00148   int n_bucket = 0;
00149 
00150   for (n = 0; n < num_ping_response_buckets; ++n) {
00151     if (ping_response_buckets[n]) {
00152       entries += ping_response_buckets[n];
00153       sum += (ping_response_buckets[n] * (n + 1));
00154     }
00155     ping_response_buckets[n] = 0;
00156   }
00157   if (entries) {
00158     n_bucket = sum / entries;
00159   } else {
00160     n_bucket = 1;
00161   }
00162   ink_hrtime current_ping_latency = HRTIME_MSECONDS(n_bucket * msecs_per_ping_response_bucket);
00163 
00164   
00165   cluster_load_msg_start_sequence_number = cluster_load_msg_sequence_number;
00166 
00167   
00168 
00169   ping_response_history_buf[ping_history_buf_head++] = current_ping_latency;
00170   ping_history_buf_head = ping_history_buf_head % ping_history_buf_length;
00171 
00172   
00173   
00174   
00175   
00176   
00177   
00178   
00179   
00180   
00181   
00182   
00183   
00184 
00185   int start, end;
00186   ink_hrtime ping_latency_threshold = HRTIME_MSECONDS(ping_latency_threshold_msecs);
00187 
00188   start = ping_history_buf_head - 1;
00189   if (start < 0)
00190     start += ping_history_buf_length;
00191   end = start;
00192 
00193   if (cluster_overloaded) {
00194     end -= (cluster_load_clear_duration <= ping_history_buf_length ?
00195             cluster_load_clear_duration : ping_history_buf_length);
00196   } else {
00197     end -= (cluster_load_exceed_duration <= ping_history_buf_length ?
00198             cluster_load_exceed_duration : ping_history_buf_length);
00199   }
00200   if (end < 0)
00201     end += ping_history_buf_length;
00202 
00203   int threshold_clear = 0;
00204   int threshold_exceeded = 0;
00205   do {
00206     if (ping_response_history_buf[start] >= ping_latency_threshold)
00207       ++threshold_exceeded;
00208     else
00209       ++threshold_clear;
00210     if (--start < 0)
00211       start = start + ping_history_buf_length;
00212   } while (start != end);
00213 
00214   if (cluster_overloaded) {
00215     if (threshold_exceeded == 0)
00216       cluster_overloaded = 0;
00217   } else {
00218     if (threshold_exceeded && (threshold_clear == 0))
00219       cluster_overloaded = 1;
00220   }
00221   Debug("cluster_monitor",
00222         "[%u.%u.%u.%u] overload=%d, clear=%d, exceed=%d, latency=%d",
00223         DOT_SEPARATED(this->ch->machine->ip), cluster_overloaded, threshold_clear, threshold_exceeded, n_bucket);
00224 }
00225 
00226 void
00227 ClusterLoadMonitor::note_ping_response_time(ink_hrtime response_time, int sequence_number)
00228 {
00229 #ifdef CLUSTER_TOMCAT
00230   ProxyMutex *mutex = this->ch->mutex;     
00231 #endif
00232 
00233   CLUSTER_SUM_DYN_STAT(CLUSTER_PING_TIME_STAT, response_time);
00234   int bucket = (int)
00235     (response_time / HRTIME_MSECONDS(msecs_per_ping_response_bucket));
00236   Debug("cluster_monitor_ping", "[%u.%u.%u.%u] ping: %d %d", DOT_SEPARATED(this->ch->machine->ip), bucket, sequence_number);
00237 
00238   if (bucket >= num_ping_response_buckets)
00239     bucket = num_ping_response_buckets - 1;
00240   ink_atomic_increment(&ping_response_buckets[bucket], 1);
00241 }
00242 
00243 void
00244 ClusterLoadMonitor::recv_cluster_load_msg(cluster_load_ping_msg * m)
00245 {
00246   
00247   ink_hrtime now = ink_get_hrtime();
00248 
00249   if ((now >= m->send_time)
00250       && ((m->sequence_number >= cluster_load_msg_start_sequence_number)
00251           && (m->sequence_number < cluster_load_msg_sequence_number))) {
00252     
00253     note_ping_response_time(now - m->send_time, m->sequence_number);
00254   }
00255 }
00256 
00257 void
00258 ClusterLoadMonitor::cluster_load_ping_rethandler(ClusterHandler *ch, void *data, int len)
00259 {
00260   
00261   
00262 
00263   if (ch) {
00264     if (len == sizeof(struct cluster_load_ping_msg)) {
00265       struct cluster_load_ping_msg m;
00266       memcpy((void *) &m, data, len);   
00267 
00268       if (m.monitor && (m.magicno == cluster_load_ping_msg::CL_MSG_MAGICNO)
00269           && (m.version == cluster_load_ping_msg::CL_MSG_VERSION)) {
00270         m.monitor->recv_cluster_load_msg(&m);
00271       }
00272     }
00273   }
00274 }
00275 
00276 void
00277 ClusterLoadMonitor::send_cluster_load_msg(ink_hrtime current_time)
00278 {
00279   
00280 
00281   struct cluster_load_ping_msg m(this);
00282 
00283   m.sequence_number = cluster_load_msg_sequence_number++;
00284   m.send_time = current_time;
00285   cluster_ping(ch, cluster_load_ping_rethandler, (void *) &m, sizeof(m));
00286 }
00287 
00288 int
00289 ClusterLoadMonitor::cluster_load_periodic(int , Event * )
00290 {
00291   
00292 
00293   if (cancel_periodic) {
00294     periodic_action->cancel();
00295     periodic_action = 0;
00296     return EVENT_DONE;
00297   }
00298 
00299   if (!cf_monitor_enabled) {
00300     return EVENT_CONT;
00301   }
00302   
00303 
00304   ink_hrtime current_time = ink_get_hrtime();
00305   if ((current_time - last_ping_message_sent) > HRTIME_MSECONDS(ping_message_send_msec_interval)) {
00306     send_cluster_load_msg(current_time);
00307     last_ping_message_sent = current_time;
00308   }
00309   
00310 
00311   if ((current_time - last_cluster_load_compute) > HRTIME_MSECONDS(cluster_load_compute_msec_interval)) {
00312     compute_cluster_load();
00313     last_cluster_load_compute = current_time;
00314   }
00315   return EVENT_CONT;
00316 }
00317 
00318