00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 #include "libts.h"
00032 #include "Main.h"
00033 #include "P_EventSystem.h"
00034 #include "P_Cache.h"
00035 #include "P_Net.h"
00036 #include "MgmtUtils.h"
00037 #include "P_RecProcess.h"
00038 #include "ICP.h"
00039 #include "ICPProcessor.h"
00040 #include "ICPlog.h"
00041 #include "logging/Log.h"
00042 #include "logging/LogAccessICP.h"
00043 #include "BaseManager.h"
00044 #include "HdrUtils.h"
00045 
00046 extern CacheLookupHttpConfig global_cache_lookup_config;
00047 HTTPHdr gclient_request;
00048 
00049 
00050 
00051 
00052 
00053 
00054 
00055 
00056 
00057 
00058 
00059 
00060 
00061 
00062 
00063 
00064 
00065 
00066 
00067 
00068 
00069 
00070 
00071 
00072 
00073 
00074 
00075 
00076 
00077 
00078 
00079 
00080 
00081 
00082 
00083 
00084 
00085 
00086 
00087 
00088 
00089 
00090 
00091 
00092 
00093 
00094 
00095 
00096 
00097 
00098 
00099 
00100 
00101 
00102 
00103 
00104 
00105 
00106 
00107 
00108 
00109 
00110 
00111 
00112 
00113 
00114 
00115 
00116 
00117 
00118 
00119 
00120 
00121 
00122 
00123 
00124 
00125 
00126 
00127 
00128 
00129 
00130 
00131 
00132 
00133 
00134 
00135 
00136 
00137 
00138 
00139 
00140 
00141 
00142 
00143 
00144 
00145 
00146 
00147 typedef int (ICPPeerReadCont::*ICPPeerReadContHandler) (int, void *);
00148 typedef int (ICPPeriodicCont::*ICPPeriodicContHandler) (int, void *);
00149 typedef int (ICPHandlerCont::*ICPHandlerContHandler) (int, void *);
00150 typedef int (ICPRequestCont::*ICPRequestContHandler) (int, void *);
00151 
00152 
00153 PluginFreshnessCalcFunc pluginFreshnessCalcFunc = (PluginFreshnessCalcFunc) NULL;
00154 
00155 
00156 
00157 
00158 
00159 
00160 
00161 
00162 int64_t ICPHandlerCont::ICPDataBuf_IOBuffer_sizeindex;
00163 static ClassAllocator <ICPPeerReadCont::PeerReadData>PeerReadDataAllocator("PeerReadDataAllocator");
00164 static ClassAllocator<ICPPeerReadCont> ICPPeerReadContAllocator("ICPPeerReadContAllocator");
00165 
00166 static Action *default_action = NULL;
00167 
00168 
00169 ICPHandlerCont::ICPHandlerCont(ICPProcessor * icpP)
00170  : PeriodicCont(icpP)
00171 {
00172 }
00173 
00174 
00175 int
00176 ICPHandlerCont::TossEvent(int , Event * )
00177 {
00178   return EVENT_DONE;
00179 }
00180 
00181 int
00182 ICPHandlerCont::PeriodicEvent(int event, Event * )
00183 {
00184   int n_peer, valid_peers;
00185   Peer *P;
00186 
00187   
00188   
00189 
00190   valid_peers = _ICPpr->GetRecvPeers();
00191 
00192   
00193   switch (event) {
00194   case EVENT_POLL:
00195   case EVENT_INTERVAL:
00196     {
00197       
00198       for (n_peer = 0; n_peer < valid_peers; ++n_peer) {
00199         P = _ICPpr->GetNthRecvPeer(n_peer, _ICPpr->GetLastRecvPeerBias());
00200         if (!P || (P && !P->IsOnline()))
00201           continue;
00202         if (P->shouldStartRead()) {
00203           P->startingRead();
00204 
00205           
00206 
00207           ICPPeerReadCont *s = ICPPeerReadContAllocator.alloc();
00208           int local_lookup = _ICPpr->GetConfig()->globalConfig()->ICPLocalCacheLookup();
00209 
00210           s->init(_ICPpr, P, local_lookup);
00211           RECORD_ICP_STATE_CHANGE(s, event, ICPPeerReadCont::READ_ACTIVE);
00212 
00213 
00214           
00215 
00216           s->handleEvent(EVENT_INTERVAL, (Event *) 0);
00217         }
00218       }
00219       break;
00220     }
00221   default:
00222     {
00223       ink_release_assert(!"unexpected event");
00224       break;
00225     }
00226   }                             
00227   return EVENT_CONT;
00228 }
00229 
00230 
00231 
00232 
00233 
00234 
00235 ICPPeerReadCont::PeerReadData::PeerReadData()
00236 {
00237   init();
00238 }
00239 
00240 void
00241 ICPPeerReadCont::PeerReadData::init()
00242 {
00243   _start_time = 0;
00244   _mycont = 0;
00245   _peer = 0;
00246   _next_state = READ_ACTIVE;
00247   _cache_lookup_local = 0;
00248   _buf = 0;
00249   _rICPmsg = 0;
00250   _rICPmsg_len = 0;
00251   _cachelookupURL.clear();
00252   _queryResult = 0;
00253   _ICPReqCont = 0;
00254   _bytesReceived = 0;
00255 #ifdef DEBUG_ICP
00256   _nhistory = 0;
00257 #endif
00258   memset((void *) &_sender, 0, sizeof(_sender));
00259 }
00260 
00261 ICPPeerReadCont::PeerReadData::~PeerReadData()
00262 {
00263   reset(1);
00264 }
00265 
00266 void
00267 ICPPeerReadCont::PeerReadData::reset(int full_reset)
00268 {
00269   if (full_reset) {
00270     _peer = 0;
00271     _buf = 0;
00272   }
00273   if (_rICPmsg) {
00274     _rICPmsg = 0;
00275     _rICPmsg_len = 0;
00276   }
00277 
00278   if (_cachelookupURL.valid()) {
00279     _cachelookupURL.destroy();
00280   }
00281 }
00282 
00283 
00284 
00285 
00286 
00287 
00288 ICPPeerReadCont::ICPPeerReadCont():Continuation(0), _object_vc(NULL), _object_read(NULL),
00289 _cache_req_hdr_heap_handle(NULL), _cache_resp_hdr_heap_handle(NULL), _ICPpr(NULL), _state(NULL),
00290 _start_time(0), _recursion_depth(0)
00291 {
00292 }
00293 
00294 void
00295 ICPPeerReadCont::init(ICPProcessor * ICPpr, Peer * p, int lookup_local)
00296 {
00297   PeerReadData *s = PeerReadDataAllocator.alloc();
00298   s->init();
00299   s->_start_time = ink_get_hrtime();
00300   s->_peer = p;
00301   s->_next_state = READ_ACTIVE;
00302   s->_cache_lookup_local = lookup_local;
00303   SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerReadEvent);
00304   _ICPpr = ICPpr;
00305   _state = s;
00306   _recursion_depth = -1;
00307   _object_vc = NULL;
00308   _object_read = NULL;
00309   _cache_req_hdr_heap_handle = NULL;
00310   _cache_resp_hdr_heap_handle = NULL;
00311   mutex = new_ProxyMutex();
00312 }
00313 
00314 ICPPeerReadCont::~ICPPeerReadCont()
00315 {
00316   reset(1);                     
00317 }
00318 
00319 void
00320 ICPPeerReadCont::reset(int full_reset)
00321 {
00322   mutex = 0;
00323   if (this->_state) {
00324     this->_state->reset(full_reset);
00325     PeerReadDataAllocator.free(this->_state);
00326   }
00327   if (_cache_req_hdr_heap_handle) {
00328     ats_free(_cache_req_hdr_heap_handle);
00329     _cache_req_hdr_heap_handle = NULL;
00330   }
00331   if (_cache_resp_hdr_heap_handle) {
00332     ats_free(_cache_resp_hdr_heap_handle);
00333     _cache_resp_hdr_heap_handle = NULL;
00334   }
00335 }
00336 
00337 int
00338 ICPPeerReadCont::ICPPeerReadEvent(int event, Event * e)
00339 {
00340   switch (event) {
00341   case EVENT_INTERVAL:
00342   case EVENT_IMMEDIATE:
00343     {
00344       break;
00345     }
00346   case NET_EVENT_DATAGRAM_WRITE_COMPLETE:
00347   case NET_EVENT_DATAGRAM_READ_COMPLETE:
00348   case NET_EVENT_DATAGRAM_READ_ERROR:
00349   case NET_EVENT_DATAGRAM_WRITE_ERROR:
00350     {
00351       ink_assert((event != NET_EVENT_DATAGRAM_READ_COMPLETE)
00352                  || (_state->_next_state == READ_DATA_DONE));
00353       ink_assert((event != NET_EVENT_DATAGRAM_WRITE_COMPLETE)
00354                  || (_state->_next_state == WRITE_DONE));
00355 
00356       ink_release_assert(this == (ICPPeerReadCont *)
00357                          completionUtil::getHandle(e));
00358       break;
00359     }
00360   case CACHE_EVENT_LOOKUP_FAILED:
00361   case CACHE_EVENT_LOOKUP:
00362     {
00363       ink_assert(_state->_next_state == AWAITING_CACHE_LOOKUP_RESPONSE);
00364       break;
00365     }
00366   default:
00367     {
00368       ink_release_assert(!"unexpected event");
00369     }
00370   }                             
00371 
00372   
00373   if (PeerReadStateMachine(_state, e) == EVENT_CONT) {
00374     eventProcessor.schedule_in(this, RETRY_INTERVAL, ET_ICP);
00375     return EVENT_DONE;
00376 
00377   } else if (_state->_next_state == READ_PROCESSING_COMPLETE) {
00378     _state->_peer->cancelRead();
00379     this->reset(1);             
00380     ICPPeerReadContAllocator.free(this);
00381     return EVENT_DONE;
00382 
00383   } else {
00384     return EVENT_DONE;
00385   }
00386 }
00387 
00388 int
00389 ICPPeerReadCont::StaleCheck(int event, Event * )
00390 {
00391   ip_port_text_buffer ipb;
00392 
00393   ink_release_assert(mutex->thread_holding == this_ethread());
00394 
00395   Debug("icp-stale", "Stale check res=%d for id=%d, [%s] from [%s]",
00396         event, _state->_rICPmsg->h.requestno,
00397     _state->_rICPmsg->un.query.URL, ats_ip_nptop(&_state->_sender, ipb, sizeof(ipb)));
00398 
00399   switch (event) {
00400   case ICP_STALE_OBJECT:
00401     {
00402       _state->_queryResult = CACHE_EVENT_LOOKUP_FAILED;
00403       break;
00404     }
00405   case ICP_FRESH_OBJECT:
00406     {
00407       _state->_queryResult = CACHE_EVENT_LOOKUP;
00408       break;
00409     }
00410   default:
00411     {
00412       Debug("icp-stale", "ICPPeerReadCont::StaleCheck: Invalid Event %d\n", event);
00413       _state->_queryResult = CACHE_EVENT_LOOKUP_FAILED;
00414       break;
00415     }
00416   }
00417   _object_vc->do_io(VIO::CLOSE);
00418   _object_vc = 0;
00419   SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerReadEvent);
00420   return handleEvent(_state->_queryResult, 0);
00421 }
00422 
00423 int
00424 ICPPeerReadCont::ICPPeerQueryEvent(int event, Event * e)
00425 {
00426   ip_port_text_buffer ipb;
00427 
00428   Debug("icp", "Remote Query lookup res=%d for id=%d, [%s] from [%s]",
00429         event, _state->_rICPmsg->h.requestno,
00430     _state->_rICPmsg->un.query.URL, ats_ip_nptop(&_state->_sender, ipb, sizeof(ipb)));
00431   if (pluginFreshnessCalcFunc) {
00432     switch (event) {
00433     case CACHE_EVENT_OPEN_READ:
00434       {
00435         _object_vc = (CacheVConnection *) e;
00436         SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::StaleCheck);
00437         _object_vc->get_http_info(&_object_read);
00438         (*pluginFreshnessCalcFunc) ((void *) this);
00439         return EVENT_DONE;
00440       }
00441     case CACHE_EVENT_OPEN_READ_FAILED:
00442       {
00443         event = CACHE_EVENT_LOOKUP_FAILED;
00444         break;
00445       }
00446     default:
00447       break;
00448     }
00449   }
00450   
00451   _state->_queryResult = event;
00452   SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerReadEvent);
00453   return handleEvent(event, e);
00454 }
00455 
00456 int
00457 ICPPeerReadCont::ICPPeerQueryCont(int , Event * )
00458 {
00459   ip_port_text_buffer ipb;
00460   Action *a;
00461 
00462   
00463 
00464   ((char *) _state->_rICPmsg)[MAX_ICP_MSGSIZE - 1] = 0; 
00465   _state->_cachelookupURL.create(NULL);
00466   const char *qurl = (const char *) _state->_rICPmsg->un.query.URL;
00467   _state->_cachelookupURL.parse(qurl, strlen(qurl));
00468   Debug("icp", "Remote Query for id=%d, [%s] from [%s]",
00469         _state->_rICPmsg->h.requestno,
00470         _state->_rICPmsg->un.query.URL,
00471     ats_ip_nptop(&_state->_sender, ipb, sizeof(ipb))
00472   );
00473 
00474   SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerQueryEvent);
00475   if (_state->_rICPmsg->un.query.URL && *_state->_rICPmsg->un.query.URL) {
00476     _state->_queryResult = ~CACHE_EVENT_LOOKUP_FAILED;
00477     _start_time = ink_get_hrtime();
00478     if (pluginFreshnessCalcFunc && _ICPpr->GetConfig()->globalConfig()->ICPStaleLookup()) {
00479 
00480       
00481       
00482 
00483       a = cacheProcessor.open_read(this, &_state->_cachelookupURL, false,
00484                                    &gclient_request, &global_cache_lookup_config, (time_t) 0);
00485     } else {
00486       a = cacheProcessor.lookup(this, &_state->_cachelookupURL, false, _state->_cache_lookup_local);
00487     }
00488     if (!a) {
00489       a = ACTION_IO_ERROR;
00490     }
00491     if (a == ACTION_RESULT_DONE) {
00492       return EVENT_DONE;        
00493     } else if (a == ACTION_IO_ERROR) {
00494       handleEvent(CACHE_EVENT_LOOKUP_FAILED, 0);
00495       return EVENT_DONE;        
00496     } else {
00497       return EVENT_CONT;        
00498     }
00499   } else {
00500     
00501     handleEvent(CACHE_EVENT_LOOKUP_FAILED, 0);
00502     return EVENT_DONE;          
00503   }
00504 }
00505 
00506 struct AutoReference
00507 {
00508   AutoReference(int *cnt)
00509   {
00510     _cnt = cnt;
00511     (*_cnt)++;
00512   }
00513    ~AutoReference()
00514   {
00515     (*_cnt)--;
00516   }
00517   int *_cnt;
00518 };
00519 
00520 int
00521 ICPPeerReadCont::PeerReadStateMachine(PeerReadData * s, Event * e)
00522 {
00523   AutoReference l(&_recursion_depth);
00524   ip_port_text_buffer ipb; 
00525   
00526   
00527   
00528   MUTEX_TRY_LOCK(lock, this->mutex, this_ethread());
00529   if (!lock) {
00530     
00531     
00532     return EVENT_CONT;          
00533   }
00534 
00535   while (1) {                   
00536 
00537     switch (s->_next_state) {
00538     case READ_ACTIVE:
00539       {
00540         ink_release_assert(_recursion_depth == 0);
00541         if (!_ICPpr->Lock())
00542           return EVENT_CONT;    
00543 
00544         bool valid_peer = (_ICPpr->IdToPeer(s->_peer->GetPeerID()) == s->_peer);
00545 
00546         if (valid_peer && _ICPpr->AllowICPQueries()
00547             && _ICPpr->GetConfig()->globalConfig()->ICPconfigured()) {
00548 
00549           
00550           _ICPpr->IncPendingQuery();
00551           _ICPpr->Unlock();
00552 
00553           s->_next_state = READ_DATA;
00554           RECORD_ICP_STATE_CHANGE(s, 0, READ_DATA);
00555           break;                
00556 
00557         } else {
00558           _ICPpr->Unlock();
00559 
00560           
00561           s->_next_state = READ_PROCESSING_COMPLETE;
00562           RECORD_ICP_STATE_CHANGE(s, 0, READ_PROCESSING_COMPLETE);
00563           return EVENT_DONE;
00564         }
00565       }
00566       ink_release_assert(0);    
00567 
00568     case READ_DATA:
00569       {
00570         ink_release_assert(_recursion_depth == 0);
00571 
00572         
00573         
00574         ink_assert(s->_peer->buf == NULL);
00575         Ptr<IOBufferBlock> buf = s->_peer->buf = new_IOBufferBlock();
00576         buf->alloc(ICPHandlerCont::ICPDataBuf_IOBuffer_sizeindex);
00577         s->_peer->fromaddrlen = sizeof(s->_peer->fromaddr);
00578         buf->fill(sizeof(ICPMsg_t));    
00579         char *be = buf->buf_end() - 1;
00580         be[0] = 0;              
00581         s->_next_state = READ_DATA_DONE;
00582         RECORD_ICP_STATE_CHANGE(s, 0, READ_DATA_DONE);
00583         ink_assert(s->_peer->readAction == NULL);
00584         Action *a = s->_peer->RecvFrom_re(this, this, buf,
00585                                           buf->write_avail() - 1,
00586                                           &s->_peer->fromaddr.sa,
00587                                           &s->_peer->fromaddrlen);
00588         if (!a) {
00589           a = ACTION_IO_ERROR;
00590         }
00591         if (a == ACTION_RESULT_DONE) {
00592           
00593           
00594           
00595           ink_assert(s->_next_state == PROCESS_READ_DATA);
00596           break;
00597         } else if (a == ACTION_IO_ERROR) {
00598           
00599           
00600           
00601           
00602           
00603           
00604           ICP_INCREMENT_DYN_STAT(no_data_read_stat);
00605           s->_peer->buf = NULL; 
00606           s->_next_state = READ_NOT_ACTIVE_EXIT;
00607           RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE_EXIT);
00608           
00609           break;
00610         } else {
00611           s->_peer->readAction = a;
00612           return EVENT_DONE;
00613         }
00614       }
00615       ink_release_assert(0);    
00616 
00617     case READ_DATA_DONE:
00618       {
00619         
00620         if (s->_peer->readAction != NULL) {
00621           ink_assert(s->_peer->readAction == e);
00622           s->_peer->readAction = NULL;
00623         }
00624         s->_bytesReceived = completionUtil::getBytesTransferred(e);
00625 
00626         if (s->_bytesReceived >= 0) {
00627           s->_next_state = PROCESS_READ_DATA;
00628           RECORD_ICP_STATE_CHANGE(s, 0, PROCESS_READ_DATA);
00629         } else {
00630           ICP_INCREMENT_DYN_STAT(no_data_read_stat);
00631           s->_peer->buf = NULL; 
00632           s->_next_state = READ_NOT_ACTIVE_EXIT;
00633           RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE_EXIT);
00634         }
00635         if (_recursion_depth > 0) {
00636           return EVENT_DONE;
00637         } else {
00638           break;
00639         }
00640       }
00641       ink_release_assert(0);    
00642 
00643     case PROCESS_READ_DATA:
00644     case ADD_PEER:
00645       {
00646         ink_release_assert(_recursion_depth == 0);
00647 
00648         Ptr<IOBufferBlock> bufblock = s->_peer->buf;
00649         char *buf = bufblock->start();
00650 
00651         if (s->_next_state == PROCESS_READ_DATA) {
00652           ICPRequestCont::NetToHostICPMsg((ICPMsg_t *)
00653                                           (buf + sizeof(ICPMsg_t)), (ICPMsg_t *) buf);
00654 
00655           
00656           bufblock->reset();
00657           bufblock->fill(s->_bytesReceived);
00658 
00659           
00660           if (s->_bytesReceived < ((ICPMsg_t *) buf)->h.msglen) {
00661             
00662             
00663             
00664             ICP_INCREMENT_DYN_STAT(short_read_stat);
00665             s->_peer->buf = NULL;
00666             s->_next_state = READ_NOT_ACTIVE;
00667             RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
00668             break;              
00669           }
00670         }
00671         
00672         
00673         IpEndpoint from;
00674         if (!s->_peer->ExtToIntRecvSockAddr(&s->_peer->fromaddr.sa, &from.sa)) {
00675           int status;
00676           ICPConfigData *cfg = _ICPpr->GetConfig()->globalConfig();
00677           ICPMsg_t *ICPmsg = (ICPMsg_t *) buf;
00678 
00679           if ((cfg->ICPconfigured() == ICP_MODE_RECEIVE_ONLY) &&
00680               cfg->ICPReplyToUnknownPeer() &&
00681               ((ICPmsg->h.version == ICP_VERSION_2) ||
00682                (ICPmsg->h.version == ICP_VERSION_3)) && (ICPmsg->h.opcode == ICP_OP_QUERY)) {
00683 
00684             
00685             
00686             
00687             
00688             if (!_ICPpr->GetConfig()->Lock()) {
00689               s->_next_state = ADD_PEER;
00690               RECORD_ICP_STATE_CHANGE(s, 0, ADD_PEER);
00691               return EVENT_CONT;
00692             }
00693             if (!_ICPpr->GetFreePeers() || !_ICPpr->GetFreeSendPeers()) {
00694               RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP Peer limit exceeded");
00695               _ICPpr->GetConfig()->Unlock();
00696               goto invalid_message;
00697             }
00698 
00699             int icp_reply_port = cfg->ICPDefaultReplyPort();
00700             if (!icp_reply_port) {
00701               icp_reply_port = ntohs(ats_ip_port_cast(&s->_peer->fromaddr));
00702             }
00703             PeerConfigData *Pcfg = new PeerConfigData(PeerConfigData::CTYPE_SIBLING, IpAddr(s->_peer->fromaddr), 0,
00704                                                       icp_reply_port);
00705             ParentSiblingPeer *P = new ParentSiblingPeer(PEER_SIBLING, Pcfg, _ICPpr, true);
00706             status = _ICPpr->AddPeer(P);
00707             ink_release_assert(status);
00708             status = _ICPpr->AddPeerToSendList(P);
00709             ink_release_assert(status);
00710 
00711             P->GetChan()->setRemote(P->GetIP());
00712 
00713             
00714             Note("ICP Peer added ip=%s", ats_ip_nptop(P->GetIP(), ipb, sizeof(ipb)));
00715             from = s->_peer->fromaddr;
00716           } else {
00717           invalid_message:
00718             
00719             
00720             
00721             ICP_INCREMENT_DYN_STAT(invalid_sender_stat);
00722             Debug("icp", "Received msg from invalid sender [%s]",
00723               ats_ip_nptop(&s->_peer->fromaddr, ipb, sizeof(ipb)));
00724 
00725             s->_peer->buf = NULL;
00726             s->_next_state = READ_NOT_ACTIVE;
00727             RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
00728             break;              
00729           }
00730         }
00731         
00732         s->_sender = from;
00733         s->_rICPmsg_len = s->_bytesReceived;
00734         ink_assert(s->_buf == NULL);
00735         s->_buf = s->_peer->buf;
00736         s->_rICPmsg = (ICPMsg_t *) s->_buf->start();
00737         s->_peer->buf = NULL;
00738 
00739         
00740         
00741         
00742         if ((s->_rICPmsg->h.version != ICP_VERSION_2)
00743             && (s->_rICPmsg->h.version != ICP_VERSION_3)) {
00744           ICP_INCREMENT_DYN_STAT(read_not_v2_icp_stat);
00745           Debug("icp", "Received (v=%d) !v2 && !v3 msg from sender [%s]",
00746             (uint32_t) s->_rICPmsg->h.version, ats_ip_nptop(&from, ipb, sizeof(ipb)));
00747 
00748           s->_rICPmsg = NULL;
00749           s->_buf = NULL;
00750           s->_next_state = READ_NOT_ACTIVE;
00751           RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
00752           break;                
00753         }
00754         
00755         
00756         
00757         
00758         if (s->_rICPmsg->h.opcode == ICP_OP_QUERY) {
00759           ICP_INCREMENT_DYN_STAT(icp_remote_query_requests_stat);
00760           ink_assert(!s->_mycont);
00761           s->_next_state = AWAITING_CACHE_LOOKUP_RESPONSE;
00762           RECORD_ICP_STATE_CHANGE(s, 0, AWAITING_CACHE_LOOKUP_RESPONSE);
00763 
00764           if (ICPPeerQueryCont(0, (Event *) 0) == EVENT_DONE) {
00765             break;              
00766           } else {
00767             return EVENT_DONE;  
00768           }
00769         } else {
00770           
00771           Debug("icp", "Response for Id=%d, from [%s]",
00772             s->_rICPmsg->h.requestno, ats_ip_nptop(&s->_sender, ipb, sizeof(ipb)));
00773           ICP_INCREMENT_DYN_STAT(icp_remote_responses_stat);
00774           s->_next_state = GET_ICP_REQUEST;
00775           RECORD_ICP_STATE_CHANGE(s, 0, GET_ICP_REQUEST);
00776           break;                
00777         }
00778       }
00779       ink_release_assert(0);    
00780 
00781     case AWAITING_CACHE_LOOKUP_RESPONSE:
00782       {
00783         int status = 0;
00784         void *data = s->_rICPmsg->un.query.URL;
00785         int datalen = strlen((const char *) data) + 1;
00786 
00787         if (s->_queryResult == CACHE_EVENT_LOOKUP) {
00788           
00789           Debug("icp", "Sending ICP_OP_HIT for id=%d, [%.*s] to [%s]",
00790             s->_rICPmsg->h.requestno, datalen, (const char *)data, ats_ip_nptop(&s->_sender, ipb, sizeof(ipb)));
00791           ICP_INCREMENT_DYN_STAT(icp_cache_lookup_success_stat);
00792           status = ICPRequestCont::BuildICPMsg(ICP_OP_HIT,
00793                                                s->_rICPmsg->h.requestno, 0  , 0  ,
00794                                                0  ,
00795                                                data, datalen, &s->_mhdr, s->_iov, s->_rICPmsg);
00796         } else if (s->_queryResult == CACHE_EVENT_LOOKUP_FAILED) {
00797           
00798           Debug("icp", "Sending ICP_OP_MISS for id=%d, [%.*s] to [%s]",
00799             s->_rICPmsg->h.requestno, datalen, (const char *)data, ats_ip_nptop(&s->_sender, ipb, sizeof(ipb)));
00800           ICP_INCREMENT_DYN_STAT(icp_cache_lookup_fail_stat);
00801           status = ICPRequestCont::BuildICPMsg(ICP_OP_MISS,
00802                                                s->_rICPmsg->h.requestno, 0  , 0  ,
00803                                                0  ,
00804                                                data, datalen, &s->_mhdr, s->_iov, s->_rICPmsg);
00805         } else {
00806           Warning("Bad cache lookup event: %d", s->_queryResult);
00807           ink_release_assert(!"Invalid cache lookup event");
00808         }
00809         ink_assert(status == 0);
00810 
00811         
00812         ICPlog logentry(s);
00813         LogAccessICP accessor(&logentry);
00814         Log::access(&accessor);
00815 
00816         s->_next_state = SEND_REPLY;
00817         RECORD_ICP_STATE_CHANGE(s, 0, SEND_REPLY);
00818 
00819         if (_recursion_depth > 0) {
00820           return EVENT_DONE;
00821         } else {
00822           break;
00823         }
00824       }
00825       ink_release_assert(0);    
00826 
00827     case SEND_REPLY:
00828       {
00829         ink_release_assert(_recursion_depth == 0);
00830         
00831         
00832         
00833         s->_next_state = WRITE_DONE;
00834         RECORD_ICP_STATE_CHANGE(s, 0, WRITE_DONE);
00835         ink_assert(s->_peer->writeAction == NULL);
00836         Action *a = s->_peer->SendMsg_re(this, this,
00837                                          &s->_mhdr, &s->_sender.sa);
00838         if (!a) {
00839           a = ACTION_IO_ERROR;
00840         }
00841         if (a == ACTION_RESULT_DONE) {
00842           
00843           
00844           break;
00845 
00846         } else if (a == ACTION_IO_ERROR) {
00847           
00848           ICP_INCREMENT_DYN_STAT(query_response_partial_write_stat);
00849           
00850           Debug("icp_warn", "ICP response send, sent=%d res=%d, ip=%s",
00851             ntohs(s->_rICPmsg->h.msglen), -1, ats_ip_ntop(&s->_sender, ipb, sizeof(ipb)));
00852           s->_next_state = READ_NOT_ACTIVE;
00853           RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
00854           break;
00855         } else {
00856           s->_peer->writeAction = a;
00857           return EVENT_DONE;
00858         }
00859       }
00860       ink_release_assert(0);    
00861 
00862     case WRITE_DONE:
00863       {
00864         s->_peer->writeAction = NULL;
00865         int len = completionUtil::getBytesTransferred(e);
00866 
00867         if (len == (int)ntohs(s->_rICPmsg->h.msglen)) {
00868           ICP_INCREMENT_DYN_STAT(query_response_write_stat);
00869           s->_peer->LogSendMsg(s->_rICPmsg, &s->_sender.sa);       
00870         } else {
00871           
00872           ICP_INCREMENT_DYN_STAT(query_response_partial_write_stat);
00873           
00874           Debug("icp_warn", "ICP response send, sent=%d res=%d, ip=%s",
00875             ntohs(s->_rICPmsg->h.msglen), len, ats_ip_ntop(&s->_sender, ipb, sizeof(ipb)));
00876         }
00877         
00878         s->_next_state = READ_NOT_ACTIVE;
00879         RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
00880         Debug("icp", "state->READ_NOT_ACTIVE");
00881 
00882         if (_recursion_depth > 0) {
00883           return EVENT_DONE;
00884         } else {
00885           break;                
00886         }
00887       }
00888       ink_release_assert(0);    
00889 
00890     case GET_ICP_REQUEST:
00891       {
00892         ink_release_assert(_recursion_depth == 0);
00893         ink_assert(s->_rICPmsg && s->_rICPmsg_len);     
00894 
00895         
00896         s->_ICPReqCont = ICPRequestCont::FindICPRequest(s->_rICPmsg->h.requestno);
00897         if (s->_ICPReqCont) {
00898           s->_next_state = GET_ICP_REQUEST_MUTEX;
00899           RECORD_ICP_STATE_CHANGE(s, 0, GET_ICP_REQUEST_MUTEX);
00900           break;                
00901         }
00902         
00903         
00904         
00905         
00906         Debug("icp", "No ICP Request for Id=%d", s->_rICPmsg->h.requestno);
00907         ICP_INCREMENT_DYN_STAT(no_icp_request_for_response_stat);
00908         Peer *p = _ICPpr->FindPeer(s->_sender);
00909         p->LogRecvMsg(s->_rICPmsg, 0);
00910         s->_next_state = READ_NOT_ACTIVE;
00911         RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
00912         break;                  
00913       }
00914       ink_release_assert(0);    
00915 
00916     case GET_ICP_REQUEST_MUTEX:
00917       {
00918         ink_release_assert(_recursion_depth == 0);
00919         ink_assert(s->_ICPReqCont);
00920         Ptr<ProxyMutex> ICPReqContMutex(s->_ICPReqCont->mutex);
00921         EThread *ethread = this_ethread();
00922         ink_hrtime request_start_time;
00923 
00924         if (!MUTEX_TAKE_TRY_LOCK(ICPReqContMutex, ethread)) {
00925           ICP_INCREMENT_DYN_STAT(icp_response_request_nolock_stat);
00926           
00927           
00928           
00929           
00930           
00931           
00932           s->_ICPReqCont = (ICPRequestCont *) 0;
00933           s->_next_state = GET_ICP_REQUEST;
00934           RECORD_ICP_STATE_CHANGE(s, 0, GET_ICP_REQUEST);
00935           return EVENT_CONT;
00936         }
00937         
00938         Peer *p = _ICPpr->FindPeer(s->_sender);
00939         p->LogRecvMsg(s->_rICPmsg, 1);
00940 
00941         
00942         ICPRequestCont::ICPRequestEventArgs_t args;
00943         args.rICPmsg = s->_rICPmsg;
00944         args.rICPmsg_len = s->_rICPmsg_len;
00945         args.peer = p;
00946         if (!s->_ICPReqCont->GetActionPtr()->cancelled) {
00947           request_start_time = s->_ICPReqCont->GetRequestStartTime();
00948           Debug("icp", "Passing Reply for ICP Id=%d", s->_rICPmsg->h.requestno);
00949           s->_ICPReqCont->handleEvent((int) ICP_RESPONSE_MESSAGE, (void *) &args);
00950         } else {
00951           request_start_time = 0;
00952           delete s->_ICPReqCont;
00953           Debug("icp", "User cancelled ICP request Id=%d", s->_rICPmsg->h.requestno);
00954         }
00955 
00956         
00957         s->_ICPReqCont = 0;
00958 
00959         MUTEX_UNTAKE_LOCK(ICPReqContMutex, ethread);
00960         if (request_start_time) {
00961           ICP_SUM_DYN_STAT(total_icp_response_time_stat, (ink_get_hrtime() - request_start_time));
00962         }
00963         RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
00964         s->_next_state = READ_NOT_ACTIVE;
00965         break;                  
00966       }
00967       ink_release_assert(0);    
00968 
00969     case READ_NOT_ACTIVE:
00970     case READ_NOT_ACTIVE_EXIT:
00971       {
00972         ink_release_assert(_recursion_depth == 0);
00973         if (!_ICPpr->Lock())
00974           return EVENT_CONT;    
00975 
00976         
00977         _ICPpr->DecPendingQuery();
00978         _ICPpr->Unlock();
00979 
00980         s->_buf = 0;
00981         if (s->_next_state == READ_NOT_ACTIVE_EXIT) {
00982           s->_next_state = READ_PROCESSING_COMPLETE;
00983           return EVENT_DONE;
00984         } else {
00985           
00986           s->reset();
00987           s->_start_time = ink_get_hrtime();
00988           s->_next_state = READ_ACTIVE;
00989           RECORD_ICP_STATE_CHANGE(s, 0, READ_ACTIVE);
00990           break;                
00991         }
00992       }
00993       ink_release_assert(0);    
00994 
00995     case READ_PROCESSING_COMPLETE:
00996     default:
00997       ink_release_assert(0);    
00998 
00999     }                           
01000 
01001   }                             
01002 }
01003 
01004 
01005 
01006 
01007 
01008 
01009 ClassAllocator<ICPRequestCont> ICPRequestCont_allocator("ICPRequestCont_allocator");
01010 
01011 ICPRequestCont::ICPRequestCont(ICPProcessor * pr, Continuation * c, URL * u)
01012   : Continuation(0), _cont(c), _url(u), _start_time(0),
01013     _ICPpr(pr), _timeout(0),
01014     npending_actions(0), pendingActions(NULL),
01015     _sequence_number(0), _expected_replies(0),
01016     _expected_replies_list(MAX_DEFINED_PEERS), _received_replies(0), _next_state(ICP_START)
01017 {
01018   memset((void *)&_ret_sockaddr, 0, sizeof(_ret_sockaddr));
01019   _ret_status = ICP_LOOKUP_FAILED;
01020   _act.cancelled = false;
01021   _act = c;
01022   memset((void *) &_ICPmsg, 0, sizeof(_ICPmsg));
01023   memset((void *) &_sendMsgHdr, 0, sizeof(_sendMsgHdr));
01024   memset((void *) &_sendMsgIOV, 0, sizeof(_sendMsgIOV[MSG_IOVECS]));
01025 
01026   if (c)
01027     this->mutex = c->mutex;
01028 }
01029 
01030 ICPRequestCont::~ICPRequestCont()
01031 {
01032   _act = NULL;
01033   this->mutex = NULL;
01034 
01035   if (_timeout) {
01036     _timeout->cancel(this);
01037     _timeout = 0;
01038   }
01039   RemoveICPRequest(_sequence_number);
01040 
01041   if (_ICPmsg.h.opcode == ICP_OP_QUERY) {
01042     if (_ICPmsg.un.query.URL) {
01043       ats_free(_ICPmsg.un.query.URL);
01044     }
01045   }
01046   if (pendingActions) {
01047     delete pendingActions;
01048     pendingActions = 0;
01049   }
01050 }
01051 
01052 void
01053 ICPRequestCont::remove_from_pendingActions(Action * a)
01054 {
01055   if (!pendingActions) {
01056     npending_actions--;
01057     return;
01058   }
01059   for (intptr_t i = 0; i < pendingActions->length(); i++) {
01060     if ((*pendingActions)[i] == a) {
01061       for (intptr_t j = i; j < pendingActions->length() - 1; j++)
01062         (*pendingActions)[j] = (*pendingActions)[j + 1];
01063       pendingActions->set_length(pendingActions->length() - 1);
01064       npending_actions--;
01065       return;
01066     }
01067   }
01068   npending_actions--;           
01069 }
01070 
01071 void
01072 ICPRequestCont::remove_all_pendingActions()
01073 {
01074   int active_pendingActions = 0;
01075 
01076   if (!pendingActions) {
01077     return;
01078   }
01079   for (intptr_t i = 0; i < pendingActions->length(); i++) {
01080     if ((*pendingActions)[i]
01081         && ((*pendingActions)[i] != ACTION_IO_ERROR)) {
01082       ((*pendingActions)[i])->cancel();
01083       (*pendingActions)[i] = 0;
01084       npending_actions--;
01085       active_pendingActions++;
01086     } else {
01087       (*pendingActions)[i] = 0;
01088     }
01089   }
01090   pendingActions->set_length(pendingActions->length() - active_pendingActions);
01091 }
01092 
01093 int
01094 ICPRequestCont::ICPRequestEvent(int event, Event * e)
01095 {
01096   
01097   
01098 
01099   ink_assert(event == NET_EVENT_DATAGRAM_WRITE_COMPLETE ||
01100              event == NET_EVENT_DATAGRAM_WRITE_ERROR ||
01101              event == EVENT_IMMEDIATE || event == EVENT_INTERVAL || event == ICP_RESPONSE_MESSAGE);
01102   
01103   if ((event == NET_EVENT_DATAGRAM_WRITE_COMPLETE)
01104       || (event == NET_EVENT_DATAGRAM_WRITE_ERROR)) {
01105     ink_assert(npending_actions > 0);
01106     remove_from_pendingActions((Action *) e);
01107     return EVENT_DONE;
01108   }
01109   
01110   
01111   switch (_next_state) {
01112   case ICP_START:
01113   case ICP_OFF_TERMINATE:
01114   case ICP_QUEUE_REQUEST:
01115   case ICP_AWAITING_RESPONSE:
01116   case ICP_DEQUEUE_REQUEST:
01117   case ICP_POST_COMPLETION:
01118   case ICP_REQUEST_NOT_ACTIVE:
01119     {
01120       if (ICPStateMachine(event, (void *) e) == EVENT_CONT) {
01121         
01122         
01123         
01124         eventProcessor.schedule_in(this, HRTIME_MSECONDS(RETRY_INTERVAL), ET_ICP);
01125         return EVENT_CONT;
01126 
01127       } else if (_next_state == ICP_DONE) {
01128         
01129         
01130         
01131         delete this;
01132         break;
01133       } else {
01134         break;
01135       }
01136     }
01137     ink_release_assert(0);      
01138 
01139   case ICP_DONE:
01140   default:
01141     ink_release_assert(0);      
01142   }                             
01143 
01144   return EVENT_DONE;
01145 }
01146 
01147 int
01148 ICPRequestCont::NopICPRequestEvent(int , Event * )
01149 {
01150   delete this;
01151   return EVENT_DONE;
01152 }
01153 
01154 int
01155 ICPRequestCont::ICPStateMachine(int event, void *d)
01156 {
01157   
01158   
01159   
01160   ICPConfiguration *ICPcf = _ICPpr->GetConfig();
01161   ip_port_text_buffer ipb;
01162 
01163   while (1) {                   
01164 
01165     switch (_next_state) {
01166     case ICP_START:
01167       {
01168         
01169         if (_act.cancelled) {
01170           _next_state = ICP_DONE;
01171           return EVENT_DONE;
01172         }
01173 
01174         if (!_ICPpr->Lock())
01175           return EVENT_CONT;    
01176 
01177         if (_ICPpr->AllowICPQueries() && (ICPcf->globalConfig()->ICPconfigured() == ICP_MODE_SEND_RECEIVE)) {
01178 
01179           
01180           if (_url->valid()) {
01181             int host_len;
01182             const char *host = _url->host_get(&host_len);
01183             if (ptr_len_casecmp(host, host_len, "127.0.0.1") == 0 || ptr_len_casecmp(host, host_len, "localhost") == 0) {
01184               _ICPpr->Unlock();
01185 
01186               
01187               _next_state = ICP_OFF_TERMINATE;
01188               Debug("icp", "[ICP_START] NULL/localhost URL ignored Id=%d", _sequence_number);
01189               break;            
01190             }
01191           }
01192           
01193           _ICPpr->IncPendingQuery();
01194           _ICPpr->Unlock();
01195 
01196           
01197           char *urlstr = _url->string_get(NULL);
01198           int urlstr_len = strlen(urlstr) + 1;
01199 
01200           int status = BuildICPMsg(ICP_OP_QUERY,
01201                                    _sequence_number = ICPReqSeqNumber(),
01202                                    0  , 0  ,
01203                                    0  ,
01204                                    (void *) urlstr, urlstr_len,
01205                                    &_sendMsgHdr, _sendMsgIOV,
01206                                    &_ICPmsg);
01207           
01208           ink_assert(status == 0);
01209           Debug("icp", "[ICP_START] ICP_OP_QUERY for [%s], Id=%d", urlstr, _sequence_number);
01210 
01211           _next_state = ICP_QUEUE_REQUEST;
01212           break;                
01213 
01214         } else {
01215           ICP_INCREMENT_DYN_STAT(icp_start_icpoff_stat);
01216           _ICPpr->Unlock();
01217 
01218           
01219           _next_state = ICP_OFF_TERMINATE;
01220           break;                
01221         }
01222       }
01223       ink_release_assert(0);    
01224 
01225     case ICP_OFF_TERMINATE:
01226       {
01227         if (!MUTEX_TAKE_TRY_LOCK_FOR(mutex, this_ethread(), _cont)) {
01228           return EVENT_CONT;    
01229         }
01230         Debug("icp", "[ICP_OFF_TERMINATE] Id=%d", _sequence_number);
01231 
01232         
01233         if (!_act.cancelled) {
01234           _cont->handleEvent(_ret_status, (void *) &_ret_sockaddr);
01235         }
01236         MUTEX_UNTAKE_LOCK(mutex, this_ethread());
01237 
01238         _next_state = ICP_DONE;
01239         return EVENT_DONE;
01240       }
01241       ink_release_assert(0);    
01242 
01243     case ICP_QUEUE_REQUEST:
01244       {
01245         
01246         int ret = AddICPRequest(_sequence_number, this);
01247         ink_assert(ret == 0);
01248 
01249         
01250         int bias = _ICPpr->GetStartingSendPeerBias();
01251         int SendPeers = _ICPpr->GetSendPeers();
01252         npending_actions = 0;
01253         while (SendPeers > 0) {
01254           Peer *P = _ICPpr->GetNthSendPeer(SendPeers, bias);
01255           if (!P->IsOnline()) {
01256             SendPeers--;
01257             continue;
01258           }
01259           
01260           
01261           
01262 
01263           
01264           
01265           int was_expected = P->ExpectedReplies(&_expected_replies_list);
01266           _expected_replies += was_expected;
01267           npending_actions++;
01268           Action *a = P->SendMsg_re(this, P, &_sendMsgHdr, NULL);
01269           if (!a) {
01270             a = ACTION_IO_ERROR;
01271           }
01272           if (a != ACTION_IO_ERROR) {
01273             if (a != ACTION_RESULT_DONE) {
01274               if (!pendingActions) {
01275                 pendingActions = new DynArray<Action *>(&default_action);
01276               }
01277               (*pendingActions) (npending_actions) = a;
01278             }
01279             P->LogSendMsg(&_ICPmsg, NULL);  
01280             Debug("icp", "[ICP_QUEUE_REQUEST] Id=%d send query to [%s]",
01281               _sequence_number, ats_ip_nptop(P->GetIP(), ipb, sizeof(ipb)));
01282           } else {
01283             _expected_replies_list.ClearBit(P->GetPeerID());
01284             _expected_replies -= was_expected;
01285             
01286             ICP_INCREMENT_DYN_STAT(send_query_partial_write_stat);
01287             
01288             Debug("icp_warn",
01289                   "ICP query send, res=%d, ip=%s", ntohs(_ICPmsg.h.msglen),
01290               ats_ip_ntop(P->GetIP(), ipb, sizeof(ipb)));
01291           }
01292           SendPeers--;
01293         }
01294 
01295         Debug("icp", "[ICP_QUEUE_REQUEST] Id=%d expected replies=%d", _sequence_number, _expected_replies);
01296         if (!_expected_replies) {
01297           
01298           
01299           
01300           ICP_INCREMENT_DYN_STAT(icp_queries_no_expected_replies_stat);
01301           _next_state = ICP_DEQUEUE_REQUEST;
01302           break;                
01303         }
01304         ICP_SUM_DYN_STAT(total_udp_send_queries_stat, _expected_replies);
01305 
01306         
01307         
01308         
01309         int tval = _ICPpr->GetConfig()->globalConfig()->ICPqueryTimeout();
01310         _timeout = eventProcessor.schedule_in(this, HRTIME_SECONDS(tval), ET_ICP);
01311 
01312         _next_state = ICP_AWAITING_RESPONSE;
01313         return EVENT_DONE;
01314       }
01315       ink_release_assert(0);    
01316 
01317     case ICP_AWAITING_RESPONSE:
01318       {
01319         Debug("icp", "[ICP_AWAITING_RESPONSE] Id=%d", _sequence_number);
01320         ink_assert(d);
01321         ICPRequestEventArgs_t dummyArgs;
01322         ICPRequestEventArgs_t *args = 0;
01323 
01324         if (event == ICP_RESPONSE_MESSAGE) {
01325           args = (ICPRequestEventArgs_t *) d;
01326         } else if (event == EVENT_INTERVAL) {
01327           memset((void *) &dummyArgs, 0, sizeof(dummyArgs));
01328           args = &dummyArgs;
01329         } else {
01330           ink_release_assert(0);        
01331         }
01332 
01333         
01334         if (ICPResponseMessage(event, args->rICPmsg, args->peer) == EVENT_DONE) {
01335           
01336           _next_state = ICP_DEQUEUE_REQUEST;
01337           break;                
01338 
01339         } else {
01340           
01341           return EVENT_DONE;
01342         }
01343       }
01344       ink_release_assert(0);    
01345 
01346     case ICP_DEQUEUE_REQUEST:
01347       {
01348         
01349         int ret = RemoveICPRequest(_sequence_number);
01350         Debug("icp", "[ICP_DEQUEUE_REQUEST] Id=%d", _sequence_number);
01351         ink_assert(ret == 0);
01352         
01353         _next_state = ICP_POST_COMPLETION;
01354         break;                  
01355       }
01356       ink_release_assert(0);    
01357 
01358     case ICP_POST_COMPLETION:
01359       {
01360         if (!MUTEX_TAKE_TRY_LOCK_FOR(mutex, this_ethread(), _cont)) {
01361           return EVENT_CONT;    
01362         }
01363         Debug("icp", "[ICP_POST_COMPLETION] Id=%d", _sequence_number);
01364 
01365         
01366         if (!_act.cancelled) {
01367           _cont->handleEvent(_ret_status, (void *) &_ret_sockaddr);
01368         }
01369         MUTEX_UNTAKE_LOCK(mutex, this_ethread());
01370         ICP_SUM_DYN_STAT(total_icp_request_time_stat, (ink_get_hrtime() - _start_time));
01371 
01372         _next_state = ICP_WAIT_SEND_COMPLETE;
01373         break;                  
01374       }
01375       ink_release_assert(0);    
01376     case ICP_WAIT_SEND_COMPLETE:
01377       {
01378         
01379         if (npending_actions > 0) {
01380           Debug("icp", "[ICP_WAIT_SEND_COMPLETE] Id=%d active=%d", _sequence_number, npending_actions);
01381         } else {
01382           _next_state = ICP_REQUEST_NOT_ACTIVE;
01383           
01384           break;
01385         }
01386       }
01387       break;
01388       ink_release_assert(0);    
01389     case ICP_REQUEST_NOT_ACTIVE:
01390       {
01391         Debug("icp", "[ICP_REQUEST_NOT_ACTIVE] Id=%d", _sequence_number);
01392         _sequence_number = 0;
01393         if (!_ICPpr->Lock())
01394           return EVENT_CONT;    
01395 
01396         
01397         _ICPpr->DecPendingQuery();
01398         _ICPpr->Unlock();
01399 
01400         _next_state = ICP_DONE;
01401         return EVENT_DONE;
01402       }
01403       ink_release_assert(0);    
01404 
01405     case ICP_DONE:
01406     default:
01407       ink_release_assert(0);    
01408 
01409     }                           
01410 
01411   }                             
01412 }
01413 
01414 int
01415 ICPRequestCont::ICPResponseMessage(int event, ICPMsg_t * m, Peer * peer)
01416 {
01417   ip_port_text_buffer ipb, ipb2;
01418 
01419   if (event == EVENT_INTERVAL) {
01420     _timeout = 0;
01421     remove_all_pendingActions();
01422 
01423     
01424     
01425 
01426     if (_received_replies) {
01427       int NumParentPeers = _ICPpr->GetParentPeers();
01428       if (NumParentPeers > 0) {
01429         int n;
01430         Peer *pp;
01431         for (n = 0; n < NumParentPeers; n++) {
01432           pp = _ICPpr->GetNthParentPeer(0, _ICPpr->GetStartingParentPeerBias());
01433           if (pp && !_expected_replies_list.IsBitSet(pp->GetPeerID())
01434               && pp->isUp()) {
01435             ats_ip_copy(&_ret_sockaddr.sa, pp->GetIP());
01436             _ret_sockaddr.port() = htons(static_cast<ParentSiblingPeer*>(pp)->GetProxyPort());
01437             _ret_status = ICP_LOOKUP_FOUND;
01438 
01439             Debug("icp",
01440               "ICP timeout using parent Id=%d from [%s] return [%s]",
01441               _sequence_number,
01442               ats_ip_nptop(pp->GetIP(), ipb, sizeof(ipb)),
01443               ats_ip_nptop(&_ret_sockaddr, ipb2, sizeof(ipb2))
01444             );
01445             return EVENT_DONE;
01446           }
01447         }
01448       }
01449     }
01450     
01451     Debug("icp", "ICP Response timeout for Id=%d", _sequence_number);
01452     return EVENT_DONE;
01453 
01454   } else {
01455     
01456     
01457     
01458     ink_assert(m->h.requestno == _sequence_number);
01459 
01460     switch (m->h.opcode) {
01461     case ICP_OP_HIT:
01462     case ICP_OP_HIT_OBJ:
01463       {
01464         
01465         _timeout->cancel(this);
01466         _timeout = 0;
01467 
01468         ICP_INCREMENT_DYN_STAT(icp_query_hits_stat);
01469         ++_received_replies;
01470         ats_ip_copy(&_ret_sockaddr, peer->GetIP());
01471         _ret_sockaddr.port() =  htons(static_cast<ParentSiblingPeer*>(peer)->GetProxyPort());
01472         _ret_status = ICP_LOOKUP_FOUND;
01473 
01474         Debug("icp",
01475           "ICP Response HIT for Id=%d from [%s] return [%s]",
01476           _sequence_number,
01477           ats_ip_nptop(peer->GetIP(), ipb, sizeof(ipb)),
01478           ats_ip_nptop(&_ret_sockaddr, ipb2, sizeof(ipb2))
01479         );
01480         return EVENT_DONE;
01481       }
01482     case ICP_OP_MISS:
01483     case ICP_OP_ERR:
01484     case ICP_OP_MISS_NOFETCH:
01485     case ICP_OP_DENIED:
01486       {
01487         Debug("icp", "ICP MISS response for Id=%d from [%s]",
01488           _sequence_number, ats_ip_nptop(peer->GetIP(), ipb, sizeof(ipb)));
01489         
01490         
01491         int Id = peer->GetPeerID();
01492         if (_expected_replies_list.IsBitSet(Id)) {
01493           
01494           _expected_replies_list.ClearBit(Id);
01495           ++_received_replies;
01496         }
01497 
01498         if (_received_replies < _expected_replies)
01499           return EVENT_CONT;    
01500 
01501         
01502         _timeout->cancel(this);
01503         _timeout = 0;
01504 
01505         ICP_INCREMENT_DYN_STAT(icp_query_misses_stat);
01506         
01507         
01508         
01509         
01510         if (_ICPpr->GetParentPeers() > 0) {
01511           
01512           
01513           Peer *p = NULL;
01514           
01515           {
01516             int i;
01517             for (i = 0; i < _ICPpr->GetParentPeers(); i++) {
01518               p = _ICPpr->GetNthParentPeer(0, _ICPpr->GetStartingParentPeerBias());
01519               
01520               if (p->isUp())
01521                 break;
01522             }
01523             
01524             if (i >= _ICPpr->GetParentPeers()) {
01525               Debug("icp", "None of the %d ICP parent(s) is up", _ICPpr->GetParentPeers());
01526               p = NULL;
01527             }
01528           }
01529           if (p) {
01530             ats_ip_copy(&_ret_sockaddr, p->GetIP());
01531             _ret_sockaddr.port() = htons(static_cast<ParentSiblingPeer*>(p)->GetProxyPort());
01532             _ret_status = ICP_LOOKUP_FOUND;
01533 
01534             Debug("icp", "ICP ALL MISS(1) for Id=%d return [%s]",
01535               _sequence_number, ats_ip_nptop(&_ret_sockaddr, ipb, sizeof(ipb)));
01536             return EVENT_DONE;
01537           }
01538         }
01539         Debug("icp", "ICP ALL MISS(2) for Id=%d return [%s]",
01540           _sequence_number, ats_ip_nptop(&_ret_sockaddr, ipb, sizeof(ipb)));
01541         return EVENT_DONE;
01542       }
01543     default:
01544       {
01545         ICP_INCREMENT_DYN_STAT(invalid_icp_query_response_stat);
01546         
01547         Warning("Invalid ICP response, op=%d reqno=%d ip=%s",
01548           m->h.opcode, m->h.requestno, ats_ip_ntop(peer->GetIP(), ipb, sizeof(ipb)));
01549         return EVENT_CONT;      
01550       }
01551 
01552     }                           
01553   }
01554 }
01555 
01556 
01557 
01558 
01559 
01560 
01561 void
01562 ICPRequestCont::NetToHostICPMsg(ICPMsg_t * in, ICPMsg_t * out)
01563 {
01564   out->h.opcode = in->h.opcode;
01565   out->h.version = in->h.version;
01566   out->h.msglen = ntohs(in->h.msglen);
01567   out->h.requestno = ntohl(in->h.requestno);
01568   out->h.optionflags = ntohl(in->h.optionflags);
01569   out->h.optiondata = ntohl(in->h.optiondata);
01570   out->h.shostid = ntohl(in->h.shostid);
01571 
01572   switch (in->h.opcode) {
01573   case ICP_OP_QUERY:
01574     {
01575       memcpy((char *) &out->un.query.rhostid,
01576              (char *) ((char *) (&in->h.shostid) + sizeof(in->h.shostid)), sizeof(out->un.query.rhostid));
01577       out->un.query.rhostid = ntohl(out->un.query.rhostid);
01578       out->un.query.URL = (char *) ((char *) (&in->h.shostid) + sizeof(in->h.shostid) + sizeof(out->un.query.rhostid));
01579       break;
01580     }
01581   case ICP_OP_HIT:
01582     {
01583       out->un.hit.URL = (char *)((char *) (&in->h.shostid) + sizeof(in->h.shostid));
01584       break;
01585     }
01586   case ICP_OP_MISS:
01587     {
01588       out->un.miss.URL = (char *)((char *) (&in->h.shostid) + sizeof(in->h.shostid));
01589       break;
01590     }
01591   case ICP_OP_HIT_OBJ:
01592     {
01593       out->un.hitobj.URL = (char *)((char *) (&in->h.shostid) + sizeof(in->h.shostid));
01594 
01595       
01596       out->un.hitobj.p_objsize = (char *) (out->un.hitobj.URL + strlen(out->un.hitobj.URL));
01597       memcpy((char *) &out->un.hitobj.objsize, out->un.hitobj.p_objsize, sizeof(out->un.hitobj.objsize));
01598       out->un.hitobj.objsize = ntohs(out->un.hitobj.objsize);
01599       out->un.hitobj.data = (char *) (out->un.hitobj.p_objsize + sizeof(out->un.hitobj.objsize));
01600       break;
01601     }
01602   default:
01603     break;
01604   }
01605 }
01606 
01607 int
01608 ICPRequestCont::BuildICPMsg(ICPopcode_t op, unsigned int seqno,
01609                             int optflags, int optdata, int shostid,
01610                             void *data, int datalen, struct msghdr *mhdr, struct iovec *iov, ICPMsg_t * icpmsg)
01611 {
01612   
01613   if (op == ICP_OP_QUERY) {
01614     icpmsg->un.query.rhostid = htonl(0);
01615     icpmsg->un.query.URL = (char *) data;
01616 
01617     mhdr->msg_iov = iov;
01618     mhdr->msg_iovlen = 3;
01619 
01620     iov[0].iov_base = (caddr_t) icpmsg;
01621     iov[0].iov_len = sizeof(ICPMsgHdr_t);
01622 
01623     iov[1].iov_base = (caddr_t) & icpmsg->un.query.rhostid;
01624     iov[1].iov_len = sizeof(icpmsg->un.query.rhostid);
01625 
01626     iov[2].iov_base = (caddr_t) data;
01627     iov[2].iov_len = datalen;
01628     icpmsg->h.msglen = htons(iov[0].iov_len + iov[1].iov_len + iov[2].iov_len);
01629 
01630   } else if (op == ICP_OP_HIT) {
01631     icpmsg->un.hit.URL = (char *) data;
01632 
01633     mhdr->msg_iov = iov;
01634     mhdr->msg_iovlen = 2;
01635 
01636     iov[0].iov_base = (caddr_t) icpmsg;
01637     iov[0].iov_len = sizeof(ICPMsgHdr_t);
01638 
01639     iov[1].iov_base = (caddr_t) data;
01640     iov[1].iov_len = datalen;
01641     icpmsg->h.msglen = htons(iov[0].iov_len + iov[1].iov_len);
01642 
01643   } else if (op == ICP_OP_MISS) {
01644     icpmsg->un.miss.URL = (char *) data;
01645 
01646     mhdr->msg_iov = iov;
01647     mhdr->msg_iovlen = 2;
01648 
01649     iov[0].iov_base = (caddr_t) icpmsg;
01650     iov[0].iov_len = sizeof(ICPMsgHdr_t);
01651 
01652     iov[1].iov_base = (caddr_t) data;
01653     iov[1].iov_len = datalen;
01654     icpmsg->h.msglen = htons(iov[0].iov_len + iov[1].iov_len);
01655 
01656   } else {
01657     ink_release_assert(0);
01658     return 1;                   
01659   }
01660 
01661   mhdr->msg_name = (caddr_t) 0;
01662   mhdr->msg_namelen = 0;
01663   
01664 #if !defined(linux) && !defined(freebsd) && !defined(darwin) && !defined(solaris) \
01665  && !defined(openbsd)
01666   mhdr->msg_accrights = (caddr_t) 0;
01667   mhdr->msg_accrightslen = 0;
01668 #elif !defined(solaris)
01669   mhdr->msg_control = 0;
01670   mhdr->msg_controllen = 0;
01671   mhdr->msg_flags = 0;
01672 #endif
01673 
01674   icpmsg->h.opcode = op;
01675   icpmsg->h.version = ICP_VERSION_2;
01676   icpmsg->h.requestno = htonl(seqno);
01677   icpmsg->h.optionflags = htonl(optflags);
01678   icpmsg->h.optiondata = htonl(optdata);
01679   icpmsg->h.shostid = htonl(shostid);
01680 
01681   return 0;                     
01682 }
01683 
01684 
01685 unsigned int
01686   ICPRequestCont::ICPRequestSeqno = 1;
01687 Queue<ICPRequestCont> ICPRequestQueue[ICPRequestCont::ICP_REQUEST_HASH_SIZE];
01688 
01689 
01690 unsigned int
01691 ICPRequestCont::ICPReqSeqNumber()
01692 {
01693   
01694   unsigned int res = 0;
01695   do {
01696     res = (unsigned int) ink_atomic_increment((int *) &ICPRequestSeqno, 1);
01697   } while (!res);
01698 
01699   return res;
01700 }
01701 
01702 
01703 inline int
01704 ICPRequestCont::ICPRequestHash(unsigned int seqno)
01705 {
01706   
01707   return seqno % ICP_REQUEST_HASH_SIZE;
01708 }
01709 
01710 
01711 int
01712 ICPRequestCont::AddICPRequest(unsigned int seqno, ICPRequestCont * r)
01713 {
01714   
01715   
01716 
01717   ICPRequestQueue[ICPRequestHash(seqno)].enqueue(r);
01718   return 0;                     
01719 }
01720 
01721 
01722 ICPRequestCont *
01723 ICPRequestCont::FindICPRequest(unsigned int seqno)
01724 {
01725   
01726   int hash = ICPRequestHash(seqno);
01727   ICPRequestCont *r;
01728 
01729   for (r = (ICPRequestCont *) ICPRequestQueue[hash].head; r; r = (ICPRequestCont *) r->link.next) {
01730     if (r->_sequence_number == seqno)
01731       return r;
01732   }
01733   return (ICPRequestCont *) 0;  
01734 }
01735 
01736 
01737 int
01738 ICPRequestCont::RemoveICPRequest(unsigned int seqno)
01739 {
01740   
01741   
01742   
01743 
01744   if (!seqno) {
01745     return 1;                   
01746   }
01747   int hash = ICPRequestHash(seqno);
01748   ICPRequestCont *r;
01749 
01750   for (r = (ICPRequestCont *) ICPRequestQueue[hash].head; r; r = (ICPRequestCont *) r->link.next) {
01751     if (r->_sequence_number == seqno) {
01752       ICPRequestQueue[hash].remove(r);
01753       return 0;
01754     }
01755   }
01756   return 1;                     
01757 }
01758 
01759 
01760 
01761 
01762 
01763 
01764 
01765 
01766 
01767 
01768 
01769 void
01770 initialize_thread_for_icp(EThread * e)
01771 {
01772   (void) e;
01773 }
01774 
01775 ICPProcessor icpProcessorInternal;
01776 ICPProcessorExt icpProcessor(&icpProcessorInternal);
01777 
01778 ICPProcessor::ICPProcessor()
01779  : _l(0), _Initialized(0), _AllowIcpQueries(0),
01780    _PendingIcpQueries(0), _ICPConfig(0), _ICPPeriodic(0), _ICPHandler(0),
01781    _mcastCB_handler(NULL), _PeriodicEvent(0), _ICPHandlerEvent(0),
01782    _nPeerList(-1), _LocalPeer(0),
01783    _curSendPeer(0), _nSendPeerList(-1),
01784    _curRecvPeer(0), _nRecvPeerList(-1), _curParentPeer(0), _nParentPeerList(-1), _ValidPollData(0), _last_recv_peer_bias(0)
01785 {
01786   memset((void *)_PeerList, 0, sizeof(_PeerList[PEER_LIST_SIZE]));
01787   memset((void *)_SendPeerList, 0, sizeof(_SendPeerList[SEND_PEER_LIST_SIZE]));
01788   memset((void *)_RecvPeerList, 0, sizeof(_RecvPeerList[RECV_PEER_LIST_SIZE]));
01789   memset((void *)_ParentPeerList, 0, sizeof(_ParentPeerList[PARENT_PEER_LIST_SIZE]));
01790   memset((void *)_PeerIDtoPollIndex, 0, sizeof(_PeerIDtoPollIndex[PEER_ID_POLL_INDEX_SIZE]));
01791 }
01792 
01793 ICPProcessor::~ICPProcessor()
01794 {
01795   if (_ICPPeriodic) {
01796     MUTEX_TAKE_LOCK(_ICPPeriodic->mutex, this_ethread());
01797     _PeriodicEvent->cancel();
01798     Mutex_unlock(_ICPPeriodic->mutex, this_ethread());
01799   }
01800 
01801   if (_ICPHandler) {
01802     MUTEX_TAKE_LOCK(_ICPHandler->mutex, this_ethread());
01803     _ICPHandlerEvent->cancel();
01804     Mutex_unlock(_ICPHandler->mutex, this_ethread());
01805   }
01806 }
01807 
01808 void
01809 ICPProcessor::start()
01810 {
01811   
01812   
01813   
01814   
01815   if (_Initialized)             
01816     return;
01817 
01818   
01819   
01820   
01821   
01822   _l = new AtomicLock();
01823 
01824   
01825   
01826   
01827   
01828   ICPHandlerCont::ICPDataBuf_IOBuffer_sizeindex = iobuffer_size_to_index(MAX_ICP_MSGSIZE, MAX_BUFFER_SIZE_INDEX);
01829 
01830   
01831   
01832   
01833   InitICPStatCallbacks();
01834 
01835   
01836   
01837   
01838   _ICPConfig = new ICPConfiguration();
01839 
01840   _mcastCB_handler = new ICPHandlerCont(this);
01841   SET_CONTINUATION_HANDLER(_mcastCB_handler, (ICPHandlerContHandler) & ICPHandlerCont::TossEvent);
01842 
01843 
01844   
01845   
01846   
01847   if (_ICPConfig->globalConfig()->ICPconfigured()) {
01848     if (BuildPeerList() == 0) {
01849       if (SetupListenSockets() == 0) {
01850         _AllowIcpQueries = 1;   
01851       }
01852     }
01853   }
01854   DumpICPConfig();
01855 
01856   
01857   
01858   
01859   _ICPPeriodic = new ICPPeriodicCont(this);
01860   SET_CONTINUATION_HANDLER(_ICPPeriodic, (ICPPeriodicContHandler) & ICPPeriodicCont::PeriodicEvent);
01861   _PeriodicEvent = eventProcessor.schedule_every(_ICPPeriodic, HRTIME_MSECONDS(ICPPeriodicCont::PERIODIC_INTERVAL), ET_ICP);
01862 
01863   
01864   
01865   
01866   _ICPHandler = new ICPHandlerCont(this);
01867   SET_CONTINUATION_HANDLER(_ICPHandler, (ICPHandlerContHandler) & ICPHandlerCont::PeriodicEvent);
01868   _ICPHandlerEvent = eventProcessor.schedule_every(_ICPHandler,
01869                                                    HRTIME_MSECONDS(ICPHandlerCont::ICP_HANDLER_INTERVAL), ET_ICP);
01870   
01871   
01872   
01873   if (!gclient_request.valid()) {
01874     gclient_request.create(HTTP_TYPE_REQUEST);
01875   }
01876   _Initialized = 1;
01877 }
01878 
01879 Action *
01880 ICPProcessor::ICPQuery(Continuation * c, URL * url)
01881 {
01882   
01883   
01884   
01885 
01886   
01887   EThread *thread = this_ethread();
01888   ProxyMutex *mutex = thread->mutex;
01889   ICPRequestCont *rc = new(ICPRequestCont_allocator.alloc()) ICPRequestCont(this, c, url);
01890 
01891   ICP_INCREMENT_DYN_STAT(icp_query_requests_stat);
01892   
01893   rc->SetRequestStartTime();
01894   SET_CONTINUATION_HANDLER(rc, (ICPRequestContHandler) & ICPRequestCont::ICPRequestEvent);
01895   eventProcessor.schedule_imm(rc, ET_ICP);
01896 
01897   return rc->GetActionPtr();
01898 }
01899 
01900 int
01901 ICPProcessor::BuildPeerList()
01902 {
01903   
01904 
01905   
01906   
01907   
01908   
01909   
01910   
01911   
01912   
01913   
01914   
01915   
01916   
01917   
01918   
01919   
01920   
01921   
01922   
01923   
01924   
01925   
01926   
01927   
01928   
01929   
01930   
01931   
01932   PeerConfigData *Pcfg;
01933   Peer *P;
01934   Peer *mcP;
01935   int index;
01936   int status;
01937   PeerType_t type;
01938 
01939   
01940   
01941   
01942   
01943   
01944   
01945   Pcfg = _ICPConfig->indexToPeerConfigData(0);
01946   ink_strlcpy(Pcfg->_hostname, "localhost", sizeof(Pcfg->_hostname));
01947   Pcfg->_ctype = PeerConfigData::CTYPE_LOCAL;
01948 
01949   
01950   IpEndpoint tmp_ip;
01951   if (!mgmt_getAddrForIntr(GetConfig()->globalConfig()->ICPinterface(), &tmp_ip.sa)) {
01952     Pcfg->_ip_addr._family = AF_UNSPEC;
01953     
01954     RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP interface [%s] has no IP address", GetConfig()->globalConfig()->ICPinterface());
01955   } else {
01956     Pcfg->_my_ip_addr = Pcfg->_ip_addr = tmp_ip;
01957   }
01958   Pcfg->_proxy_port = 0;
01959   Pcfg->_icp_port = GetConfig()->globalConfig()->ICPport();
01960   Pcfg->_mc_member = 0;
01961   Pcfg->_mc_ip_addr._family = AF_UNSPEC;
01962   Pcfg->_mc_ttl = 0;
01963 
01964   
01965   
01966   
01967   
01968   P = new ParentSiblingPeer(PEER_LOCAL, Pcfg, this);
01969   status = AddPeer(P);
01970   ink_release_assert(status);
01971   status = AddPeerToRecvList(P);
01972   ink_release_assert(status);
01973   _LocalPeer = P;
01974 
01975   for (index = 1; index < MAX_DEFINED_PEERS; ++index) {
01976     Pcfg = _ICPConfig->indexToPeerConfigData(index);
01977     type = PeerConfigData::CTypeToPeerType_t(Pcfg->GetCType());
01978     
01979     
01980     
01981     
01982     
01983     
01984     if (Pcfg->GetIPAddr() == _LocalPeer->GetIP())
01985       continue;                 
01986 
01987     if ((type == PEER_PARENT) || (type == PEER_SIBLING)) {
01988 
01989       if (Pcfg->MultiCastMember()) {
01990         mcP = FindPeer(Pcfg->GetMultiCastIPAddr(), Pcfg->GetICPPort());
01991         if (!mcP) {
01992           
01993           
01994           
01995           mcP = new MultiCastPeer(Pcfg->GetMultiCastIPAddr(), Pcfg->GetICPPort(), Pcfg->GetMultiCastTTL(), this);
01996           status = AddPeer(mcP);
01997           ink_assert(status);
01998           status = AddPeerToSendList(mcP);
01999           ink_assert(status);
02000           status = AddPeerToRecvList(mcP);
02001           ink_assert(status);
02002         }
02003         
02004         
02005         
02006         P = new ParentSiblingPeer(type, Pcfg, this);
02007         status = AddPeer(P);
02008         ink_assert(status);
02009         status = ((MultiCastPeer *) mcP)->AddMultiCastChild(P);
02010         ink_assert(status);
02011 
02012       } else {
02013         
02014         
02015         
02016         P = new ParentSiblingPeer(type, Pcfg, this);
02017         status = AddPeer(P);
02018         ink_assert(status);
02019         status = AddPeerToSendList(P);
02020         ink_assert(status);
02021       }
02022       
02023       
02024       
02025       if (type == PEER_PARENT) {
02026         status = AddPeerToParentList(P);
02027         ink_assert(status);
02028       }
02029     }
02030   }
02031   return 0;                     
02032 }
02033 
02034 void
02035 ICPProcessor::FreePeerList()
02036 {
02037   
02038   int index;
02039   for (index = 0; index < (_nPeerList + 1); ++index) {
02040     if (_PeerList[index]) {
02041       _PeerList[index] = 0;
02042     }
02043   }
02044   
02045   _nPeerList = -1;
02046   _LocalPeer = (Peer *) 0;
02047   _curSendPeer = 0;
02048   _nSendPeerList = -1;
02049   _curRecvPeer = 0;
02050   _nRecvPeerList = -1;
02051   _curParentPeer = 0;
02052   _nParentPeerList = -1;
02053   _ValidPollData = 0;
02054   _last_recv_peer_bias = 0;
02055 
02056   for (index = 0; index < PEER_LIST_SIZE; index++) {
02057     _PeerList[index] = 0;
02058   }
02059   for (index = 0; index < SEND_PEER_LIST_SIZE; index++) {
02060     _SendPeerList[index] = 0;
02061   }
02062   for (index = 0; index < RECV_PEER_LIST_SIZE; index++) {
02063     _RecvPeerList[index] = 0;
02064   }
02065   for (index = 0; index < PARENT_PEER_LIST_SIZE; index++) {
02066     _ParentPeerList[index] = 0;
02067   }
02068   memset((void *) _PeerIDtoPollIndex, 0, sizeof(_PeerIDtoPollIndex[PEER_ID_POLL_INDEX_SIZE]));
02069 }
02070 
02071 int
02072 ICPProcessor::SetupListenSockets()
02073 {
02074   int allow_null_configuration;
02075 
02076   if ((_ICPConfig->globalConfig()->ICPconfigured() == ICP_MODE_RECEIVE_ONLY)
02077       && _ICPConfig->globalConfig()->ICPReplyToUnknownPeer()) {
02078     allow_null_configuration = 1;
02079   } else {
02080     allow_null_configuration = 0;
02081   }
02082 
02083   
02084 
02085   
02086   
02087   
02088   if (!_LocalPeer) {
02089     RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined local Peer");
02090     return 1;                   
02091   }
02092 
02093   if (GetSendPeers() == 0) {
02094     if (!allow_null_configuration) {
02095       RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined send Peer(s)");
02096       return 1;                 
02097     }
02098   }
02099   if (GetRecvPeers() == 0) {
02100     if (!allow_null_configuration) {
02101       RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined receive Peer(s)");
02102       return 1;                 
02103     }
02104   }
02105   
02106   
02107   
02108   Peer *P;
02109   int status;
02110   int index;
02111   ip_port_text_buffer ipb, ipb2;
02112   for (index = 0; index < (_nPeerList + 1); ++index) {
02113 
02114     if ((P = _PeerList[index])) {
02115 
02116       if ((P->GetType() == PEER_PARENT)
02117           || (P->GetType() == PEER_SIBLING)) {
02118         ParentSiblingPeer *pPS = (ParentSiblingPeer *) P;
02119 
02120         pPS->GetChan()->setRemote(pPS->GetIP());
02121 
02122       } else if (P->GetType() == PEER_MULTICAST) {
02123         MultiCastPeer *pMC = (MultiCastPeer *) P;
02124         ink_assert(_mcastCB_handler != NULL);
02125         status = pMC->GetSendChan()->setup_mc_send(pMC->GetIP(), _LocalPeer->GetIP(), NON_BLOCKING, pMC->GetTTL(), DISABLE_MC_LOOPBACK, _mcastCB_handler);
02126         if (status) {
02127           
02128           RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP MC send setup failed, res=%d, ip=%s bind_ip=%s",
02129             status,
02130             ats_ip_nptop(pMC->GetIP(), ipb, sizeof(ipb)),
02131             ats_ip_nptop(_LocalPeer->GetIP(), ipb2, sizeof(ipb2))
02132           );
02133           return 1;             
02134         }
02135 
02136         status = pMC->GetRecvChan()->setup_mc_receive(pMC->GetIP(),
02137                                                       _LocalPeer->GetIP(),
02138                                                       NON_BLOCKING, pMC->GetSendChan(), _mcastCB_handler);
02139         if (status) {
02140           
02141           RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP MC recv setup failed, res=%d, ip=%s",
02142             status, ats_ip_nptop(pMC->GetIP(), ipb, sizeof(ipb)));
02143           return 1;             
02144         }
02145       }
02146     }
02147   }
02148   
02149   
02150   
02151   
02152   
02153   ParentSiblingPeer *pPS = (ParentSiblingPeer *) ((Peer *) _LocalPeer);
02154 
02155   NetVCOptions options;
02156   options.local_ip.assign(pPS->GetIP());
02157   options.local_port = pPS->GetICPPort();
02158   options.ip_proto = NetVCOptions::USE_UDP;
02159   options.addr_binding = NetVCOptions::INTF_ADDR;
02160   status = pPS->GetChan()->open(options);
02161   if (status) {
02162     
02163     RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP bind_connect failed, res=%d, ip=%s",
02164       status,
02165       ats_ip_nptop(pPS->GetIP(), ipb, sizeof(ipb))
02166     );
02167     return 1;             
02168   }
02169 
02170   return 0;                     
02171 }
02172 
02173 void
02174 ICPProcessor::ShutdownListenSockets()
02175 {
02176   
02177   
02178   
02179   ink_assert(!PendingQuery());
02180   Peer *P;
02181 
02182   int index;
02183   for (index = 0; index < (_nPeerList + 1); ++index) {
02184     if ((P = _PeerList[index])) {
02185       if (P->GetType() == PEER_LOCAL) {
02186         ParentSiblingPeer *pPS = (ParentSiblingPeer *) P;
02187         (void) pPS->GetChan()->close();
02188 
02189       } else if (P->GetType() == PEER_MULTICAST) {
02190         MultiCastPeer *pMC = (MultiCastPeer *) P;
02191         (void) pMC->GetSendChan()->close();
02192         (void) pMC->GetRecvChan()->close();
02193       }
02194     }
02195   }
02196 }
02197 
02198 int
02199 ICPProcessor::Reconfigure(int , int )
02200 {
02201   
02202   
02203   
02204   
02205   
02206   ink_assert(_ICPConfig->HaveLock());
02207   ink_assert(!AllowICPQueries());
02208   ink_assert(!PendingQuery());
02209   
02210   
02211   
02212   
02213   ShutdownListenSockets();
02214   FreePeerList();
02215   
02216   
02217   
02218   
02219   _ICPConfig->UpdateGlobalConfig();
02220   _ICPConfig->UpdatePeerConfig();
02221 
02222   int status = -1;
02223   if (_ICPConfig->globalConfig()->ICPconfigured()) {
02224     if ((status = BuildPeerList()) == 0) {
02225       status = SetupListenSockets();
02226     }
02227     DumpICPConfig();
02228   }
02229   return status;
02230 }
02231 
02232 ICPProcessor::ReconfigState_t
02233   ICPProcessor::ReconfigureStateMachine(ReconfigState_t s, int gconfig_changed, int pconfig_changed)
02234 {
02235   
02236   
02237   
02238   
02239   
02240   
02241   
02242   
02243   
02244   ink_assert(_ICPConfig->HaveLock());
02245   int reconfig_status;
02246 
02247   while (1) {
02248 
02249     switch (s) {
02250     case RC_RECONFIG:
02251       {
02252         if (!Lock())
02253           return RC_RECONFIG;   
02254 
02255         if (PendingQuery()) {
02256           DisableICPQueries();  
02257           Unlock();
02258           CancelPendingReads();
02259           return RC_RECONFIG;   
02260 
02261         } else {
02262           DisableICPQueries();  
02263           Unlock();
02264           
02265           reconfig_status = Reconfigure(gconfig_changed, pconfig_changed);
02266 
02267           if (reconfig_status == 0) {
02268             s = RC_ENABLE_ICP;  
02269           } else {
02270             s = RC_DONE;        
02271           }
02272           break;                
02273         }
02274       }
02275 
02276     case RC_ENABLE_ICP:
02277       {
02278         if (!Lock())
02279           return RC_ENABLE_ICP; 
02280 
02281         EnableICPQueries();     
02282         Unlock();
02283 
02284         s = RC_DONE;
02285         break;                  
02286       }
02287 
02288     case RC_DONE:
02289       {
02290         
02291         _ICPConfig->Unlock();
02292         return RC_DONE;         
02293       }
02294     default:
02295       {
02296         ink_release_assert(0);  
02297       }
02298 
02299     }                           
02300 
02301   }                             
02302   return RC_DONE;
02303 }
02304 
02305 void
02306 ICPProcessor::CancelPendingReads()
02307 {
02308   
02309   
02310 
02311   ICPRequestCont *r = new(ICPRequestCont_allocator.alloc())
02312     ICPRequestCont(this, NULL, NULL);
02313   SET_CONTINUATION_HANDLER(r, (ICPRequestContHandler) & ICPRequestCont::NopICPRequestEvent);
02314   r->mutex = new_ProxyMutex();
02315 
02316   
02317   ICPRequestCont::BuildICPMsg(ICP_OP_HIT, 0, 0  , 0  , 0  ,
02318                                        (void *) 0, 0, &r->_sendMsgHdr, r->_sendMsgIOV, &r->_ICPmsg);
02319   r->_sendMsgHdr.msg_iovlen = 1;
02320   r->_ICPmsg.h.version = ~r->_ICPmsg.h.version; 
02321 
02322   Peer *lp = GetLocalPeer();
02323   r->_sendMsgHdr.msg_name = (caddr_t) & (lp->GetSendChan())->addr;
02324   r->_sendMsgHdr.msg_namelen = sizeof((lp->GetSendChan())->addr);
02325   udpNet.sendmsg_re(r, r, lp->GetSendFD(), &r->_sendMsgHdr);
02326 }
02327 
02328 Peer *
02329 ICPProcessor::GenericFindListPeer(IpAddr const& ip, uint16_t port, int validListItems, Ptr<Peer> *List)
02330 {
02331   Peer *P;
02332   port = htons(port);
02333   for (int n = 0; n < validListItems; ++n) {
02334     if ((P = List[n])) {
02335       if ((P->GetIP() == ip)
02336         && ((port == 0) || (ats_ip_port_cast(P->GetIP()) == port)))
02337         return P;
02338     }
02339   }
02340   return NULL;
02341 }
02342 
02343 Peer *
02344 ICPProcessor::FindPeer(IpAddr const& ip, uint16_t port)
02345 {
02346   
02347   return GenericFindListPeer(ip, port, (_nPeerList + 1), _PeerList);
02348 }
02349 
02350 Peer *
02351 ICPProcessor::FindSendListPeer(IpAddr const& ip, uint16_t port)
02352 {
02353   
02354   
02355   return GenericFindListPeer(ip, port, (_nSendPeerList + 1), _SendPeerList);
02356 }
02357 
02358 Peer *
02359 ICPProcessor::FindRecvListPeer(IpAddr const& ip, uint16_t port)
02360 {
02361   
02362   
02363   return GenericFindListPeer(ip, port, (_nRecvPeerList + 1), _RecvPeerList);
02364 }
02365 
02366 int
02367 ICPProcessor::AddPeer(Peer * P)
02368 {
02369   
02370   
02371   
02372 
02373   
02374   
02375   
02376   if (FindPeer(P->GetIP())) {
02377     ip_port_text_buffer x;
02378     
02379     RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "bad icp.config, multiple peer definitions for ip=%s", ats_ip_nptop(P->GetIP(), x, sizeof(x)));
02380 
02381     return 0;                   
02382   } else {
02383     
02384     if (_nPeerList + 1 < PEER_LIST_SIZE) {
02385       _nPeerList++;
02386       _PeerList[_nPeerList] = P;
02387       P->SetPeerID(_nPeerList);
02388       return 1;                 
02389     } else {
02390       return 0;                 
02391     }
02392   }
02393 }
02394 
02395 int
02396 ICPProcessor::AddPeerToRecvList(Peer * P)
02397 {
02398   
02399   
02400   
02401 
02402   
02403   ink_assert(FindRecvListPeer(IpAddr(P->GetIP()), ats_ip_port_host_order(P->GetIP())) == 0);
02404 
02405   if (_nRecvPeerList + 1 < RECV_PEER_LIST_SIZE) {
02406     _nRecvPeerList++;
02407     _RecvPeerList[_nRecvPeerList] = P;
02408     return 1;                   
02409   } else {
02410     return 0;                   
02411   }
02412 }
02413 
02414 int
02415 ICPProcessor::AddPeerToSendList(Peer * P)
02416 {
02417   
02418   
02419   
02420 
02421   
02422   ink_assert(FindSendListPeer(IpAddr(P->GetIP()), ats_ip_port_host_order(P->GetIP())) == 0);
02423 
02424   if (_nSendPeerList + 1 < SEND_PEER_LIST_SIZE) {
02425     _nSendPeerList++;
02426     _SendPeerList[_nSendPeerList] = P;
02427     return 1;                   
02428   } else {
02429     return 0;                   
02430   }
02431 }
02432 
02433 int
02434 ICPProcessor::AddPeerToParentList(Peer * P)
02435 {
02436   
02437   
02438 
02439   if (_nParentPeerList + 1 < PARENT_PEER_LIST_SIZE) {
02440     _nParentPeerList++;
02441     _ParentPeerList[_nParentPeerList] = P;
02442     return 1;                   
02443   } else {
02444     return 0;                   
02445   }
02446 }
02447 
02448