• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

ClusterLoadMonitor.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 
00025 /****************************************************************************
00026 
00027   ClusterLoadMonitor.cc
00028 ****************************************************************************/
00029 
00030 #include "P_Cluster.h"
00031 int
00032   ClusterLoadMonitor::cf_monitor_enabled;
00033 int
00034   ClusterLoadMonitor::cf_ping_message_send_msec_interval;
00035 int
00036   ClusterLoadMonitor::cf_num_ping_response_buckets;
00037 int
00038   ClusterLoadMonitor::cf_msecs_per_ping_response_bucket;
00039 int
00040   ClusterLoadMonitor::cf_ping_latency_threshold_msecs;
00041 int
00042   ClusterLoadMonitor::cf_cluster_load_compute_msec_interval;
00043 int
00044   ClusterLoadMonitor::cf_cluster_periodic_msec_interval;
00045 int
00046   ClusterLoadMonitor::cf_ping_history_buf_length;
00047 int
00048   ClusterLoadMonitor::cf_cluster_load_clear_duration;
00049 int
00050   ClusterLoadMonitor::cf_cluster_load_exceed_duration;
00051 
00052 ClusterLoadMonitor::ClusterLoadMonitor(ClusterHandler * ch)
00053 :Continuation(0), ch(ch), ping_history_buf_head(0),
00054 periodic_action(0), cluster_overloaded(0), cancel_periodic(0),
00055 cluster_load_msg_sequence_number(0), cluster_load_msg_start_sequence_number(0)
00056 {
00057   mutex = this->ch->mutex;
00058   SET_HANDLER(&ClusterLoadMonitor::cluster_load_periodic);
00059 
00060   ping_message_send_msec_interval = cf_ping_message_send_msec_interval ? cf_ping_message_send_msec_interval : 100;
00061   Debug("cluster_monitor", "ping_message_send_msec_interval=%d", ping_message_send_msec_interval);
00062 
00063   num_ping_response_buckets = cf_num_ping_response_buckets ? cf_num_ping_response_buckets : 100;
00064   Debug("cluster_monitor", "num_ping_response_buckets=%d", num_ping_response_buckets);
00065 
00066   msecs_per_ping_response_bucket = cf_msecs_per_ping_response_bucket ? cf_msecs_per_ping_response_bucket : 50;
00067   Debug("cluster_monitor", "msecs_per_ping_response_bucket=%d", msecs_per_ping_response_bucket);
00068 
00069   ping_latency_threshold_msecs = cf_ping_latency_threshold_msecs ? cf_ping_latency_threshold_msecs : 500;
00070   Debug("cluster_monitor", "ping_latency_threshold_msecs=%d", ping_latency_threshold_msecs);
00071 
00072   cluster_load_compute_msec_interval =
00073     cf_cluster_load_compute_msec_interval ? cf_cluster_load_compute_msec_interval : 5000;
00074   Debug("cluster_monitor", "cluster_load_compute_msec_interval=%d", cluster_load_compute_msec_interval);
00075 
00076   cluster_periodic_msec_interval = cf_cluster_periodic_msec_interval ? cf_cluster_periodic_msec_interval : 100;
00077   Debug("cluster_monitor", "cluster_periodic_msec_interval=%d", cluster_periodic_msec_interval);
00078 
00079   ping_history_buf_length = cf_ping_history_buf_length ? cf_ping_history_buf_length : 120;
00080   Debug("cluster_monitor", "ping_history_buf_length=%d", ping_history_buf_length);
00081 
00082   cluster_load_clear_duration = cf_cluster_load_clear_duration ? cf_cluster_load_clear_duration : 24;
00083   Debug("cluster_monitor", "cluster_load_clear_duration=%d", cluster_load_clear_duration);
00084 
00085   cluster_load_exceed_duration = cf_cluster_load_exceed_duration ? cf_cluster_load_exceed_duration : 4;
00086   Debug("cluster_monitor", "cluster_load_exceed_duration=%d", cluster_load_exceed_duration);
00087 
00088   int nbytes = sizeof(int) * num_ping_response_buckets;
00089   ping_response_buckets = (int *)ats_malloc(nbytes);
00090   memset((char *) ping_response_buckets, 0, nbytes);
00091 
00092   nbytes = sizeof(ink_hrtime) * ping_history_buf_length;
00093   ping_response_history_buf = (ink_hrtime *)ats_malloc(nbytes);
00094   memset((char *) ping_response_history_buf, 0, nbytes);
00095 
00096   last_ping_message_sent = HRTIME_SECONDS(0);
00097   last_cluster_load_compute = HRTIME_SECONDS(0);
00098 }
00099 
00100 void
00101 ClusterLoadMonitor::init()
00102 {
00103   periodic_action = eventProcessor.schedule_every(this, HRTIME_MSECONDS(cluster_periodic_msec_interval), ET_CALL);
00104 }
00105 
00106 ClusterLoadMonitor::~ClusterLoadMonitor()
00107 {
00108   //
00109   // Note: Since the ClusterLoadMonitor is only associated
00110   //       with the ClusterHandler, a periodic callback operating
00111   //       on a freed ClusterLoadMonitor is not possible, since the
00112   //       ClusterHandler is only deleted after several minutes.  Allowing
00113   //       plenty of time for the periodic to cancel itself via the
00114   //       "cancel_periodic" flag.
00115   //
00116   ink_release_assert(!periodic_action);
00117   if (ping_response_buckets) {
00118     ats_free(ping_response_buckets);
00119     ping_response_buckets = 0;
00120   }
00121   if (ping_response_history_buf) {
00122     ats_free(ping_response_history_buf);
00123     ping_response_history_buf = 0;
00124   }
00125 }
00126 
00127 void
00128 ClusterLoadMonitor::cancel_monitor()
00129 {
00130   if (!cancel_periodic)
00131     cancel_periodic = 1;
00132 }
00133 
00134 bool ClusterLoadMonitor::is_cluster_overloaded()
00135 {
00136   return (cluster_overloaded ? true : false);
00137 }
00138 
00139 void
00140 ClusterLoadMonitor::compute_cluster_load()
00141 {
00142   // Compute ping message latency by scanning the response time
00143   // buckets and averaging the results.
00144 
00145   int n;
00146   int sum = 0;
00147   int entries = 0;
00148   int n_bucket = 0;
00149 
00150   for (n = 0; n < num_ping_response_buckets; ++n) {
00151     if (ping_response_buckets[n]) {
00152       entries += ping_response_buckets[n];
00153       sum += (ping_response_buckets[n] * (n + 1));
00154     }
00155     ping_response_buckets[n] = 0;
00156   }
00157   if (entries) {
00158     n_bucket = sum / entries;
00159   } else {
00160     n_bucket = 1;
00161   }
00162   ink_hrtime current_ping_latency = HRTIME_MSECONDS(n_bucket * msecs_per_ping_response_bucket);
00163 
00164   // Invalidate messages associated with this sample interval
00165   cluster_load_msg_start_sequence_number = cluster_load_msg_sequence_number;
00166 
00167   // Log ping latency in history buffer.
00168 
00169   ping_response_history_buf[ping_history_buf_head++] = current_ping_latency;
00170   ping_history_buf_head = ping_history_buf_head % ping_history_buf_length;
00171 
00172   // Determine the current state of the cluster interconnect using
00173   // the configured limits.  We determine the state as follows.
00174   //   if (cluster overloaded)
00175   //     Determine if it is still in the overload state by examining
00176   //     the last 'cluster_load_clear_duration' entries in the history
00177   //     buffer and declaring it not overloaded if none of the entries
00178   //     exceed the threshold.
00179   //   else
00180   //     Determine if it is now in the overload state by examining
00181   //     the last 'cluster_load_exceed_duration' entries in the history
00182   //     buffer and declaring it overloaded if all of the entries
00183   //     exceed the threshold.
00184 
00185   int start, end;
00186   ink_hrtime ping_latency_threshold = HRTIME_MSECONDS(ping_latency_threshold_msecs);
00187 
00188   start = ping_history_buf_head - 1;
00189   if (start < 0)
00190     start += ping_history_buf_length;
00191   end = start;
00192 
00193   if (cluster_overloaded) {
00194     end -= (cluster_load_clear_duration <= ping_history_buf_length ?
00195             cluster_load_clear_duration : ping_history_buf_length);
00196   } else {
00197     end -= (cluster_load_exceed_duration <= ping_history_buf_length ?
00198             cluster_load_exceed_duration : ping_history_buf_length);
00199   }
00200   if (end < 0)
00201     end += ping_history_buf_length;
00202 
00203   int threshold_clear = 0;
00204   int threshold_exceeded = 0;
00205   do {
00206     if (ping_response_history_buf[start] >= ping_latency_threshold)
00207       ++threshold_exceeded;
00208     else
00209       ++threshold_clear;
00210     if (--start < 0)
00211       start = start + ping_history_buf_length;
00212   } while (start != end);
00213 
00214   if (cluster_overloaded) {
00215     if (threshold_exceeded == 0)
00216       cluster_overloaded = 0;
00217   } else {
00218     if (threshold_exceeded && (threshold_clear == 0))
00219       cluster_overloaded = 1;
00220   }
00221   Debug("cluster_monitor",
00222         "[%u.%u.%u.%u] overload=%d, clear=%d, exceed=%d, latency=%d",
00223         DOT_SEPARATED(this->ch->machine->ip), cluster_overloaded, threshold_clear, threshold_exceeded, n_bucket);
00224 }
00225 
00226 void
00227 ClusterLoadMonitor::note_ping_response_time(ink_hrtime response_time, int sequence_number)
00228 {
00229 #ifdef CLUSTER_TOMCAT
00230   ProxyMutex *mutex = this->ch->mutex;     // hack for stats
00231 #endif
00232 
00233   CLUSTER_SUM_DYN_STAT(CLUSTER_PING_TIME_STAT, response_time);
00234   int bucket = (int)
00235     (response_time / HRTIME_MSECONDS(msecs_per_ping_response_bucket));
00236   Debug("cluster_monitor_ping", "[%u.%u.%u.%u] ping: %d %d", DOT_SEPARATED(this->ch->machine->ip), bucket, sequence_number);
00237 
00238   if (bucket >= num_ping_response_buckets)
00239     bucket = num_ping_response_buckets - 1;
00240   ink_atomic_increment(&ping_response_buckets[bucket], 1);
00241 }
00242 
00243 void
00244 ClusterLoadMonitor::recv_cluster_load_msg(cluster_load_ping_msg * m)
00245 {
00246   // We have received back our ping message.
00247   ink_hrtime now = ink_get_hrtime();
00248 
00249   if ((now >= m->send_time)
00250       && ((m->sequence_number >= cluster_load_msg_start_sequence_number)
00251           && (m->sequence_number < cluster_load_msg_sequence_number))) {
00252     // Valid message, note response time.
00253     note_ping_response_time(now - m->send_time, m->sequence_number);
00254   }
00255 }
00256 
00257 void
00258 ClusterLoadMonitor::cluster_load_ping_rethandler(ClusterHandler *ch, void *data, int len)
00259 {
00260   // Global cluster load ping message return handler which
00261   // dispatches the result to the class specific handler.
00262 
00263   if (ch) {
00264     if (len == sizeof(struct cluster_load_ping_msg)) {
00265       struct cluster_load_ping_msg m;
00266       memcpy((void *) &m, data, len);   // unmarshal
00267 
00268       if (m.monitor && (m.magicno == cluster_load_ping_msg::CL_MSG_MAGICNO)
00269           && (m.version == cluster_load_ping_msg::CL_MSG_VERSION)) {
00270         m.monitor->recv_cluster_load_msg(&m);
00271       }
00272     }
00273   }
00274 }
00275 
00276 void
00277 ClusterLoadMonitor::send_cluster_load_msg(ink_hrtime current_time)
00278 {
00279   // Build and send cluster load ping message.
00280 
00281   struct cluster_load_ping_msg m(this);
00282 
00283   m.sequence_number = cluster_load_msg_sequence_number++;
00284   m.send_time = current_time;
00285   cluster_ping(ch, cluster_load_ping_rethandler, (void *) &m, sizeof(m));
00286 }
00287 
00288 int
00289 ClusterLoadMonitor::cluster_load_periodic(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
00290 {
00291   // Perform periodic cluster load computation actions.
00292 
00293   if (cancel_periodic) {
00294     periodic_action->cancel();
00295     periodic_action = 0;
00296     return EVENT_DONE;
00297   }
00298 
00299   if (!cf_monitor_enabled) {
00300     return EVENT_CONT;
00301   }
00302   // Generate periodic ping messages.
00303 
00304   ink_hrtime current_time = ink_get_hrtime();
00305   if ((current_time - last_ping_message_sent) > HRTIME_MSECONDS(ping_message_send_msec_interval)) {
00306     send_cluster_load_msg(current_time);
00307     last_ping_message_sent = current_time;
00308   }
00309   // Compute cluster load.
00310 
00311   if ((current_time - last_cluster_load_compute) > HRTIME_MSECONDS(cluster_load_compute_msec_interval)) {
00312     compute_cluster_load();
00313     last_cluster_load_compute = current_time;
00314   }
00315   return EVENT_CONT;
00316 }
00317 
00318 // End of ClusterLoadMonitor.cc

Generated by  doxygen 1.7.1