00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 #include "libts.h"
00031 #include "P_EventSystem.h"
00032 #include "P_Net.h"
00033 #include "Main.h"
00034 #include "CongestionDB.h"
00035 #include "Congestion.h"
00036 #include "ProcessManager.h"
00037 
00038 #define SCHEDULE_CONGEST_CONT_INTERVAL HRTIME_MSECONDS(5)
00039 int CONGESTION_DB_SIZE = 1024;
00040 
00041 CongestionDB *theCongestionDB = NULL;
00042 
00043 
00044 
00045 
00046 
00047 
00048 
00049 
00050 class CongestionDBCont:public Continuation
00051 {
00052 public:
00053   CongestionDBCont();
00054   int GC(int event, Event * e);
00055 
00056   int get_congest_list(int event, Event * e);
00057 
00058   int get_congest_entry(int event, Event * e);
00059 
00060 
00061   Action m_action;
00062 
00063   
00064   union
00065   {
00066     struct
00067     {
00068       MIOBuffer *m_iobuf;
00069       int m_CurPartitionID;
00070       int m_list_format;        
00071     } list_info;
00072     struct
00073     {
00074       uint64_t m_key;
00075       char *m_hostname;
00076       IpEndpoint m_ip;
00077       CongestionControlRecord *m_rule;
00078       CongestionEntry **m_ppEntry;
00079     } entry_info;
00080   } data;
00081 };
00082 
00083 
00084 #define CDBC_buf  data.list_info.m_iobuf
00085 #define CDBC_pid  data.list_info.m_CurPartitionID
00086 #define CDBC_lf   data.list_info.m_list_format
00087 #define CDBC_key  data.entry_info.m_key
00088 #define CDBC_host data.entry_info.m_hostname
00089 #define CDBC_ip   data.entry_info.m_ip
00090 #define CDBC_rule data.entry_info.m_rule
00091 #define CDBC_ppE  data.entry_info.m_ppEntry
00092 
00093 inline CongestionDBCont::CongestionDBCont()
00094 :Continuation(NULL)
00095 {
00096   memset(&data, 0, sizeof(data));
00097 }
00098 
00099 
00100 
00101 static
00102   ClassAllocator <
00103   CongestionDBCont >
00104 CongestionDBContAllocator("CongestionDBContAllocator");
00105 
00106 inline void
00107 Free_CongestionDBCont(CongestionDBCont * cont)
00108 {
00109   cont->m_action = NULL;
00110   cont->mutex = NULL;
00111   CongestionDBContAllocator.free(cont);
00112 }
00113 
00114 ClassAllocator<CongestRequestParam> CongestRequestParamAllocator("CongestRequestParamAllocator");
00115 
00116 inline void
00117 Free_CongestRequestParam(CongestRequestParam * param)
00118 {
00119   CongestRequestParamAllocator.free(param);
00120 }
00121 
00122 
00123 
00124 
00125 
00126 
00127 
00128 
00129 static long
00130   congestEntryGCTime = 0;
00131 
00132 
00133 
00134 
00135 
00136 void
00137 preCongestEntryGC(void)
00138 {
00139   congestEntryGCTime = (long) ink_hrtime_to_sec(ink_get_hrtime());
00140 }
00141 
00142 
00143 
00144 bool
00145 congestEntryGC(CongestionEntry * p)
00146 {
00147   if (!p->usefulInfo(congestEntryGCTime)) {
00148     p->put();
00149     return true;
00150   }
00151   return false;
00152 }
00153 
00154 CongestionDB::CongestionDB(int tablesize)
00155 :CongestionTable(tablesize, &congestEntryGC, &preCongestEntryGC)
00156 {
00157   ink_assert(tablesize > 0);
00158   todo_lists = new InkAtomicList[MT_HASHTABLE_PARTITIONS];
00159   for (int i = 0; i < MT_HASHTABLE_PARTITIONS; i++) {
00160     ink_atomiclist_init(&todo_lists[i], "cong_todo_list", (uintptr_t) &((CongestRequestParam *) 0)->link);
00161   }
00162 }
00163 
00164 
00165 
00166 
00167 
00168 CongestionDB::~CongestionDB()
00169 {
00170   delete[]todo_lists;
00171 }
00172 
00173 void
00174 CongestionDB::addRecord(uint64_t key, CongestionEntry * pEntry)
00175 {
00176   ink_assert(key == pEntry->m_key);
00177   pEntry->get();
00178   ProxyMutex *bucket_mutex = lock_for_key(key);
00179   MUTEX_TRY_LOCK(lock, bucket_mutex, this_ethread());
00180   if (lock) {
00181     RunTodoList(part_num(key));
00182     CongestionEntry *tmp = insert_entry(key, pEntry);
00183     if (tmp)
00184       tmp->put();
00185   } else {
00186     CongestRequestParam *param = CongestRequestParamAllocator.alloc();
00187     param->m_op = CongestRequestParam::ADD_RECORD;
00188     param->m_key = key;
00189     param->m_pEntry = pEntry;
00190     ink_atomiclist_push(&todo_lists[part_num(key)], param);
00191   }
00192 }
00193 
00194 void
00195 CongestionDB::removeAllRecords()
00196 {
00197   CongestionEntry *tmp;
00198   Iter it;
00199   for (int part = 0; part < MT_HASHTABLE_PARTITIONS; part++) {
00200     ProxyMutex *bucket_mutex = lock_for_key(part);
00201     MUTEX_TRY_LOCK(lock, bucket_mutex, this_ethread());
00202     if (lock) {
00203       RunTodoList(part);
00204       tmp = first_entry(part, &it);
00205       while (tmp) {
00206         remove_entry(part, &it);
00207         tmp->put();
00208         tmp = cur_entry(part, &it);
00209       }
00210     } else {
00211       CongestRequestParam *param = CongestRequestParamAllocator.alloc();
00212       param->m_op = CongestRequestParam::REMOVE_ALL_RECORDS;
00213       param->m_key = part;
00214       ink_atomiclist_push(&todo_lists[part], param);
00215     }
00216   }
00217 }
00218 
00219 void
00220 CongestionDB::removeRecord(uint64_t key)
00221 {
00222   CongestionEntry *tmp;
00223   ProxyMutex *bucket_mutex = lock_for_key(key);
00224   MUTEX_TRY_LOCK(lock, bucket_mutex, this_ethread());
00225   if (lock) {
00226     RunTodoList(part_num(key));
00227     tmp = remove_entry(key);
00228     if (tmp)
00229       tmp->put();
00230   } else {
00231     CongestRequestParam *param = CongestRequestParamAllocator.alloc();
00232     param->m_op = CongestRequestParam::REMOVE_RECORD;
00233     param->m_key = key;
00234     ink_atomiclist_push(&todo_lists[part_num(key)], param);
00235   }
00236 }
00237 
00238 
00239 void
00240 CongestionDB::process(int buckId, CongestRequestParam * param)
00241 {
00242   CongestionEntry *pEntry = NULL;
00243   switch (param->m_op) {
00244   case CongestRequestParam::ADD_RECORD:
00245     pEntry = insert_entry(param->m_key, param->m_pEntry);
00246     if (pEntry) {
00247       pEntry->put();
00248     }
00249     break;
00250   case CongestRequestParam::REMOVE_ALL_RECORDS:
00251     {
00252       CongestionEntry *tmp;
00253       Iter it;
00254       tmp = first_entry(param->m_key, &it);
00255       while (tmp) {
00256         remove_entry(param->m_key, &it);
00257         tmp->put();
00258         tmp = cur_entry(param->m_key, &it);
00259       }
00260       break;
00261     }
00262   case CongestRequestParam::REMOVE_RECORD:
00263     pEntry = remove_entry(param->m_key);
00264     if (pEntry)
00265       pEntry->put();
00266     break;
00267   case CongestRequestParam::REVALIDATE_BUCKET:
00268     revalidateBucket(buckId);
00269     break;
00270   default:
00271     ink_assert(!"CongestionDB::process unrecognized op");
00272   }
00273 }
00274 
00275 void
00276 CongestionDB::RunTodoList(int buckId)
00277 {
00278   CongestRequestParam *param = NULL, *cur = NULL;
00279   if ((param = (CongestRequestParam *)
00280        ink_atomiclist_popall(&todo_lists[buckId])) != NULL) {
00281     
00282     param->link.prev = NULL;
00283     while (param->link.next) {
00284       param->link.next->link.prev = param;
00285       param = param->link.next;
00286     };
00287     while (param) {
00288       process(buckId, param);
00289       cur = param;
00290       param = param->link.prev;
00291       Free_CongestRequestParam(cur);
00292     }
00293   }
00294 }
00295 
00296 void
00297 CongestionDB::revalidateBucket(int buckId)
00298 {
00299   Iter it;
00300   CongestionEntry *cur = NULL;
00301   cur = first_entry(buckId, &it);
00302   while (cur != NULL) {
00303     if (!cur->validate()) {
00304       remove_entry(buckId, &it);
00305       cur->put();
00306       
00307       
00308       cur = cur_entry(buckId, &it);
00309     } else
00310       cur = next_entry(buckId, &it);
00311   }
00312 }
00313 
00314 
00315 
00316 
00317 
00318 int
00319 CongestionDBCont::GC(int , Event * )
00320 {
00321   if (congestionControlEnabled == 1 || congestionControlEnabled == 2) {
00322     if (theCongestionDB == NULL)
00323       goto Ldone;
00324     for (; CDBC_pid < theCongestionDB->getSize(); CDBC_pid++) {
00325       ProxyMutex *bucket_mutex = theCongestionDB->lock_for_key(CDBC_pid);
00326       {
00327         MUTEX_TRY_LOCK(lock, bucket_mutex, this_ethread());
00328         if (lock) {
00329           ink_hrtime now = ink_get_hrtime();
00330           now = ink_hrtime_to_sec(now);
00331           theCongestionDB->RunTodoList(CDBC_pid);
00332           Iter it;
00333           CongestionEntry *pEntry = theCongestionDB->first_entry(CDBC_pid, &it);
00334           while (pEntry) {
00335             if (!pEntry->usefulInfo(now)) {
00336               theCongestionDB->remove_entry(CDBC_pid, &it);
00337               pEntry->put();
00338               pEntry = theCongestionDB->cur_entry(CDBC_pid, &it);
00339             }
00340           }
00341         } else {
00342           Debug("congestion_db", "flush gc missed the lock [%d], retry", CDBC_pid);
00343           return EVENT_CONT;
00344         }
00345       }
00346     }
00347   }
00348 Ldone:
00349   CDBC_pid = 0;
00350   return EVENT_DONE;
00351 }
00352 
00353 int
00354 CongestionDBCont::get_congest_list(int , Event * e)
00355 {
00356   if (m_action.cancelled) {
00357     Free_CongestionDBCont(this);
00358     return EVENT_DONE;
00359   }
00360   for (; CDBC_pid < theCongestionDB->getSize(); CDBC_pid++) {
00361     ProxyMutex *bucket_mutex = theCongestionDB->lock_for_key(CDBC_pid);
00362     {
00363       MUTEX_TRY_LOCK(lock_bucket, bucket_mutex, this_ethread());
00364       if (!lock_bucket) {
00365         e->schedule_in(SCHEDULE_CONGEST_CONT_INTERVAL);
00366         return EVENT_CONT;
00367       } else {
00368         theCongestionDB->RunTodoList(CDBC_pid);
00369         char buf[1024];
00370         Iter it;
00371         int len;
00372         CongestionEntry *pEntry = theCongestionDB->first_entry(CDBC_pid, &it);
00373         while (pEntry) {
00374           if ((pEntry->congested() && pEntry->pRecord->max_connection != 0) || CDBC_lf > 10) {
00375             len = pEntry->sprint(buf, 1024, CDBC_lf);
00376             CDBC_buf->write(buf, len);
00377           }
00378           pEntry = theCongestionDB->next_entry(CDBC_pid, &it);
00379         }
00380       }
00381     }
00382   }
00383 
00384   
00385   m_action.continuation->handleEvent(CONGESTION_EVENT_CONGESTED_LIST_DONE, NULL);
00386   Free_CongestionDBCont(this);
00387   return EVENT_DONE;
00388 }
00389 
00390 int
00391 CongestionDBCont::get_congest_entry(int , Event * e)
00392 {
00393   Debug("congestion_control", "cont::get_congest_entry started");
00394 
00395   if (m_action.cancelled) {
00396     Debug("congestion_cont", "action cancelled for %p", this);
00397     Free_CongestionDBCont(this);
00398     Debug("congestion_control", "cont::get_congest_entry state machine canceled");
00399     return EVENT_DONE;
00400   }
00401   ProxyMutex *bucket_mutex = theCongestionDB->lock_for_key(CDBC_key);
00402   MUTEX_TRY_LOCK(lock_bucket, bucket_mutex, this_ethread());
00403   if (lock_bucket) {
00404     theCongestionDB->RunTodoList(theCongestionDB->part_num(CDBC_key));
00405     *CDBC_ppE = theCongestionDB->lookup_entry(CDBC_key);
00406     if (*CDBC_ppE != NULL) {
00407       CDBC_rule->put();
00408       (*CDBC_ppE)->get();
00409       Debug("congestion_control", "cont::get_congest_entry entry found");
00410       m_action.continuation->handleEvent(CONGESTION_EVENT_CONTROL_LOOKUP_DONE, NULL);
00411     } else {
00412       
00413       *CDBC_ppE = new CongestionEntry(CDBC_host, &CDBC_ip.sa, CDBC_rule, CDBC_key);
00414       CDBC_rule->put();
00415       (*CDBC_ppE)->get();
00416       theCongestionDB->insert_entry(CDBC_key, *CDBC_ppE);
00417       Debug("congestion_control", "cont::get_congest_entry new entry created");
00418       m_action.continuation->handleEvent(CONGESTION_EVENT_CONTROL_LOOKUP_DONE, NULL);
00419     }
00420     Free_CongestionDBCont(this);
00421     return EVENT_DONE;
00422   } else {
00423     Debug("congestion_control", "cont::get_congest_entry MUTEX_TRY_LOCK failed");
00424     e->schedule_in(SCHEDULE_CONGEST_CONT_INTERVAL);
00425     return EVENT_CONT;
00426   }
00427 }
00428 
00429 
00430 
00431 
00432 
00433 void
00434 initCongestionDB()
00435 {
00436   if (theCongestionDB == NULL) {
00437     theCongestionDB = new CongestionDB(CONGESTION_DB_SIZE / MT_HASHTABLE_PARTITIONS);
00438   }
00439 }
00440 
00441 void
00442 revalidateCongestionDB()
00443 {
00444   ProxyMutex *bucket_mutex;
00445   if (theCongestionDB == NULL) {
00446     theCongestionDB = new CongestionDB(CONGESTION_DB_SIZE / MT_HASHTABLE_PARTITIONS);
00447     return;
00448   }
00449   Debug("congestion_config", "congestion control revalidating CongestionDB");
00450   for (int i = 0; i < theCongestionDB->getSize(); i++) {
00451     bucket_mutex = theCongestionDB->lock_for_key(i);
00452     {
00453       MUTEX_TRY_LOCK(lock_bucket, bucket_mutex, this_ethread());
00454       if (lock_bucket) {
00455         theCongestionDB->RunTodoList(i);
00456         theCongestionDB->revalidateBucket(i);
00457       } else {
00458         CongestRequestParam *param = CongestRequestParamAllocator.alloc();
00459         param->m_op = CongestRequestParam::REVALIDATE_BUCKET;
00460         ink_atomiclist_push(&theCongestionDB->todo_lists[i], param);
00461       }
00462     }
00463   }
00464   Debug("congestion_config", "congestion control revalidating CongestionDB Done");
00465 }
00466 
00467 Action *
00468 get_congest_entry(Continuation * cont, HttpRequestData * data, CongestionEntry ** ppEntry)
00469 {
00470   if (congestionControlEnabled != 1 && congestionControlEnabled != 2)
00471     return ACTION_RESULT_DONE;
00472   Debug("congestion_control", "congestion control get_congest_entry start");
00473 
00474   CongestionControlRecord *p = CongestionControlled(data);
00475   Debug("congestion_control", "Control Matcher matched rule_num %d", p == NULL ? -1 : p->line_num);
00476   if (p == NULL)
00477     return ACTION_RESULT_DONE;
00478 
00479   if (p->max_connection_failures <= 0 && p->max_connection < 0) {
00480     return ACTION_RESULT_DONE;
00481   }
00482   uint64_t key = make_key((char *) data->get_host(), data->get_ip(), p);
00483   Debug("congestion_control", "Key = %" PRIu64 "", key);
00484 
00485   ProxyMutex *bucket_mutex = theCongestionDB->lock_for_key(key);
00486   MUTEX_TRY_LOCK(lock_bucket, bucket_mutex, this_ethread());
00487   if (lock_bucket) {
00488     theCongestionDB->RunTodoList(theCongestionDB->part_num(key));
00489     *ppEntry = theCongestionDB->lookup_entry(key);
00490     if (*ppEntry != NULL) {
00491       (*ppEntry)->get();
00492       Debug("congestion_control", "get_congest_entry, found entry %p done", (void *) *ppEntry);
00493       return ACTION_RESULT_DONE;
00494     } else {
00495       
00496       *ppEntry = new CongestionEntry(data->get_host(), data->get_ip(), p, key);
00497       (*ppEntry)->get();
00498       theCongestionDB->insert_entry(key, *ppEntry);
00499       Debug("congestion_control", "get_congest_entry, new entry %p done", (void *) *ppEntry);
00500       return ACTION_RESULT_DONE;
00501     }
00502   } else {
00503     Debug("congestion_control", "get_congest_entry, trylock failed, schedule cont");
00504     CongestionDBCont *Ccont = CongestionDBContAllocator.alloc();
00505     Ccont->m_action = cont;
00506     Ccont->mutex = cont->mutex;
00507     Ccont->CDBC_key = key;
00508     Ccont->CDBC_host = (char *) data->get_host();
00509     ats_ip_copy(&Ccont->CDBC_ip.sa, data->get_ip());
00510     p->get();
00511     Ccont->CDBC_rule = p;
00512     Ccont->CDBC_ppE = ppEntry;
00513 
00514     SET_CONTINUATION_HANDLER(Ccont, &CongestionDBCont::get_congest_entry);
00515     eventProcessor.schedule_in(Ccont, SCHEDULE_CONGEST_CONT_INTERVAL, ET_NET);
00516     return &Ccont->m_action;
00517   }
00518 }
00519 
00520 Action *
00521 get_congest_list(Continuation * cont, MIOBuffer * buffer, int format)
00522 {
00523   if (theCongestionDB == NULL || (congestionControlEnabled != 1 && congestionControlEnabled != 2))
00524     return ACTION_RESULT_DONE;
00525   for (int i = 0; i < theCongestionDB->getSize(); i++) {
00526     ProxyMutex *bucket_mutex = theCongestionDB->lock_for_key(i);
00527     {
00528       MUTEX_TRY_LOCK(lock_bucket, bucket_mutex, this_ethread());
00529       if (lock_bucket) {
00530         theCongestionDB->RunTodoList(i);
00531         char buf[1024];
00532         Iter it;
00533         int len;
00534         CongestionEntry *pEntry = theCongestionDB->first_entry(i, &it);
00535         while (pEntry) {
00536           if ((pEntry->congested() && pEntry->pRecord->max_connection != 0) || format > 10) {
00537             len = pEntry->sprint(buf, 1024, format);
00538             buffer->write(buf, len);
00539           }
00540           pEntry = theCongestionDB->next_entry(i, &it);
00541         }
00542       } else {
00543         
00544         CongestionDBCont *CCcont = CongestionDBContAllocator.alloc();
00545         CCcont->CDBC_pid = i;
00546         CCcont->CDBC_buf = buffer;
00547         CCcont->m_action = cont;
00548         CCcont->mutex = cont->mutex;
00549         CCcont->CDBC_lf = format;
00550         SET_CONTINUATION_HANDLER(CCcont, &CongestionDBCont::get_congest_list);
00551         eventProcessor.schedule_in(CCcont, SCHEDULE_CONGEST_CONT_INTERVAL, ET_NET);
00552         return &CCcont->m_action;
00553       }
00554     }
00555   }
00556   return ACTION_RESULT_DONE;
00557 }
00558 
00559 
00560 
00561 
00562 
00563 
00564 void
00565 remove_all_congested_entry()
00566 {
00567   if (theCongestionDB != NULL) {
00568     theCongestionDB->removeAllRecords();
00569   }
00570 }
00571 
00572 void
00573 remove_congested_entry(uint64_t key)
00574 {
00575   if (theCongestionDB != NULL) {
00576     theCongestionDB->removeRecord(key);
00577   }
00578 }
00579 
00580 
00581 
00582 
00583 
00584 
00585 
00586 
00587 
00588 
00589 
00590 void
00591 remove_congested_entry(char *buf, MIOBuffer * out_buffer)
00592 {
00593   const int MSG_LEN = 512;
00594   char msg[MSG_LEN + 1];
00595   int len = 0;
00596   uint64_t key;
00597   if (strcasecmp(buf, "all") == 0) {
00598     remove_all_congested_entry();
00599     len = snprintf(msg, MSG_LEN, "all entries in congestion control table removed\n");
00600     
00601   } else if (sscanf(buf, "key=%" PRIu64 "", &key) == 1) {
00602     remove_congested_entry(key);
00603     len = snprintf(msg, MSG_LEN, "key %" PRIu64 " removed\n", key);
00604   } else if (strncasecmp(buf, "host=", 5) == 0) {
00605     char *p = buf + 5;
00606     char *prefix = strchr(p, '/');
00607     int prelen = 0;
00608     if (prefix) {
00609       *prefix = '\0';
00610       prefix++;
00611       prelen = strlen(prefix);
00612     }
00613     key = make_key(p, strlen(p), 0, prefix, prelen);
00614     remove_congested_entry(key);
00615     len = snprintf(msg, MSG_LEN, "host=%s prefix=%s removed\n", p, prefix ? prefix : "(nil)");
00616   } else if (strncasecmp(buf, "ip=", 3) == 0) {
00617     IpEndpoint ip;
00618     memset(&ip, 0, sizeof(ip));
00619     
00620     char *p = buf + 3;
00621     char *prefix = strchr(p, '/');
00622     int prelen = 0;
00623     if (prefix) {
00624       *prefix = '\0';
00625       prefix++;
00626       prelen = strlen(prefix);
00627     }
00628     ats_ip_pton(p, &ip);
00629     if (!ats_is_ip(&ip)) {
00630       len = snprintf(msg, MSG_LEN, "invalid ip: %s\n", buf);
00631     } else {
00632       key = make_key(NULL, 0, &ip.sa, prefix, prelen);
00633       remove_congested_entry(key);
00634       len = snprintf(msg, MSG_LEN, "ip=%s prefix=%s removed\n", p, prefix ? prefix : "(nil)");
00635     }
00636   }
00637   out_buffer->write(msg, len);
00638 }