00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #ifndef CONGESTION_H_
00032 #define CONGESTION_H_
00033
00034 #include "libts.h"
00035 #include "P_EventSystem.h"
00036 #include "ControlBase.h"
00037 #include "ControlMatcher.h"
00038 #include "CongestionStats.h"
00039
00040 #define CONGESTION_EVENT_CONGESTED_ON_M (CONGESTION_EVENT_EVENTS_START + 1)
00041 #define CONGESTION_EVENT_CONGESTED_ON_F (CONGESTION_EVENT_EVENTS_START + 2)
00042 #define CONGESTION_EVENT_CONGESTED_LIST_DONE (CONGESTION_EVENT_EVENTS_START + 3)
00043 #define CONGESTION_EVENT_CONTROL_LOOKUP_DONE (CONGESTION_EVENT_EVENTS_START + 4)
00044
00045 struct RequestData;
00046
00047 extern InkRand CongestionRand;
00048
00049 enum
00050 { PER_IP, PER_HOST };
00051
00052 class CongestionControlRecord;
00053
00054 struct CongestionControlRule
00055 {
00056 CongestionControlRule();
00057 ~CongestionControlRule();
00058 CongestionControlRecord *record;
00059 };
00060
00061 class CongestionControlRecord:public ControlBase
00062 {
00063 public:
00064 CongestionControlRecord();
00065 CongestionControlRecord(const CongestionControlRecord & rec);
00066 ~CongestionControlRecord();
00067 char *Init(matcher_line * line_info);
00068 void UpdateMatch(CongestionControlRule * pRule, RequestData * rdata);
00069 void Print();
00070
00071 void cleanup();
00072 void setdefault();
00073 char *validate();
00074
00075 int rank;
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085 char *prefix;
00086 int prefix_len;
00087 unsigned short port;
00088 int congestion_scheme;
00089 char *error_page;
00090
00091 int max_connection_failures;
00092 int fail_window;
00093 int proxy_retry_interval;
00094 int client_wait_interval;
00095 int wait_interval_alpha;
00096 int live_os_conn_timeout;
00097 int live_os_conn_retries;
00098 int dead_os_conn_timeout;
00099 int dead_os_conn_retries;
00100 int max_connection;
00101
00102 CongestionControlRecord *pRecord;
00103 int32_t ref_count;
00104
00105 void get()
00106 {
00107 ink_atomic_increment(&ref_count, 1);
00108 }
00109 void put()
00110 {
00111 if (ink_atomic_increment(&ref_count, -1) == 1)
00112 delete this;
00113 }
00114 };
00115
00116 inline
00117 CongestionControlRule::CongestionControlRule()
00118 :
00119 record(NULL)
00120 {
00121 }
00122
00123 inline
00124 CongestionControlRule::~
00125 CongestionControlRule()
00126 {
00127 record = NULL;
00128 }
00129
00130 inline
00131 CongestionControlRecord::CongestionControlRecord()
00132 :
00133 rank(0),
00134 prefix(NULL),
00135 prefix_len(0),
00136 port(0),
00137 congestion_scheme(PER_IP),
00138 error_page(NULL),
00139 max_connection_failures(5),
00140 fail_window(120),
00141 proxy_retry_interval(10),
00142 client_wait_interval(300),
00143 wait_interval_alpha(30),
00144 live_os_conn_timeout(60),
00145 live_os_conn_retries(2),
00146 dead_os_conn_timeout(15),
00147 dead_os_conn_retries(1),
00148 max_connection(-1),
00149 pRecord(NULL),
00150 ref_count(0)
00151 {
00152 }
00153
00154 inline
00155 CongestionControlRecord::~
00156 CongestionControlRecord()
00157 {
00158 cleanup();
00159 }
00160 inline void
00161 CongestionControlRecord::cleanup()
00162 {
00163 if (pRecord) {
00164 pRecord->put();
00165 pRecord = NULL;
00166 }
00167 ats_free(prefix), prefix = NULL;
00168 ats_free(error_page), error_page = NULL;
00169 }
00170
00171 typedef unsigned short cong_hist_t;
00172 #define CONG_HIST_ENTRIES 17
00173
00174
00175 struct FailHistory
00176 {
00177 long start;
00178 int bin_len;
00179 int length;
00180 cong_hist_t bins[CONG_HIST_ENTRIES];
00181 int cur_index;
00182 long last_event;
00183 int events;
00184
00185 FailHistory():start(0), bin_len(0), length(0), cur_index(0), last_event(0), events(0)
00186 {
00187 bzero((void *) &bins, sizeof(bins));
00188 }
00189 void init(int window);
00190 void init_event(long t, int n = 1);
00191 int regist_event(long t, int n = 1);
00192 int get_bin_events(int index)
00193 {
00194 return bins[(index + 1 + cur_index) % CONG_HIST_ENTRIES];
00195 }
00196 };
00197
00198
00199 struct CongestionEntry: public RequestData
00200 {
00201
00202 uint64_t m_key;
00203
00204 IpEndpoint m_ip;
00205 char *m_hostname;
00206
00207
00208
00209 CongestionControlRecord *pRecord;
00210
00211
00212 FailHistory m_history;
00213 Ptr<ProxyMutex> m_hist_lock;
00214 ink_hrtime m_last_congested;
00215 volatile int m_congested;
00216 int m_stat_congested_conn_failures;
00217
00218 volatile int m_M_congested;
00219 ink_hrtime m_last_M_congested;
00220
00221
00222 int m_num_connections;
00223 int m_stat_congested_max_conn;
00224
00225
00226 int m_ref_count;
00227
00228 CongestionEntry(const char *hostname, sockaddr const* ip, CongestionControlRecord * rule, uint64_t key);
00229 CongestionEntry();
00230 virtual ~ CongestionEntry();
00231
00232
00233 virtual char *get_string()
00234 {
00235 return pRecord->prefix;
00236 }
00237 virtual const char *get_host()
00238 {
00239 return m_hostname;
00240 }
00241 virtual sockaddr const* get_ip()
00242 {
00243 return &m_ip.sa;
00244 }
00245 virtual const sockaddr* get_client_ip()
00246 {
00247 return NULL;
00248 }
00249 virtual RD_Type data_type(void)
00250 {
00251 return RD_CONGEST_ENTRY;
00252 }
00253
00254
00255 int sprint(char *buf, int buflen, int format = 0);
00256
00257
00258 void get();
00259 void put();
00260
00261
00262
00263 bool F_congested();
00264 bool M_congested(ink_hrtime t);
00265 bool congested();
00266
00267
00268 void go_alive();
00269 void failed_at(ink_hrtime t);
00270 void connection_opened();
00271 void connection_closed();
00272
00273
00274 bool proxy_retry(ink_hrtime t);
00275 int client_retry_after();
00276 int connect_retries();
00277 int connect_timeout();
00278 char *getErrorPage()
00279 {
00280 return pRecord->error_page;
00281 }
00282
00283
00284 void stat_inc_F();
00285 void stat_inc_M();
00286
00287
00288 void clearFailHistory();
00289 bool compCongested();
00290
00291
00292 bool usefulInfo(ink_hrtime t);
00293 bool validate();
00294 void applyNewRule(CongestionControlRecord * rule);
00295 void init(CongestionControlRecord * rule);
00296
00297 };
00298
00299 inline bool CongestionEntry::usefulInfo(ink_hrtime t)
00300 {
00301 return (m_ref_count > 1 ||
00302 m_congested != 0 ||
00303 m_num_connections > 0 || (m_history.last_event + pRecord->fail_window > t && m_history.events > 0));
00304 }
00305
00306 inline int
00307 CongestionEntry::client_retry_after()
00308 {
00309 int prat = 0;
00310 if (F_congested()) {
00311 prat = pRecord->proxy_retry_interval + m_history.last_event - ink_hrtime_to_sec(ink_get_hrtime());
00312 if (prat < 0)
00313 prat = 0;
00314 }
00315 return (prat + pRecord->client_wait_interval + CongestionRand.random() % pRecord->wait_interval_alpha);
00316 }
00317
00318 inline bool CongestionEntry::proxy_retry(ink_hrtime t)
00319 {
00320 return ((ink_hrtime_to_sec(t) - m_history.last_event) >= pRecord->proxy_retry_interval);
00321 }
00322
00323 inline bool CongestionEntry::F_congested()
00324 {
00325 return m_congested == 1;
00326 }
00327
00328 inline bool CongestionEntry::M_congested(ink_hrtime t)
00329 {
00330 if (pRecord->max_connection >= 0 && m_num_connections >= pRecord->max_connection) {
00331 if (ink_atomic_swap(&m_M_congested, 1) == 0) {
00332 m_last_M_congested = t;
00333
00334 }
00335 return true;
00336 }
00337 return false;
00338 }
00339
00340 inline bool CongestionEntry::congested()
00341 {
00342 return (F_congested() || m_M_congested == 1);
00343 }
00344
00345 inline int
00346 CongestionEntry::connect_retries()
00347 {
00348 if (F_congested()) {
00349 return pRecord->dead_os_conn_retries;
00350 } else {
00351 return pRecord->live_os_conn_retries;
00352 }
00353 }
00354
00355 inline int
00356 CongestionEntry::connect_timeout()
00357 {
00358 if (F_congested()) {
00359 return pRecord->dead_os_conn_timeout;
00360 } else {
00361 return pRecord->live_os_conn_timeout;
00362 }
00363 }
00364
00365
00366 inline void
00367 CongestionEntry::stat_inc_F()
00368 {
00369 ink_atomic_increment(&m_stat_congested_conn_failures, 1);
00370 }
00371
00372 inline void
00373 CongestionEntry::stat_inc_M()
00374 {
00375 ink_atomic_increment(&m_stat_congested_max_conn, 1);
00376 }
00377
00378 inline bool CongestionEntry::compCongested()
00379 {
00380 if (m_congested)
00381 return true;
00382 if (pRecord->max_connection_failures == -1)
00383 return false;
00384 return pRecord->max_connection_failures <= m_history.events;
00385 }
00386
00387
00388 inline void
00389 CongestionEntry::connection_opened()
00390 {
00391 ink_atomic_increment(&m_num_connections, 1);
00392 }
00393
00394
00395 inline void
00396 CongestionEntry::connection_closed()
00397 {
00398 ink_atomic_increment(&m_num_connections, -1);
00399 if (ink_atomic_swap(&m_M_congested, 0) == 1) {
00400
00401 }
00402 }
00403
00404 inline void
00405 CongestionEntry::clearFailHistory()
00406 {
00407 m_history.init(pRecord->fail_window);
00408 m_congested = 0;
00409 }
00410
00411 inline CongestionEntry::CongestionEntry()
00412 :m_key(0), m_hostname(NULL), pRecord(NULL),
00413 m_last_congested(0), m_congested(0),
00414 m_stat_congested_conn_failures(0),
00415 m_M_congested(0), m_last_M_congested(0), m_num_connections(0), m_stat_congested_max_conn(0), m_ref_count(1)
00416 {
00417 memset(&m_ip, 0, sizeof(m_ip));
00418 m_hist_lock = new_ProxyMutex();
00419 }
00420
00421
00422 inline CongestionEntry::~CongestionEntry()
00423 {
00424 if (m_hostname)
00425 ats_free(m_hostname), m_hostname = NULL;
00426 m_hist_lock = NULL;
00427 if (pRecord)
00428 pRecord->put(), pRecord = NULL;
00429 }
00430
00431 inline void
00432 CongestionEntry::get()
00433 {
00434 ink_atomic_increment(&m_ref_count, 1);
00435 }
00436
00437 inline void
00438 CongestionEntry::put()
00439 {
00440 if (ink_atomic_increment(&m_ref_count, -1) == 1) {
00441 delete this;
00442 }
00443 }
00444
00445
00446
00447 extern int congestionControlEnabled;
00448 extern int congestionControlLocalTime;
00449
00450 void initCongestionControl();
00451 CongestionControlRecord *CongestionControlled(RequestData * rdata);
00452
00453 uint64_t make_key(char *hostname, int len, sockaddr const* ip, CongestionControlRecord * record);
00454 uint64_t make_key(char *hostname, sockaddr const* ip, CongestionControlRecord * record);
00455 uint64_t make_key(char *hostname, int len, sockaddr const* ip, char *prefix, int prelen, short port = 0);
00456
00457
00458
00459
00460
00461
00462
00463 extern Action *get_congest_entry(Continuation * cont, HttpRequestData * data, CongestionEntry ** ppEntry);
00464 extern Action *get_congest_list(Continuation * cont, MIOBuffer * buffer, int format);
00465
00466 extern void remove_congested_entry(uint64_t key);
00467 extern void remove_all_congested_entry(void);
00468 extern void remove_congested_entry(char *buf, MIOBuffer * out_buffer);
00469
00470 #endif