00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include "libts.h"
00032 #include "Main.h"
00033 #include "P_EventSystem.h"
00034 #include "P_Cache.h"
00035 #include "P_Net.h"
00036 #include "MgmtUtils.h"
00037 #include "P_RecProcess.h"
00038 #include "ICP.h"
00039 #include "ICPProcessor.h"
00040 #include "ICPlog.h"
00041 #include "logging/Log.h"
00042 #include "logging/LogAccessICP.h"
00043 #include "BaseManager.h"
00044 #include "HdrUtils.h"
00045
00046 extern CacheLookupHttpConfig global_cache_lookup_config;
00047 HTTPHdr gclient_request;
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147 typedef int (ICPPeerReadCont::*ICPPeerReadContHandler) (int, void *);
00148 typedef int (ICPPeriodicCont::*ICPPeriodicContHandler) (int, void *);
00149 typedef int (ICPHandlerCont::*ICPHandlerContHandler) (int, void *);
00150 typedef int (ICPRequestCont::*ICPRequestContHandler) (int, void *);
00151
00152
00153 PluginFreshnessCalcFunc pluginFreshnessCalcFunc = (PluginFreshnessCalcFunc) NULL;
00154
00155
00156
00157
00158
00159
00160
00161
00162 int64_t ICPHandlerCont::ICPDataBuf_IOBuffer_sizeindex;
00163 static ClassAllocator <ICPPeerReadCont::PeerReadData>PeerReadDataAllocator("PeerReadDataAllocator");
00164 static ClassAllocator<ICPPeerReadCont> ICPPeerReadContAllocator("ICPPeerReadContAllocator");
00165
00166 static Action *default_action = NULL;
00167
00168
00169 ICPHandlerCont::ICPHandlerCont(ICPProcessor * icpP)
00170 : PeriodicCont(icpP)
00171 {
00172 }
00173
00174
00175 int
00176 ICPHandlerCont::TossEvent(int , Event * )
00177 {
00178 return EVENT_DONE;
00179 }
00180
00181 int
00182 ICPHandlerCont::PeriodicEvent(int event, Event * )
00183 {
00184 int n_peer, valid_peers;
00185 Peer *P;
00186
00187
00188
00189
00190 valid_peers = _ICPpr->GetRecvPeers();
00191
00192
00193 switch (event) {
00194 case EVENT_POLL:
00195 case EVENT_INTERVAL:
00196 {
00197
00198 for (n_peer = 0; n_peer < valid_peers; ++n_peer) {
00199 P = _ICPpr->GetNthRecvPeer(n_peer, _ICPpr->GetLastRecvPeerBias());
00200 if (!P || (P && !P->IsOnline()))
00201 continue;
00202 if (P->shouldStartRead()) {
00203 P->startingRead();
00204
00205
00206
00207 ICPPeerReadCont *s = ICPPeerReadContAllocator.alloc();
00208 int local_lookup = _ICPpr->GetConfig()->globalConfig()->ICPLocalCacheLookup();
00209
00210 s->init(_ICPpr, P, local_lookup);
00211 RECORD_ICP_STATE_CHANGE(s, event, ICPPeerReadCont::READ_ACTIVE);
00212
00213
00214
00215
00216 s->handleEvent(EVENT_INTERVAL, (Event *) 0);
00217 }
00218 }
00219 break;
00220 }
00221 default:
00222 {
00223 ink_release_assert(!"unexpected event");
00224 break;
00225 }
00226 }
00227 return EVENT_CONT;
00228 }
00229
00230
00231
00232
00233
00234
00235 ICPPeerReadCont::PeerReadData::PeerReadData()
00236 {
00237 init();
00238 }
00239
00240 void
00241 ICPPeerReadCont::PeerReadData::init()
00242 {
00243 _start_time = 0;
00244 _mycont = 0;
00245 _peer = 0;
00246 _next_state = READ_ACTIVE;
00247 _cache_lookup_local = 0;
00248 _buf = 0;
00249 _rICPmsg = 0;
00250 _rICPmsg_len = 0;
00251 _cachelookupURL.clear();
00252 _queryResult = 0;
00253 _ICPReqCont = 0;
00254 _bytesReceived = 0;
00255 #ifdef DEBUG_ICP
00256 _nhistory = 0;
00257 #endif
00258 memset((void *) &_sender, 0, sizeof(_sender));
00259 }
00260
00261 ICPPeerReadCont::PeerReadData::~PeerReadData()
00262 {
00263 reset(1);
00264 }
00265
00266 void
00267 ICPPeerReadCont::PeerReadData::reset(int full_reset)
00268 {
00269 if (full_reset) {
00270 _peer = 0;
00271 _buf = 0;
00272 }
00273 if (_rICPmsg) {
00274 _rICPmsg = 0;
00275 _rICPmsg_len = 0;
00276 }
00277
00278 if (_cachelookupURL.valid()) {
00279 _cachelookupURL.destroy();
00280 }
00281 }
00282
00283
00284
00285
00286
00287
00288 ICPPeerReadCont::ICPPeerReadCont():Continuation(0), _object_vc(NULL), _object_read(NULL),
00289 _cache_req_hdr_heap_handle(NULL), _cache_resp_hdr_heap_handle(NULL), _ICPpr(NULL), _state(NULL),
00290 _start_time(0), _recursion_depth(0)
00291 {
00292 }
00293
00294 void
00295 ICPPeerReadCont::init(ICPProcessor * ICPpr, Peer * p, int lookup_local)
00296 {
00297 PeerReadData *s = PeerReadDataAllocator.alloc();
00298 s->init();
00299 s->_start_time = ink_get_hrtime();
00300 s->_peer = p;
00301 s->_next_state = READ_ACTIVE;
00302 s->_cache_lookup_local = lookup_local;
00303 SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerReadEvent);
00304 _ICPpr = ICPpr;
00305 _state = s;
00306 _recursion_depth = -1;
00307 _object_vc = NULL;
00308 _object_read = NULL;
00309 _cache_req_hdr_heap_handle = NULL;
00310 _cache_resp_hdr_heap_handle = NULL;
00311 mutex = new_ProxyMutex();
00312 }
00313
00314 ICPPeerReadCont::~ICPPeerReadCont()
00315 {
00316 reset(1);
00317 }
00318
00319 void
00320 ICPPeerReadCont::reset(int full_reset)
00321 {
00322 mutex = 0;
00323 if (this->_state) {
00324 this->_state->reset(full_reset);
00325 PeerReadDataAllocator.free(this->_state);
00326 }
00327 if (_cache_req_hdr_heap_handle) {
00328 ats_free(_cache_req_hdr_heap_handle);
00329 _cache_req_hdr_heap_handle = NULL;
00330 }
00331 if (_cache_resp_hdr_heap_handle) {
00332 ats_free(_cache_resp_hdr_heap_handle);
00333 _cache_resp_hdr_heap_handle = NULL;
00334 }
00335 }
00336
00337 int
00338 ICPPeerReadCont::ICPPeerReadEvent(int event, Event * e)
00339 {
00340 switch (event) {
00341 case EVENT_INTERVAL:
00342 case EVENT_IMMEDIATE:
00343 {
00344 break;
00345 }
00346 case NET_EVENT_DATAGRAM_WRITE_COMPLETE:
00347 case NET_EVENT_DATAGRAM_READ_COMPLETE:
00348 case NET_EVENT_DATAGRAM_READ_ERROR:
00349 case NET_EVENT_DATAGRAM_WRITE_ERROR:
00350 {
00351 ink_assert((event != NET_EVENT_DATAGRAM_READ_COMPLETE)
00352 || (_state->_next_state == READ_DATA_DONE));
00353 ink_assert((event != NET_EVENT_DATAGRAM_WRITE_COMPLETE)
00354 || (_state->_next_state == WRITE_DONE));
00355
00356 ink_release_assert(this == (ICPPeerReadCont *)
00357 completionUtil::getHandle(e));
00358 break;
00359 }
00360 case CACHE_EVENT_LOOKUP_FAILED:
00361 case CACHE_EVENT_LOOKUP:
00362 {
00363 ink_assert(_state->_next_state == AWAITING_CACHE_LOOKUP_RESPONSE);
00364 break;
00365 }
00366 default:
00367 {
00368 ink_release_assert(!"unexpected event");
00369 }
00370 }
00371
00372
00373 if (PeerReadStateMachine(_state, e) == EVENT_CONT) {
00374 eventProcessor.schedule_in(this, RETRY_INTERVAL, ET_ICP);
00375 return EVENT_DONE;
00376
00377 } else if (_state->_next_state == READ_PROCESSING_COMPLETE) {
00378 _state->_peer->cancelRead();
00379 this->reset(1);
00380 ICPPeerReadContAllocator.free(this);
00381 return EVENT_DONE;
00382
00383 } else {
00384 return EVENT_DONE;
00385 }
00386 }
00387
00388 int
00389 ICPPeerReadCont::StaleCheck(int event, Event * )
00390 {
00391 ip_port_text_buffer ipb;
00392
00393 ink_release_assert(mutex->thread_holding == this_ethread());
00394
00395 Debug("icp-stale", "Stale check res=%d for id=%d, [%s] from [%s]",
00396 event, _state->_rICPmsg->h.requestno,
00397 _state->_rICPmsg->un.query.URL, ats_ip_nptop(&_state->_sender, ipb, sizeof(ipb)));
00398
00399 switch (event) {
00400 case ICP_STALE_OBJECT:
00401 {
00402 _state->_queryResult = CACHE_EVENT_LOOKUP_FAILED;
00403 break;
00404 }
00405 case ICP_FRESH_OBJECT:
00406 {
00407 _state->_queryResult = CACHE_EVENT_LOOKUP;
00408 break;
00409 }
00410 default:
00411 {
00412 Debug("icp-stale", "ICPPeerReadCont::StaleCheck: Invalid Event %d\n", event);
00413 _state->_queryResult = CACHE_EVENT_LOOKUP_FAILED;
00414 break;
00415 }
00416 }
00417 _object_vc->do_io(VIO::CLOSE);
00418 _object_vc = 0;
00419 SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerReadEvent);
00420 return handleEvent(_state->_queryResult, 0);
00421 }
00422
00423 int
00424 ICPPeerReadCont::ICPPeerQueryEvent(int event, Event * e)
00425 {
00426 ip_port_text_buffer ipb;
00427
00428 Debug("icp", "Remote Query lookup res=%d for id=%d, [%s] from [%s]",
00429 event, _state->_rICPmsg->h.requestno,
00430 _state->_rICPmsg->un.query.URL, ats_ip_nptop(&_state->_sender, ipb, sizeof(ipb)));
00431 if (pluginFreshnessCalcFunc) {
00432 switch (event) {
00433 case CACHE_EVENT_OPEN_READ:
00434 {
00435 _object_vc = (CacheVConnection *) e;
00436 SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::StaleCheck);
00437 _object_vc->get_http_info(&_object_read);
00438 (*pluginFreshnessCalcFunc) ((void *) this);
00439 return EVENT_DONE;
00440 }
00441 case CACHE_EVENT_OPEN_READ_FAILED:
00442 {
00443 event = CACHE_EVENT_LOOKUP_FAILED;
00444 break;
00445 }
00446 default:
00447 break;
00448 }
00449 }
00450
00451 _state->_queryResult = event;
00452 SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerReadEvent);
00453 return handleEvent(event, e);
00454 }
00455
00456 int
00457 ICPPeerReadCont::ICPPeerQueryCont(int , Event * )
00458 {
00459 ip_port_text_buffer ipb;
00460 Action *a;
00461
00462
00463
00464 ((char *) _state->_rICPmsg)[MAX_ICP_MSGSIZE - 1] = 0;
00465 _state->_cachelookupURL.create(NULL);
00466 const char *qurl = (const char *) _state->_rICPmsg->un.query.URL;
00467 _state->_cachelookupURL.parse(qurl, strlen(qurl));
00468 Debug("icp", "Remote Query for id=%d, [%s] from [%s]",
00469 _state->_rICPmsg->h.requestno,
00470 _state->_rICPmsg->un.query.URL,
00471 ats_ip_nptop(&_state->_sender, ipb, sizeof(ipb))
00472 );
00473
00474 SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerQueryEvent);
00475 if (_state->_rICPmsg->un.query.URL && *_state->_rICPmsg->un.query.URL) {
00476 _state->_queryResult = ~CACHE_EVENT_LOOKUP_FAILED;
00477 _start_time = ink_get_hrtime();
00478 if (pluginFreshnessCalcFunc && _ICPpr->GetConfig()->globalConfig()->ICPStaleLookup()) {
00479
00480
00481
00482
00483 a = cacheProcessor.open_read(this, &_state->_cachelookupURL, false,
00484 &gclient_request, &global_cache_lookup_config, (time_t) 0);
00485 } else {
00486 a = cacheProcessor.lookup(this, &_state->_cachelookupURL, false, _state->_cache_lookup_local);
00487 }
00488 if (!a) {
00489 a = ACTION_IO_ERROR;
00490 }
00491 if (a == ACTION_RESULT_DONE) {
00492 return EVENT_DONE;
00493 } else if (a == ACTION_IO_ERROR) {
00494 handleEvent(CACHE_EVENT_LOOKUP_FAILED, 0);
00495 return EVENT_DONE;
00496 } else {
00497 return EVENT_CONT;
00498 }
00499 } else {
00500
00501 handleEvent(CACHE_EVENT_LOOKUP_FAILED, 0);
00502 return EVENT_DONE;
00503 }
00504 }
00505
00506 struct AutoReference
00507 {
00508 AutoReference(int *cnt)
00509 {
00510 _cnt = cnt;
00511 (*_cnt)++;
00512 }
00513 ~AutoReference()
00514 {
00515 (*_cnt)--;
00516 }
00517 int *_cnt;
00518 };
00519
00520 int
00521 ICPPeerReadCont::PeerReadStateMachine(PeerReadData * s, Event * e)
00522 {
00523 AutoReference l(&_recursion_depth);
00524 ip_port_text_buffer ipb;
00525
00526
00527
00528 MUTEX_TRY_LOCK(lock, this->mutex, this_ethread());
00529 if (!lock) {
00530
00531
00532 return EVENT_CONT;
00533 }
00534
00535 while (1) {
00536
00537 switch (s->_next_state) {
00538 case READ_ACTIVE:
00539 {
00540 ink_release_assert(_recursion_depth == 0);
00541 if (!_ICPpr->Lock())
00542 return EVENT_CONT;
00543
00544 bool valid_peer = (_ICPpr->IdToPeer(s->_peer->GetPeerID()) == s->_peer);
00545
00546 if (valid_peer && _ICPpr->AllowICPQueries()
00547 && _ICPpr->GetConfig()->globalConfig()->ICPconfigured()) {
00548
00549
00550 _ICPpr->IncPendingQuery();
00551 _ICPpr->Unlock();
00552
00553 s->_next_state = READ_DATA;
00554 RECORD_ICP_STATE_CHANGE(s, 0, READ_DATA);
00555 break;
00556
00557 } else {
00558 _ICPpr->Unlock();
00559
00560
00561 s->_next_state = READ_PROCESSING_COMPLETE;
00562 RECORD_ICP_STATE_CHANGE(s, 0, READ_PROCESSING_COMPLETE);
00563 return EVENT_DONE;
00564 }
00565 }
00566 ink_release_assert(0);
00567
00568 case READ_DATA:
00569 {
00570 ink_release_assert(_recursion_depth == 0);
00571
00572
00573
00574 ink_assert(s->_peer->buf == NULL);
00575 Ptr<IOBufferBlock> buf = s->_peer->buf = new_IOBufferBlock();
00576 buf->alloc(ICPHandlerCont::ICPDataBuf_IOBuffer_sizeindex);
00577 s->_peer->fromaddrlen = sizeof(s->_peer->fromaddr);
00578 buf->fill(sizeof(ICPMsg_t));
00579 char *be = buf->buf_end() - 1;
00580 be[0] = 0;
00581 s->_next_state = READ_DATA_DONE;
00582 RECORD_ICP_STATE_CHANGE(s, 0, READ_DATA_DONE);
00583 ink_assert(s->_peer->readAction == NULL);
00584 Action *a = s->_peer->RecvFrom_re(this, this, buf,
00585 buf->write_avail() - 1,
00586 &s->_peer->fromaddr.sa,
00587 &s->_peer->fromaddrlen);
00588 if (!a) {
00589 a = ACTION_IO_ERROR;
00590 }
00591 if (a == ACTION_RESULT_DONE) {
00592
00593
00594
00595 ink_assert(s->_next_state == PROCESS_READ_DATA);
00596 break;
00597 } else if (a == ACTION_IO_ERROR) {
00598
00599
00600
00601
00602
00603
00604 ICP_INCREMENT_DYN_STAT(no_data_read_stat);
00605 s->_peer->buf = NULL;
00606 s->_next_state = READ_NOT_ACTIVE_EXIT;
00607 RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE_EXIT);
00608
00609 break;
00610 } else {
00611 s->_peer->readAction = a;
00612 return EVENT_DONE;
00613 }
00614 }
00615 ink_release_assert(0);
00616
00617 case READ_DATA_DONE:
00618 {
00619
00620 if (s->_peer->readAction != NULL) {
00621 ink_assert(s->_peer->readAction == e);
00622 s->_peer->readAction = NULL;
00623 }
00624 s->_bytesReceived = completionUtil::getBytesTransferred(e);
00625
00626 if (s->_bytesReceived >= 0) {
00627 s->_next_state = PROCESS_READ_DATA;
00628 RECORD_ICP_STATE_CHANGE(s, 0, PROCESS_READ_DATA);
00629 } else {
00630 ICP_INCREMENT_DYN_STAT(no_data_read_stat);
00631 s->_peer->buf = NULL;
00632 s->_next_state = READ_NOT_ACTIVE_EXIT;
00633 RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE_EXIT);
00634 }
00635 if (_recursion_depth > 0) {
00636 return EVENT_DONE;
00637 } else {
00638 break;
00639 }
00640 }
00641 ink_release_assert(0);
00642
00643 case PROCESS_READ_DATA:
00644 case ADD_PEER:
00645 {
00646 ink_release_assert(_recursion_depth == 0);
00647
00648 Ptr<IOBufferBlock> bufblock = s->_peer->buf;
00649 char *buf = bufblock->start();
00650
00651 if (s->_next_state == PROCESS_READ_DATA) {
00652 ICPRequestCont::NetToHostICPMsg((ICPMsg_t *)
00653 (buf + sizeof(ICPMsg_t)), (ICPMsg_t *) buf);
00654
00655
00656 bufblock->reset();
00657 bufblock->fill(s->_bytesReceived);
00658
00659
00660 if (s->_bytesReceived < ((ICPMsg_t *) buf)->h.msglen) {
00661
00662
00663
00664 ICP_INCREMENT_DYN_STAT(short_read_stat);
00665 s->_peer->buf = NULL;
00666 s->_next_state = READ_NOT_ACTIVE;
00667 RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
00668 break;
00669 }
00670 }
00671
00672
00673 IpEndpoint from;
00674 if (!s->_peer->ExtToIntRecvSockAddr(&s->_peer->fromaddr.sa, &from.sa)) {
00675 int status;
00676 ICPConfigData *cfg = _ICPpr->GetConfig()->globalConfig();
00677 ICPMsg_t *ICPmsg = (ICPMsg_t *) buf;
00678
00679 if ((cfg->ICPconfigured() == ICP_MODE_RECEIVE_ONLY) &&
00680 cfg->ICPReplyToUnknownPeer() &&
00681 ((ICPmsg->h.version == ICP_VERSION_2) ||
00682 (ICPmsg->h.version == ICP_VERSION_3)) && (ICPmsg->h.opcode == ICP_OP_QUERY)) {
00683
00684
00685
00686
00687
00688 if (!_ICPpr->GetConfig()->Lock()) {
00689 s->_next_state = ADD_PEER;
00690 RECORD_ICP_STATE_CHANGE(s, 0, ADD_PEER);
00691 return EVENT_CONT;
00692 }
00693 if (!_ICPpr->GetFreePeers() || !_ICPpr->GetFreeSendPeers()) {
00694 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP Peer limit exceeded");
00695 _ICPpr->GetConfig()->Unlock();
00696 goto invalid_message;
00697 }
00698
00699 int icp_reply_port = cfg->ICPDefaultReplyPort();
00700 if (!icp_reply_port) {
00701 icp_reply_port = ntohs(ats_ip_port_cast(&s->_peer->fromaddr));
00702 }
00703 PeerConfigData *Pcfg = new PeerConfigData(PeerConfigData::CTYPE_SIBLING, IpAddr(s->_peer->fromaddr), 0,
00704 icp_reply_port);
00705 ParentSiblingPeer *P = new ParentSiblingPeer(PEER_SIBLING, Pcfg, _ICPpr, true);
00706 status = _ICPpr->AddPeer(P);
00707 ink_release_assert(status);
00708 status = _ICPpr->AddPeerToSendList(P);
00709 ink_release_assert(status);
00710
00711 P->GetChan()->setRemote(P->GetIP());
00712
00713
00714 Note("ICP Peer added ip=%s", ats_ip_nptop(P->GetIP(), ipb, sizeof(ipb)));
00715 from = s->_peer->fromaddr;
00716 } else {
00717 invalid_message:
00718
00719
00720
00721 ICP_INCREMENT_DYN_STAT(invalid_sender_stat);
00722 Debug("icp", "Received msg from invalid sender [%s]",
00723 ats_ip_nptop(&s->_peer->fromaddr, ipb, sizeof(ipb)));
00724
00725 s->_peer->buf = NULL;
00726 s->_next_state = READ_NOT_ACTIVE;
00727 RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
00728 break;
00729 }
00730 }
00731
00732 s->_sender = from;
00733 s->_rICPmsg_len = s->_bytesReceived;
00734 ink_assert(s->_buf == NULL);
00735 s->_buf = s->_peer->buf;
00736 s->_rICPmsg = (ICPMsg_t *) s->_buf->start();
00737 s->_peer->buf = NULL;
00738
00739
00740
00741
00742 if ((s->_rICPmsg->h.version != ICP_VERSION_2)
00743 && (s->_rICPmsg->h.version != ICP_VERSION_3)) {
00744 ICP_INCREMENT_DYN_STAT(read_not_v2_icp_stat);
00745 Debug("icp", "Received (v=%d) !v2 && !v3 msg from sender [%s]",
00746 (uint32_t) s->_rICPmsg->h.version, ats_ip_nptop(&from, ipb, sizeof(ipb)));
00747
00748 s->_rICPmsg = NULL;
00749 s->_buf = NULL;
00750 s->_next_state = READ_NOT_ACTIVE;
00751 RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
00752 break;
00753 }
00754
00755
00756
00757
00758 if (s->_rICPmsg->h.opcode == ICP_OP_QUERY) {
00759 ICP_INCREMENT_DYN_STAT(icp_remote_query_requests_stat);
00760 ink_assert(!s->_mycont);
00761 s->_next_state = AWAITING_CACHE_LOOKUP_RESPONSE;
00762 RECORD_ICP_STATE_CHANGE(s, 0, AWAITING_CACHE_LOOKUP_RESPONSE);
00763
00764 if (ICPPeerQueryCont(0, (Event *) 0) == EVENT_DONE) {
00765 break;
00766 } else {
00767 return EVENT_DONE;
00768 }
00769 } else {
00770
00771 Debug("icp", "Response for Id=%d, from [%s]",
00772 s->_rICPmsg->h.requestno, ats_ip_nptop(&s->_sender, ipb, sizeof(ipb)));
00773 ICP_INCREMENT_DYN_STAT(icp_remote_responses_stat);
00774 s->_next_state = GET_ICP_REQUEST;
00775 RECORD_ICP_STATE_CHANGE(s, 0, GET_ICP_REQUEST);
00776 break;
00777 }
00778 }
00779 ink_release_assert(0);
00780
00781 case AWAITING_CACHE_LOOKUP_RESPONSE:
00782 {
00783 int status = 0;
00784 void *data = s->_rICPmsg->un.query.URL;
00785 int datalen = strlen((const char *) data) + 1;
00786
00787 if (s->_queryResult == CACHE_EVENT_LOOKUP) {
00788
00789 Debug("icp", "Sending ICP_OP_HIT for id=%d, [%.*s] to [%s]",
00790 s->_rICPmsg->h.requestno, datalen, (const char *)data, ats_ip_nptop(&s->_sender, ipb, sizeof(ipb)));
00791 ICP_INCREMENT_DYN_STAT(icp_cache_lookup_success_stat);
00792 status = ICPRequestCont::BuildICPMsg(ICP_OP_HIT,
00793 s->_rICPmsg->h.requestno, 0 , 0 ,
00794 0 ,
00795 data, datalen, &s->_mhdr, s->_iov, s->_rICPmsg);
00796 } else if (s->_queryResult == CACHE_EVENT_LOOKUP_FAILED) {
00797
00798 Debug("icp", "Sending ICP_OP_MISS for id=%d, [%.*s] to [%s]",
00799 s->_rICPmsg->h.requestno, datalen, (const char *)data, ats_ip_nptop(&s->_sender, ipb, sizeof(ipb)));
00800 ICP_INCREMENT_DYN_STAT(icp_cache_lookup_fail_stat);
00801 status = ICPRequestCont::BuildICPMsg(ICP_OP_MISS,
00802 s->_rICPmsg->h.requestno, 0 , 0 ,
00803 0 ,
00804 data, datalen, &s->_mhdr, s->_iov, s->_rICPmsg);
00805 } else {
00806 Warning("Bad cache lookup event: %d", s->_queryResult);
00807 ink_release_assert(!"Invalid cache lookup event");
00808 }
00809 ink_assert(status == 0);
00810
00811
00812 ICPlog logentry(s);
00813 LogAccessICP accessor(&logentry);
00814 Log::access(&accessor);
00815
00816 s->_next_state = SEND_REPLY;
00817 RECORD_ICP_STATE_CHANGE(s, 0, SEND_REPLY);
00818
00819 if (_recursion_depth > 0) {
00820 return EVENT_DONE;
00821 } else {
00822 break;
00823 }
00824 }
00825 ink_release_assert(0);
00826
00827 case SEND_REPLY:
00828 {
00829 ink_release_assert(_recursion_depth == 0);
00830
00831
00832
00833 s->_next_state = WRITE_DONE;
00834 RECORD_ICP_STATE_CHANGE(s, 0, WRITE_DONE);
00835 ink_assert(s->_peer->writeAction == NULL);
00836 Action *a = s->_peer->SendMsg_re(this, this,
00837 &s->_mhdr, &s->_sender.sa);
00838 if (!a) {
00839 a = ACTION_IO_ERROR;
00840 }
00841 if (a == ACTION_RESULT_DONE) {
00842
00843
00844 break;
00845
00846 } else if (a == ACTION_IO_ERROR) {
00847
00848 ICP_INCREMENT_DYN_STAT(query_response_partial_write_stat);
00849
00850 Debug("icp_warn", "ICP response send, sent=%d res=%d, ip=%s",
00851 ntohs(s->_rICPmsg->h.msglen), -1, ats_ip_ntop(&s->_sender, ipb, sizeof(ipb)));
00852 s->_next_state = READ_NOT_ACTIVE;
00853 RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
00854 break;
00855 } else {
00856 s->_peer->writeAction = a;
00857 return EVENT_DONE;
00858 }
00859 }
00860 ink_release_assert(0);
00861
00862 case WRITE_DONE:
00863 {
00864 s->_peer->writeAction = NULL;
00865 int len = completionUtil::getBytesTransferred(e);
00866
00867 if (len == (int)ntohs(s->_rICPmsg->h.msglen)) {
00868 ICP_INCREMENT_DYN_STAT(query_response_write_stat);
00869 s->_peer->LogSendMsg(s->_rICPmsg, &s->_sender.sa);
00870 } else {
00871
00872 ICP_INCREMENT_DYN_STAT(query_response_partial_write_stat);
00873
00874 Debug("icp_warn", "ICP response send, sent=%d res=%d, ip=%s",
00875 ntohs(s->_rICPmsg->h.msglen), len, ats_ip_ntop(&s->_sender, ipb, sizeof(ipb)));
00876 }
00877
00878 s->_next_state = READ_NOT_ACTIVE;
00879 RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
00880 Debug("icp", "state->READ_NOT_ACTIVE");
00881
00882 if (_recursion_depth > 0) {
00883 return EVENT_DONE;
00884 } else {
00885 break;
00886 }
00887 }
00888 ink_release_assert(0);
00889
00890 case GET_ICP_REQUEST:
00891 {
00892 ink_release_assert(_recursion_depth == 0);
00893 ink_assert(s->_rICPmsg && s->_rICPmsg_len);
00894
00895
00896 s->_ICPReqCont = ICPRequestCont::FindICPRequest(s->_rICPmsg->h.requestno);
00897 if (s->_ICPReqCont) {
00898 s->_next_state = GET_ICP_REQUEST_MUTEX;
00899 RECORD_ICP_STATE_CHANGE(s, 0, GET_ICP_REQUEST_MUTEX);
00900 break;
00901 }
00902
00903
00904
00905
00906 Debug("icp", "No ICP Request for Id=%d", s->_rICPmsg->h.requestno);
00907 ICP_INCREMENT_DYN_STAT(no_icp_request_for_response_stat);
00908 Peer *p = _ICPpr->FindPeer(s->_sender);
00909 p->LogRecvMsg(s->_rICPmsg, 0);
00910 s->_next_state = READ_NOT_ACTIVE;
00911 RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
00912 break;
00913 }
00914 ink_release_assert(0);
00915
00916 case GET_ICP_REQUEST_MUTEX:
00917 {
00918 ink_release_assert(_recursion_depth == 0);
00919 ink_assert(s->_ICPReqCont);
00920 Ptr<ProxyMutex> ICPReqContMutex(s->_ICPReqCont->mutex);
00921 EThread *ethread = this_ethread();
00922 ink_hrtime request_start_time;
00923
00924 if (!MUTEX_TAKE_TRY_LOCK(ICPReqContMutex, ethread)) {
00925 ICP_INCREMENT_DYN_STAT(icp_response_request_nolock_stat);
00926
00927
00928
00929
00930
00931
00932 s->_ICPReqCont = (ICPRequestCont *) 0;
00933 s->_next_state = GET_ICP_REQUEST;
00934 RECORD_ICP_STATE_CHANGE(s, 0, GET_ICP_REQUEST);
00935 return EVENT_CONT;
00936 }
00937
00938 Peer *p = _ICPpr->FindPeer(s->_sender);
00939 p->LogRecvMsg(s->_rICPmsg, 1);
00940
00941
00942 ICPRequestCont::ICPRequestEventArgs_t args;
00943 args.rICPmsg = s->_rICPmsg;
00944 args.rICPmsg_len = s->_rICPmsg_len;
00945 args.peer = p;
00946 if (!s->_ICPReqCont->GetActionPtr()->cancelled) {
00947 request_start_time = s->_ICPReqCont->GetRequestStartTime();
00948 Debug("icp", "Passing Reply for ICP Id=%d", s->_rICPmsg->h.requestno);
00949 s->_ICPReqCont->handleEvent((int) ICP_RESPONSE_MESSAGE, (void *) &args);
00950 } else {
00951 request_start_time = 0;
00952 delete s->_ICPReqCont;
00953 Debug("icp", "User cancelled ICP request Id=%d", s->_rICPmsg->h.requestno);
00954 }
00955
00956
00957 s->_ICPReqCont = 0;
00958
00959 MUTEX_UNTAKE_LOCK(ICPReqContMutex, ethread);
00960 if (request_start_time) {
00961 ICP_SUM_DYN_STAT(total_icp_response_time_stat, (ink_get_hrtime() - request_start_time));
00962 }
00963 RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
00964 s->_next_state = READ_NOT_ACTIVE;
00965 break;
00966 }
00967 ink_release_assert(0);
00968
00969 case READ_NOT_ACTIVE:
00970 case READ_NOT_ACTIVE_EXIT:
00971 {
00972 ink_release_assert(_recursion_depth == 0);
00973 if (!_ICPpr->Lock())
00974 return EVENT_CONT;
00975
00976
00977 _ICPpr->DecPendingQuery();
00978 _ICPpr->Unlock();
00979
00980 s->_buf = 0;
00981 if (s->_next_state == READ_NOT_ACTIVE_EXIT) {
00982 s->_next_state = READ_PROCESSING_COMPLETE;
00983 return EVENT_DONE;
00984 } else {
00985
00986 s->reset();
00987 s->_start_time = ink_get_hrtime();
00988 s->_next_state = READ_ACTIVE;
00989 RECORD_ICP_STATE_CHANGE(s, 0, READ_ACTIVE);
00990 break;
00991 }
00992 }
00993 ink_release_assert(0);
00994
00995 case READ_PROCESSING_COMPLETE:
00996 default:
00997 ink_release_assert(0);
00998
00999 }
01000
01001 }
01002 }
01003
01004
01005
01006
01007
01008
01009 ClassAllocator<ICPRequestCont> ICPRequestCont_allocator("ICPRequestCont_allocator");
01010
01011 ICPRequestCont::ICPRequestCont(ICPProcessor * pr, Continuation * c, URL * u)
01012 : Continuation(0), _cont(c), _url(u), _start_time(0),
01013 _ICPpr(pr), _timeout(0),
01014 npending_actions(0), pendingActions(NULL),
01015 _sequence_number(0), _expected_replies(0),
01016 _expected_replies_list(MAX_DEFINED_PEERS), _received_replies(0), _next_state(ICP_START)
01017 {
01018 memset((void *)&_ret_sockaddr, 0, sizeof(_ret_sockaddr));
01019 _ret_status = ICP_LOOKUP_FAILED;
01020 _act.cancelled = false;
01021 _act = c;
01022 memset((void *) &_ICPmsg, 0, sizeof(_ICPmsg));
01023 memset((void *) &_sendMsgHdr, 0, sizeof(_sendMsgHdr));
01024 memset((void *) &_sendMsgIOV, 0, sizeof(_sendMsgIOV[MSG_IOVECS]));
01025
01026 if (c)
01027 this->mutex = c->mutex;
01028 }
01029
01030 ICPRequestCont::~ICPRequestCont()
01031 {
01032 _act = NULL;
01033 this->mutex = NULL;
01034
01035 if (_timeout) {
01036 _timeout->cancel(this);
01037 _timeout = 0;
01038 }
01039 RemoveICPRequest(_sequence_number);
01040
01041 if (_ICPmsg.h.opcode == ICP_OP_QUERY) {
01042 if (_ICPmsg.un.query.URL) {
01043 ats_free(_ICPmsg.un.query.URL);
01044 }
01045 }
01046 if (pendingActions) {
01047 delete pendingActions;
01048 pendingActions = 0;
01049 }
01050 }
01051
01052 void
01053 ICPRequestCont::remove_from_pendingActions(Action * a)
01054 {
01055 if (!pendingActions) {
01056 npending_actions--;
01057 return;
01058 }
01059 for (intptr_t i = 0; i < pendingActions->length(); i++) {
01060 if ((*pendingActions)[i] == a) {
01061 for (intptr_t j = i; j < pendingActions->length() - 1; j++)
01062 (*pendingActions)[j] = (*pendingActions)[j + 1];
01063 pendingActions->set_length(pendingActions->length() - 1);
01064 npending_actions--;
01065 return;
01066 }
01067 }
01068 npending_actions--;
01069 }
01070
01071 void
01072 ICPRequestCont::remove_all_pendingActions()
01073 {
01074 int active_pendingActions = 0;
01075
01076 if (!pendingActions) {
01077 return;
01078 }
01079 for (intptr_t i = 0; i < pendingActions->length(); i++) {
01080 if ((*pendingActions)[i]
01081 && ((*pendingActions)[i] != ACTION_IO_ERROR)) {
01082 ((*pendingActions)[i])->cancel();
01083 (*pendingActions)[i] = 0;
01084 npending_actions--;
01085 active_pendingActions++;
01086 } else {
01087 (*pendingActions)[i] = 0;
01088 }
01089 }
01090 pendingActions->set_length(pendingActions->length() - active_pendingActions);
01091 }
01092
01093 int
01094 ICPRequestCont::ICPRequestEvent(int event, Event * e)
01095 {
01096
01097
01098
01099 ink_assert(event == NET_EVENT_DATAGRAM_WRITE_COMPLETE ||
01100 event == NET_EVENT_DATAGRAM_WRITE_ERROR ||
01101 event == EVENT_IMMEDIATE || event == EVENT_INTERVAL || event == ICP_RESPONSE_MESSAGE);
01102
01103 if ((event == NET_EVENT_DATAGRAM_WRITE_COMPLETE)
01104 || (event == NET_EVENT_DATAGRAM_WRITE_ERROR)) {
01105 ink_assert(npending_actions > 0);
01106 remove_from_pendingActions((Action *) e);
01107 return EVENT_DONE;
01108 }
01109
01110
01111 switch (_next_state) {
01112 case ICP_START:
01113 case ICP_OFF_TERMINATE:
01114 case ICP_QUEUE_REQUEST:
01115 case ICP_AWAITING_RESPONSE:
01116 case ICP_DEQUEUE_REQUEST:
01117 case ICP_POST_COMPLETION:
01118 case ICP_REQUEST_NOT_ACTIVE:
01119 {
01120 if (ICPStateMachine(event, (void *) e) == EVENT_CONT) {
01121
01122
01123
01124 eventProcessor.schedule_in(this, HRTIME_MSECONDS(RETRY_INTERVAL), ET_ICP);
01125 return EVENT_CONT;
01126
01127 } else if (_next_state == ICP_DONE) {
01128
01129
01130
01131 delete this;
01132 break;
01133 } else {
01134 break;
01135 }
01136 }
01137 ink_release_assert(0);
01138
01139 case ICP_DONE:
01140 default:
01141 ink_release_assert(0);
01142 }
01143
01144 return EVENT_DONE;
01145 }
01146
01147 int
01148 ICPRequestCont::NopICPRequestEvent(int , Event * )
01149 {
01150 delete this;
01151 return EVENT_DONE;
01152 }
01153
01154 int
01155 ICPRequestCont::ICPStateMachine(int event, void *d)
01156 {
01157
01158
01159
01160 ICPConfiguration *ICPcf = _ICPpr->GetConfig();
01161 ip_port_text_buffer ipb;
01162
01163 while (1) {
01164
01165 switch (_next_state) {
01166 case ICP_START:
01167 {
01168
01169 if (_act.cancelled) {
01170 _next_state = ICP_DONE;
01171 return EVENT_DONE;
01172 }
01173
01174 if (!_ICPpr->Lock())
01175 return EVENT_CONT;
01176
01177 if (_ICPpr->AllowICPQueries() && (ICPcf->globalConfig()->ICPconfigured() == ICP_MODE_SEND_RECEIVE)) {
01178
01179
01180 if (_url->valid()) {
01181 int host_len;
01182 const char *host = _url->host_get(&host_len);
01183 if (ptr_len_casecmp(host, host_len, "127.0.0.1") == 0 || ptr_len_casecmp(host, host_len, "localhost") == 0) {
01184 _ICPpr->Unlock();
01185
01186
01187 _next_state = ICP_OFF_TERMINATE;
01188 Debug("icp", "[ICP_START] NULL/localhost URL ignored Id=%d", _sequence_number);
01189 break;
01190 }
01191 }
01192
01193 _ICPpr->IncPendingQuery();
01194 _ICPpr->Unlock();
01195
01196
01197 char *urlstr = _url->string_get(NULL);
01198 int urlstr_len = strlen(urlstr) + 1;
01199
01200 int status = BuildICPMsg(ICP_OP_QUERY,
01201 _sequence_number = ICPReqSeqNumber(),
01202 0 , 0 ,
01203 0 ,
01204 (void *) urlstr, urlstr_len,
01205 &_sendMsgHdr, _sendMsgIOV,
01206 &_ICPmsg);
01207
01208 ink_assert(status == 0);
01209 Debug("icp", "[ICP_START] ICP_OP_QUERY for [%s], Id=%d", urlstr, _sequence_number);
01210
01211 _next_state = ICP_QUEUE_REQUEST;
01212 break;
01213
01214 } else {
01215 ICP_INCREMENT_DYN_STAT(icp_start_icpoff_stat);
01216 _ICPpr->Unlock();
01217
01218
01219 _next_state = ICP_OFF_TERMINATE;
01220 break;
01221 }
01222 }
01223 ink_release_assert(0);
01224
01225 case ICP_OFF_TERMINATE:
01226 {
01227 if (!MUTEX_TAKE_TRY_LOCK_FOR(mutex, this_ethread(), _cont)) {
01228 return EVENT_CONT;
01229 }
01230 Debug("icp", "[ICP_OFF_TERMINATE] Id=%d", _sequence_number);
01231
01232
01233 if (!_act.cancelled) {
01234 _cont->handleEvent(_ret_status, (void *) &_ret_sockaddr);
01235 }
01236 MUTEX_UNTAKE_LOCK(mutex, this_ethread());
01237
01238 _next_state = ICP_DONE;
01239 return EVENT_DONE;
01240 }
01241 ink_release_assert(0);
01242
01243 case ICP_QUEUE_REQUEST:
01244 {
01245
01246 int ret = AddICPRequest(_sequence_number, this);
01247 ink_assert(ret == 0);
01248
01249
01250 int bias = _ICPpr->GetStartingSendPeerBias();
01251 int SendPeers = _ICPpr->GetSendPeers();
01252 npending_actions = 0;
01253 while (SendPeers > 0) {
01254 Peer *P = _ICPpr->GetNthSendPeer(SendPeers, bias);
01255 if (!P->IsOnline()) {
01256 SendPeers--;
01257 continue;
01258 }
01259
01260
01261
01262
01263
01264
01265 int was_expected = P->ExpectedReplies(&_expected_replies_list);
01266 _expected_replies += was_expected;
01267 npending_actions++;
01268 Action *a = P->SendMsg_re(this, P, &_sendMsgHdr, NULL);
01269 if (!a) {
01270 a = ACTION_IO_ERROR;
01271 }
01272 if (a != ACTION_IO_ERROR) {
01273 if (a != ACTION_RESULT_DONE) {
01274 if (!pendingActions) {
01275 pendingActions = new DynArray<Action *>(&default_action);
01276 }
01277 (*pendingActions) (npending_actions) = a;
01278 }
01279 P->LogSendMsg(&_ICPmsg, NULL);
01280 Debug("icp", "[ICP_QUEUE_REQUEST] Id=%d send query to [%s]",
01281 _sequence_number, ats_ip_nptop(P->GetIP(), ipb, sizeof(ipb)));
01282 } else {
01283 _expected_replies_list.ClearBit(P->GetPeerID());
01284 _expected_replies -= was_expected;
01285
01286 ICP_INCREMENT_DYN_STAT(send_query_partial_write_stat);
01287
01288 Debug("icp_warn",
01289 "ICP query send, res=%d, ip=%s", ntohs(_ICPmsg.h.msglen),
01290 ats_ip_ntop(P->GetIP(), ipb, sizeof(ipb)));
01291 }
01292 SendPeers--;
01293 }
01294
01295 Debug("icp", "[ICP_QUEUE_REQUEST] Id=%d expected replies=%d", _sequence_number, _expected_replies);
01296 if (!_expected_replies) {
01297
01298
01299
01300 ICP_INCREMENT_DYN_STAT(icp_queries_no_expected_replies_stat);
01301 _next_state = ICP_DEQUEUE_REQUEST;
01302 break;
01303 }
01304 ICP_SUM_DYN_STAT(total_udp_send_queries_stat, _expected_replies);
01305
01306
01307
01308
01309 int tval = _ICPpr->GetConfig()->globalConfig()->ICPqueryTimeout();
01310 _timeout = eventProcessor.schedule_in(this, HRTIME_SECONDS(tval), ET_ICP);
01311
01312 _next_state = ICP_AWAITING_RESPONSE;
01313 return EVENT_DONE;
01314 }
01315 ink_release_assert(0);
01316
01317 case ICP_AWAITING_RESPONSE:
01318 {
01319 Debug("icp", "[ICP_AWAITING_RESPONSE] Id=%d", _sequence_number);
01320 ink_assert(d);
01321 ICPRequestEventArgs_t dummyArgs;
01322 ICPRequestEventArgs_t *args = 0;
01323
01324 if (event == ICP_RESPONSE_MESSAGE) {
01325 args = (ICPRequestEventArgs_t *) d;
01326 } else if (event == EVENT_INTERVAL) {
01327 memset((void *) &dummyArgs, 0, sizeof(dummyArgs));
01328 args = &dummyArgs;
01329 } else {
01330 ink_release_assert(0);
01331 }
01332
01333
01334 if (ICPResponseMessage(event, args->rICPmsg, args->peer) == EVENT_DONE) {
01335
01336 _next_state = ICP_DEQUEUE_REQUEST;
01337 break;
01338
01339 } else {
01340
01341 return EVENT_DONE;
01342 }
01343 }
01344 ink_release_assert(0);
01345
01346 case ICP_DEQUEUE_REQUEST:
01347 {
01348
01349 int ret = RemoveICPRequest(_sequence_number);
01350 Debug("icp", "[ICP_DEQUEUE_REQUEST] Id=%d", _sequence_number);
01351 ink_assert(ret == 0);
01352
01353 _next_state = ICP_POST_COMPLETION;
01354 break;
01355 }
01356 ink_release_assert(0);
01357
01358 case ICP_POST_COMPLETION:
01359 {
01360 if (!MUTEX_TAKE_TRY_LOCK_FOR(mutex, this_ethread(), _cont)) {
01361 return EVENT_CONT;
01362 }
01363 Debug("icp", "[ICP_POST_COMPLETION] Id=%d", _sequence_number);
01364
01365
01366 if (!_act.cancelled) {
01367 _cont->handleEvent(_ret_status, (void *) &_ret_sockaddr);
01368 }
01369 MUTEX_UNTAKE_LOCK(mutex, this_ethread());
01370 ICP_SUM_DYN_STAT(total_icp_request_time_stat, (ink_get_hrtime() - _start_time));
01371
01372 _next_state = ICP_WAIT_SEND_COMPLETE;
01373 break;
01374 }
01375 ink_release_assert(0);
01376 case ICP_WAIT_SEND_COMPLETE:
01377 {
01378
01379 if (npending_actions > 0) {
01380 Debug("icp", "[ICP_WAIT_SEND_COMPLETE] Id=%d active=%d", _sequence_number, npending_actions);
01381 } else {
01382 _next_state = ICP_REQUEST_NOT_ACTIVE;
01383
01384 break;
01385 }
01386 }
01387 break;
01388 ink_release_assert(0);
01389 case ICP_REQUEST_NOT_ACTIVE:
01390 {
01391 Debug("icp", "[ICP_REQUEST_NOT_ACTIVE] Id=%d", _sequence_number);
01392 _sequence_number = 0;
01393 if (!_ICPpr->Lock())
01394 return EVENT_CONT;
01395
01396
01397 _ICPpr->DecPendingQuery();
01398 _ICPpr->Unlock();
01399
01400 _next_state = ICP_DONE;
01401 return EVENT_DONE;
01402 }
01403 ink_release_assert(0);
01404
01405 case ICP_DONE:
01406 default:
01407 ink_release_assert(0);
01408
01409 }
01410
01411 }
01412 }
01413
01414 int
01415 ICPRequestCont::ICPResponseMessage(int event, ICPMsg_t * m, Peer * peer)
01416 {
01417 ip_port_text_buffer ipb, ipb2;
01418
01419 if (event == EVENT_INTERVAL) {
01420 _timeout = 0;
01421 remove_all_pendingActions();
01422
01423
01424
01425
01426 if (_received_replies) {
01427 int NumParentPeers = _ICPpr->GetParentPeers();
01428 if (NumParentPeers > 0) {
01429 int n;
01430 Peer *pp;
01431 for (n = 0; n < NumParentPeers; n++) {
01432 pp = _ICPpr->GetNthParentPeer(0, _ICPpr->GetStartingParentPeerBias());
01433 if (pp && !_expected_replies_list.IsBitSet(pp->GetPeerID())
01434 && pp->isUp()) {
01435 ats_ip_copy(&_ret_sockaddr.sa, pp->GetIP());
01436 _ret_sockaddr.port() = htons(static_cast<ParentSiblingPeer*>(pp)->GetProxyPort());
01437 _ret_status = ICP_LOOKUP_FOUND;
01438
01439 Debug("icp",
01440 "ICP timeout using parent Id=%d from [%s] return [%s]",
01441 _sequence_number,
01442 ats_ip_nptop(pp->GetIP(), ipb, sizeof(ipb)),
01443 ats_ip_nptop(&_ret_sockaddr, ipb2, sizeof(ipb2))
01444 );
01445 return EVENT_DONE;
01446 }
01447 }
01448 }
01449 }
01450
01451 Debug("icp", "ICP Response timeout for Id=%d", _sequence_number);
01452 return EVENT_DONE;
01453
01454 } else {
01455
01456
01457
01458 ink_assert(m->h.requestno == _sequence_number);
01459
01460 switch (m->h.opcode) {
01461 case ICP_OP_HIT:
01462 case ICP_OP_HIT_OBJ:
01463 {
01464
01465 _timeout->cancel(this);
01466 _timeout = 0;
01467
01468 ICP_INCREMENT_DYN_STAT(icp_query_hits_stat);
01469 ++_received_replies;
01470 ats_ip_copy(&_ret_sockaddr, peer->GetIP());
01471 _ret_sockaddr.port() = htons(static_cast<ParentSiblingPeer*>(peer)->GetProxyPort());
01472 _ret_status = ICP_LOOKUP_FOUND;
01473
01474 Debug("icp",
01475 "ICP Response HIT for Id=%d from [%s] return [%s]",
01476 _sequence_number,
01477 ats_ip_nptop(peer->GetIP(), ipb, sizeof(ipb)),
01478 ats_ip_nptop(&_ret_sockaddr, ipb2, sizeof(ipb2))
01479 );
01480 return EVENT_DONE;
01481 }
01482 case ICP_OP_MISS:
01483 case ICP_OP_ERR:
01484 case ICP_OP_MISS_NOFETCH:
01485 case ICP_OP_DENIED:
01486 {
01487 Debug("icp", "ICP MISS response for Id=%d from [%s]",
01488 _sequence_number, ats_ip_nptop(peer->GetIP(), ipb, sizeof(ipb)));
01489
01490
01491 int Id = peer->GetPeerID();
01492 if (_expected_replies_list.IsBitSet(Id)) {
01493
01494 _expected_replies_list.ClearBit(Id);
01495 ++_received_replies;
01496 }
01497
01498 if (_received_replies < _expected_replies)
01499 return EVENT_CONT;
01500
01501
01502 _timeout->cancel(this);
01503 _timeout = 0;
01504
01505 ICP_INCREMENT_DYN_STAT(icp_query_misses_stat);
01506
01507
01508
01509
01510 if (_ICPpr->GetParentPeers() > 0) {
01511
01512
01513 Peer *p = NULL;
01514
01515 {
01516 int i;
01517 for (i = 0; i < _ICPpr->GetParentPeers(); i++) {
01518 p = _ICPpr->GetNthParentPeer(0, _ICPpr->GetStartingParentPeerBias());
01519
01520 if (p->isUp())
01521 break;
01522 }
01523
01524 if (i >= _ICPpr->GetParentPeers()) {
01525 Debug("icp", "None of the %d ICP parent(s) is up", _ICPpr->GetParentPeers());
01526 p = NULL;
01527 }
01528 }
01529 if (p) {
01530 ats_ip_copy(&_ret_sockaddr, p->GetIP());
01531 _ret_sockaddr.port() = htons(static_cast<ParentSiblingPeer*>(p)->GetProxyPort());
01532 _ret_status = ICP_LOOKUP_FOUND;
01533
01534 Debug("icp", "ICP ALL MISS(1) for Id=%d return [%s]",
01535 _sequence_number, ats_ip_nptop(&_ret_sockaddr, ipb, sizeof(ipb)));
01536 return EVENT_DONE;
01537 }
01538 }
01539 Debug("icp", "ICP ALL MISS(2) for Id=%d return [%s]",
01540 _sequence_number, ats_ip_nptop(&_ret_sockaddr, ipb, sizeof(ipb)));
01541 return EVENT_DONE;
01542 }
01543 default:
01544 {
01545 ICP_INCREMENT_DYN_STAT(invalid_icp_query_response_stat);
01546
01547 Warning("Invalid ICP response, op=%d reqno=%d ip=%s",
01548 m->h.opcode, m->h.requestno, ats_ip_ntop(peer->GetIP(), ipb, sizeof(ipb)));
01549 return EVENT_CONT;
01550 }
01551
01552 }
01553 }
01554 }
01555
01556
01557
01558
01559
01560
01561 void
01562 ICPRequestCont::NetToHostICPMsg(ICPMsg_t * in, ICPMsg_t * out)
01563 {
01564 out->h.opcode = in->h.opcode;
01565 out->h.version = in->h.version;
01566 out->h.msglen = ntohs(in->h.msglen);
01567 out->h.requestno = ntohl(in->h.requestno);
01568 out->h.optionflags = ntohl(in->h.optionflags);
01569 out->h.optiondata = ntohl(in->h.optiondata);
01570 out->h.shostid = ntohl(in->h.shostid);
01571
01572 switch (in->h.opcode) {
01573 case ICP_OP_QUERY:
01574 {
01575 memcpy((char *) &out->un.query.rhostid,
01576 (char *) ((char *) (&in->h.shostid) + sizeof(in->h.shostid)), sizeof(out->un.query.rhostid));
01577 out->un.query.rhostid = ntohl(out->un.query.rhostid);
01578 out->un.query.URL = (char *) ((char *) (&in->h.shostid) + sizeof(in->h.shostid) + sizeof(out->un.query.rhostid));
01579 break;
01580 }
01581 case ICP_OP_HIT:
01582 {
01583 out->un.hit.URL = (char *)((char *) (&in->h.shostid) + sizeof(in->h.shostid));
01584 break;
01585 }
01586 case ICP_OP_MISS:
01587 {
01588 out->un.miss.URL = (char *)((char *) (&in->h.shostid) + sizeof(in->h.shostid));
01589 break;
01590 }
01591 case ICP_OP_HIT_OBJ:
01592 {
01593 out->un.hitobj.URL = (char *)((char *) (&in->h.shostid) + sizeof(in->h.shostid));
01594
01595
01596 out->un.hitobj.p_objsize = (char *) (out->un.hitobj.URL + strlen(out->un.hitobj.URL));
01597 memcpy((char *) &out->un.hitobj.objsize, out->un.hitobj.p_objsize, sizeof(out->un.hitobj.objsize));
01598 out->un.hitobj.objsize = ntohs(out->un.hitobj.objsize);
01599 out->un.hitobj.data = (char *) (out->un.hitobj.p_objsize + sizeof(out->un.hitobj.objsize));
01600 break;
01601 }
01602 default:
01603 break;
01604 }
01605 }
01606
01607 int
01608 ICPRequestCont::BuildICPMsg(ICPopcode_t op, unsigned int seqno,
01609 int optflags, int optdata, int shostid,
01610 void *data, int datalen, struct msghdr *mhdr, struct iovec *iov, ICPMsg_t * icpmsg)
01611 {
01612
01613 if (op == ICP_OP_QUERY) {
01614 icpmsg->un.query.rhostid = htonl(0);
01615 icpmsg->un.query.URL = (char *) data;
01616
01617 mhdr->msg_iov = iov;
01618 mhdr->msg_iovlen = 3;
01619
01620 iov[0].iov_base = (caddr_t) icpmsg;
01621 iov[0].iov_len = sizeof(ICPMsgHdr_t);
01622
01623 iov[1].iov_base = (caddr_t) & icpmsg->un.query.rhostid;
01624 iov[1].iov_len = sizeof(icpmsg->un.query.rhostid);
01625
01626 iov[2].iov_base = (caddr_t) data;
01627 iov[2].iov_len = datalen;
01628 icpmsg->h.msglen = htons(iov[0].iov_len + iov[1].iov_len + iov[2].iov_len);
01629
01630 } else if (op == ICP_OP_HIT) {
01631 icpmsg->un.hit.URL = (char *) data;
01632
01633 mhdr->msg_iov = iov;
01634 mhdr->msg_iovlen = 2;
01635
01636 iov[0].iov_base = (caddr_t) icpmsg;
01637 iov[0].iov_len = sizeof(ICPMsgHdr_t);
01638
01639 iov[1].iov_base = (caddr_t) data;
01640 iov[1].iov_len = datalen;
01641 icpmsg->h.msglen = htons(iov[0].iov_len + iov[1].iov_len);
01642
01643 } else if (op == ICP_OP_MISS) {
01644 icpmsg->un.miss.URL = (char *) data;
01645
01646 mhdr->msg_iov = iov;
01647 mhdr->msg_iovlen = 2;
01648
01649 iov[0].iov_base = (caddr_t) icpmsg;
01650 iov[0].iov_len = sizeof(ICPMsgHdr_t);
01651
01652 iov[1].iov_base = (caddr_t) data;
01653 iov[1].iov_len = datalen;
01654 icpmsg->h.msglen = htons(iov[0].iov_len + iov[1].iov_len);
01655
01656 } else {
01657 ink_release_assert(0);
01658 return 1;
01659 }
01660
01661 mhdr->msg_name = (caddr_t) 0;
01662 mhdr->msg_namelen = 0;
01663
01664 #if !defined(linux) && !defined(freebsd) && !defined(darwin) && !defined(solaris) \
01665 && !defined(openbsd)
01666 mhdr->msg_accrights = (caddr_t) 0;
01667 mhdr->msg_accrightslen = 0;
01668 #elif !defined(solaris)
01669 mhdr->msg_control = 0;
01670 mhdr->msg_controllen = 0;
01671 mhdr->msg_flags = 0;
01672 #endif
01673
01674 icpmsg->h.opcode = op;
01675 icpmsg->h.version = ICP_VERSION_2;
01676 icpmsg->h.requestno = htonl(seqno);
01677 icpmsg->h.optionflags = htonl(optflags);
01678 icpmsg->h.optiondata = htonl(optdata);
01679 icpmsg->h.shostid = htonl(shostid);
01680
01681 return 0;
01682 }
01683
01684
01685 unsigned int
01686 ICPRequestCont::ICPRequestSeqno = 1;
01687 Queue<ICPRequestCont> ICPRequestQueue[ICPRequestCont::ICP_REQUEST_HASH_SIZE];
01688
01689
01690 unsigned int
01691 ICPRequestCont::ICPReqSeqNumber()
01692 {
01693
01694 unsigned int res = 0;
01695 do {
01696 res = (unsigned int) ink_atomic_increment((int *) &ICPRequestSeqno, 1);
01697 } while (!res);
01698
01699 return res;
01700 }
01701
01702
01703 inline int
01704 ICPRequestCont::ICPRequestHash(unsigned int seqno)
01705 {
01706
01707 return seqno % ICP_REQUEST_HASH_SIZE;
01708 }
01709
01710
01711 int
01712 ICPRequestCont::AddICPRequest(unsigned int seqno, ICPRequestCont * r)
01713 {
01714
01715
01716
01717 ICPRequestQueue[ICPRequestHash(seqno)].enqueue(r);
01718 return 0;
01719 }
01720
01721
01722 ICPRequestCont *
01723 ICPRequestCont::FindICPRequest(unsigned int seqno)
01724 {
01725
01726 int hash = ICPRequestHash(seqno);
01727 ICPRequestCont *r;
01728
01729 for (r = (ICPRequestCont *) ICPRequestQueue[hash].head; r; r = (ICPRequestCont *) r->link.next) {
01730 if (r->_sequence_number == seqno)
01731 return r;
01732 }
01733 return (ICPRequestCont *) 0;
01734 }
01735
01736
01737 int
01738 ICPRequestCont::RemoveICPRequest(unsigned int seqno)
01739 {
01740
01741
01742
01743
01744 if (!seqno) {
01745 return 1;
01746 }
01747 int hash = ICPRequestHash(seqno);
01748 ICPRequestCont *r;
01749
01750 for (r = (ICPRequestCont *) ICPRequestQueue[hash].head; r; r = (ICPRequestCont *) r->link.next) {
01751 if (r->_sequence_number == seqno) {
01752 ICPRequestQueue[hash].remove(r);
01753 return 0;
01754 }
01755 }
01756 return 1;
01757 }
01758
01759
01760
01761
01762
01763
01764
01765
01766
01767
01768
01769 void
01770 initialize_thread_for_icp(EThread * e)
01771 {
01772 (void) e;
01773 }
01774
01775 ICPProcessor icpProcessorInternal;
01776 ICPProcessorExt icpProcessor(&icpProcessorInternal);
01777
01778 ICPProcessor::ICPProcessor()
01779 : _l(0), _Initialized(0), _AllowIcpQueries(0),
01780 _PendingIcpQueries(0), _ICPConfig(0), _ICPPeriodic(0), _ICPHandler(0),
01781 _mcastCB_handler(NULL), _PeriodicEvent(0), _ICPHandlerEvent(0),
01782 _nPeerList(-1), _LocalPeer(0),
01783 _curSendPeer(0), _nSendPeerList(-1),
01784 _curRecvPeer(0), _nRecvPeerList(-1), _curParentPeer(0), _nParentPeerList(-1), _ValidPollData(0), _last_recv_peer_bias(0)
01785 {
01786 memset((void *)_PeerList, 0, sizeof(_PeerList[PEER_LIST_SIZE]));
01787 memset((void *)_SendPeerList, 0, sizeof(_SendPeerList[SEND_PEER_LIST_SIZE]));
01788 memset((void *)_RecvPeerList, 0, sizeof(_RecvPeerList[RECV_PEER_LIST_SIZE]));
01789 memset((void *)_ParentPeerList, 0, sizeof(_ParentPeerList[PARENT_PEER_LIST_SIZE]));
01790 memset((void *)_PeerIDtoPollIndex, 0, sizeof(_PeerIDtoPollIndex[PEER_ID_POLL_INDEX_SIZE]));
01791 }
01792
01793 ICPProcessor::~ICPProcessor()
01794 {
01795 if (_ICPPeriodic) {
01796 MUTEX_TAKE_LOCK(_ICPPeriodic->mutex, this_ethread());
01797 _PeriodicEvent->cancel();
01798 Mutex_unlock(_ICPPeriodic->mutex, this_ethread());
01799 }
01800
01801 if (_ICPHandler) {
01802 MUTEX_TAKE_LOCK(_ICPHandler->mutex, this_ethread());
01803 _ICPHandlerEvent->cancel();
01804 Mutex_unlock(_ICPHandler->mutex, this_ethread());
01805 }
01806 }
01807
01808 void
01809 ICPProcessor::start()
01810 {
01811
01812
01813
01814
01815 if (_Initialized)
01816 return;
01817
01818
01819
01820
01821
01822 _l = new AtomicLock();
01823
01824
01825
01826
01827
01828 ICPHandlerCont::ICPDataBuf_IOBuffer_sizeindex = iobuffer_size_to_index(MAX_ICP_MSGSIZE, MAX_BUFFER_SIZE_INDEX);
01829
01830
01831
01832
01833 InitICPStatCallbacks();
01834
01835
01836
01837
01838 _ICPConfig = new ICPConfiguration();
01839
01840 _mcastCB_handler = new ICPHandlerCont(this);
01841 SET_CONTINUATION_HANDLER(_mcastCB_handler, (ICPHandlerContHandler) & ICPHandlerCont::TossEvent);
01842
01843
01844
01845
01846
01847 if (_ICPConfig->globalConfig()->ICPconfigured()) {
01848 if (BuildPeerList() == 0) {
01849 if (SetupListenSockets() == 0) {
01850 _AllowIcpQueries = 1;
01851 }
01852 }
01853 }
01854 DumpICPConfig();
01855
01856
01857
01858
01859 _ICPPeriodic = new ICPPeriodicCont(this);
01860 SET_CONTINUATION_HANDLER(_ICPPeriodic, (ICPPeriodicContHandler) & ICPPeriodicCont::PeriodicEvent);
01861 _PeriodicEvent = eventProcessor.schedule_every(_ICPPeriodic, HRTIME_MSECONDS(ICPPeriodicCont::PERIODIC_INTERVAL), ET_ICP);
01862
01863
01864
01865
01866 _ICPHandler = new ICPHandlerCont(this);
01867 SET_CONTINUATION_HANDLER(_ICPHandler, (ICPHandlerContHandler) & ICPHandlerCont::PeriodicEvent);
01868 _ICPHandlerEvent = eventProcessor.schedule_every(_ICPHandler,
01869 HRTIME_MSECONDS(ICPHandlerCont::ICP_HANDLER_INTERVAL), ET_ICP);
01870
01871
01872
01873 if (!gclient_request.valid()) {
01874 gclient_request.create(HTTP_TYPE_REQUEST);
01875 }
01876 _Initialized = 1;
01877 }
01878
01879 Action *
01880 ICPProcessor::ICPQuery(Continuation * c, URL * url)
01881 {
01882
01883
01884
01885
01886
01887 EThread *thread = this_ethread();
01888 ProxyMutex *mutex = thread->mutex;
01889 ICPRequestCont *rc = new(ICPRequestCont_allocator.alloc()) ICPRequestCont(this, c, url);
01890
01891 ICP_INCREMENT_DYN_STAT(icp_query_requests_stat);
01892
01893 rc->SetRequestStartTime();
01894 SET_CONTINUATION_HANDLER(rc, (ICPRequestContHandler) & ICPRequestCont::ICPRequestEvent);
01895 eventProcessor.schedule_imm(rc, ET_ICP);
01896
01897 return rc->GetActionPtr();
01898 }
01899
01900 int
01901 ICPProcessor::BuildPeerList()
01902 {
01903
01904
01905
01906
01907
01908
01909
01910
01911
01912
01913
01914
01915
01916
01917
01918
01919
01920
01921
01922
01923
01924
01925
01926
01927
01928
01929
01930
01931
01932 PeerConfigData *Pcfg;
01933 Peer *P;
01934 Peer *mcP;
01935 int index;
01936 int status;
01937 PeerType_t type;
01938
01939
01940
01941
01942
01943
01944
01945 Pcfg = _ICPConfig->indexToPeerConfigData(0);
01946 ink_strlcpy(Pcfg->_hostname, "localhost", sizeof(Pcfg->_hostname));
01947 Pcfg->_ctype = PeerConfigData::CTYPE_LOCAL;
01948
01949
01950 IpEndpoint tmp_ip;
01951 if (!mgmt_getAddrForIntr(GetConfig()->globalConfig()->ICPinterface(), &tmp_ip.sa)) {
01952 Pcfg->_ip_addr._family = AF_UNSPEC;
01953
01954 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP interface [%s] has no IP address", GetConfig()->globalConfig()->ICPinterface());
01955 } else {
01956 Pcfg->_my_ip_addr = Pcfg->_ip_addr = tmp_ip;
01957 }
01958 Pcfg->_proxy_port = 0;
01959 Pcfg->_icp_port = GetConfig()->globalConfig()->ICPport();
01960 Pcfg->_mc_member = 0;
01961 Pcfg->_mc_ip_addr._family = AF_UNSPEC;
01962 Pcfg->_mc_ttl = 0;
01963
01964
01965
01966
01967
01968 P = new ParentSiblingPeer(PEER_LOCAL, Pcfg, this);
01969 status = AddPeer(P);
01970 ink_release_assert(status);
01971 status = AddPeerToRecvList(P);
01972 ink_release_assert(status);
01973 _LocalPeer = P;
01974
01975 for (index = 1; index < MAX_DEFINED_PEERS; ++index) {
01976 Pcfg = _ICPConfig->indexToPeerConfigData(index);
01977 type = PeerConfigData::CTypeToPeerType_t(Pcfg->GetCType());
01978
01979
01980
01981
01982
01983
01984 if (Pcfg->GetIPAddr() == _LocalPeer->GetIP())
01985 continue;
01986
01987 if ((type == PEER_PARENT) || (type == PEER_SIBLING)) {
01988
01989 if (Pcfg->MultiCastMember()) {
01990 mcP = FindPeer(Pcfg->GetMultiCastIPAddr(), Pcfg->GetICPPort());
01991 if (!mcP) {
01992
01993
01994
01995 mcP = new MultiCastPeer(Pcfg->GetMultiCastIPAddr(), Pcfg->GetICPPort(), Pcfg->GetMultiCastTTL(), this);
01996 status = AddPeer(mcP);
01997 ink_assert(status);
01998 status = AddPeerToSendList(mcP);
01999 ink_assert(status);
02000 status = AddPeerToRecvList(mcP);
02001 ink_assert(status);
02002 }
02003
02004
02005
02006 P = new ParentSiblingPeer(type, Pcfg, this);
02007 status = AddPeer(P);
02008 ink_assert(status);
02009 status = ((MultiCastPeer *) mcP)->AddMultiCastChild(P);
02010 ink_assert(status);
02011
02012 } else {
02013
02014
02015
02016 P = new ParentSiblingPeer(type, Pcfg, this);
02017 status = AddPeer(P);
02018 ink_assert(status);
02019 status = AddPeerToSendList(P);
02020 ink_assert(status);
02021 }
02022
02023
02024
02025 if (type == PEER_PARENT) {
02026 status = AddPeerToParentList(P);
02027 ink_assert(status);
02028 }
02029 }
02030 }
02031 return 0;
02032 }
02033
02034 void
02035 ICPProcessor::FreePeerList()
02036 {
02037
02038 int index;
02039 for (index = 0; index < (_nPeerList + 1); ++index) {
02040 if (_PeerList[index]) {
02041 _PeerList[index] = 0;
02042 }
02043 }
02044
02045 _nPeerList = -1;
02046 _LocalPeer = (Peer *) 0;
02047 _curSendPeer = 0;
02048 _nSendPeerList = -1;
02049 _curRecvPeer = 0;
02050 _nRecvPeerList = -1;
02051 _curParentPeer = 0;
02052 _nParentPeerList = -1;
02053 _ValidPollData = 0;
02054 _last_recv_peer_bias = 0;
02055
02056 for (index = 0; index < PEER_LIST_SIZE; index++) {
02057 _PeerList[index] = 0;
02058 }
02059 for (index = 0; index < SEND_PEER_LIST_SIZE; index++) {
02060 _SendPeerList[index] = 0;
02061 }
02062 for (index = 0; index < RECV_PEER_LIST_SIZE; index++) {
02063 _RecvPeerList[index] = 0;
02064 }
02065 for (index = 0; index < PARENT_PEER_LIST_SIZE; index++) {
02066 _ParentPeerList[index] = 0;
02067 }
02068 memset((void *) _PeerIDtoPollIndex, 0, sizeof(_PeerIDtoPollIndex[PEER_ID_POLL_INDEX_SIZE]));
02069 }
02070
02071 int
02072 ICPProcessor::SetupListenSockets()
02073 {
02074 int allow_null_configuration;
02075
02076 if ((_ICPConfig->globalConfig()->ICPconfigured() == ICP_MODE_RECEIVE_ONLY)
02077 && _ICPConfig->globalConfig()->ICPReplyToUnknownPeer()) {
02078 allow_null_configuration = 1;
02079 } else {
02080 allow_null_configuration = 0;
02081 }
02082
02083
02084
02085
02086
02087
02088 if (!_LocalPeer) {
02089 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined local Peer");
02090 return 1;
02091 }
02092
02093 if (GetSendPeers() == 0) {
02094 if (!allow_null_configuration) {
02095 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined send Peer(s)");
02096 return 1;
02097 }
02098 }
02099 if (GetRecvPeers() == 0) {
02100 if (!allow_null_configuration) {
02101 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined receive Peer(s)");
02102 return 1;
02103 }
02104 }
02105
02106
02107
02108 Peer *P;
02109 int status;
02110 int index;
02111 ip_port_text_buffer ipb, ipb2;
02112 for (index = 0; index < (_nPeerList + 1); ++index) {
02113
02114 if ((P = _PeerList[index])) {
02115
02116 if ((P->GetType() == PEER_PARENT)
02117 || (P->GetType() == PEER_SIBLING)) {
02118 ParentSiblingPeer *pPS = (ParentSiblingPeer *) P;
02119
02120 pPS->GetChan()->setRemote(pPS->GetIP());
02121
02122 } else if (P->GetType() == PEER_MULTICAST) {
02123 MultiCastPeer *pMC = (MultiCastPeer *) P;
02124 ink_assert(_mcastCB_handler != NULL);
02125 status = pMC->GetSendChan()->setup_mc_send(pMC->GetIP(), _LocalPeer->GetIP(), NON_BLOCKING, pMC->GetTTL(), DISABLE_MC_LOOPBACK, _mcastCB_handler);
02126 if (status) {
02127
02128 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP MC send setup failed, res=%d, ip=%s bind_ip=%s",
02129 status,
02130 ats_ip_nptop(pMC->GetIP(), ipb, sizeof(ipb)),
02131 ats_ip_nptop(_LocalPeer->GetIP(), ipb2, sizeof(ipb2))
02132 );
02133 return 1;
02134 }
02135
02136 status = pMC->GetRecvChan()->setup_mc_receive(pMC->GetIP(),
02137 _LocalPeer->GetIP(),
02138 NON_BLOCKING, pMC->GetSendChan(), _mcastCB_handler);
02139 if (status) {
02140
02141 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP MC recv setup failed, res=%d, ip=%s",
02142 status, ats_ip_nptop(pMC->GetIP(), ipb, sizeof(ipb)));
02143 return 1;
02144 }
02145 }
02146 }
02147 }
02148
02149
02150
02151
02152
02153 ParentSiblingPeer *pPS = (ParentSiblingPeer *) ((Peer *) _LocalPeer);
02154
02155 NetVCOptions options;
02156 options.local_ip.assign(pPS->GetIP());
02157 options.local_port = pPS->GetICPPort();
02158 options.ip_proto = NetVCOptions::USE_UDP;
02159 options.addr_binding = NetVCOptions::INTF_ADDR;
02160 status = pPS->GetChan()->open(options);
02161 if (status) {
02162
02163 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP bind_connect failed, res=%d, ip=%s",
02164 status,
02165 ats_ip_nptop(pPS->GetIP(), ipb, sizeof(ipb))
02166 );
02167 return 1;
02168 }
02169
02170 return 0;
02171 }
02172
02173 void
02174 ICPProcessor::ShutdownListenSockets()
02175 {
02176
02177
02178
02179 ink_assert(!PendingQuery());
02180 Peer *P;
02181
02182 int index;
02183 for (index = 0; index < (_nPeerList + 1); ++index) {
02184 if ((P = _PeerList[index])) {
02185 if (P->GetType() == PEER_LOCAL) {
02186 ParentSiblingPeer *pPS = (ParentSiblingPeer *) P;
02187 (void) pPS->GetChan()->close();
02188
02189 } else if (P->GetType() == PEER_MULTICAST) {
02190 MultiCastPeer *pMC = (MultiCastPeer *) P;
02191 (void) pMC->GetSendChan()->close();
02192 (void) pMC->GetRecvChan()->close();
02193 }
02194 }
02195 }
02196 }
02197
02198 int
02199 ICPProcessor::Reconfigure(int , int )
02200 {
02201
02202
02203
02204
02205
02206 ink_assert(_ICPConfig->HaveLock());
02207 ink_assert(!AllowICPQueries());
02208 ink_assert(!PendingQuery());
02209
02210
02211
02212
02213 ShutdownListenSockets();
02214 FreePeerList();
02215
02216
02217
02218
02219 _ICPConfig->UpdateGlobalConfig();
02220 _ICPConfig->UpdatePeerConfig();
02221
02222 int status = -1;
02223 if (_ICPConfig->globalConfig()->ICPconfigured()) {
02224 if ((status = BuildPeerList()) == 0) {
02225 status = SetupListenSockets();
02226 }
02227 DumpICPConfig();
02228 }
02229 return status;
02230 }
02231
02232 ICPProcessor::ReconfigState_t
02233 ICPProcessor::ReconfigureStateMachine(ReconfigState_t s, int gconfig_changed, int pconfig_changed)
02234 {
02235
02236
02237
02238
02239
02240
02241
02242
02243
02244 ink_assert(_ICPConfig->HaveLock());
02245 int reconfig_status;
02246
02247 while (1) {
02248
02249 switch (s) {
02250 case RC_RECONFIG:
02251 {
02252 if (!Lock())
02253 return RC_RECONFIG;
02254
02255 if (PendingQuery()) {
02256 DisableICPQueries();
02257 Unlock();
02258 CancelPendingReads();
02259 return RC_RECONFIG;
02260
02261 } else {
02262 DisableICPQueries();
02263 Unlock();
02264
02265 reconfig_status = Reconfigure(gconfig_changed, pconfig_changed);
02266
02267 if (reconfig_status == 0) {
02268 s = RC_ENABLE_ICP;
02269 } else {
02270 s = RC_DONE;
02271 }
02272 break;
02273 }
02274 }
02275
02276 case RC_ENABLE_ICP:
02277 {
02278 if (!Lock())
02279 return RC_ENABLE_ICP;
02280
02281 EnableICPQueries();
02282 Unlock();
02283
02284 s = RC_DONE;
02285 break;
02286 }
02287
02288 case RC_DONE:
02289 {
02290
02291 _ICPConfig->Unlock();
02292 return RC_DONE;
02293 }
02294 default:
02295 {
02296 ink_release_assert(0);
02297 }
02298
02299 }
02300
02301 }
02302 return RC_DONE;
02303 }
02304
02305 void
02306 ICPProcessor::CancelPendingReads()
02307 {
02308
02309
02310
02311 ICPRequestCont *r = new(ICPRequestCont_allocator.alloc())
02312 ICPRequestCont(this, NULL, NULL);
02313 SET_CONTINUATION_HANDLER(r, (ICPRequestContHandler) & ICPRequestCont::NopICPRequestEvent);
02314 r->mutex = new_ProxyMutex();
02315
02316
02317 ICPRequestCont::BuildICPMsg(ICP_OP_HIT, 0, 0 , 0 , 0 ,
02318 (void *) 0, 0, &r->_sendMsgHdr, r->_sendMsgIOV, &r->_ICPmsg);
02319 r->_sendMsgHdr.msg_iovlen = 1;
02320 r->_ICPmsg.h.version = ~r->_ICPmsg.h.version;
02321
02322 Peer *lp = GetLocalPeer();
02323 r->_sendMsgHdr.msg_name = (caddr_t) & (lp->GetSendChan())->addr;
02324 r->_sendMsgHdr.msg_namelen = sizeof((lp->GetSendChan())->addr);
02325 udpNet.sendmsg_re(r, r, lp->GetSendFD(), &r->_sendMsgHdr);
02326 }
02327
02328 Peer *
02329 ICPProcessor::GenericFindListPeer(IpAddr const& ip, uint16_t port, int validListItems, Ptr<Peer> *List)
02330 {
02331 Peer *P;
02332 port = htons(port);
02333 for (int n = 0; n < validListItems; ++n) {
02334 if ((P = List[n])) {
02335 if ((P->GetIP() == ip)
02336 && ((port == 0) || (ats_ip_port_cast(P->GetIP()) == port)))
02337 return P;
02338 }
02339 }
02340 return NULL;
02341 }
02342
02343 Peer *
02344 ICPProcessor::FindPeer(IpAddr const& ip, uint16_t port)
02345 {
02346
02347 return GenericFindListPeer(ip, port, (_nPeerList + 1), _PeerList);
02348 }
02349
02350 Peer *
02351 ICPProcessor::FindSendListPeer(IpAddr const& ip, uint16_t port)
02352 {
02353
02354
02355 return GenericFindListPeer(ip, port, (_nSendPeerList + 1), _SendPeerList);
02356 }
02357
02358 Peer *
02359 ICPProcessor::FindRecvListPeer(IpAddr const& ip, uint16_t port)
02360 {
02361
02362
02363 return GenericFindListPeer(ip, port, (_nRecvPeerList + 1), _RecvPeerList);
02364 }
02365
02366 int
02367 ICPProcessor::AddPeer(Peer * P)
02368 {
02369
02370
02371
02372
02373
02374
02375
02376 if (FindPeer(P->GetIP())) {
02377 ip_port_text_buffer x;
02378
02379 RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "bad icp.config, multiple peer definitions for ip=%s", ats_ip_nptop(P->GetIP(), x, sizeof(x)));
02380
02381 return 0;
02382 } else {
02383
02384 if (_nPeerList + 1 < PEER_LIST_SIZE) {
02385 _nPeerList++;
02386 _PeerList[_nPeerList] = P;
02387 P->SetPeerID(_nPeerList);
02388 return 1;
02389 } else {
02390 return 0;
02391 }
02392 }
02393 }
02394
02395 int
02396 ICPProcessor::AddPeerToRecvList(Peer * P)
02397 {
02398
02399
02400
02401
02402
02403 ink_assert(FindRecvListPeer(IpAddr(P->GetIP()), ats_ip_port_host_order(P->GetIP())) == 0);
02404
02405 if (_nRecvPeerList + 1 < RECV_PEER_LIST_SIZE) {
02406 _nRecvPeerList++;
02407 _RecvPeerList[_nRecvPeerList] = P;
02408 return 1;
02409 } else {
02410 return 0;
02411 }
02412 }
02413
02414 int
02415 ICPProcessor::AddPeerToSendList(Peer * P)
02416 {
02417
02418
02419
02420
02421
02422 ink_assert(FindSendListPeer(IpAddr(P->GetIP()), ats_ip_port_host_order(P->GetIP())) == 0);
02423
02424 if (_nSendPeerList + 1 < SEND_PEER_LIST_SIZE) {
02425 _nSendPeerList++;
02426 _SendPeerList[_nSendPeerList] = P;
02427 return 1;
02428 } else {
02429 return 0;
02430 }
02431 }
02432
02433 int
02434 ICPProcessor::AddPeerToParentList(Peer * P)
02435 {
02436
02437
02438
02439 if (_nParentPeerList + 1 < PARENT_PEER_LIST_SIZE) {
02440 _nParentPeerList++;
02441 _ParentPeerList[_nParentPeerList] = P;
02442 return 1;
02443 } else {
02444 return 0;
02445 }
02446 }
02447
02448