00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include "P_Cluster.h"
00031 int
00032 ClusterLoadMonitor::cf_monitor_enabled;
00033 int
00034 ClusterLoadMonitor::cf_ping_message_send_msec_interval;
00035 int
00036 ClusterLoadMonitor::cf_num_ping_response_buckets;
00037 int
00038 ClusterLoadMonitor::cf_msecs_per_ping_response_bucket;
00039 int
00040 ClusterLoadMonitor::cf_ping_latency_threshold_msecs;
00041 int
00042 ClusterLoadMonitor::cf_cluster_load_compute_msec_interval;
00043 int
00044 ClusterLoadMonitor::cf_cluster_periodic_msec_interval;
00045 int
00046 ClusterLoadMonitor::cf_ping_history_buf_length;
00047 int
00048 ClusterLoadMonitor::cf_cluster_load_clear_duration;
00049 int
00050 ClusterLoadMonitor::cf_cluster_load_exceed_duration;
00051
00052 ClusterLoadMonitor::ClusterLoadMonitor(ClusterHandler * ch)
00053 :Continuation(0), ch(ch), ping_history_buf_head(0),
00054 periodic_action(0), cluster_overloaded(0), cancel_periodic(0),
00055 cluster_load_msg_sequence_number(0), cluster_load_msg_start_sequence_number(0)
00056 {
00057 mutex = this->ch->mutex;
00058 SET_HANDLER(&ClusterLoadMonitor::cluster_load_periodic);
00059
00060 ping_message_send_msec_interval = cf_ping_message_send_msec_interval ? cf_ping_message_send_msec_interval : 100;
00061 Debug("cluster_monitor", "ping_message_send_msec_interval=%d", ping_message_send_msec_interval);
00062
00063 num_ping_response_buckets = cf_num_ping_response_buckets ? cf_num_ping_response_buckets : 100;
00064 Debug("cluster_monitor", "num_ping_response_buckets=%d", num_ping_response_buckets);
00065
00066 msecs_per_ping_response_bucket = cf_msecs_per_ping_response_bucket ? cf_msecs_per_ping_response_bucket : 50;
00067 Debug("cluster_monitor", "msecs_per_ping_response_bucket=%d", msecs_per_ping_response_bucket);
00068
00069 ping_latency_threshold_msecs = cf_ping_latency_threshold_msecs ? cf_ping_latency_threshold_msecs : 500;
00070 Debug("cluster_monitor", "ping_latency_threshold_msecs=%d", ping_latency_threshold_msecs);
00071
00072 cluster_load_compute_msec_interval =
00073 cf_cluster_load_compute_msec_interval ? cf_cluster_load_compute_msec_interval : 5000;
00074 Debug("cluster_monitor", "cluster_load_compute_msec_interval=%d", cluster_load_compute_msec_interval);
00075
00076 cluster_periodic_msec_interval = cf_cluster_periodic_msec_interval ? cf_cluster_periodic_msec_interval : 100;
00077 Debug("cluster_monitor", "cluster_periodic_msec_interval=%d", cluster_periodic_msec_interval);
00078
00079 ping_history_buf_length = cf_ping_history_buf_length ? cf_ping_history_buf_length : 120;
00080 Debug("cluster_monitor", "ping_history_buf_length=%d", ping_history_buf_length);
00081
00082 cluster_load_clear_duration = cf_cluster_load_clear_duration ? cf_cluster_load_clear_duration : 24;
00083 Debug("cluster_monitor", "cluster_load_clear_duration=%d", cluster_load_clear_duration);
00084
00085 cluster_load_exceed_duration = cf_cluster_load_exceed_duration ? cf_cluster_load_exceed_duration : 4;
00086 Debug("cluster_monitor", "cluster_load_exceed_duration=%d", cluster_load_exceed_duration);
00087
00088 int nbytes = sizeof(int) * num_ping_response_buckets;
00089 ping_response_buckets = (int *)ats_malloc(nbytes);
00090 memset((char *) ping_response_buckets, 0, nbytes);
00091
00092 nbytes = sizeof(ink_hrtime) * ping_history_buf_length;
00093 ping_response_history_buf = (ink_hrtime *)ats_malloc(nbytes);
00094 memset((char *) ping_response_history_buf, 0, nbytes);
00095
00096 last_ping_message_sent = HRTIME_SECONDS(0);
00097 last_cluster_load_compute = HRTIME_SECONDS(0);
00098 }
00099
00100 void
00101 ClusterLoadMonitor::init()
00102 {
00103 periodic_action = eventProcessor.schedule_every(this, HRTIME_MSECONDS(cluster_periodic_msec_interval), ET_CALL);
00104 }
00105
00106 ClusterLoadMonitor::~ClusterLoadMonitor()
00107 {
00108
00109
00110
00111
00112
00113
00114
00115
00116 ink_release_assert(!periodic_action);
00117 if (ping_response_buckets) {
00118 ats_free(ping_response_buckets);
00119 ping_response_buckets = 0;
00120 }
00121 if (ping_response_history_buf) {
00122 ats_free(ping_response_history_buf);
00123 ping_response_history_buf = 0;
00124 }
00125 }
00126
00127 void
00128 ClusterLoadMonitor::cancel_monitor()
00129 {
00130 if (!cancel_periodic)
00131 cancel_periodic = 1;
00132 }
00133
00134 bool ClusterLoadMonitor::is_cluster_overloaded()
00135 {
00136 return (cluster_overloaded ? true : false);
00137 }
00138
00139 void
00140 ClusterLoadMonitor::compute_cluster_load()
00141 {
00142
00143
00144
00145 int n;
00146 int sum = 0;
00147 int entries = 0;
00148 int n_bucket = 0;
00149
00150 for (n = 0; n < num_ping_response_buckets; ++n) {
00151 if (ping_response_buckets[n]) {
00152 entries += ping_response_buckets[n];
00153 sum += (ping_response_buckets[n] * (n + 1));
00154 }
00155 ping_response_buckets[n] = 0;
00156 }
00157 if (entries) {
00158 n_bucket = sum / entries;
00159 } else {
00160 n_bucket = 1;
00161 }
00162 ink_hrtime current_ping_latency = HRTIME_MSECONDS(n_bucket * msecs_per_ping_response_bucket);
00163
00164
00165 cluster_load_msg_start_sequence_number = cluster_load_msg_sequence_number;
00166
00167
00168
00169 ping_response_history_buf[ping_history_buf_head++] = current_ping_latency;
00170 ping_history_buf_head = ping_history_buf_head % ping_history_buf_length;
00171
00172
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183
00184
00185 int start, end;
00186 ink_hrtime ping_latency_threshold = HRTIME_MSECONDS(ping_latency_threshold_msecs);
00187
00188 start = ping_history_buf_head - 1;
00189 if (start < 0)
00190 start += ping_history_buf_length;
00191 end = start;
00192
00193 if (cluster_overloaded) {
00194 end -= (cluster_load_clear_duration <= ping_history_buf_length ?
00195 cluster_load_clear_duration : ping_history_buf_length);
00196 } else {
00197 end -= (cluster_load_exceed_duration <= ping_history_buf_length ?
00198 cluster_load_exceed_duration : ping_history_buf_length);
00199 }
00200 if (end < 0)
00201 end += ping_history_buf_length;
00202
00203 int threshold_clear = 0;
00204 int threshold_exceeded = 0;
00205 do {
00206 if (ping_response_history_buf[start] >= ping_latency_threshold)
00207 ++threshold_exceeded;
00208 else
00209 ++threshold_clear;
00210 if (--start < 0)
00211 start = start + ping_history_buf_length;
00212 } while (start != end);
00213
00214 if (cluster_overloaded) {
00215 if (threshold_exceeded == 0)
00216 cluster_overloaded = 0;
00217 } else {
00218 if (threshold_exceeded && (threshold_clear == 0))
00219 cluster_overloaded = 1;
00220 }
00221 Debug("cluster_monitor",
00222 "[%u.%u.%u.%u] overload=%d, clear=%d, exceed=%d, latency=%d",
00223 DOT_SEPARATED(this->ch->machine->ip), cluster_overloaded, threshold_clear, threshold_exceeded, n_bucket);
00224 }
00225
00226 void
00227 ClusterLoadMonitor::note_ping_response_time(ink_hrtime response_time, int sequence_number)
00228 {
00229 #ifdef CLUSTER_TOMCAT
00230 ProxyMutex *mutex = this->ch->mutex;
00231 #endif
00232
00233 CLUSTER_SUM_DYN_STAT(CLUSTER_PING_TIME_STAT, response_time);
00234 int bucket = (int)
00235 (response_time / HRTIME_MSECONDS(msecs_per_ping_response_bucket));
00236 Debug("cluster_monitor_ping", "[%u.%u.%u.%u] ping: %d %d", DOT_SEPARATED(this->ch->machine->ip), bucket, sequence_number);
00237
00238 if (bucket >= num_ping_response_buckets)
00239 bucket = num_ping_response_buckets - 1;
00240 ink_atomic_increment(&ping_response_buckets[bucket], 1);
00241 }
00242
00243 void
00244 ClusterLoadMonitor::recv_cluster_load_msg(cluster_load_ping_msg * m)
00245 {
00246
00247 ink_hrtime now = ink_get_hrtime();
00248
00249 if ((now >= m->send_time)
00250 && ((m->sequence_number >= cluster_load_msg_start_sequence_number)
00251 && (m->sequence_number < cluster_load_msg_sequence_number))) {
00252
00253 note_ping_response_time(now - m->send_time, m->sequence_number);
00254 }
00255 }
00256
00257 void
00258 ClusterLoadMonitor::cluster_load_ping_rethandler(ClusterHandler *ch, void *data, int len)
00259 {
00260
00261
00262
00263 if (ch) {
00264 if (len == sizeof(struct cluster_load_ping_msg)) {
00265 struct cluster_load_ping_msg m;
00266 memcpy((void *) &m, data, len);
00267
00268 if (m.monitor && (m.magicno == cluster_load_ping_msg::CL_MSG_MAGICNO)
00269 && (m.version == cluster_load_ping_msg::CL_MSG_VERSION)) {
00270 m.monitor->recv_cluster_load_msg(&m);
00271 }
00272 }
00273 }
00274 }
00275
00276 void
00277 ClusterLoadMonitor::send_cluster_load_msg(ink_hrtime current_time)
00278 {
00279
00280
00281 struct cluster_load_ping_msg m(this);
00282
00283 m.sequence_number = cluster_load_msg_sequence_number++;
00284 m.send_time = current_time;
00285 cluster_ping(ch, cluster_load_ping_rethandler, (void *) &m, sizeof(m));
00286 }
00287
00288 int
00289 ClusterLoadMonitor::cluster_load_periodic(int , Event * )
00290 {
00291
00292
00293 if (cancel_periodic) {
00294 periodic_action->cancel();
00295 periodic_action = 0;
00296 return EVENT_DONE;
00297 }
00298
00299 if (!cf_monitor_enabled) {
00300 return EVENT_CONT;
00301 }
00302
00303
00304 ink_hrtime current_time = ink_get_hrtime();
00305 if ((current_time - last_ping_message_sent) > HRTIME_MSECONDS(ping_message_send_msec_interval)) {
00306 send_cluster_load_msg(current_time);
00307 last_ping_message_sent = current_time;
00308 }
00309
00310
00311 if ((current_time - last_cluster_load_compute) > HRTIME_MSECONDS(cluster_load_compute_msec_interval)) {
00312 compute_cluster_load();
00313 last_cluster_load_compute = current_time;
00314 }
00315 return EVENT_CONT;
00316 }
00317
00318