00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 #include "libts.h"
00032 #include "P_EventSystem.h"
00033 #include "P_Cache.h"
00034 #include "P_Net.h"
00035 #include "P_RecProcess.h"
00036 #include "Main.h"
00037 #include "ICP.h"
00038 #include "ICPProcessor.h"
00039 #include "ICPlog.h"
00040 #include "BaseManager.h"
00041 #include "I_Layout.h"
00042 
00043 
00044 
00045 
00046 
00047 
00048 
00049 
00050 
00051 
00052 
00053 
00054 
00055 
00056 
00057 
00058 
00059 
00060 
00061 
00062 
00063 
00064 
00065 
00066 
00067 
00068 
00069 
00070 
00071 
00072 
00073 
00074 
00075 
00076 
00077 
00078 
00079 
00080 
00081 
00082 
00083 
00084 
00085 
00086 
00087 
00088 
00089 
00090 
00091 
00092 
00093 
00094 
00095 
00096 
00097 
00098 
00099 
00100 
00101 
00102 
00103 
00104 
00105 
00106 
00107 
00108 
00109 
00110 
00111 
00112 
00113 
00114 
00115 
00116 
00117 
00118 
00119 
00120 
00121 
00122 
00123 
00124 
00125 #if !defined(USE_CAS_FOR_ATOMICLOCK)
00126 AtomicLock::AtomicLock()
00127 {
00128   _mutex = new_ProxyMutex();
00129 }
00130 
00131 AtomicLock::~AtomicLock()
00132 {
00133 }
00134 
00135 int
00136 AtomicLock::Lock()
00137 {
00138   EThread *et = this_ethread();
00139   ink_assert(et != NULL);
00140   return MUTEX_TAKE_TRY_LOCK(_mutex, et);
00141 }
00142 
00143 int
00144 AtomicLock::HaveLock()
00145 {
00146   EThread *et = this_ethread();
00147   ink_assert(et != NULL);
00148   return (_mutex->thread_holding == et);
00149 }
00150 
00151 void
00152 AtomicLock::Unlock()
00153 {
00154   EThread *et = this_ethread();
00155   ink_assert(et != NULL);
00156   MUTEX_UNTAKE_LOCK(_mutex, et);
00157 }
00158 
00159 #else // USE_CAS_FOR_ATOMICLOCK
00160 AtomicLock::AtomicLock():lock_word(0)
00161 {
00162 }
00163 
00164 AtomicLock::~AtomicLock()
00165 {
00166 }
00167 
00168 int
00169 AtomicLock::Lock()
00170 {
00171   bool status = ink_atomic_cas(&_lock_word, AtomicLock::UNLOCKED,
00172                                AtomicLock::LOCKED);
00173   return status;
00174 }
00175 
00176 int
00177 AtomicLock::HaveLock()
00178 {
00179   return (_lock_word == LOCKED);
00180 }
00181 
00182 void
00183 AtomicLock::Unlock()
00184 {
00185   ink_assert(_lock_word == AtomicLock::LOCKED);
00186   _lock_word = AtomicLock::UNLOCKED;
00187 }
00188 #endif // USE_CAS_FOR_ATOMICLOCK
00189 
00190 
00191 
00192 
00193 
00194 
00195 BitMap::BitMap(int bitmap_maxsize)
00196 {
00197   if (bitmap_maxsize <= (int) (STATIC_BITMAP_BYTE_SIZE * BITS_PER_BYTE)) {
00198     _bitmap = _static_bitmap;
00199     _bitmap_size = bitmap_maxsize;
00200     _bitmap_byte_size = STATIC_BITMAP_BYTE_SIZE;
00201   } else {
00202     _bitmap_byte_size = (bitmap_maxsize + (BITS_PER_BYTE - 1)) / BITS_PER_BYTE;
00203     _bitmap = new char[_bitmap_byte_size];
00204     _bitmap_size = bitmap_maxsize;
00205   }
00206   memset((void *) _bitmap, 0, _bitmap_byte_size);
00207 }
00208 
00209 BitMap::~BitMap()
00210 {
00211   if (_bitmap_size > (int) (STATIC_BITMAP_BYTE_SIZE * BITS_PER_BYTE)) {
00212     delete[]_bitmap;
00213   }
00214 }
00215 
00216 void
00217 BitMap::SetBit(int bit)
00218 {
00219   if (bit >= _bitmap_size)
00220     return;
00221 
00222   char *pbyte = &_bitmap[bit / BITS_PER_BYTE];
00223   *pbyte |= (1 << (bit % BITS_PER_BYTE));
00224 }
00225 
00226 void
00227 BitMap::ClearBit(int bit)
00228 {
00229   if (bit >= _bitmap_size)
00230     return;
00231 
00232   char *pbyte = &_bitmap[bit / BITS_PER_BYTE];
00233   *pbyte &= ~(1 << (bit % BITS_PER_BYTE));
00234 }
00235 
00236 int
00237 BitMap::IsBitSet(int bit)
00238 {
00239   if (bit >= _bitmap_size)
00240     return 0;
00241 
00242   char *pbyte = &_bitmap[bit / BITS_PER_BYTE];
00243   if (*pbyte & (1 << (bit % BITS_PER_BYTE)))
00244     return 1;
00245   else
00246     return 0;
00247 }
00248 
00249 
00250 
00251 
00252 
00253 
00254 int
00255 ICPConfigData::operator==(ICPConfigData & ICPData)
00256 {
00257   if (ICPData._icp_enabled != _icp_enabled)
00258     return 0;
00259   if (ICPData._icp_port != _icp_port)
00260     return 0;
00261   if (ICPData._icp_interface != _icp_interface)
00262     return 0;
00263   if (ICPData._multicast_enabled != _multicast_enabled)
00264     return 0;
00265   if (ICPData._icp_query_timeout != _icp_query_timeout)
00266     return 0;
00267   if (ICPData._cache_lookup_local != _cache_lookup_local)
00268     return 0;
00269   if (ICPData._stale_lookup != _stale_lookup)
00270     return 0;
00271   if (ICPData._reply_to_unknown_peer != _reply_to_unknown_peer)
00272     return 0;
00273   if (ICPData._default_reply_port != _default_reply_port)
00274     return 0;
00275   return 1;
00276 }
00277 
00278 
00279 
00280 
00281 
00282 
00283 PeerConfigData::PeerConfigData():_ctype(CTYPE_NONE), _proxy_port(0), _icp_port(0), _mc_member(0), _mc_ttl(0)
00284 {
00285   memset(_hostname, 0, HOSTNAME_SIZE);
00286 }
00287 
00288 PeerType_t PeerConfigData::CTypeToPeerType_t(int ctype)
00289 {
00290   switch (ctype) {
00291   case (int) CTYPE_PARENT:
00292     return PEER_PARENT;
00293 
00294   case (int) CTYPE_SIBLING:
00295     return PEER_SIBLING;
00296 
00297   case (int) CTYPE_LOCAL:
00298     return PEER_LOCAL;
00299 
00300   default:
00301     return PEER_NONE;
00302   }
00303 }
00304 
00305 int
00306 PeerConfigData::GetHostIPByName(char *hostname, IpAddr& rip)
00307 {
00308   
00309   if (0 == hostname || 0 == *hostname)
00310     return 1;                   
00311 
00312   addrinfo hints;
00313   addrinfo *ai;
00314   sockaddr const* best = 0;
00315 
00316   ink_zero(hints);
00317   hints.ai_family = AF_UNSPEC;
00318   hints.ai_flags = AI_ADDRCONFIG;
00319   if (0 == getaddrinfo(hostname, 0, &hints, &ai)) {
00320     for ( addrinfo *spot = ai ; spot ; spot = spot->ai_next) {
00321       
00322       
00323       if (ats_is_ip(spot->ai_addr) &&
00324         (!best || -1 == ats_ip_addr_cmp(spot->ai_addr, best))
00325       ) {
00326         best = spot->ai_addr;
00327       }
00328     }
00329     if (best) rip.assign(best);
00330     freeaddrinfo(ai);
00331   }
00332   return best ? 0 : 1;
00333 }
00334 
00335 bool PeerConfigData::operator==(PeerConfigData & PeerData)
00336 {
00337   if (strncmp(PeerData._hostname, _hostname, PeerConfigData::HOSTNAME_SIZE) != 0)
00338     return false;
00339   if (PeerData._ctype != _ctype)
00340     return false;
00341   if (PeerData._ip_addr != _ip_addr)
00342     return false;
00343   if (PeerData._proxy_port != _proxy_port)
00344     return false;
00345   if (PeerData._icp_port != _icp_port)
00346     return false;
00347   if (PeerData._mc_member != _mc_member)
00348     return false;
00349   if (PeerData._mc_ip_addr != _mc_ip_addr)
00350     return false;
00351   if (PeerData._mc_ttl != _mc_ttl)
00352     return false;
00353   return true;
00354 }
00355 
00356 
00357 
00358 
00359 
00360 ICPConfigUpdateCont::ICPConfigUpdateCont(void *d, void *v):
00361 Continuation(new_ProxyMutex()), _data(d), _value(v)
00362 {
00363 }
00364 
00365 int
00366 ICPConfigUpdateCont::RetryICPconfigUpdate(int , Event * )
00367 {
00368   ICPConfiguration::icp_config_change_callback(_data, _value);
00369   delete this;
00370   return EVENT_DONE;
00371 }
00372 
00373 
00374 
00375 
00376 
00377 typedef int (ICPConfigUpdateCont::*ICPCfgContHandler) (int, void *);
00378 ICPConfiguration::ICPConfiguration():_icp_config_callouts(0)
00379 {
00380   
00381   
00382   
00383   _icp_cdata = new ICPConfigData();
00384   _icp_cdata_current = new ICPConfigData();
00385 
00386   
00387   
00388   
00389   ICP_EstablishStaticConfigInteger(_icp_cdata_current->_icp_enabled, "proxy.config.icp.enabled");
00390   ICP_EstablishStaticConfigInteger(_icp_cdata_current->_icp_port, "proxy.config.icp.icp_port");
00391   ICP_EstablishStaticConfigStringAlloc(_icp_cdata_current->_icp_interface, "proxy.config.icp.icp_interface");
00392   ICP_EstablishStaticConfigInteger(_icp_cdata_current->_multicast_enabled, "proxy.config.icp.multicast_enabled");
00393   ICP_EstablishStaticConfigInteger(_icp_cdata_current->_icp_query_timeout, "proxy.config.icp.query_timeout");
00394   ICP_EstablishStaticConfigInteger(_icp_cdata_current->_cache_lookup_local, "proxy.config.icp.lookup_local");
00395   ICP_EstablishStaticConfigInteger(_icp_cdata_current->_stale_lookup, "proxy.config.icp.stale_icp_enabled");
00396   ICP_EstablishStaticConfigInteger(_icp_cdata_current->_reply_to_unknown_peer,
00397                                    "proxy.config.icp.reply_to_unknown_peer");
00398   ICP_EstablishStaticConfigInteger(_icp_cdata_current->_default_reply_port, "proxy.config.icp.default_reply_port");
00399   UpdateGlobalConfig();         
00400 
00401   
00402   
00403   
00404   for (int n = 0; n <= MAX_DEFINED_PEERS; ++n) {
00405     _peer_cdata[n] = new PeerConfigData;
00406     _peer_cdata_current[n] = new PeerConfigData;
00407   }
00408 
00409   
00410   
00411   
00412   char
00413     icp_config_filename[PATH_NAME_MAX] = "";
00414   ICP_ReadConfigString(icp_config_filename, "proxy.config.icp.icp_configuration", sizeof(icp_config_filename) - 1);
00415   (void) icp_config_change_callback((void *) this, (void *) icp_config_filename, 1);
00416   UpdatePeerConfig();           
00417 
00418   
00419   
00420   
00421   ICP_RegisterConfigUpdateFunc("proxy.config.icp.icp_configuration", mgr_icp_config_change_callback, (void *) this);
00422 }
00423 
00424 ICPConfiguration::~ICPConfiguration()
00425 {
00426   
00427   
00428 #ifdef OMIT
00429   if (_icp_cdata) {
00430     
00431     delete((void *) _icp_cdata);
00432   }
00433   if (_icp_cdata_current)
00434     delete((void *) _icp_cdata_current);
00435   if (_peer_cdata)
00436     delete((void *) _peer_cdata);
00437   if (_peer_cdata_current)
00438     delete((void *) _peer_cdata_current);
00439 #endif // OMIT
00440 }
00441 
00442 int
00443 ICPConfiguration::GlobalConfigChange()
00444 {
00445   return (!(*_icp_cdata == *_icp_cdata_current));
00446 }
00447 
00448 void
00449 ICPConfiguration::UpdateGlobalConfig()
00450 {
00451   *_icp_cdata = *_icp_cdata_current;
00452 }
00453 
00454 int
00455 ICPConfiguration::PeerConfigChange()
00456 {
00457   
00458   for (int i = 1; i <= MAX_DEFINED_PEERS; ++i) {
00459     if (!(*_peer_cdata[i] == *_peer_cdata_current[i]))
00460       return 1;
00461   }
00462   return 0;
00463 }
00464 
00465 void
00466 ICPConfiguration::UpdatePeerConfig()
00467 {
00468   
00469   for (int i = 1; i <= MAX_DEFINED_PEERS; ++i) {
00470     
00471     
00472     
00473     
00474     memcpy(_peer_cdata[i], _peer_cdata_current[i], sizeof(*_peer_cdata[i]));
00475     
00476     if ((_peer_cdata[i]->_ip_addr.isValid()) && _peer_cdata[i]->_hostname) {
00477       
00478       (void) PeerConfigData::GetHostIPByName(_peer_cdata[i]->_hostname, _peer_cdata[i]->_my_ip_addr);
00479     } else {
00480       
00481       _peer_cdata[i]->_my_ip_addr = _peer_cdata[i]->_ip_addr;
00482     }
00483   }
00484 }
00485 
00486 int
00487 ICPConfiguration::mgr_icp_config_change_callback(const char * ,
00488                                                  RecDataT , RecData data, void *cookie)
00489 {
00490   
00491   
00492   
00493   
00494 
00495   
00496 
00497   ICPConfigUpdateCont *rh = new ICPConfigUpdateCont(cookie, data.rec_string);
00498   SET_CONTINUATION_HANDLER(rh, (ICPCfgContHandler) & ICPConfigUpdateCont::RetryICPconfigUpdate);
00499   eventProcessor.schedule_imm(rh, ET_ICP);
00500   return EVENT_DONE;
00501 }
00502 
00503 namespace {
00504   inline char* next_field(char* text, char fs) {
00505     text = strchr(text, fs);
00506     
00507     if (text && *text == fs)
00508       while (text[1] == fs) ++text;
00509     return text;
00510   }
00511 }
00512 
00513 void *
00514 ICPConfiguration::icp_config_change_callback(void *data, void *value, int startup)
00515 {
00516   EThread *thread = this_ethread();
00517   ProxyMutex *mutex = thread->mutex;
00518 
00519   
00520   
00521   
00522   char *filename = (char *) value;
00523   ICPConfiguration *ICPconfig = (ICPConfiguration *) data;
00524 
00525   
00526   
00527   
00528   if (!startup && !ICPconfig->Lock()) {
00529     
00530     ICPConfigUpdateCont *rh = new ICPConfigUpdateCont(data, value);
00531     SET_CONTINUATION_HANDLER(rh, (ICPCfgContHandler) & ICPConfigUpdateCont::RetryICPconfigUpdate);
00532     eventProcessor.schedule_in(rh, HRTIME_MSECONDS(ICPConfigUpdateCont::RETRY_INTERVAL), ET_ICP);
00533     return EVENT_DONE;
00534   }
00535   ICP_INCREMENT_DYN_STAT(config_mgmt_callouts_stat);
00536   ICPconfig->_icp_config_callouts++;
00537 
00538   
00539   
00540   
00541   PeerConfigData *P = new PeerConfigData[MAX_DEFINED_PEERS + 1];
00542 
00543   
00544   
00545   
00546   ink_release_assert(filename != NULL);
00547 
00548   ats_scoped_str config_path(Layout::get()->relative_to(Layout::get()->sysconfdir, filename));
00549   int fd = open(config_path, O_RDONLY);
00550   if (fd < 0) {
00551     RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, open failed");
00552     delete[]P;
00553     return EVENT_DONE;
00554   }
00555   
00556   
00557   
00558   
00559   
00560   
00561   
00562   
00563   
00564   
00565   
00566   
00567   const int colons_per_entry = 8;       
00568 
00569   int error = 0;
00570   int ln = 0;
00571   int n_colons;
00572   char line[512];
00573   char *cur;
00574   char *next;
00575   char *p;
00576   char fs = ':'; 
00577   int len; 
00578 
00579   int n = 1;                    
00580 
00581 
00582   
00583   
00584   
00585 
00586   while ((len = ink_file_fd_readline(fd, sizeof(line) - 1, line)) > 0) {
00587     ln++;
00588     cur = line;
00589     while (isspace(*cur)) ++cur, --len; 
00590     if (!*cur || *cur == '#')
00591       continue;
00592 
00593     if (n >= MAX_DEFINED_PEERS) {
00594       RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, maximum peer entries exceeded");
00595       error = 1;
00596       break;
00597     }
00598     
00599     
00600     
00601     
00602 
00603 
00604 
00605 
00606     char* last = cur + len -1; 
00607     if ('\n' == *last) --last; 
00608     if (NULL == strchr(" ;:|,", *last)) {
00609       RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, invalid separator [value %d]", *last);
00610       error = 1;
00611       break;
00612     }
00613     fs = *last;
00614 
00615     n_colons = 0;
00616     p = cur;
00617     while (0 != (p = next_field(p, fs))) {
00618       ++p;
00619       ++n_colons;
00620     }
00621     if (n_colons != colons_per_entry) {
00622       RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, invalid syntax, line %d: expected %d fields, found %d", ln, colons_per_entry, n_colons);
00623       error = 1;
00624       break;
00625     }
00626     
00627     
00628     
00629     next = next_field(cur, fs);
00630     *next++ = 0;
00631     if (cur != (next - 1)) {
00632       ink_strlcpy(P[n]._hostname, cur, PeerConfigData::HOSTNAME_SIZE);
00633     } else {
00634       P[n]._hostname[0] = 0;
00635     }
00636     
00637     
00638     
00639     cur = next;
00640     next = next_field(next, fs);
00641     *next++ = 0;
00642     if (cur != (next - 1)) {
00643       if (0 != P[n]._ip_addr.load(cur)) {
00644         RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, bad host ip_addr, line %d", ln);
00645         error = 1;
00646         break;
00647       }
00648     } else {
00649       P[n]._ip_addr.invalidate();
00650     }
00651 
00652     if (!P[n]._hostname[0] && !P[n]._ip_addr.isValid()) {
00653       RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, bad hostname, line %d", ln);
00654       error = 1;
00655       break;
00656     }
00657     
00658     
00659     
00660     cur = next;
00661     next = next_field(next, fs);
00662     *next++ = 0;
00663     if (cur != (next - 1)) {
00664       P[n]._ctype = atoi(cur);
00665       if ((P[n]._ctype != PeerConfigData::CTYPE_PARENT) && (P[n]._ctype != PeerConfigData::CTYPE_SIBLING)) {
00666         RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, bad ctype, line %d", ln);
00667         error = 1;
00668         break;
00669       }
00670     } else {
00671       RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, 2bad ctype, line %d", ln);
00672       error = 1;
00673       break;
00674     }
00675     
00676     
00677     
00678     cur = next;
00679     next = next_field(next, fs);
00680     *next++ = 0;
00681     if (cur != (next - 1)) {
00682       if ((P[n]._proxy_port = atoi(cur)) <= 0) {
00683         RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, bad proxy_port, line %d", ln);
00684         error = 1;
00685         break;
00686       }
00687     } else {
00688       RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, 2bad proxy_port, line %d", ln);
00689       error = 1;
00690       break;
00691     }
00692     
00693     
00694     
00695     cur = next;
00696     next = next_field(next, fs);
00697     *next++ = 0;
00698     if (cur != (next - 1)) {
00699       if ((P[n]._icp_port = atoi(cur)) <= 0) {
00700         RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, bad icp_port, line %d", ln);
00701         error = 1;
00702         break;
00703       }
00704     } else {
00705       RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, 2bad icp_port, line %d", ln);
00706       error = 1;
00707       break;
00708     }
00709     
00710     
00711     
00712     cur = next;
00713     next = next_field(next, fs);
00714     *next++ = 0;
00715     if (cur != (next - 1)) {
00716       if ((P[n]._mc_member = atoi(cur)) < 0) {
00717         RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, bad mc_member, line %d", ln);
00718         error = 1;
00719         break;
00720       }
00721       if ((P[n]._mc_member != 0) && (P[n]._mc_member != 1)) {
00722         RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, bad mc_member (2), line %d", ln);
00723         error = 1;
00724         break;
00725       }
00726     } else {
00727       RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, 2bad mc_member, line %d", ln);
00728       error = 1;
00729       break;
00730     }
00731     
00732     
00733     
00734     cur = next;
00735     next = next_field(next, fs);
00736     *next++ = 0;
00737     if (cur != (next - 1)) {
00738       P[n]._mc_ip_addr.load(cur);
00739       
00740       if (P[n]._mc_member != 0 && !P[n]._mc_ip_addr.isMulticast()) {
00741         RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, bad multicast ip_addr, line %d", ln);
00742         error = 1;
00743         break;
00744       }
00745     } else {
00746       P[n]._mc_ip_addr.invalidate();
00747     }
00748     
00749     
00750     
00751     
00752     cur = next;
00753     next = next_field(next, fs);
00754     *next++ = 0;
00755     if (cur != (next - 1)) {
00756       P[n]._mc_ttl = atoi(cur);
00757       if ((P[n]._mc_ttl = atoi(cur)) <= 0) {
00758         RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, bad mc_ttl, line %d", ln);
00759         error = 1;
00760         break;
00761       }
00762     } else {
00763       RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, 2bad mc_ttl, line %d", ln);
00764       error = 1;
00765       break;
00766     }
00767     n++;                        
00768   }
00769   close(fd);
00770 
00771   if (!error) {
00772     for (int i = 0; i <= MAX_DEFINED_PEERS; i++)
00773       *ICPconfig->_peer_cdata_current[i] = P[i];
00774   }
00775   delete[]P;                    
00776   if (!startup)
00777     ICPconfig->Unlock();
00778   return EVENT_DONE;
00779 }
00780 
00781 
00782 
00783 
00784 Peer::Peer(PeerType_t t, ICPProcessor * icpPr, bool dynamic_peer):
00785 buf(NULL), notFirstRead(0), readAction(NULL), writeAction(NULL), _type(t), _next(0), _ICPpr(icpPr), _state(PEER_UP)
00786 {
00787   notFirstRead = 0;
00788   if (dynamic_peer) {
00789     _state |= PEER_DYNAMIC;
00790   }
00791   memset((void *) &this->_stats, 0, sizeof(this->_stats));
00792   ink_zero(fromaddr);
00793   fromaddrlen = sizeof(fromaddr);
00794   _id = 0;
00795 }
00796 
00797 void
00798 Peer::LogRecvMsg(ICPMsg_t * m, int valid)
00799 {
00800   
00801 
00802   
00803   _stats.last_receive = ink_get_hrtime();
00804   if ((m->h.opcode >= ICP_OP_QUERY) && (m->h.opcode <= ICP_OP_LAST)) {
00805     _stats.recv[m->h.opcode]++;
00806   } else {
00807     _stats.recv[ICP_OP_INVALID]++;
00808   }
00809   _stats.total_received++;
00810 
00811   if (!valid) {
00812     
00813     _stats.dropped_replies++;
00814   }
00815   if ((_state & PEER_UP) == 0) {
00816     ip_port_text_buffer ipb;
00817     
00818     
00819     _state |= PEER_UP;
00820     _stats.total_received = _stats.total_sent;  
00821 
00822     Debug("icp", "Peer [%s] now back online", ats_ip_nptop(this->GetIP(), ipb, sizeof(ipb)));
00823   }
00824 }
00825 
00826 
00827 
00828 
00829 
00830 ParentSiblingPeer::ParentSiblingPeer(PeerType_t t, PeerConfigData * p, ICPProcessor * icpPr, bool dynamic_peer)
00831 :Peer(t, icpPr, dynamic_peer), _pconfig(p)
00832 {
00833   ats_ip_set(&_ip.sa, _pconfig->GetIPAddr(), htons(_pconfig->GetICPPort()));
00834 }
00835 
00836 int
00837 ParentSiblingPeer::GetProxyPort()
00838 {
00839   return _pconfig->GetProxyPort();
00840 }
00841 
00842 int
00843 ParentSiblingPeer::GetICPPort()
00844 {
00845   return _pconfig->GetICPPort();
00846 }
00847 
00848 
00849 sockaddr*
00850 ParentSiblingPeer::GetIP()
00851 {
00852   
00853   
00854   return &_ip.sa;
00855 }
00856 
00857 Action *
00858 ParentSiblingPeer::SendMsg_re(Continuation * cont, void *token, struct msghdr * msg, sockaddr const* to)
00859 {
00860   
00861 
00862   Peer *lp = _ICPpr->GetLocalPeer();
00863 
00864   if (to) {
00865     
00866     Peer *p = _ICPpr->FindPeer(IpAddr(to), ntohs(ats_ip_port_cast(to)));
00867     ink_assert(p);
00868 
00869     msg->msg_name = &p->GetSendChan()->addr;
00870     msg->msg_namelen = ats_ip_size(&p->GetSendChan()->addr);
00871     Action *a = udpNet.sendmsg_re(cont, token, lp->GetSendFD(), msg);
00872     return a;
00873   } else {
00874     
00875     msg->msg_name = & _chan.addr;
00876     msg->msg_namelen = ats_ip_size(&_chan.addr.sa);
00877     Action *a = udpNet.sendmsg_re(cont, token, lp->GetSendFD(), msg);
00878     return a;
00879   }
00880 }
00881 
00882 Action *
00883 ParentSiblingPeer::RecvFrom_re(Continuation * cont, void *token,
00884                                IOBufferBlock * bufblock, int size, struct sockaddr * from, socklen_t *fromlen)
00885 {
00886   
00887 
00888   Peer *lp = _ICPpr->GetLocalPeer();
00889   Action *a = udpNet.recvfrom_re(cont, token,
00890                                  lp->GetRecvFD(), from, fromlen,
00891                                  bufblock, size, true, 0);
00892   return a;
00893 }
00894 
00895 int
00896 ParentSiblingPeer::GetRecvFD()
00897 {
00898   return _chan.fd;
00899 }
00900 
00901 int
00902 ParentSiblingPeer::GetSendFD()
00903 {
00904   return _chan.fd;
00905 }
00906 
00907 int
00908 ParentSiblingPeer::ExpectedReplies(BitMap * expected_replies_list)
00909 {
00910   if (((_state & PEER_UP) == 0) || ((_stats.total_sent - _stats.total_received) > Peer::OFFLINE_THRESHOLD)) {
00911     if (_state & PEER_UP) {
00912       ip_port_text_buffer ipb;
00913       _state &= ~PEER_UP;
00914       Debug("icp", "Peer [%s] marked offline", ats_ip_nptop(this->GetIP(), ipb, sizeof(ipb)));
00915     }
00916     
00917     
00918     
00919     
00920     return 0;
00921   } else {
00922     expected_replies_list->SetBit(this->GetPeerID());
00923     return 1;
00924   }
00925 }
00926 
00927 int
00928 ParentSiblingPeer::ValidSender(sockaddr* fr)
00929 {
00930   if (_type == PEER_LOCAL) {
00931     
00932     
00933     
00934     
00935     
00936     
00937     Peer *p = _ICPpr->FindPeer(fr);
00938     if (p) {
00939       return 1;                 
00940     } else {
00941       return 0;                 
00942     }
00943 
00944   } else {
00945     
00946     
00947     
00948     if (ats_ip_addr_eq(this->GetIP(), fr) &&
00949       (ats_ip_port_cast(this->GetIP()) == ats_ip_port_cast(fr))
00950     ) {
00951       return 1;                 
00952     } else {
00953       return 0;                 
00954     }
00955   }
00956 }
00957 
00958 void
00959 ParentSiblingPeer::LogSendMsg(ICPMsg_t * m, sockaddr const* )
00960 {
00961   
00962 
00963   
00964   _stats.last_send = ink_get_hrtime();
00965   _stats.sent[m->h.opcode]++;
00966   _stats.total_sent++;
00967 }
00968 
00969 int
00970 ParentSiblingPeer::ExtToIntRecvSockAddr(sockaddr const* in, sockaddr *out)
00971 {
00972   Peer *p = _ICPpr->FindPeer(IpAddr(in));
00973   if (p && (p->GetType() != PEER_LOCAL)) {
00974     
00975     ats_ip_copy(out, p->GetIP());
00976     return 1;
00977   } else {
00978     return 0;
00979   }
00980 }
00981 
00982 
00983 
00984 
00985 
00986 MultiCastPeer::MultiCastPeer(IpAddr const& addr, uint16_t mc_port, int ttl, ICPProcessor * icpPr)
00987 :Peer(PEER_MULTICAST, icpPr), _mc_ttl(ttl)
00988 {
00989   ats_ip_set(&_mc_ip.sa, addr, htons(mc_port));
00990   memset(&this->_mc, 0, sizeof(this->_mc));
00991 }
00992 
00993 int
00994 MultiCastPeer::GetTTL()
00995 {
00996   return _mc_ttl;
00997 }
00998 
00999 sockaddr *
01000 MultiCastPeer::GetIP()
01001 {
01002   return &_mc_ip.sa;
01003 }
01004 
01005 Action *
01006 MultiCastPeer::SendMsg_re(Continuation * cont, void *token, struct msghdr * msg, sockaddr const* to)
01007 {
01008   Action *a;
01009 
01010   if (to) {
01011     
01012     Peer *p = FindMultiCastChild(IpAddr(to), ats_ip_port_host_order(to));
01013     ink_assert(p);
01014     a = ((ParentSiblingPeer *) p)->SendMsg_re(cont, token, msg, 0);
01015   } else {
01016     
01017     msg->msg_name = (caddr_t) & _send_chan.addr;
01018     msg->msg_namelen = sizeof(_send_chan.addr);
01019     a = udpNet.sendmsg_re(cont, token, _send_chan.fd, msg);
01020   }
01021   return a;
01022 }
01023 
01024 Action *
01025 MultiCastPeer::RecvFrom_re(Continuation * cont, void *token, IOBufferBlock * ,
01026                            int len, struct sockaddr * from, socklen_t *fromlen)
01027 {
01028   Action *a = udpNet.recvfrom_re(cont, token, _recv_chan.fd, from, fromlen, buf, len, true, 0);
01029   return a;
01030 }
01031 
01032 int
01033 MultiCastPeer::GetRecvFD()
01034 {
01035   return _recv_chan.fd;
01036 }
01037 
01038 int
01039 MultiCastPeer::GetSendFD()
01040 {
01041   return _send_chan.fd;
01042 }
01043 
01044 int
01045 MultiCastPeer::ExpectedReplies(BitMap * expected_replies_list)
01046 {
01047   
01048   
01049 
01050   int replies = 0;
01051   ParentSiblingPeer *p = (ParentSiblingPeer *) this->_next;
01052   while (p) {
01053     replies += p->ExpectedReplies(expected_replies_list);
01054     p = (ParentSiblingPeer *) p->GetNext();
01055   }
01056   return replies;
01057 }
01058 
01059 int
01060 MultiCastPeer::ValidSender(sockaddr* sa)
01061 {
01062   
01063   
01064   
01065   Peer *P = _next;
01066   while (P) {
01067     if (ats_ip_addr_eq(P->GetIP(), sa) &&
01068       (ats_ip_port_cast(P->GetIP()) == ats_ip_port_cast(sa))
01069     ) {
01070       return 1;
01071     } else {
01072       P = P->GetNext();
01073     }
01074   }
01075   return 0;
01076 }
01077 
01078 void
01079 MultiCastPeer::LogSendMsg(ICPMsg_t * m, sockaddr const* sa)
01080 {
01081   
01082   if (sa) {
01083     
01084     
01085     
01086     Peer *p;
01087     p = FindMultiCastChild(IpAddr(sa), ats_ip_port_host_order(sa));
01088     if (p)
01089       ((ParentSiblingPeer *) p)->LogSendMsg(m, sa);
01090 
01091   } else {
01092     
01093     _stats.last_send = ink_get_hrtime();
01094     _stats.sent[m->h.opcode]++;
01095     _stats.total_sent++;
01096 
01097     Peer *p = _next;
01098     while (p) {
01099       ((ParentSiblingPeer *) p)->LogSendMsg(m, sa);
01100       p = p->GetNext();
01101     }
01102   }
01103 }
01104 
01105 int
01106 MultiCastPeer::IsOnline()
01107 {
01108   return (_ICPpr->GetConfig()->globalConfig()->ICPmulticastConfigured());
01109 }
01110 
01111 int
01112 MultiCastPeer::AddMultiCastChild(Peer * P)
01113 {
01114   
01115   
01116   sockaddr const* ip = P->GetIP();
01117   if (FindMultiCastChild(IpAddr(ip), ats_ip_port_host_order(ip))) {
01118     ip_text_buffer x;
01119     Warning("bad icp.config, multiple multicast child definitions for ip=%s", ats_ip_ntop(ip, x, sizeof(x)));
01120     return 0;                   
01121   } else {
01122     P->SetNext(this->_next);
01123     this->_next = P;
01124     ++_mc.defined_members;
01125     return 1;                   
01126   }
01127 }
01128 
01129 Peer *
01130 MultiCastPeer::FindMultiCastChild(IpAddr const& addr, uint16_t port)
01131 {
01132   
01133   
01134   
01135   Peer *curP = this->_next;
01136   while (curP) {
01137     sockaddr const* peer_ip = curP->GetIP();
01138     if (addr == peer_ip &&
01139       (!port || port == ats_ip_port_host_order(peer_ip))
01140     ) {
01141       return curP;
01142     } else {
01143       curP = curP->GetNext();
01144     }
01145   }
01146   return NULL;
01147 }
01148 
01149 
01150 
01151 
01152 
01153 typedef int (ICPPeriodicCont::*ICPPeriodicContHandler) (int, void *);
01154 PeriodicCont::PeriodicCont(ICPProcessor * icpP):Continuation(0), _ICPpr(icpP)
01155 {
01156   mutex = new_ProxyMutex();
01157 }
01158 
01159 PeriodicCont::~PeriodicCont()
01160 {
01161   mutex = 0;
01162 }
01163 
01164 
01165 
01166 
01167 ICPPeriodicCont::ICPPeriodicCont(ICPProcessor * icpP)
01168 :PeriodicCont(icpP), _last_icp_config_callouts(0), _global_config_changed(0), _peer_config_changed(0)
01169 {
01170 }
01171 
01172 int
01173 ICPPeriodicCont::PeriodicEvent(int , Event * )
01174 {
01175   int do_reconfig = 0;
01176   ICPConfiguration *C = _ICPpr->GetConfig();
01177 
01178   if (C->GlobalConfigChange())
01179     do_reconfig = 1;
01180 
01181   int configcallouts = C->ICPConfigCallouts();
01182   if (_last_icp_config_callouts != configcallouts) {
01183     
01184     
01185     _last_icp_config_callouts = configcallouts;
01186     do_reconfig = 1;
01187   }
01188 
01189   if (do_reconfig) {
01190     
01191     
01192     
01193     ICPPeriodicCont *rc = new ICPPeriodicCont(_ICPpr);
01194     SET_CONTINUATION_HANDLER(rc, (ICPPeriodicContHandler) & ICPPeriodicCont::DoReconfigAction);
01195     eventProcessor.schedule_imm(rc);
01196   }
01197   return EVENT_CONT;
01198 }
01199 
01200 int
01201 ICPPeriodicCont::DoReconfigAction(int event, Event * e)
01202 {
01203   
01204   
01205   
01206   
01207   ICPConfiguration *C = _ICPpr->GetConfig();
01208 
01209   for (;;) {
01210     switch (event) {
01211     case EVENT_IMMEDIATE:
01212     case EVENT_INTERVAL:
01213       {
01214         ink_assert(!_global_config_changed && !_peer_config_changed);
01215         if (C->Lock()) {
01216           ICP_INCREMENT_DYN_STAT(reconfig_polls_stat);
01217           if (C->GlobalConfigChange()) {
01218             _global_config_changed = 1;
01219           }
01220           
01221           
01222           
01223           
01224           
01225           if (C->PeerConfigChange()) {
01226             _peer_config_changed = 1;
01227           }
01228           if (_global_config_changed || _peer_config_changed) {
01229             
01230             
01231             
01232             ICP_INCREMENT_DYN_STAT(reconfig_events_stat);
01233             ICPProcessor::ReconfigState_t NextState;
01234 
01235             NextState =
01236               _ICPpr->ReconfigureStateMachine(ICPProcessor::RC_RECONFIG, _global_config_changed, _peer_config_changed);
01237             if (NextState == ICPProcessor::RC_DONE) {
01238               
01239               
01240               delete this;
01241               return EVENT_DONE;
01242             } else {
01243               
01244               _global_config_changed = 0;
01245               _peer_config_changed = 0;
01246               C->Unlock();
01247               e->schedule_in(HRTIME_MSECONDS(RETRY_INTERVAL_MSECS));
01248               return EVENT_CONT;
01249             }
01250 
01251           } else {
01252             
01253             C->Unlock();
01254           }
01255 
01256         } else {
01257           
01258           e->schedule_in(HRTIME_MSECONDS(RETRY_INTERVAL_MSECS));
01259           return EVENT_CONT;
01260         }
01261         delete this;
01262         return EVENT_DONE;
01263       }
01264     default:
01265       {
01266         ink_release_assert(!"ICPPeriodicCont::DoReconfigAction() bad event");
01267       }
01268     }                           
01269   }                             
01270 
01271   return EVENT_DONE;
01272 }
01273 
01274 
01275 
01276 
01277 
01278 
01279 ink_hrtime ICPlog::GetElapsedTime()
01280 {
01281   return (ink_get_hrtime() - _s->_start_time);
01282 }
01283 
01284 sockaddr const*
01285 ICPlog::GetClientIP()
01286 {
01287   return &_s->_sender.sa;
01288 }
01289 
01290 in_port_t
01291 ICPlog::GetClientPort()
01292 {
01293   return _s->_sender.port();
01294 }
01295 
01296 SquidLogCode ICPlog::GetAction()
01297 {
01298   if (_s->_queryResult == CACHE_EVENT_LOOKUP)
01299     return SQUID_LOG_UDP_HIT;
01300   else
01301     return SQUID_LOG_UDP_MISS;
01302 }
01303 
01304 const char *
01305 ICPlog::GetCode()
01306 {
01307   static const char *const ICPCodeStr = "000";
01308   return ICPCodeStr;
01309 }
01310 
01311 int
01312 ICPlog::GetSize()
01313 {
01314   return ntohs(_s->_rICPmsg->h.msglen);
01315 }
01316 
01317 const char *
01318 ICPlog::GetMethod()
01319 {
01320   return HTTP_METHOD_ICP_QUERY;
01321 }
01322 
01323 const char *
01324 ICPlog::GetURI()
01325 {
01326   return (const char *) _s->_rICPmsg->un.query.URL;
01327 }
01328 
01329 const char *
01330 ICPlog::GetIdent()
01331 {
01332   static const char *const ICPidentStr = "";
01333   return ICPidentStr;
01334 }
01335 
01336 SquidHierarchyCode ICPlog::GetHierarchy()
01337 {
01338   return SQUID_HIER_NONE;
01339 }
01340 
01341 const char *
01342 ICPlog::GetFromHost()
01343 {
01344   static const char *const FromHostStr = "";
01345   return FromHostStr;
01346 }
01347 
01348 const char *
01349 ICPlog::GetContentType()
01350 {
01351   static const char *const ICPcontentTypeStr = "";
01352   return ICPcontentTypeStr;
01353 }
01354 
01355 
01356 
01357 
01358 
01359 static const char *ICPstatNames[] = {
01360   "icp_stat_def",
01361   "config_mgmt_callouts_stat",
01362   "reconfig_polls_stat",
01363   "reconfig_events_stat",
01364   "invalid_poll_data_stat",
01365   "no_data_read_stat",
01366   "short_read_stat",
01367   "invalid_sender_stat",
01368   "read_not_v2_icp_stat",
01369   "icp_remote_query_requests_stat",
01370   "icp_remote_responses_stat",
01371   "icp_cache_lookup_success_stat",
01372   "icp_cache_lookup_fail_stat",
01373   "query_response_write_stat",
01374   "query_response_partial_write_stat",
01375   "no_icp_request_for_response_stat",
01376   "icp_response_request_nolock_stat",
01377   "icp_start_icpoff_stat",
01378   "send_query_partial_write_stat",
01379   "icp_queries_no_expected_replies_stat",
01380   "icp_query_hits_stat",
01381   "icp_query_misses_stat",
01382   "invalid_icp_query_response_stat",
01383   "icp_query_requests_stat",
01384   "total_icp_response_time_stat",
01385   "total_udp_send_queries_stat",
01386   "total_icp_request_time_stat",
01387   "icp_total_reloads",
01388   "icp_pending_reloads",
01389   "icp_reload_start_aborts",
01390   "icp_reload_connect_aborts",
01391   "icp_reload_read_aborts",
01392   "icp_reload_write_aborts",
01393   "icp_reload_successes",
01394   "icp_stat_count",
01395   ""
01396 };
01397 
01398 void
01399 dumpICPstatEntry(int i, const char *name)
01400 {
01401   int l = strlen(name);
01402   int64_t sval, cval;
01403 
01404   RecRawStat *p = RecGetGlobalRawStatPtr(icp_rsb, i);
01405   sval = p->sum;
01406   cval = p->count;
01407 
01408   printf("%-32s %12" PRId64 " %16" PRId64 " %17.4f\n", &name[l > 31 ? l - 31 : 0], cval, sval,
01409          cval ? (((double) sval) / ((double) cval)) : 0.0);
01410 }
01411 
01412 void
01413 dumpICPstats()
01414 {
01415   printf("\n");
01416   int i;
01417   for (i = 0; i < icp_stat_count; ++i) {
01418     dumpICPstatEntry(i, ICPstatNames[i]);
01419   }
01420 }
01421 
01422 
01423 void
01424 ICPProcessor::DumpICPConfig()
01425 {
01426   Peer *P;
01427   PeerType_t type;
01428   int id;
01429   ip_port_text_buffer ipb;
01430 
01431   Debug("icp", "On=%d, MultiCast=%d, Timeout=%d LocalCacheLookup=%d",
01432         GetConfig()->globalConfig()->ICPconfigured(),
01433         GetConfig()->globalConfig()->ICPmulticastConfigured(),
01434         GetConfig()->globalConfig()->ICPqueryTimeout(), GetConfig()->globalConfig()->ICPLocalCacheLookup());
01435   Debug("icp", "StaleLookup=%d, ReplyToUnknowPeer=%d, DefaultReplyPort=%d",
01436         GetConfig()->globalConfig()->ICPStaleLookup(),
01437         GetConfig()->globalConfig()->ICPReplyToUnknownPeer(), GetConfig()->globalConfig()->ICPDefaultReplyPort());
01438 
01439   for (int i = 0; i < (_nPeerList + 1); i++) {
01440     P = _PeerList[i];
01441     id = P->GetPeerID();
01442     type = P->GetType();
01443     const char *str_type;
01444 
01445     switch (type) {
01446     case PEER_PARENT:
01447       {
01448         str_type = "P";
01449         break;
01450       }
01451     case PEER_SIBLING:
01452       {
01453         str_type = "S";
01454         break;
01455       }
01456     case PEER_LOCAL:
01457       {
01458         str_type = "L";
01459         break;
01460       }
01461     case PEER_MULTICAST:
01462       {
01463         str_type = "M";
01464         break;
01465       }
01466     default:
01467       {
01468         str_type = "N";
01469         break;
01470       }
01471     }                           
01472 
01473     if (*str_type == 'M') {
01474       Debug("icp", "[%d]: Type=%s IP=%s", id, str_type, ats_ip_nptop(P->GetIP(), ipb, sizeof(ipb)));
01475     } else {
01476       ParentSiblingPeer *Pps = static_cast<ParentSiblingPeer *>(P);
01477       Debug("icp",
01478             "[%d]: Type=%s IP=%s PPort=%d Host=%s",
01479         id, str_type, ats_ip_nptop(P->GetIP(), ipb, sizeof(ipb)),
01480         Pps->GetConfig()->GetProxyPort(), Pps->GetConfig()->GetHostname());
01481 
01482       Debug("icp",
01483             "[%d]: MC ON=%d MC_IP=%s MC_TTL=%d",
01484             id, Pps->GetConfig()->MultiCastMember(),
01485         Pps->GetConfig()->GetMultiCastIPAddr().toString(ipb, sizeof(ipb)),
01486         Pps->GetConfig()->GetMultiCastTTL());
01487     }
01488   }
01489 }
01490 
01491 
01492 
01493