00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include "libts.h"
00031 #include "P_EventSystem.h"
00032 #include "P_Net.h"
00033 #include "Main.h"
00034 #include "CongestionDB.h"
00035 #include "Congestion.h"
00036 #include "ProcessManager.h"
00037
00038 #define SCHEDULE_CONGEST_CONT_INTERVAL HRTIME_MSECONDS(5)
00039 int CONGESTION_DB_SIZE = 1024;
00040
00041 CongestionDB *theCongestionDB = NULL;
00042
00043
00044
00045
00046
00047
00048
00049
00050 class CongestionDBCont:public Continuation
00051 {
00052 public:
00053 CongestionDBCont();
00054 int GC(int event, Event * e);
00055
00056 int get_congest_list(int event, Event * e);
00057
00058 int get_congest_entry(int event, Event * e);
00059
00060
00061 Action m_action;
00062
00063
00064 union
00065 {
00066 struct
00067 {
00068 MIOBuffer *m_iobuf;
00069 int m_CurPartitionID;
00070 int m_list_format;
00071 } list_info;
00072 struct
00073 {
00074 uint64_t m_key;
00075 char *m_hostname;
00076 IpEndpoint m_ip;
00077 CongestionControlRecord *m_rule;
00078 CongestionEntry **m_ppEntry;
00079 } entry_info;
00080 } data;
00081 };
00082
00083
00084 #define CDBC_buf data.list_info.m_iobuf
00085 #define CDBC_pid data.list_info.m_CurPartitionID
00086 #define CDBC_lf data.list_info.m_list_format
00087 #define CDBC_key data.entry_info.m_key
00088 #define CDBC_host data.entry_info.m_hostname
00089 #define CDBC_ip data.entry_info.m_ip
00090 #define CDBC_rule data.entry_info.m_rule
00091 #define CDBC_ppE data.entry_info.m_ppEntry
00092
00093 inline CongestionDBCont::CongestionDBCont()
00094 :Continuation(NULL)
00095 {
00096 memset(&data, 0, sizeof(data));
00097 }
00098
00099
00100
00101 static
00102 ClassAllocator <
00103 CongestionDBCont >
00104 CongestionDBContAllocator("CongestionDBContAllocator");
00105
00106 inline void
00107 Free_CongestionDBCont(CongestionDBCont * cont)
00108 {
00109 cont->m_action = NULL;
00110 cont->mutex = NULL;
00111 CongestionDBContAllocator.free(cont);
00112 }
00113
00114 ClassAllocator<CongestRequestParam> CongestRequestParamAllocator("CongestRequestParamAllocator");
00115
00116 inline void
00117 Free_CongestRequestParam(CongestRequestParam * param)
00118 {
00119 CongestRequestParamAllocator.free(param);
00120 }
00121
00122
00123
00124
00125
00126
00127
00128
00129 static long
00130 congestEntryGCTime = 0;
00131
00132
00133
00134
00135
00136 void
00137 preCongestEntryGC(void)
00138 {
00139 congestEntryGCTime = (long) ink_hrtime_to_sec(ink_get_hrtime());
00140 }
00141
00142
00143
00144 bool
00145 congestEntryGC(CongestionEntry * p)
00146 {
00147 if (!p->usefulInfo(congestEntryGCTime)) {
00148 p->put();
00149 return true;
00150 }
00151 return false;
00152 }
00153
00154 CongestionDB::CongestionDB(int tablesize)
00155 :CongestionTable(tablesize, &congestEntryGC, &preCongestEntryGC)
00156 {
00157 ink_assert(tablesize > 0);
00158 todo_lists = new InkAtomicList[MT_HASHTABLE_PARTITIONS];
00159 for (int i = 0; i < MT_HASHTABLE_PARTITIONS; i++) {
00160 ink_atomiclist_init(&todo_lists[i], "cong_todo_list", (uintptr_t) &((CongestRequestParam *) 0)->link);
00161 }
00162 }
00163
00164
00165
00166
00167
00168 CongestionDB::~CongestionDB()
00169 {
00170 delete[]todo_lists;
00171 }
00172
00173 void
00174 CongestionDB::addRecord(uint64_t key, CongestionEntry * pEntry)
00175 {
00176 ink_assert(key == pEntry->m_key);
00177 pEntry->get();
00178 ProxyMutex *bucket_mutex = lock_for_key(key);
00179 MUTEX_TRY_LOCK(lock, bucket_mutex, this_ethread());
00180 if (lock) {
00181 RunTodoList(part_num(key));
00182 CongestionEntry *tmp = insert_entry(key, pEntry);
00183 if (tmp)
00184 tmp->put();
00185 } else {
00186 CongestRequestParam *param = CongestRequestParamAllocator.alloc();
00187 param->m_op = CongestRequestParam::ADD_RECORD;
00188 param->m_key = key;
00189 param->m_pEntry = pEntry;
00190 ink_atomiclist_push(&todo_lists[part_num(key)], param);
00191 }
00192 }
00193
00194 void
00195 CongestionDB::removeAllRecords()
00196 {
00197 CongestionEntry *tmp;
00198 Iter it;
00199 for (int part = 0; part < MT_HASHTABLE_PARTITIONS; part++) {
00200 ProxyMutex *bucket_mutex = lock_for_key(part);
00201 MUTEX_TRY_LOCK(lock, bucket_mutex, this_ethread());
00202 if (lock) {
00203 RunTodoList(part);
00204 tmp = first_entry(part, &it);
00205 while (tmp) {
00206 remove_entry(part, &it);
00207 tmp->put();
00208 tmp = cur_entry(part, &it);
00209 }
00210 } else {
00211 CongestRequestParam *param = CongestRequestParamAllocator.alloc();
00212 param->m_op = CongestRequestParam::REMOVE_ALL_RECORDS;
00213 param->m_key = part;
00214 ink_atomiclist_push(&todo_lists[part], param);
00215 }
00216 }
00217 }
00218
00219 void
00220 CongestionDB::removeRecord(uint64_t key)
00221 {
00222 CongestionEntry *tmp;
00223 ProxyMutex *bucket_mutex = lock_for_key(key);
00224 MUTEX_TRY_LOCK(lock, bucket_mutex, this_ethread());
00225 if (lock) {
00226 RunTodoList(part_num(key));
00227 tmp = remove_entry(key);
00228 if (tmp)
00229 tmp->put();
00230 } else {
00231 CongestRequestParam *param = CongestRequestParamAllocator.alloc();
00232 param->m_op = CongestRequestParam::REMOVE_RECORD;
00233 param->m_key = key;
00234 ink_atomiclist_push(&todo_lists[part_num(key)], param);
00235 }
00236 }
00237
00238
00239 void
00240 CongestionDB::process(int buckId, CongestRequestParam * param)
00241 {
00242 CongestionEntry *pEntry = NULL;
00243 switch (param->m_op) {
00244 case CongestRequestParam::ADD_RECORD:
00245 pEntry = insert_entry(param->m_key, param->m_pEntry);
00246 if (pEntry) {
00247 pEntry->put();
00248 }
00249 break;
00250 case CongestRequestParam::REMOVE_ALL_RECORDS:
00251 {
00252 CongestionEntry *tmp;
00253 Iter it;
00254 tmp = first_entry(param->m_key, &it);
00255 while (tmp) {
00256 remove_entry(param->m_key, &it);
00257 tmp->put();
00258 tmp = cur_entry(param->m_key, &it);
00259 }
00260 break;
00261 }
00262 case CongestRequestParam::REMOVE_RECORD:
00263 pEntry = remove_entry(param->m_key);
00264 if (pEntry)
00265 pEntry->put();
00266 break;
00267 case CongestRequestParam::REVALIDATE_BUCKET:
00268 revalidateBucket(buckId);
00269 break;
00270 default:
00271 ink_assert(!"CongestionDB::process unrecognized op");
00272 }
00273 }
00274
00275 void
00276 CongestionDB::RunTodoList(int buckId)
00277 {
00278 CongestRequestParam *param = NULL, *cur = NULL;
00279 if ((param = (CongestRequestParam *)
00280 ink_atomiclist_popall(&todo_lists[buckId])) != NULL) {
00281
00282 param->link.prev = NULL;
00283 while (param->link.next) {
00284 param->link.next->link.prev = param;
00285 param = param->link.next;
00286 };
00287 while (param) {
00288 process(buckId, param);
00289 cur = param;
00290 param = param->link.prev;
00291 Free_CongestRequestParam(cur);
00292 }
00293 }
00294 }
00295
00296 void
00297 CongestionDB::revalidateBucket(int buckId)
00298 {
00299 Iter it;
00300 CongestionEntry *cur = NULL;
00301 cur = first_entry(buckId, &it);
00302 while (cur != NULL) {
00303 if (!cur->validate()) {
00304 remove_entry(buckId, &it);
00305 cur->put();
00306
00307
00308 cur = cur_entry(buckId, &it);
00309 } else
00310 cur = next_entry(buckId, &it);
00311 }
00312 }
00313
00314
00315
00316
00317
00318 int
00319 CongestionDBCont::GC(int , Event * )
00320 {
00321 if (congestionControlEnabled == 1 || congestionControlEnabled == 2) {
00322 if (theCongestionDB == NULL)
00323 goto Ldone;
00324 for (; CDBC_pid < theCongestionDB->getSize(); CDBC_pid++) {
00325 ProxyMutex *bucket_mutex = theCongestionDB->lock_for_key(CDBC_pid);
00326 {
00327 MUTEX_TRY_LOCK(lock, bucket_mutex, this_ethread());
00328 if (lock) {
00329 ink_hrtime now = ink_get_hrtime();
00330 now = ink_hrtime_to_sec(now);
00331 theCongestionDB->RunTodoList(CDBC_pid);
00332 Iter it;
00333 CongestionEntry *pEntry = theCongestionDB->first_entry(CDBC_pid, &it);
00334 while (pEntry) {
00335 if (!pEntry->usefulInfo(now)) {
00336 theCongestionDB->remove_entry(CDBC_pid, &it);
00337 pEntry->put();
00338 pEntry = theCongestionDB->cur_entry(CDBC_pid, &it);
00339 }
00340 }
00341 } else {
00342 Debug("congestion_db", "flush gc missed the lock [%d], retry", CDBC_pid);
00343 return EVENT_CONT;
00344 }
00345 }
00346 }
00347 }
00348 Ldone:
00349 CDBC_pid = 0;
00350 return EVENT_DONE;
00351 }
00352
00353 int
00354 CongestionDBCont::get_congest_list(int , Event * e)
00355 {
00356 if (m_action.cancelled) {
00357 Free_CongestionDBCont(this);
00358 return EVENT_DONE;
00359 }
00360 for (; CDBC_pid < theCongestionDB->getSize(); CDBC_pid++) {
00361 ProxyMutex *bucket_mutex = theCongestionDB->lock_for_key(CDBC_pid);
00362 {
00363 MUTEX_TRY_LOCK(lock_bucket, bucket_mutex, this_ethread());
00364 if (!lock_bucket) {
00365 e->schedule_in(SCHEDULE_CONGEST_CONT_INTERVAL);
00366 return EVENT_CONT;
00367 } else {
00368 theCongestionDB->RunTodoList(CDBC_pid);
00369 char buf[1024];
00370 Iter it;
00371 int len;
00372 CongestionEntry *pEntry = theCongestionDB->first_entry(CDBC_pid, &it);
00373 while (pEntry) {
00374 if ((pEntry->congested() && pEntry->pRecord->max_connection != 0) || CDBC_lf > 10) {
00375 len = pEntry->sprint(buf, 1024, CDBC_lf);
00376 CDBC_buf->write(buf, len);
00377 }
00378 pEntry = theCongestionDB->next_entry(CDBC_pid, &it);
00379 }
00380 }
00381 }
00382 }
00383
00384
00385 m_action.continuation->handleEvent(CONGESTION_EVENT_CONGESTED_LIST_DONE, NULL);
00386 Free_CongestionDBCont(this);
00387 return EVENT_DONE;
00388 }
00389
00390 int
00391 CongestionDBCont::get_congest_entry(int , Event * e)
00392 {
00393 Debug("congestion_control", "cont::get_congest_entry started");
00394
00395 if (m_action.cancelled) {
00396 Debug("congestion_cont", "action cancelled for %p", this);
00397 Free_CongestionDBCont(this);
00398 Debug("congestion_control", "cont::get_congest_entry state machine canceled");
00399 return EVENT_DONE;
00400 }
00401 ProxyMutex *bucket_mutex = theCongestionDB->lock_for_key(CDBC_key);
00402 MUTEX_TRY_LOCK(lock_bucket, bucket_mutex, this_ethread());
00403 if (lock_bucket) {
00404 theCongestionDB->RunTodoList(theCongestionDB->part_num(CDBC_key));
00405 *CDBC_ppE = theCongestionDB->lookup_entry(CDBC_key);
00406 if (*CDBC_ppE != NULL) {
00407 CDBC_rule->put();
00408 (*CDBC_ppE)->get();
00409 Debug("congestion_control", "cont::get_congest_entry entry found");
00410 m_action.continuation->handleEvent(CONGESTION_EVENT_CONTROL_LOOKUP_DONE, NULL);
00411 } else {
00412
00413 *CDBC_ppE = new CongestionEntry(CDBC_host, &CDBC_ip.sa, CDBC_rule, CDBC_key);
00414 CDBC_rule->put();
00415 (*CDBC_ppE)->get();
00416 theCongestionDB->insert_entry(CDBC_key, *CDBC_ppE);
00417 Debug("congestion_control", "cont::get_congest_entry new entry created");
00418 m_action.continuation->handleEvent(CONGESTION_EVENT_CONTROL_LOOKUP_DONE, NULL);
00419 }
00420 Free_CongestionDBCont(this);
00421 return EVENT_DONE;
00422 } else {
00423 Debug("congestion_control", "cont::get_congest_entry MUTEX_TRY_LOCK failed");
00424 e->schedule_in(SCHEDULE_CONGEST_CONT_INTERVAL);
00425 return EVENT_CONT;
00426 }
00427 }
00428
00429
00430
00431
00432
00433 void
00434 initCongestionDB()
00435 {
00436 if (theCongestionDB == NULL) {
00437 theCongestionDB = new CongestionDB(CONGESTION_DB_SIZE / MT_HASHTABLE_PARTITIONS);
00438 }
00439 }
00440
00441 void
00442 revalidateCongestionDB()
00443 {
00444 ProxyMutex *bucket_mutex;
00445 if (theCongestionDB == NULL) {
00446 theCongestionDB = new CongestionDB(CONGESTION_DB_SIZE / MT_HASHTABLE_PARTITIONS);
00447 return;
00448 }
00449 Debug("congestion_config", "congestion control revalidating CongestionDB");
00450 for (int i = 0; i < theCongestionDB->getSize(); i++) {
00451 bucket_mutex = theCongestionDB->lock_for_key(i);
00452 {
00453 MUTEX_TRY_LOCK(lock_bucket, bucket_mutex, this_ethread());
00454 if (lock_bucket) {
00455 theCongestionDB->RunTodoList(i);
00456 theCongestionDB->revalidateBucket(i);
00457 } else {
00458 CongestRequestParam *param = CongestRequestParamAllocator.alloc();
00459 param->m_op = CongestRequestParam::REVALIDATE_BUCKET;
00460 ink_atomiclist_push(&theCongestionDB->todo_lists[i], param);
00461 }
00462 }
00463 }
00464 Debug("congestion_config", "congestion control revalidating CongestionDB Done");
00465 }
00466
00467 Action *
00468 get_congest_entry(Continuation * cont, HttpRequestData * data, CongestionEntry ** ppEntry)
00469 {
00470 if (congestionControlEnabled != 1 && congestionControlEnabled != 2)
00471 return ACTION_RESULT_DONE;
00472 Debug("congestion_control", "congestion control get_congest_entry start");
00473
00474 CongestionControlRecord *p = CongestionControlled(data);
00475 Debug("congestion_control", "Control Matcher matched rule_num %d", p == NULL ? -1 : p->line_num);
00476 if (p == NULL)
00477 return ACTION_RESULT_DONE;
00478
00479 if (p->max_connection_failures <= 0 && p->max_connection < 0) {
00480 return ACTION_RESULT_DONE;
00481 }
00482 uint64_t key = make_key((char *) data->get_host(), data->get_ip(), p);
00483 Debug("congestion_control", "Key = %" PRIu64 "", key);
00484
00485 ProxyMutex *bucket_mutex = theCongestionDB->lock_for_key(key);
00486 MUTEX_TRY_LOCK(lock_bucket, bucket_mutex, this_ethread());
00487 if (lock_bucket) {
00488 theCongestionDB->RunTodoList(theCongestionDB->part_num(key));
00489 *ppEntry = theCongestionDB->lookup_entry(key);
00490 if (*ppEntry != NULL) {
00491 (*ppEntry)->get();
00492 Debug("congestion_control", "get_congest_entry, found entry %p done", (void *) *ppEntry);
00493 return ACTION_RESULT_DONE;
00494 } else {
00495
00496 *ppEntry = new CongestionEntry(data->get_host(), data->get_ip(), p, key);
00497 (*ppEntry)->get();
00498 theCongestionDB->insert_entry(key, *ppEntry);
00499 Debug("congestion_control", "get_congest_entry, new entry %p done", (void *) *ppEntry);
00500 return ACTION_RESULT_DONE;
00501 }
00502 } else {
00503 Debug("congestion_control", "get_congest_entry, trylock failed, schedule cont");
00504 CongestionDBCont *Ccont = CongestionDBContAllocator.alloc();
00505 Ccont->m_action = cont;
00506 Ccont->mutex = cont->mutex;
00507 Ccont->CDBC_key = key;
00508 Ccont->CDBC_host = (char *) data->get_host();
00509 ats_ip_copy(&Ccont->CDBC_ip.sa, data->get_ip());
00510 p->get();
00511 Ccont->CDBC_rule = p;
00512 Ccont->CDBC_ppE = ppEntry;
00513
00514 SET_CONTINUATION_HANDLER(Ccont, &CongestionDBCont::get_congest_entry);
00515 eventProcessor.schedule_in(Ccont, SCHEDULE_CONGEST_CONT_INTERVAL, ET_NET);
00516 return &Ccont->m_action;
00517 }
00518 }
00519
00520 Action *
00521 get_congest_list(Continuation * cont, MIOBuffer * buffer, int format)
00522 {
00523 if (theCongestionDB == NULL || (congestionControlEnabled != 1 && congestionControlEnabled != 2))
00524 return ACTION_RESULT_DONE;
00525 for (int i = 0; i < theCongestionDB->getSize(); i++) {
00526 ProxyMutex *bucket_mutex = theCongestionDB->lock_for_key(i);
00527 {
00528 MUTEX_TRY_LOCK(lock_bucket, bucket_mutex, this_ethread());
00529 if (lock_bucket) {
00530 theCongestionDB->RunTodoList(i);
00531 char buf[1024];
00532 Iter it;
00533 int len;
00534 CongestionEntry *pEntry = theCongestionDB->first_entry(i, &it);
00535 while (pEntry) {
00536 if ((pEntry->congested() && pEntry->pRecord->max_connection != 0) || format > 10) {
00537 len = pEntry->sprint(buf, 1024, format);
00538 buffer->write(buf, len);
00539 }
00540 pEntry = theCongestionDB->next_entry(i, &it);
00541 }
00542 } else {
00543
00544 CongestionDBCont *CCcont = CongestionDBContAllocator.alloc();
00545 CCcont->CDBC_pid = i;
00546 CCcont->CDBC_buf = buffer;
00547 CCcont->m_action = cont;
00548 CCcont->mutex = cont->mutex;
00549 CCcont->CDBC_lf = format;
00550 SET_CONTINUATION_HANDLER(CCcont, &CongestionDBCont::get_congest_list);
00551 eventProcessor.schedule_in(CCcont, SCHEDULE_CONGEST_CONT_INTERVAL, ET_NET);
00552 return &CCcont->m_action;
00553 }
00554 }
00555 }
00556 return ACTION_RESULT_DONE;
00557 }
00558
00559
00560
00561
00562
00563
00564 void
00565 remove_all_congested_entry()
00566 {
00567 if (theCongestionDB != NULL) {
00568 theCongestionDB->removeAllRecords();
00569 }
00570 }
00571
00572 void
00573 remove_congested_entry(uint64_t key)
00574 {
00575 if (theCongestionDB != NULL) {
00576 theCongestionDB->removeRecord(key);
00577 }
00578 }
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590 void
00591 remove_congested_entry(char *buf, MIOBuffer * out_buffer)
00592 {
00593 const int MSG_LEN = 512;
00594 char msg[MSG_LEN + 1];
00595 int len = 0;
00596 uint64_t key;
00597 if (strcasecmp(buf, "all") == 0) {
00598 remove_all_congested_entry();
00599 len = snprintf(msg, MSG_LEN, "all entries in congestion control table removed\n");
00600
00601 } else if (sscanf(buf, "key=%" PRIu64 "", &key) == 1) {
00602 remove_congested_entry(key);
00603 len = snprintf(msg, MSG_LEN, "key %" PRIu64 " removed\n", key);
00604 } else if (strncasecmp(buf, "host=", 5) == 0) {
00605 char *p = buf + 5;
00606 char *prefix = strchr(p, '/');
00607 int prelen = 0;
00608 if (prefix) {
00609 *prefix = '\0';
00610 prefix++;
00611 prelen = strlen(prefix);
00612 }
00613 key = make_key(p, strlen(p), 0, prefix, prelen);
00614 remove_congested_entry(key);
00615 len = snprintf(msg, MSG_LEN, "host=%s prefix=%s removed\n", p, prefix ? prefix : "(nil)");
00616 } else if (strncasecmp(buf, "ip=", 3) == 0) {
00617 IpEndpoint ip;
00618 memset(&ip, 0, sizeof(ip));
00619
00620 char *p = buf + 3;
00621 char *prefix = strchr(p, '/');
00622 int prelen = 0;
00623 if (prefix) {
00624 *prefix = '\0';
00625 prefix++;
00626 prelen = strlen(prefix);
00627 }
00628 ats_ip_pton(p, &ip);
00629 if (!ats_is_ip(&ip)) {
00630 len = snprintf(msg, MSG_LEN, "invalid ip: %s\n", buf);
00631 } else {
00632 key = make_key(NULL, 0, &ip.sa, prefix, prelen);
00633 remove_congested_entry(key);
00634 len = snprintf(msg, MSG_LEN, "ip=%s prefix=%s removed\n", p, prefix ? prefix : "(nil)");
00635 }
00636 }
00637 out_buffer->write(msg, len);
00638 }