00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include "libts.h"
00032 #include "P_EventSystem.h"
00033 #include "P_Cache.h"
00034 #include "P_Net.h"
00035 #include "P_RecProcess.h"
00036 #include "Main.h"
00037 #include "ICP.h"
00038 #include "ICPProcessor.h"
00039 #include "ICPlog.h"
00040 #include "BaseManager.h"
00041 #include "I_Layout.h"
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125 #if !defined(USE_CAS_FOR_ATOMICLOCK)
00126 AtomicLock::AtomicLock()
00127 {
00128 _mutex = new_ProxyMutex();
00129 }
00130
00131 AtomicLock::~AtomicLock()
00132 {
00133 }
00134
00135 int
00136 AtomicLock::Lock()
00137 {
00138 EThread *et = this_ethread();
00139 ink_assert(et != NULL);
00140 return MUTEX_TAKE_TRY_LOCK(_mutex, et);
00141 }
00142
00143 int
00144 AtomicLock::HaveLock()
00145 {
00146 EThread *et = this_ethread();
00147 ink_assert(et != NULL);
00148 return (_mutex->thread_holding == et);
00149 }
00150
00151 void
00152 AtomicLock::Unlock()
00153 {
00154 EThread *et = this_ethread();
00155 ink_assert(et != NULL);
00156 MUTEX_UNTAKE_LOCK(_mutex, et);
00157 }
00158
00159 #else // USE_CAS_FOR_ATOMICLOCK
00160 AtomicLock::AtomicLock():lock_word(0)
00161 {
00162 }
00163
00164 AtomicLock::~AtomicLock()
00165 {
00166 }
00167
00168 int
00169 AtomicLock::Lock()
00170 {
00171 bool status = ink_atomic_cas(&_lock_word, AtomicLock::UNLOCKED,
00172 AtomicLock::LOCKED);
00173 return status;
00174 }
00175
00176 int
00177 AtomicLock::HaveLock()
00178 {
00179 return (_lock_word == LOCKED);
00180 }
00181
00182 void
00183 AtomicLock::Unlock()
00184 {
00185 ink_assert(_lock_word == AtomicLock::LOCKED);
00186 _lock_word = AtomicLock::UNLOCKED;
00187 }
00188 #endif // USE_CAS_FOR_ATOMICLOCK
00189
00190
00191
00192
00193
00194
00195 BitMap::BitMap(int bitmap_maxsize)
00196 {
00197 if (bitmap_maxsize <= (int) (STATIC_BITMAP_BYTE_SIZE * BITS_PER_BYTE)) {
00198 _bitmap = _static_bitmap;
00199 _bitmap_size = bitmap_maxsize;
00200 _bitmap_byte_size = STATIC_BITMAP_BYTE_SIZE;
00201 } else {
00202 _bitmap_byte_size = (bitmap_maxsize + (BITS_PER_BYTE - 1)) / BITS_PER_BYTE;
00203 _bitmap = new char[_bitmap_byte_size];
00204 _bitmap_size = bitmap_maxsize;
00205 }
00206 memset((void *) _bitmap, 0, _bitmap_byte_size);
00207 }
00208
00209 BitMap::~BitMap()
00210 {
00211 if (_bitmap_size > (int) (STATIC_BITMAP_BYTE_SIZE * BITS_PER_BYTE)) {
00212 delete[]_bitmap;
00213 }
00214 }
00215
00216 void
00217 BitMap::SetBit(int bit)
00218 {
00219 if (bit >= _bitmap_size)
00220 return;
00221
00222 char *pbyte = &_bitmap[bit / BITS_PER_BYTE];
00223 *pbyte |= (1 << (bit % BITS_PER_BYTE));
00224 }
00225
00226 void
00227 BitMap::ClearBit(int bit)
00228 {
00229 if (bit >= _bitmap_size)
00230 return;
00231
00232 char *pbyte = &_bitmap[bit / BITS_PER_BYTE];
00233 *pbyte &= ~(1 << (bit % BITS_PER_BYTE));
00234 }
00235
00236 int
00237 BitMap::IsBitSet(int bit)
00238 {
00239 if (bit >= _bitmap_size)
00240 return 0;
00241
00242 char *pbyte = &_bitmap[bit / BITS_PER_BYTE];
00243 if (*pbyte & (1 << (bit % BITS_PER_BYTE)))
00244 return 1;
00245 else
00246 return 0;
00247 }
00248
00249
00250
00251
00252
00253
00254 int
00255 ICPConfigData::operator==(ICPConfigData & ICPData)
00256 {
00257 if (ICPData._icp_enabled != _icp_enabled)
00258 return 0;
00259 if (ICPData._icp_port != _icp_port)
00260 return 0;
00261 if (ICPData._icp_interface != _icp_interface)
00262 return 0;
00263 if (ICPData._multicast_enabled != _multicast_enabled)
00264 return 0;
00265 if (ICPData._icp_query_timeout != _icp_query_timeout)
00266 return 0;
00267 if (ICPData._cache_lookup_local != _cache_lookup_local)
00268 return 0;
00269 if (ICPData._stale_lookup != _stale_lookup)
00270 return 0;
00271 if (ICPData._reply_to_unknown_peer != _reply_to_unknown_peer)
00272 return 0;
00273 if (ICPData._default_reply_port != _default_reply_port)
00274 return 0;
00275 return 1;
00276 }
00277
00278
00279
00280
00281
00282
00283 PeerConfigData::PeerConfigData():_ctype(CTYPE_NONE), _proxy_port(0), _icp_port(0), _mc_member(0), _mc_ttl(0)
00284 {
00285 memset(_hostname, 0, HOSTNAME_SIZE);
00286 }
00287
00288 PeerType_t PeerConfigData::CTypeToPeerType_t(int ctype)
00289 {
00290 switch (ctype) {
00291 case (int) CTYPE_PARENT:
00292 return PEER_PARENT;
00293
00294 case (int) CTYPE_SIBLING:
00295 return PEER_SIBLING;
00296
00297 case (int) CTYPE_LOCAL:
00298 return PEER_LOCAL;
00299
00300 default:
00301 return PEER_NONE;
00302 }
00303 }
00304
00305 int
00306 PeerConfigData::GetHostIPByName(char *hostname, IpAddr& rip)
00307 {
00308
00309 if (0 == hostname || 0 == *hostname)
00310 return 1;
00311
00312 addrinfo hints;
00313 addrinfo *ai;
00314 sockaddr const* best = 0;
00315
00316 ink_zero(hints);
00317 hints.ai_family = AF_UNSPEC;
00318 hints.ai_flags = AI_ADDRCONFIG;
00319 if (0 == getaddrinfo(hostname, 0, &hints, &ai)) {
00320 for ( addrinfo *spot = ai ; spot ; spot = spot->ai_next) {
00321
00322
00323 if (ats_is_ip(spot->ai_addr) &&
00324 (!best || -1 == ats_ip_addr_cmp(spot->ai_addr, best))
00325 ) {
00326 best = spot->ai_addr;
00327 }
00328 }
00329 if (best) rip.assign(best);
00330 freeaddrinfo(ai);
00331 }
00332 return best ? 0 : 1;
00333 }
00334
00335 bool PeerConfigData::operator==(PeerConfigData & PeerData)
00336 {
00337 if (strncmp(PeerData._hostname, _hostname, PeerConfigData::HOSTNAME_SIZE) != 0)
00338 return false;
00339 if (PeerData._ctype != _ctype)
00340 return false;
00341 if (PeerData._ip_addr != _ip_addr)
00342 return false;
00343 if (PeerData._proxy_port != _proxy_port)
00344 return false;
00345 if (PeerData._icp_port != _icp_port)
00346 return false;
00347 if (PeerData._mc_member != _mc_member)
00348 return false;
00349 if (PeerData._mc_ip_addr != _mc_ip_addr)
00350 return false;
00351 if (PeerData._mc_ttl != _mc_ttl)
00352 return false;
00353 return true;
00354 }
00355
00356
00357
00358
00359
00360 ICPConfigUpdateCont::ICPConfigUpdateCont(void *d, void *v):
00361 Continuation(new_ProxyMutex()), _data(d), _value(v)
00362 {
00363 }
00364
00365 int
00366 ICPConfigUpdateCont::RetryICPconfigUpdate(int , Event * )
00367 {
00368 ICPConfiguration::icp_config_change_callback(_data, _value);
00369 delete this;
00370 return EVENT_DONE;
00371 }
00372
00373
00374
00375
00376
00377 typedef int (ICPConfigUpdateCont::*ICPCfgContHandler) (int, void *);
00378 ICPConfiguration::ICPConfiguration():_icp_config_callouts(0)
00379 {
00380
00381
00382
00383 _icp_cdata = new ICPConfigData();
00384 _icp_cdata_current = new ICPConfigData();
00385
00386
00387
00388
00389 ICP_EstablishStaticConfigInteger(_icp_cdata_current->_icp_enabled, "proxy.config.icp.enabled");
00390 ICP_EstablishStaticConfigInteger(_icp_cdata_current->_icp_port, "proxy.config.icp.icp_port");
00391 ICP_EstablishStaticConfigStringAlloc(_icp_cdata_current->_icp_interface, "proxy.config.icp.icp_interface");
00392 ICP_EstablishStaticConfigInteger(_icp_cdata_current->_multicast_enabled, "proxy.config.icp.multicast_enabled");
00393 ICP_EstablishStaticConfigInteger(_icp_cdata_current->_icp_query_timeout, "proxy.config.icp.query_timeout");
00394 ICP_EstablishStaticConfigInteger(_icp_cdata_current->_cache_lookup_local, "proxy.config.icp.lookup_local");
00395 ICP_EstablishStaticConfigInteger(_icp_cdata_current->_stale_lookup, "proxy.config.icp.stale_icp_enabled");
00396 ICP_EstablishStaticConfigInteger(_icp_cdata_current->_reply_to_unknown_peer,
00397 "proxy.config.icp.reply_to_unknown_peer");
00398 ICP_EstablishStaticConfigInteger(_icp_cdata_current->_default_reply_port, "proxy.config.icp.default_reply_port");
00399 UpdateGlobalConfig();
00400
00401
00402
00403
00404 for (int n = 0; n <= MAX_DEFINED_PEERS; ++n) {
00405 _peer_cdata[n] = new PeerConfigData;
00406 _peer_cdata_current[n] = new PeerConfigData;
00407 }
00408
00409
00410
00411
00412 char
00413 icp_config_filename[PATH_NAME_MAX] = "";
00414 ICP_ReadConfigString(icp_config_filename, "proxy.config.icp.icp_configuration", sizeof(icp_config_filename) - 1);
00415 (void) icp_config_change_callback((void *) this, (void *) icp_config_filename, 1);
00416 UpdatePeerConfig();
00417
00418
00419
00420
00421 ICP_RegisterConfigUpdateFunc("proxy.config.icp.icp_configuration", mgr_icp_config_change_callback, (void *) this);
00422 }
00423
00424 ICPConfiguration::~ICPConfiguration()
00425 {
00426
00427
00428 #ifdef OMIT
00429 if (_icp_cdata) {
00430
00431 delete((void *) _icp_cdata);
00432 }
00433 if (_icp_cdata_current)
00434 delete((void *) _icp_cdata_current);
00435 if (_peer_cdata)
00436 delete((void *) _peer_cdata);
00437 if (_peer_cdata_current)
00438 delete((void *) _peer_cdata_current);
00439 #endif // OMIT
00440 }
00441
00442 int
00443 ICPConfiguration::GlobalConfigChange()
00444 {
00445 return (!(*_icp_cdata == *_icp_cdata_current));
00446 }
00447
00448 void
00449 ICPConfiguration::UpdateGlobalConfig()
00450 {
00451 *_icp_cdata = *_icp_cdata_current;
00452 }
00453
00454 int
00455 ICPConfiguration::PeerConfigChange()
00456 {
00457
00458 for (int i = 1; i <= MAX_DEFINED_PEERS; ++i) {
00459 if (!(*_peer_cdata[i] == *_peer_cdata_current[i]))
00460 return 1;
00461 }
00462 return 0;
00463 }
00464
00465 void
00466 ICPConfiguration::UpdatePeerConfig()
00467 {
00468
00469 for (int i = 1; i <= MAX_DEFINED_PEERS; ++i) {
00470
00471
00472
00473
00474 memcpy(_peer_cdata[i], _peer_cdata_current[i], sizeof(*_peer_cdata[i]));
00475
00476 if ((_peer_cdata[i]->_ip_addr.isValid()) && _peer_cdata[i]->_hostname) {
00477
00478 (void) PeerConfigData::GetHostIPByName(_peer_cdata[i]->_hostname, _peer_cdata[i]->_my_ip_addr);
00479 } else {
00480
00481 _peer_cdata[i]->_my_ip_addr = _peer_cdata[i]->_ip_addr;
00482 }
00483 }
00484 }
00485
00486 int
00487 ICPConfiguration::mgr_icp_config_change_callback(const char * ,
00488 RecDataT , RecData data, void *cookie)
00489 {
00490
00491
00492
00493
00494
00495
00496
00497 ICPConfigUpdateCont *rh = new ICPConfigUpdateCont(cookie, data.rec_string);
00498 SET_CONTINUATION_HANDLER(rh, (ICPCfgContHandler) & ICPConfigUpdateCont::RetryICPconfigUpdate);
00499 eventProcessor.schedule_imm(rh, ET_ICP);
00500 return EVENT_DONE;
00501 }
00502
00503 namespace {
00504 inline char* next_field(char* text, char fs) {
00505 text = strchr(text, fs);
00506
00507 if (text && *text == fs)
00508 while (text[1] == fs) ++text;
00509 return text;
00510 }
00511 }
00512
00513 void *
00514 ICPConfiguration::icp_config_change_callback(void *data, void *value, int startup)
00515 {
00516 EThread *thread = this_ethread();
00517 ProxyMutex *mutex = thread->mutex;
00518
00519
00520
00521
00522 char *filename = (char *) value;
00523 ICPConfiguration *ICPconfig = (ICPConfiguration *) data;
00524
00525
00526
00527
00528 if (!startup && !ICPconfig->Lock()) {
00529
00530 ICPConfigUpdateCont *rh = new ICPConfigUpdateCont(data, value);
00531 SET_CONTINUATION_HANDLER(rh, (ICPCfgContHandler) & ICPConfigUpdateCont::RetryICPconfigUpdate);
00532 eventProcessor.schedule_in(rh, HRTIME_MSECONDS(ICPConfigUpdateCont::RETRY_INTERVAL), ET_ICP);
00533 return EVENT_DONE;
00534 }
00535 ICP_INCREMENT_DYN_STAT(config_mgmt_callouts_stat);
00536 ICPconfig->_icp_config_callouts++;
00537
00538
00539
00540
00541 PeerConfigData *P = new PeerConfigData[MAX_DEFINED_PEERS + 1];
00542
00543
00544
00545
00546 ink_release_assert(filename != NULL);
00547
00548 ats_scoped_str config_path(Layout::get()->relative_to(Layout::get()->sysconfdir, filename));
00549 int fd = open(config_path, O_RDONLY);
00550 if (fd < 0) {
00551 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, open failed");
00552 delete[]P;
00553 return EVENT_DONE;
00554 }
00555
00556
00557
00558
00559
00560
00561
00562
00563
00564
00565
00566
00567 const int colons_per_entry = 8;
00568
00569 int error = 0;
00570 int ln = 0;
00571 int n_colons;
00572 char line[512];
00573 char *cur;
00574 char *next;
00575 char *p;
00576 char fs = ':';
00577 int len;
00578
00579 int n = 1;
00580
00581
00582
00583
00584
00585
00586 while ((len = ink_file_fd_readline(fd, sizeof(line) - 1, line)) > 0) {
00587 ln++;
00588 cur = line;
00589 while (isspace(*cur)) ++cur, --len;
00590 if (!*cur || *cur == '#')
00591 continue;
00592
00593 if (n >= MAX_DEFINED_PEERS) {
00594 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, maximum peer entries exceeded");
00595 error = 1;
00596 break;
00597 }
00598
00599
00600
00601
00602
00603
00604
00605
00606 char* last = cur + len -1;
00607 if ('\n' == *last) --last;
00608 if (NULL == strchr(" ;:|,", *last)) {
00609 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, invalid separator [value %d]", *last);
00610 error = 1;
00611 break;
00612 }
00613 fs = *last;
00614
00615 n_colons = 0;
00616 p = cur;
00617 while (0 != (p = next_field(p, fs))) {
00618 ++p;
00619 ++n_colons;
00620 }
00621 if (n_colons != colons_per_entry) {
00622 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, invalid syntax, line %d: expected %d fields, found %d", ln, colons_per_entry, n_colons);
00623 error = 1;
00624 break;
00625 }
00626
00627
00628
00629 next = next_field(cur, fs);
00630 *next++ = 0;
00631 if (cur != (next - 1)) {
00632 ink_strlcpy(P[n]._hostname, cur, PeerConfigData::HOSTNAME_SIZE);
00633 } else {
00634 P[n]._hostname[0] = 0;
00635 }
00636
00637
00638
00639 cur = next;
00640 next = next_field(next, fs);
00641 *next++ = 0;
00642 if (cur != (next - 1)) {
00643 if (0 != P[n]._ip_addr.load(cur)) {
00644 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, bad host ip_addr, line %d", ln);
00645 error = 1;
00646 break;
00647 }
00648 } else {
00649 P[n]._ip_addr.invalidate();
00650 }
00651
00652 if (!P[n]._hostname[0] && !P[n]._ip_addr.isValid()) {
00653 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, bad hostname, line %d", ln);
00654 error = 1;
00655 break;
00656 }
00657
00658
00659
00660 cur = next;
00661 next = next_field(next, fs);
00662 *next++ = 0;
00663 if (cur != (next - 1)) {
00664 P[n]._ctype = atoi(cur);
00665 if ((P[n]._ctype != PeerConfigData::CTYPE_PARENT) && (P[n]._ctype != PeerConfigData::CTYPE_SIBLING)) {
00666 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, bad ctype, line %d", ln);
00667 error = 1;
00668 break;
00669 }
00670 } else {
00671 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, 2bad ctype, line %d", ln);
00672 error = 1;
00673 break;
00674 }
00675
00676
00677
00678 cur = next;
00679 next = next_field(next, fs);
00680 *next++ = 0;
00681 if (cur != (next - 1)) {
00682 if ((P[n]._proxy_port = atoi(cur)) <= 0) {
00683 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, bad proxy_port, line %d", ln);
00684 error = 1;
00685 break;
00686 }
00687 } else {
00688 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, 2bad proxy_port, line %d", ln);
00689 error = 1;
00690 break;
00691 }
00692
00693
00694
00695 cur = next;
00696 next = next_field(next, fs);
00697 *next++ = 0;
00698 if (cur != (next - 1)) {
00699 if ((P[n]._icp_port = atoi(cur)) <= 0) {
00700 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, bad icp_port, line %d", ln);
00701 error = 1;
00702 break;
00703 }
00704 } else {
00705 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, 2bad icp_port, line %d", ln);
00706 error = 1;
00707 break;
00708 }
00709
00710
00711
00712 cur = next;
00713 next = next_field(next, fs);
00714 *next++ = 0;
00715 if (cur != (next - 1)) {
00716 if ((P[n]._mc_member = atoi(cur)) < 0) {
00717 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, bad mc_member, line %d", ln);
00718 error = 1;
00719 break;
00720 }
00721 if ((P[n]._mc_member != 0) && (P[n]._mc_member != 1)) {
00722 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, bad mc_member (2), line %d", ln);
00723 error = 1;
00724 break;
00725 }
00726 } else {
00727 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, 2bad mc_member, line %d", ln);
00728 error = 1;
00729 break;
00730 }
00731
00732
00733
00734 cur = next;
00735 next = next_field(next, fs);
00736 *next++ = 0;
00737 if (cur != (next - 1)) {
00738 P[n]._mc_ip_addr.load(cur);
00739
00740 if (P[n]._mc_member != 0 && !P[n]._mc_ip_addr.isMulticast()) {
00741 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, bad multicast ip_addr, line %d", ln);
00742 error = 1;
00743 break;
00744 }
00745 } else {
00746 P[n]._mc_ip_addr.invalidate();
00747 }
00748
00749
00750
00751
00752 cur = next;
00753 next = next_field(next, fs);
00754 *next++ = 0;
00755 if (cur != (next - 1)) {
00756 P[n]._mc_ttl = atoi(cur);
00757 if ((P[n]._mc_ttl = atoi(cur)) <= 0) {
00758 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, bad mc_ttl, line %d", ln);
00759 error = 1;
00760 break;
00761 }
00762 } else {
00763 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "read icp.config, 2bad mc_ttl, line %d", ln);
00764 error = 1;
00765 break;
00766 }
00767 n++;
00768 }
00769 close(fd);
00770
00771 if (!error) {
00772 for (int i = 0; i <= MAX_DEFINED_PEERS; i++)
00773 *ICPconfig->_peer_cdata_current[i] = P[i];
00774 }
00775 delete[]P;
00776 if (!startup)
00777 ICPconfig->Unlock();
00778 return EVENT_DONE;
00779 }
00780
00781
00782
00783
00784 Peer::Peer(PeerType_t t, ICPProcessor * icpPr, bool dynamic_peer):
00785 buf(NULL), notFirstRead(0), readAction(NULL), writeAction(NULL), _type(t), _next(0), _ICPpr(icpPr), _state(PEER_UP)
00786 {
00787 notFirstRead = 0;
00788 if (dynamic_peer) {
00789 _state |= PEER_DYNAMIC;
00790 }
00791 memset((void *) &this->_stats, 0, sizeof(this->_stats));
00792 ink_zero(fromaddr);
00793 fromaddrlen = sizeof(fromaddr);
00794 _id = 0;
00795 }
00796
00797 void
00798 Peer::LogRecvMsg(ICPMsg_t * m, int valid)
00799 {
00800
00801
00802
00803 _stats.last_receive = ink_get_hrtime();
00804 if ((m->h.opcode >= ICP_OP_QUERY) && (m->h.opcode <= ICP_OP_LAST)) {
00805 _stats.recv[m->h.opcode]++;
00806 } else {
00807 _stats.recv[ICP_OP_INVALID]++;
00808 }
00809 _stats.total_received++;
00810
00811 if (!valid) {
00812
00813 _stats.dropped_replies++;
00814 }
00815 if ((_state & PEER_UP) == 0) {
00816 ip_port_text_buffer ipb;
00817
00818
00819 _state |= PEER_UP;
00820 _stats.total_received = _stats.total_sent;
00821
00822 Debug("icp", "Peer [%s] now back online", ats_ip_nptop(this->GetIP(), ipb, sizeof(ipb)));
00823 }
00824 }
00825
00826
00827
00828
00829
00830 ParentSiblingPeer::ParentSiblingPeer(PeerType_t t, PeerConfigData * p, ICPProcessor * icpPr, bool dynamic_peer)
00831 :Peer(t, icpPr, dynamic_peer), _pconfig(p)
00832 {
00833 ats_ip_set(&_ip.sa, _pconfig->GetIPAddr(), htons(_pconfig->GetICPPort()));
00834 }
00835
00836 int
00837 ParentSiblingPeer::GetProxyPort()
00838 {
00839 return _pconfig->GetProxyPort();
00840 }
00841
00842 int
00843 ParentSiblingPeer::GetICPPort()
00844 {
00845 return _pconfig->GetICPPort();
00846 }
00847
00848
00849 sockaddr*
00850 ParentSiblingPeer::GetIP()
00851 {
00852
00853
00854 return &_ip.sa;
00855 }
00856
00857 Action *
00858 ParentSiblingPeer::SendMsg_re(Continuation * cont, void *token, struct msghdr * msg, sockaddr const* to)
00859 {
00860
00861
00862 Peer *lp = _ICPpr->GetLocalPeer();
00863
00864 if (to) {
00865
00866 Peer *p = _ICPpr->FindPeer(IpAddr(to), ntohs(ats_ip_port_cast(to)));
00867 ink_assert(p);
00868
00869 msg->msg_name = &p->GetSendChan()->addr;
00870 msg->msg_namelen = ats_ip_size(&p->GetSendChan()->addr);
00871 Action *a = udpNet.sendmsg_re(cont, token, lp->GetSendFD(), msg);
00872 return a;
00873 } else {
00874
00875 msg->msg_name = & _chan.addr;
00876 msg->msg_namelen = ats_ip_size(&_chan.addr.sa);
00877 Action *a = udpNet.sendmsg_re(cont, token, lp->GetSendFD(), msg);
00878 return a;
00879 }
00880 }
00881
00882 Action *
00883 ParentSiblingPeer::RecvFrom_re(Continuation * cont, void *token,
00884 IOBufferBlock * bufblock, int size, struct sockaddr * from, socklen_t *fromlen)
00885 {
00886
00887
00888 Peer *lp = _ICPpr->GetLocalPeer();
00889 Action *a = udpNet.recvfrom_re(cont, token,
00890 lp->GetRecvFD(), from, fromlen,
00891 bufblock, size, true, 0);
00892 return a;
00893 }
00894
00895 int
00896 ParentSiblingPeer::GetRecvFD()
00897 {
00898 return _chan.fd;
00899 }
00900
00901 int
00902 ParentSiblingPeer::GetSendFD()
00903 {
00904 return _chan.fd;
00905 }
00906
00907 int
00908 ParentSiblingPeer::ExpectedReplies(BitMap * expected_replies_list)
00909 {
00910 if (((_state & PEER_UP) == 0) || ((_stats.total_sent - _stats.total_received) > Peer::OFFLINE_THRESHOLD)) {
00911 if (_state & PEER_UP) {
00912 ip_port_text_buffer ipb;
00913 _state &= ~PEER_UP;
00914 Debug("icp", "Peer [%s] marked offline", ats_ip_nptop(this->GetIP(), ipb, sizeof(ipb)));
00915 }
00916
00917
00918
00919
00920 return 0;
00921 } else {
00922 expected_replies_list->SetBit(this->GetPeerID());
00923 return 1;
00924 }
00925 }
00926
00927 int
00928 ParentSiblingPeer::ValidSender(sockaddr* fr)
00929 {
00930 if (_type == PEER_LOCAL) {
00931
00932
00933
00934
00935
00936
00937 Peer *p = _ICPpr->FindPeer(fr);
00938 if (p) {
00939 return 1;
00940 } else {
00941 return 0;
00942 }
00943
00944 } else {
00945
00946
00947
00948 if (ats_ip_addr_eq(this->GetIP(), fr) &&
00949 (ats_ip_port_cast(this->GetIP()) == ats_ip_port_cast(fr))
00950 ) {
00951 return 1;
00952 } else {
00953 return 0;
00954 }
00955 }
00956 }
00957
00958 void
00959 ParentSiblingPeer::LogSendMsg(ICPMsg_t * m, sockaddr const* )
00960 {
00961
00962
00963
00964 _stats.last_send = ink_get_hrtime();
00965 _stats.sent[m->h.opcode]++;
00966 _stats.total_sent++;
00967 }
00968
00969 int
00970 ParentSiblingPeer::ExtToIntRecvSockAddr(sockaddr const* in, sockaddr *out)
00971 {
00972 Peer *p = _ICPpr->FindPeer(IpAddr(in));
00973 if (p && (p->GetType() != PEER_LOCAL)) {
00974
00975 ats_ip_copy(out, p->GetIP());
00976 return 1;
00977 } else {
00978 return 0;
00979 }
00980 }
00981
00982
00983
00984
00985
00986 MultiCastPeer::MultiCastPeer(IpAddr const& addr, uint16_t mc_port, int ttl, ICPProcessor * icpPr)
00987 :Peer(PEER_MULTICAST, icpPr), _mc_ttl(ttl)
00988 {
00989 ats_ip_set(&_mc_ip.sa, addr, htons(mc_port));
00990 memset(&this->_mc, 0, sizeof(this->_mc));
00991 }
00992
00993 int
00994 MultiCastPeer::GetTTL()
00995 {
00996 return _mc_ttl;
00997 }
00998
00999 sockaddr *
01000 MultiCastPeer::GetIP()
01001 {
01002 return &_mc_ip.sa;
01003 }
01004
01005 Action *
01006 MultiCastPeer::SendMsg_re(Continuation * cont, void *token, struct msghdr * msg, sockaddr const* to)
01007 {
01008 Action *a;
01009
01010 if (to) {
01011
01012 Peer *p = FindMultiCastChild(IpAddr(to), ats_ip_port_host_order(to));
01013 ink_assert(p);
01014 a = ((ParentSiblingPeer *) p)->SendMsg_re(cont, token, msg, 0);
01015 } else {
01016
01017 msg->msg_name = (caddr_t) & _send_chan.addr;
01018 msg->msg_namelen = sizeof(_send_chan.addr);
01019 a = udpNet.sendmsg_re(cont, token, _send_chan.fd, msg);
01020 }
01021 return a;
01022 }
01023
01024 Action *
01025 MultiCastPeer::RecvFrom_re(Continuation * cont, void *token, IOBufferBlock * ,
01026 int len, struct sockaddr * from, socklen_t *fromlen)
01027 {
01028 Action *a = udpNet.recvfrom_re(cont, token, _recv_chan.fd, from, fromlen, buf, len, true, 0);
01029 return a;
01030 }
01031
01032 int
01033 MultiCastPeer::GetRecvFD()
01034 {
01035 return _recv_chan.fd;
01036 }
01037
01038 int
01039 MultiCastPeer::GetSendFD()
01040 {
01041 return _send_chan.fd;
01042 }
01043
01044 int
01045 MultiCastPeer::ExpectedReplies(BitMap * expected_replies_list)
01046 {
01047
01048
01049
01050 int replies = 0;
01051 ParentSiblingPeer *p = (ParentSiblingPeer *) this->_next;
01052 while (p) {
01053 replies += p->ExpectedReplies(expected_replies_list);
01054 p = (ParentSiblingPeer *) p->GetNext();
01055 }
01056 return replies;
01057 }
01058
01059 int
01060 MultiCastPeer::ValidSender(sockaddr* sa)
01061 {
01062
01063
01064
01065 Peer *P = _next;
01066 while (P) {
01067 if (ats_ip_addr_eq(P->GetIP(), sa) &&
01068 (ats_ip_port_cast(P->GetIP()) == ats_ip_port_cast(sa))
01069 ) {
01070 return 1;
01071 } else {
01072 P = P->GetNext();
01073 }
01074 }
01075 return 0;
01076 }
01077
01078 void
01079 MultiCastPeer::LogSendMsg(ICPMsg_t * m, sockaddr const* sa)
01080 {
01081
01082 if (sa) {
01083
01084
01085
01086 Peer *p;
01087 p = FindMultiCastChild(IpAddr(sa), ats_ip_port_host_order(sa));
01088 if (p)
01089 ((ParentSiblingPeer *) p)->LogSendMsg(m, sa);
01090
01091 } else {
01092
01093 _stats.last_send = ink_get_hrtime();
01094 _stats.sent[m->h.opcode]++;
01095 _stats.total_sent++;
01096
01097 Peer *p = _next;
01098 while (p) {
01099 ((ParentSiblingPeer *) p)->LogSendMsg(m, sa);
01100 p = p->GetNext();
01101 }
01102 }
01103 }
01104
01105 int
01106 MultiCastPeer::IsOnline()
01107 {
01108 return (_ICPpr->GetConfig()->globalConfig()->ICPmulticastConfigured());
01109 }
01110
01111 int
01112 MultiCastPeer::AddMultiCastChild(Peer * P)
01113 {
01114
01115
01116 sockaddr const* ip = P->GetIP();
01117 if (FindMultiCastChild(IpAddr(ip), ats_ip_port_host_order(ip))) {
01118 ip_text_buffer x;
01119 Warning("bad icp.config, multiple multicast child definitions for ip=%s", ats_ip_ntop(ip, x, sizeof(x)));
01120 return 0;
01121 } else {
01122 P->SetNext(this->_next);
01123 this->_next = P;
01124 ++_mc.defined_members;
01125 return 1;
01126 }
01127 }
01128
01129 Peer *
01130 MultiCastPeer::FindMultiCastChild(IpAddr const& addr, uint16_t port)
01131 {
01132
01133
01134
01135 Peer *curP = this->_next;
01136 while (curP) {
01137 sockaddr const* peer_ip = curP->GetIP();
01138 if (addr == peer_ip &&
01139 (!port || port == ats_ip_port_host_order(peer_ip))
01140 ) {
01141 return curP;
01142 } else {
01143 curP = curP->GetNext();
01144 }
01145 }
01146 return NULL;
01147 }
01148
01149
01150
01151
01152
01153 typedef int (ICPPeriodicCont::*ICPPeriodicContHandler) (int, void *);
01154 PeriodicCont::PeriodicCont(ICPProcessor * icpP):Continuation(0), _ICPpr(icpP)
01155 {
01156 mutex = new_ProxyMutex();
01157 }
01158
01159 PeriodicCont::~PeriodicCont()
01160 {
01161 mutex = 0;
01162 }
01163
01164
01165
01166
01167 ICPPeriodicCont::ICPPeriodicCont(ICPProcessor * icpP)
01168 :PeriodicCont(icpP), _last_icp_config_callouts(0), _global_config_changed(0), _peer_config_changed(0)
01169 {
01170 }
01171
01172 int
01173 ICPPeriodicCont::PeriodicEvent(int , Event * )
01174 {
01175 int do_reconfig = 0;
01176 ICPConfiguration *C = _ICPpr->GetConfig();
01177
01178 if (C->GlobalConfigChange())
01179 do_reconfig = 1;
01180
01181 int configcallouts = C->ICPConfigCallouts();
01182 if (_last_icp_config_callouts != configcallouts) {
01183
01184
01185 _last_icp_config_callouts = configcallouts;
01186 do_reconfig = 1;
01187 }
01188
01189 if (do_reconfig) {
01190
01191
01192
01193 ICPPeriodicCont *rc = new ICPPeriodicCont(_ICPpr);
01194 SET_CONTINUATION_HANDLER(rc, (ICPPeriodicContHandler) & ICPPeriodicCont::DoReconfigAction);
01195 eventProcessor.schedule_imm(rc);
01196 }
01197 return EVENT_CONT;
01198 }
01199
01200 int
01201 ICPPeriodicCont::DoReconfigAction(int event, Event * e)
01202 {
01203
01204
01205
01206
01207 ICPConfiguration *C = _ICPpr->GetConfig();
01208
01209 for (;;) {
01210 switch (event) {
01211 case EVENT_IMMEDIATE:
01212 case EVENT_INTERVAL:
01213 {
01214 ink_assert(!_global_config_changed && !_peer_config_changed);
01215 if (C->Lock()) {
01216 ICP_INCREMENT_DYN_STAT(reconfig_polls_stat);
01217 if (C->GlobalConfigChange()) {
01218 _global_config_changed = 1;
01219 }
01220
01221
01222
01223
01224
01225 if (C->PeerConfigChange()) {
01226 _peer_config_changed = 1;
01227 }
01228 if (_global_config_changed || _peer_config_changed) {
01229
01230
01231
01232 ICP_INCREMENT_DYN_STAT(reconfig_events_stat);
01233 ICPProcessor::ReconfigState_t NextState;
01234
01235 NextState =
01236 _ICPpr->ReconfigureStateMachine(ICPProcessor::RC_RECONFIG, _global_config_changed, _peer_config_changed);
01237 if (NextState == ICPProcessor::RC_DONE) {
01238
01239
01240 delete this;
01241 return EVENT_DONE;
01242 } else {
01243
01244 _global_config_changed = 0;
01245 _peer_config_changed = 0;
01246 C->Unlock();
01247 e->schedule_in(HRTIME_MSECONDS(RETRY_INTERVAL_MSECS));
01248 return EVENT_CONT;
01249 }
01250
01251 } else {
01252
01253 C->Unlock();
01254 }
01255
01256 } else {
01257
01258 e->schedule_in(HRTIME_MSECONDS(RETRY_INTERVAL_MSECS));
01259 return EVENT_CONT;
01260 }
01261 delete this;
01262 return EVENT_DONE;
01263 }
01264 default:
01265 {
01266 ink_release_assert(!"ICPPeriodicCont::DoReconfigAction() bad event");
01267 }
01268 }
01269 }
01270
01271 return EVENT_DONE;
01272 }
01273
01274
01275
01276
01277
01278
01279 ink_hrtime ICPlog::GetElapsedTime()
01280 {
01281 return (ink_get_hrtime() - _s->_start_time);
01282 }
01283
01284 sockaddr const*
01285 ICPlog::GetClientIP()
01286 {
01287 return &_s->_sender.sa;
01288 }
01289
01290 in_port_t
01291 ICPlog::GetClientPort()
01292 {
01293 return _s->_sender.port();
01294 }
01295
01296 SquidLogCode ICPlog::GetAction()
01297 {
01298 if (_s->_queryResult == CACHE_EVENT_LOOKUP)
01299 return SQUID_LOG_UDP_HIT;
01300 else
01301 return SQUID_LOG_UDP_MISS;
01302 }
01303
01304 const char *
01305 ICPlog::GetCode()
01306 {
01307 static const char *const ICPCodeStr = "000";
01308 return ICPCodeStr;
01309 }
01310
01311 int
01312 ICPlog::GetSize()
01313 {
01314 return ntohs(_s->_rICPmsg->h.msglen);
01315 }
01316
01317 const char *
01318 ICPlog::GetMethod()
01319 {
01320 return HTTP_METHOD_ICP_QUERY;
01321 }
01322
01323 const char *
01324 ICPlog::GetURI()
01325 {
01326 return (const char *) _s->_rICPmsg->un.query.URL;
01327 }
01328
01329 const char *
01330 ICPlog::GetIdent()
01331 {
01332 static const char *const ICPidentStr = "";
01333 return ICPidentStr;
01334 }
01335
01336 SquidHierarchyCode ICPlog::GetHierarchy()
01337 {
01338 return SQUID_HIER_NONE;
01339 }
01340
01341 const char *
01342 ICPlog::GetFromHost()
01343 {
01344 static const char *const FromHostStr = "";
01345 return FromHostStr;
01346 }
01347
01348 const char *
01349 ICPlog::GetContentType()
01350 {
01351 static const char *const ICPcontentTypeStr = "";
01352 return ICPcontentTypeStr;
01353 }
01354
01355
01356
01357
01358
01359 static const char *ICPstatNames[] = {
01360 "icp_stat_def",
01361 "config_mgmt_callouts_stat",
01362 "reconfig_polls_stat",
01363 "reconfig_events_stat",
01364 "invalid_poll_data_stat",
01365 "no_data_read_stat",
01366 "short_read_stat",
01367 "invalid_sender_stat",
01368 "read_not_v2_icp_stat",
01369 "icp_remote_query_requests_stat",
01370 "icp_remote_responses_stat",
01371 "icp_cache_lookup_success_stat",
01372 "icp_cache_lookup_fail_stat",
01373 "query_response_write_stat",
01374 "query_response_partial_write_stat",
01375 "no_icp_request_for_response_stat",
01376 "icp_response_request_nolock_stat",
01377 "icp_start_icpoff_stat",
01378 "send_query_partial_write_stat",
01379 "icp_queries_no_expected_replies_stat",
01380 "icp_query_hits_stat",
01381 "icp_query_misses_stat",
01382 "invalid_icp_query_response_stat",
01383 "icp_query_requests_stat",
01384 "total_icp_response_time_stat",
01385 "total_udp_send_queries_stat",
01386 "total_icp_request_time_stat",
01387 "icp_total_reloads",
01388 "icp_pending_reloads",
01389 "icp_reload_start_aborts",
01390 "icp_reload_connect_aborts",
01391 "icp_reload_read_aborts",
01392 "icp_reload_write_aborts",
01393 "icp_reload_successes",
01394 "icp_stat_count",
01395 ""
01396 };
01397
01398 void
01399 dumpICPstatEntry(int i, const char *name)
01400 {
01401 int l = strlen(name);
01402 int64_t sval, cval;
01403
01404 RecRawStat *p = RecGetGlobalRawStatPtr(icp_rsb, i);
01405 sval = p->sum;
01406 cval = p->count;
01407
01408 printf("%-32s %12" PRId64 " %16" PRId64 " %17.4f\n", &name[l > 31 ? l - 31 : 0], cval, sval,
01409 cval ? (((double) sval) / ((double) cval)) : 0.0);
01410 }
01411
01412 void
01413 dumpICPstats()
01414 {
01415 printf("\n");
01416 int i;
01417 for (i = 0; i < icp_stat_count; ++i) {
01418 dumpICPstatEntry(i, ICPstatNames[i]);
01419 }
01420 }
01421
01422
01423 void
01424 ICPProcessor::DumpICPConfig()
01425 {
01426 Peer *P;
01427 PeerType_t type;
01428 int id;
01429 ip_port_text_buffer ipb;
01430
01431 Debug("icp", "On=%d, MultiCast=%d, Timeout=%d LocalCacheLookup=%d",
01432 GetConfig()->globalConfig()->ICPconfigured(),
01433 GetConfig()->globalConfig()->ICPmulticastConfigured(),
01434 GetConfig()->globalConfig()->ICPqueryTimeout(), GetConfig()->globalConfig()->ICPLocalCacheLookup());
01435 Debug("icp", "StaleLookup=%d, ReplyToUnknowPeer=%d, DefaultReplyPort=%d",
01436 GetConfig()->globalConfig()->ICPStaleLookup(),
01437 GetConfig()->globalConfig()->ICPReplyToUnknownPeer(), GetConfig()->globalConfig()->ICPDefaultReplyPort());
01438
01439 for (int i = 0; i < (_nPeerList + 1); i++) {
01440 P = _PeerList[i];
01441 id = P->GetPeerID();
01442 type = P->GetType();
01443 const char *str_type;
01444
01445 switch (type) {
01446 case PEER_PARENT:
01447 {
01448 str_type = "P";
01449 break;
01450 }
01451 case PEER_SIBLING:
01452 {
01453 str_type = "S";
01454 break;
01455 }
01456 case PEER_LOCAL:
01457 {
01458 str_type = "L";
01459 break;
01460 }
01461 case PEER_MULTICAST:
01462 {
01463 str_type = "M";
01464 break;
01465 }
01466 default:
01467 {
01468 str_type = "N";
01469 break;
01470 }
01471 }
01472
01473 if (*str_type == 'M') {
01474 Debug("icp", "[%d]: Type=%s IP=%s", id, str_type, ats_ip_nptop(P->GetIP(), ipb, sizeof(ipb)));
01475 } else {
01476 ParentSiblingPeer *Pps = static_cast<ParentSiblingPeer *>(P);
01477 Debug("icp",
01478 "[%d]: Type=%s IP=%s PPort=%d Host=%s",
01479 id, str_type, ats_ip_nptop(P->GetIP(), ipb, sizeof(ipb)),
01480 Pps->GetConfig()->GetProxyPort(), Pps->GetConfig()->GetHostname());
01481
01482 Debug("icp",
01483 "[%d]: MC ON=%d MC_IP=%s MC_TTL=%d",
01484 id, Pps->GetConfig()->MultiCastMember(),
01485 Pps->GetConfig()->GetMultiCastIPAddr().toString(ipb, sizeof(ipb)),
01486 Pps->GetConfig()->GetMultiCastTTL());
01487 }
01488 }
01489 }
01490
01491
01492
01493