• Main Page
  • Related Pages
  • Namespaces
  • Data Structures
  • Files
  • File List
  • Globals

ICP.cc

Go to the documentation of this file.
00001 /** @file
00002 
00003   A brief file description
00004 
00005   @section license License
00006 
00007   Licensed to the Apache Software Foundation (ASF) under one
00008   or more contributor license agreements.  See the NOTICE file
00009   distributed with this work for additional information
00010   regarding copyright ownership.  The ASF licenses this file
00011   to you under the Apache License, Version 2.0 (the
00012   "License"); you may not use this file except in compliance
00013   with the License.  You may obtain a copy of the License at
00014 
00015       http://www.apache.org/licenses/LICENSE-2.0
00016 
00017   Unless required by applicable law or agreed to in writing, software
00018   distributed under the License is distributed on an "AS IS" BASIS,
00019   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00020   See the License for the specific language governing permissions and
00021   limitations under the License.
00022  */
00023 
00024 /****************************************************************************
00025 
00026   ICP.cc
00027 
00028 
00029 ****************************************************************************/
00030 
00031 #include "libts.h"
00032 #include "Main.h"
00033 #include "P_EventSystem.h"
00034 #include "P_Cache.h"
00035 #include "P_Net.h"
00036 #include "MgmtUtils.h"
00037 #include "P_RecProcess.h"
00038 #include "ICP.h"
00039 #include "ICPProcessor.h"
00040 #include "ICPlog.h"
00041 #include "logging/Log.h"
00042 #include "logging/LogAccessICP.h"
00043 #include "BaseManager.h"
00044 #include "HdrUtils.h"
00045 
00046 extern CacheLookupHttpConfig global_cache_lookup_config;
00047 HTTPHdr gclient_request;
00048 
00049 //****************************************************************************
00050 //  File Overview:
00051 //  ==============
00052 //      ICP files
00053 //        ICP.h           -- All ICP class definitions.
00054 //        ICPlog.h        -- ICP log object for logging system
00055 //        ICP.cc          -- Incoming/outgoing ICP request and ICP configuration
00056 //                           data base management.
00057 //        ICPConfig.cc    -- ICP interface to Traffic Server configuration
00058 //                           management, member functions for ICPlog (object
00059 //                           passed to logging system) along with
00060 //                           miscellaneous support routines.
00061 //        ICPevents.h     -- Event definitions specific to ICP.
00062 //        ICPProcessor.h  -- ICP external interface for other subsystems.
00063 //                           External subsystems only need to include this
00064 //                           header to use ICP.
00065 //        ICPProcessor.cc -- ICP external interface implementation.
00066 //        ICPStats.cc     -- ICP statistic callback registration.
00067 //
00068 //
00069 //  Class Overview:
00070 //  ===============
00071 //    ICPConfigData  -- Manages global ICP data from the TS configuration
00072 //                      manager.
00073 //    PeerConfigData -- Manages  ICP peer data from the TS configuration
00074 //                      manager.
00075 //    ICPConfigUpdateCont -- Used by
00076 //                        ICPConfiguration::icp_config_change_callback()
00077 //                        to retry callout after a delay in cases where
00078 //                        we cannot acquire the configuration lock.
00079 //    ICPConfiguration -- Overall manager of ICP configuration from TS
00080 //                        configuration.  Acts as interface and uses
00081 //                        ICPConfigData and PeerConfigData to implement
00082 //                        actions.  Also fields/processes TS configuration
00083 //                        callouts for "icp.config" changes.  ICP classes only
00084 //                        see ICPConfiguration when dealing with TS
00085 //                        configuration info.
00086 //
00087 //    Peer (base class) -- abstract base class
00088 //      ParentSiblingPeer : Peer  -- ICP object describing parent/sibling
00089 //                                   peer which is initialized from the
00090 //                                   TS configuration data.
00091 //      MultiCastPeer : Peer -- ICP object describing MultiCast peer.
00092 //                              Object is initialized from the TS
00093 //                              configuration data.
00094 //
00095 //    BitMap -- Generic bit map management class
00096 //
00097 //    ICPProcessor -- Central class which starts all periodic events
00098 //                 and maintains ICP configuration database.  Delegates
00099 //                 incoming data processing to ICPHandlerCont and
00100 //                 outgoing data processing to ICPRequestCont. Implements
00101 //                 reconfiguration actions and query requests from the
00102 //                 external interface.
00103 //
00104 //    ICPRequestCont -- Implements the state machine which processes
00105 //                 locally generated ICP queries.  Generates message
00106 //                 queries and processes query responses.  Responses
00107 //                 received via callout from ICPPeerReadCont.
00108 //
00109 //    PeriodicCont (base class) -- abstract base class
00110 //      ICPPeriodicCont : PeriodicCont -- Periodic which looks for ICP
00111 //                 configuration changes sent by the Traffic Server
00112 //                 configuration manager, and initiates ICP reconfiguration
00113 //                 in the event we have a valid configuration change via
00114 //                 ICPProcessor::ReconfigureStateMachine().
00115 //
00116 //      ICPHandlerCont : PeriodicCont -- Periodic which monitors incoming
00117 //                 ICP sockets and starts processing of the incoming ICP data.
00118 //
00119 //    ICPPeerReadCont -- Implements the incoming data state machine.
00120 //                 Processes remote ICP query requests and passes query
00121 //                 responses to ICPRequestCont via a callout.
00122 //    ICPlog -- Logging object which encapsulates ICP query info required
00123 //              by the new logging subsystem to produce squid access log
00124 //              data for ICP queries.
00125 //
00126 //****************************************************************************
00127 //
00128 //  ICP is integrated into HTTP miss processing as follows.
00129 //
00130 //  if (HTTP Traffic Server Miss) {
00131 //    if (proxy.config.icp.enabled) {
00132 //      Status = QueryICP(URL, &target_ip);
00133 //      if (Status == ICP_HIT)
00134 //        Issue Http Request to (target_ip, proxy_port);
00135 //    }
00136 //    if (proxy.config.http.parent_proxy_routing_enable) {
00137 //      Issue Http Request to (proxy.config.http.parent_proxy_hostname,
00138 //                             proxy.config.http.parent_proxy_port)
00139 //    }
00140 //    else
00141 //      Issue Http Request to Origin Server
00142 //  }
00143 //
00144 //****************************************************************************
00145 
00146 // VC++ 5.0 is rather picky
00147 typedef int (ICPPeerReadCont::*ICPPeerReadContHandler) (int, void *);
00148 typedef int (ICPPeriodicCont::*ICPPeriodicContHandler) (int, void *);
00149 typedef int (ICPHandlerCont::*ICPHandlerContHandler) (int, void *);
00150 typedef int (ICPRequestCont::*ICPRequestContHandler) (int, void *);
00151 
00152 // Plugin freshness function
00153 PluginFreshnessCalcFunc pluginFreshnessCalcFunc = (PluginFreshnessCalcFunc) NULL;
00154 
00155 //---------------------------------------
00156 // Class ICPHandlerCont member functions
00157 //      Deal with incoming ICP data
00158 //---------------------------------------
00159 
00160 // Static data declarations
00161 //Allocator *ICPHandlerCont::IncomingICPDataBuf;
00162 int64_t ICPHandlerCont::ICPDataBuf_IOBuffer_sizeindex;
00163 static ClassAllocator <ICPPeerReadCont::PeerReadData>PeerReadDataAllocator("PeerReadDataAllocator");
00164 static ClassAllocator<ICPPeerReadCont> ICPPeerReadContAllocator("ICPPeerReadContAllocator");
00165 
00166 static Action *default_action = NULL;
00167 
00168 
00169 ICPHandlerCont::ICPHandlerCont(ICPProcessor * icpP)
00170  : PeriodicCont(icpP)
00171 {
00172 }
00173 
00174 // do nothing continuation handler
00175 int
00176 ICPHandlerCont::TossEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
00177 {
00178   return EVENT_DONE;
00179 }
00180 
00181 int
00182 ICPHandlerCont::PeriodicEvent(int event, Event * /* e ATS_UNUSED */)
00183 {
00184   int n_peer, valid_peers;
00185   Peer *P;
00186 
00187   // Periodic handler which initiates incoming message processing
00188   // on the defined peers.
00189 
00190   valid_peers = _ICPpr->GetRecvPeers();
00191 
00192   // get peer info from the completionEvent token.
00193   switch (event) {
00194   case EVENT_POLL:
00195   case EVENT_INTERVAL:
00196     {
00197       // start read I/Os on peers which don't have outstanding I/Os
00198       for (n_peer = 0; n_peer < valid_peers; ++n_peer) {
00199         P = _ICPpr->GetNthRecvPeer(n_peer, _ICPpr->GetLastRecvPeerBias());
00200         if (!P || (P && !P->IsOnline()))
00201           continue;
00202         if (P->shouldStartRead()) {
00203           P->startingRead();
00204           ///////////////////////////////////////////
00205           // Setup state machine
00206           ///////////////////////////////////////////
00207           ICPPeerReadCont *s = ICPPeerReadContAllocator.alloc();
00208           int local_lookup = _ICPpr->GetConfig()->globalConfig()->ICPLocalCacheLookup();
00209 
00210           s->init(_ICPpr, P, local_lookup);
00211           RECORD_ICP_STATE_CHANGE(s, event, ICPPeerReadCont::READ_ACTIVE);
00212 
00213           ///////////////////////////////////////////
00214           // Start processing
00215           ///////////////////////////////////////////
00216           s->handleEvent(EVENT_INTERVAL, (Event *) 0);
00217         }
00218       }
00219       break;
00220     }
00221   default:
00222     {
00223       ink_release_assert(!"unexpected event");
00224       break;
00225     }
00226   }                             // End of switch
00227   return EVENT_CONT;
00228 }
00229 
00230 //***************************************************************************
00231 // Nested Class PeerReadData member functions
00232 //      Used by ICPPeerReadCont to encapsulate the data required by
00233 //      PeerReadStateMachine
00234 //***************************************************************************
00235 ICPPeerReadCont::PeerReadData::PeerReadData()
00236 {
00237   init();
00238 }
00239 
00240 void
00241 ICPPeerReadCont::PeerReadData::init()
00242 {
00243   _start_time = 0;
00244   _mycont = 0;
00245   _peer = 0;
00246   _next_state = READ_ACTIVE;
00247   _cache_lookup_local = 0;
00248   _buf = 0;
00249   _rICPmsg = 0;
00250   _rICPmsg_len = 0;
00251   _cachelookupURL.clear();
00252   _queryResult = 0;
00253   _ICPReqCont = 0;
00254   _bytesReceived = 0;
00255 #ifdef DEBUG_ICP
00256   _nhistory = 0;
00257 #endif
00258   memset((void *) &_sender, 0, sizeof(_sender));
00259 }
00260 
00261 ICPPeerReadCont::PeerReadData::~PeerReadData()
00262 {
00263   reset(1);
00264 }
00265 
00266 void
00267 ICPPeerReadCont::PeerReadData::reset(int full_reset)
00268 {
00269   if (full_reset) {
00270     _peer = 0;
00271     _buf = 0;
00272   }
00273   if (_rICPmsg) {
00274     _rICPmsg = 0;
00275     _rICPmsg_len = 0;
00276   }
00277 
00278   if (_cachelookupURL.valid()) {
00279     _cachelookupURL.destroy();
00280   }
00281 }
00282 
00283 //***************************************************************************
00284 
00285 //------------------------------------------------------------------------
00286 // ICPPeerReadCont -- ICP incoming message processing state machine
00287 //------------------------------------------------------------------------
00288 ICPPeerReadCont::ICPPeerReadCont():Continuation(0), _object_vc(NULL), _object_read(NULL),
00289 _cache_req_hdr_heap_handle(NULL), _cache_resp_hdr_heap_handle(NULL), _ICPpr(NULL), _state(NULL),
00290 _start_time(0), _recursion_depth(0)
00291 {
00292 }
00293 
00294 void
00295 ICPPeerReadCont::init(ICPProcessor * ICPpr, Peer * p, int lookup_local)
00296 {
00297   PeerReadData *s = PeerReadDataAllocator.alloc();
00298   s->init();
00299   s->_start_time = ink_get_hrtime();
00300   s->_peer = p;
00301   s->_next_state = READ_ACTIVE;
00302   s->_cache_lookup_local = lookup_local;
00303   SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerReadEvent);
00304   _ICPpr = ICPpr;
00305   _state = s;
00306   _recursion_depth = -1;
00307   _object_vc = NULL;
00308   _object_read = NULL;
00309   _cache_req_hdr_heap_handle = NULL;
00310   _cache_resp_hdr_heap_handle = NULL;
00311   mutex = new_ProxyMutex();
00312 }
00313 
00314 ICPPeerReadCont::~ICPPeerReadCont()
00315 {
00316   reset(1);                     // Full reset
00317 }
00318 
00319 void
00320 ICPPeerReadCont::reset(int full_reset)
00321 {
00322   mutex = 0;
00323   if (this->_state) {
00324     this->_state->reset(full_reset);
00325     PeerReadDataAllocator.free(this->_state);
00326   }
00327   if (_cache_req_hdr_heap_handle) {
00328     ats_free(_cache_req_hdr_heap_handle);
00329     _cache_req_hdr_heap_handle = NULL;
00330   }
00331   if (_cache_resp_hdr_heap_handle) {
00332     ats_free(_cache_resp_hdr_heap_handle);
00333     _cache_resp_hdr_heap_handle = NULL;
00334   }
00335 }
00336 
00337 int
00338 ICPPeerReadCont::ICPPeerReadEvent(int event, Event * e)
00339 {
00340   switch (event) {
00341   case EVENT_INTERVAL:
00342   case EVENT_IMMEDIATE:
00343     {
00344       break;
00345     }
00346   case NET_EVENT_DATAGRAM_WRITE_COMPLETE:
00347   case NET_EVENT_DATAGRAM_READ_COMPLETE:
00348   case NET_EVENT_DATAGRAM_READ_ERROR:
00349   case NET_EVENT_DATAGRAM_WRITE_ERROR:
00350     {
00351       ink_assert((event != NET_EVENT_DATAGRAM_READ_COMPLETE)
00352                  || (_state->_next_state == READ_DATA_DONE));
00353       ink_assert((event != NET_EVENT_DATAGRAM_WRITE_COMPLETE)
00354                  || (_state->_next_state == WRITE_DONE));
00355 
00356       ink_release_assert(this == (ICPPeerReadCont *)
00357                          completionUtil::getHandle(e));
00358       break;
00359     }
00360   case CACHE_EVENT_LOOKUP_FAILED:
00361   case CACHE_EVENT_LOOKUP:
00362     {
00363       ink_assert(_state->_next_state == AWAITING_CACHE_LOOKUP_RESPONSE);
00364       break;
00365     }
00366   default:
00367     {
00368       ink_release_assert(!"unexpected event");
00369     }
00370   }                             // End of switch
00371 
00372   // Front end to PeerReadStateMachine(), invoked by Event subsystem.
00373   if (PeerReadStateMachine(_state, e) == EVENT_CONT) {
00374     eventProcessor.schedule_in(this, RETRY_INTERVAL, ET_ICP);
00375     return EVENT_DONE;
00376 
00377   } else if (_state->_next_state == READ_PROCESSING_COMPLETE) {
00378     _state->_peer->cancelRead();
00379     this->reset(1);             // Full reset
00380     ICPPeerReadContAllocator.free(this);
00381     return EVENT_DONE;
00382 
00383   } else {
00384     return EVENT_DONE;
00385   }
00386 }
00387 
00388 int
00389 ICPPeerReadCont::StaleCheck(int event, Event * /* e ATS_UNUSED */)
00390 {
00391   ip_port_text_buffer ipb;
00392 
00393   ink_release_assert(mutex->thread_holding == this_ethread());
00394 
00395   Debug("icp-stale", "Stale check res=%d for id=%d, [%s] from [%s]",
00396         event, _state->_rICPmsg->h.requestno,
00397     _state->_rICPmsg->un.query.URL, ats_ip_nptop(&_state->_sender, ipb, sizeof(ipb)));
00398 
00399   switch (event) {
00400   case ICP_STALE_OBJECT:
00401     {
00402       _state->_queryResult = CACHE_EVENT_LOOKUP_FAILED;
00403       break;
00404     }
00405   case ICP_FRESH_OBJECT:
00406     {
00407       _state->_queryResult = CACHE_EVENT_LOOKUP;
00408       break;
00409     }
00410   default:
00411     {
00412       Debug("icp-stale", "ICPPeerReadCont::StaleCheck: Invalid Event %d\n", event);
00413       _state->_queryResult = CACHE_EVENT_LOOKUP_FAILED;
00414       break;
00415     }
00416   }
00417   _object_vc->do_io(VIO::CLOSE);
00418   _object_vc = 0;
00419   SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerReadEvent);
00420   return handleEvent(_state->_queryResult, 0);
00421 }
00422 
00423 int
00424 ICPPeerReadCont::ICPPeerQueryEvent(int event, Event * e)
00425 {
00426   ip_port_text_buffer ipb;
00427 
00428   Debug("icp", "Remote Query lookup res=%d for id=%d, [%s] from [%s]",
00429         event, _state->_rICPmsg->h.requestno,
00430     _state->_rICPmsg->un.query.URL, ats_ip_nptop(&_state->_sender, ipb, sizeof(ipb)));
00431   if (pluginFreshnessCalcFunc) {
00432     switch (event) {
00433     case CACHE_EVENT_OPEN_READ:
00434       {
00435         _object_vc = (CacheVConnection *) e;
00436         SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::StaleCheck);
00437         _object_vc->get_http_info(&_object_read);
00438         (*pluginFreshnessCalcFunc) ((void *) this);
00439         return EVENT_DONE;
00440       }
00441     case CACHE_EVENT_OPEN_READ_FAILED:
00442       {
00443         event = CACHE_EVENT_LOOKUP_FAILED;
00444         break;
00445       }
00446     default:
00447       break;
00448     }
00449   }
00450   // Process result
00451   _state->_queryResult = event;
00452   SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerReadEvent);
00453   return handleEvent(event, e);
00454 }
00455 
00456 int
00457 ICPPeerReadCont::ICPPeerQueryCont(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
00458 {
00459   ip_port_text_buffer ipb;
00460   Action *a;
00461 
00462   // Perform lookup()/open_read() on behalf of PeerReadStateMachine()
00463 
00464   ((char *) _state->_rICPmsg)[MAX_ICP_MSGSIZE - 1] = 0; // null terminate
00465   _state->_cachelookupURL.create(NULL);
00466   const char *qurl = (const char *) _state->_rICPmsg->un.query.URL;
00467   _state->_cachelookupURL.parse(qurl, strlen(qurl));
00468   Debug("icp", "Remote Query for id=%d, [%s] from [%s]",
00469         _state->_rICPmsg->h.requestno,
00470         _state->_rICPmsg->un.query.URL,
00471     ats_ip_nptop(&_state->_sender, ipb, sizeof(ipb))
00472   );
00473 
00474   SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerQueryEvent);
00475   if (_state->_rICPmsg->un.query.URL && *_state->_rICPmsg->un.query.URL) {
00476     _state->_queryResult = ~CACHE_EVENT_LOOKUP_FAILED;
00477     _start_time = ink_get_hrtime();
00478     if (pluginFreshnessCalcFunc && _ICPpr->GetConfig()->globalConfig()->ICPStaleLookup()) {
00479       //////////////////////////////////////////////////////////////
00480       // Note: _cache_lookup_local is ignored in this case, since
00481       //       cache clustering is not used with stale lookup.
00482       //////////////////////////////////////////////////////////////
00483       a = cacheProcessor.open_read(this, &_state->_cachelookupURL, false,
00484                                    &gclient_request, &global_cache_lookup_config, (time_t) 0);
00485     } else {
00486       a = cacheProcessor.lookup(this, &_state->_cachelookupURL, false, _state->_cache_lookup_local);
00487     }
00488     if (!a) {
00489       a = ACTION_IO_ERROR;
00490     }
00491     if (a == ACTION_RESULT_DONE) {
00492       return EVENT_DONE;        // callback complete
00493     } else if (a == ACTION_IO_ERROR) {
00494       handleEvent(CACHE_EVENT_LOOKUP_FAILED, 0);
00495       return EVENT_DONE;        // callback complete
00496     } else {
00497       return EVENT_CONT;        // callback pending
00498     }
00499   } else {
00500     // Null URL, return failed lookup
00501     handleEvent(CACHE_EVENT_LOOKUP_FAILED, 0);
00502     return EVENT_DONE;          // callback done
00503   }
00504 }
00505 
00506 struct AutoReference
00507 {
00508   AutoReference(int *cnt)
00509   {
00510     _cnt = cnt;
00511     (*_cnt)++;
00512   }
00513    ~AutoReference()
00514   {
00515     (*_cnt)--;
00516   }
00517   int *_cnt;
00518 };
00519 
00520 int
00521 ICPPeerReadCont::PeerReadStateMachine(PeerReadData * s, Event * e)
00522 {
00523   AutoReference l(&_recursion_depth);
00524   ip_port_text_buffer ipb; // scratch buffer for diagnostic messages.
00525   //-----------------------------------------------------------
00526   // State machine to process ICP data received on UDP socket
00527   //-----------------------------------------------------------
00528   MUTEX_TRY_LOCK(lock, this->mutex, this_ethread());
00529   if (!lock) {
00530     // we didn't get the lock, so we don't need to unlock it
00531     // coverity[missing_unlock]
00532     return EVENT_CONT;          // try again later
00533   }
00534 
00535   while (1) {                   // loop forever
00536 
00537     switch (s->_next_state) {
00538     case READ_ACTIVE:
00539       {
00540         ink_release_assert(_recursion_depth == 0);
00541         if (!_ICPpr->Lock())
00542           return EVENT_CONT;    // unable to get lock, try again later
00543 
00544         bool valid_peer = (_ICPpr->IdToPeer(s->_peer->GetPeerID()) == s->_peer);
00545 
00546         if (valid_peer && _ICPpr->AllowICPQueries()
00547             && _ICPpr->GetConfig()->globalConfig()->ICPconfigured()) {
00548 
00549           // Note pending incoming ICP request or response
00550           _ICPpr->IncPendingQuery();
00551           _ICPpr->Unlock();
00552 
00553           s->_next_state = READ_DATA;
00554           RECORD_ICP_STATE_CHANGE(s, 0, READ_DATA);
00555           break;                // move to next_state
00556 
00557         } else {
00558           _ICPpr->Unlock();
00559 
00560           // ICP NOT enabled, do nothing
00561           s->_next_state = READ_PROCESSING_COMPLETE;
00562           RECORD_ICP_STATE_CHANGE(s, 0, READ_PROCESSING_COMPLETE);
00563           return EVENT_DONE;
00564         }
00565       }
00566       ink_release_assert(0);    // Should never happen
00567 
00568     case READ_DATA:
00569       {
00570         ink_release_assert(_recursion_depth == 0);
00571 
00572         // Assumption of one outstanding read per peer...
00573         // Setup read from FD
00574         ink_assert(s->_peer->buf == NULL);
00575         Ptr<IOBufferBlock> buf = s->_peer->buf = new_IOBufferBlock();
00576         buf->alloc(ICPHandlerCont::ICPDataBuf_IOBuffer_sizeindex);
00577         s->_peer->fromaddrlen = sizeof(s->_peer->fromaddr);
00578         buf->fill(sizeof(ICPMsg_t));    // reserve space for decoding
00579         char *be = buf->buf_end() - 1;
00580         be[0] = 0;              // null terminate buffer
00581         s->_next_state = READ_DATA_DONE;
00582         RECORD_ICP_STATE_CHANGE(s, 0, READ_DATA_DONE);
00583         ink_assert(s->_peer->readAction == NULL);
00584         Action *a = s->_peer->RecvFrom_re(this, this, buf,
00585                                           buf->write_avail() - 1,
00586                                           &s->_peer->fromaddr.sa,
00587                                           &s->_peer->fromaddrlen);
00588         if (!a) {
00589           a = ACTION_IO_ERROR;
00590         }
00591         if (a == ACTION_RESULT_DONE) {
00592           // we will have been called back already and our state updated
00593           // appropriately.
00594           // move to next state
00595           ink_assert(s->_next_state == PROCESS_READ_DATA);
00596           break;
00597         } else if (a == ACTION_IO_ERROR) {
00598           // actually, this *could* be taken care of by the main handler, but
00599           // error processing makes more sense at this point.  Therefore,
00600           // the main handler ignores the errors.
00601           //
00602           // No data, terminate read loop.
00603           //
00604           ICP_INCREMENT_DYN_STAT(no_data_read_stat);
00605           s->_peer->buf = NULL; // release reference
00606           s->_next_state = READ_NOT_ACTIVE_EXIT;
00607           RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE_EXIT);
00608           // move to next state
00609           break;
00610         } else {
00611           s->_peer->readAction = a;
00612           return EVENT_DONE;
00613         }
00614       }
00615       ink_release_assert(0);    // Should never happen
00616 
00617     case READ_DATA_DONE:
00618       {
00619         // Convert ICP message from network to host format
00620         if (s->_peer->readAction != NULL) {
00621           ink_assert(s->_peer->readAction == e);
00622           s->_peer->readAction = NULL;
00623         }
00624         s->_bytesReceived = completionUtil::getBytesTransferred(e);
00625 
00626         if (s->_bytesReceived >= 0) {
00627           s->_next_state = PROCESS_READ_DATA;
00628           RECORD_ICP_STATE_CHANGE(s, 0, PROCESS_READ_DATA);
00629         } else {
00630           ICP_INCREMENT_DYN_STAT(no_data_read_stat);
00631           s->_peer->buf = NULL; // release reference
00632           s->_next_state = READ_NOT_ACTIVE_EXIT;
00633           RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE_EXIT);
00634         }
00635         if (_recursion_depth > 0) {
00636           return EVENT_DONE;
00637         } else {
00638           break;
00639         }
00640       }
00641       ink_release_assert(0);    // Should never happen
00642 
00643     case PROCESS_READ_DATA:
00644     case ADD_PEER:
00645       {
00646         ink_release_assert(_recursion_depth == 0);
00647 
00648         Ptr<IOBufferBlock> bufblock = s->_peer->buf;
00649         char *buf = bufblock->start();
00650 
00651         if (s->_next_state == PROCESS_READ_DATA) {
00652           ICPRequestCont::NetToHostICPMsg((ICPMsg_t *)
00653                                           (buf + sizeof(ICPMsg_t)), (ICPMsg_t *) buf);
00654 
00655           // adjust buffer pointers to point to decoded message.
00656           bufblock->reset();
00657           bufblock->fill(s->_bytesReceived);
00658 
00659           // Validate message length for sanity
00660           if (s->_bytesReceived < ((ICPMsg_t *) buf)->h.msglen) {
00661             //
00662             // Short read, terminate
00663             //
00664             ICP_INCREMENT_DYN_STAT(short_read_stat);
00665             s->_peer->buf = NULL;
00666             s->_next_state = READ_NOT_ACTIVE;
00667             RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
00668             break;              // move to next_state
00669           }
00670         }
00671         // Validate receiver and convert the received sockaddr
00672         //   to internal sockaddr format.
00673         IpEndpoint from;
00674         if (!s->_peer->ExtToIntRecvSockAddr(&s->_peer->fromaddr.sa, &from.sa)) {
00675           int status;
00676           ICPConfigData *cfg = _ICPpr->GetConfig()->globalConfig();
00677           ICPMsg_t *ICPmsg = (ICPMsg_t *) buf;
00678 
00679           if ((cfg->ICPconfigured() == ICP_MODE_RECEIVE_ONLY) &&
00680               cfg->ICPReplyToUnknownPeer() &&
00681               ((ICPmsg->h.version == ICP_VERSION_2) ||
00682                (ICPmsg->h.version == ICP_VERSION_3)) && (ICPmsg->h.opcode == ICP_OP_QUERY)) {
00683 
00684             //
00685             // Add the unknown Peer to our database to
00686             // allow us to resolve the lookup request.
00687             //
00688             if (!_ICPpr->GetConfig()->Lock()) {
00689               s->_next_state = ADD_PEER;
00690               RECORD_ICP_STATE_CHANGE(s, 0, ADD_PEER);
00691               return EVENT_CONT;
00692             }
00693             if (!_ICPpr->GetFreePeers() || !_ICPpr->GetFreeSendPeers()) {
00694               RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP Peer limit exceeded");
00695               _ICPpr->GetConfig()->Unlock();
00696               goto invalid_message;
00697             }
00698 
00699             int icp_reply_port = cfg->ICPDefaultReplyPort();
00700             if (!icp_reply_port) {
00701               icp_reply_port = ntohs(ats_ip_port_cast(&s->_peer->fromaddr));
00702             }
00703             PeerConfigData *Pcfg = new PeerConfigData(PeerConfigData::CTYPE_SIBLING, IpAddr(s->_peer->fromaddr), 0,
00704                                                       icp_reply_port);
00705             ParentSiblingPeer *P = new ParentSiblingPeer(PEER_SIBLING, Pcfg, _ICPpr, true);
00706             status = _ICPpr->AddPeer(P);
00707             ink_release_assert(status);
00708             status = _ICPpr->AddPeerToSendList(P);
00709             ink_release_assert(status);
00710 
00711             P->GetChan()->setRemote(P->GetIP());
00712 
00713             // coverity[uninit_use_in_call]
00714             Note("ICP Peer added ip=%s", ats_ip_nptop(P->GetIP(), ipb, sizeof(ipb)));
00715             from = s->_peer->fromaddr;
00716           } else {
00717           invalid_message:
00718             //
00719             // Sender does not exist in ICP configuration, terminate
00720             //
00721             ICP_INCREMENT_DYN_STAT(invalid_sender_stat);
00722             Debug("icp", "Received msg from invalid sender [%s]",
00723               ats_ip_nptop(&s->_peer->fromaddr, ipb, sizeof(ipb)));
00724 
00725             s->_peer->buf = NULL;
00726             s->_next_state = READ_NOT_ACTIVE;
00727             RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
00728             break;              // move to next_state
00729           }
00730         }
00731         // we hand off the decoded buffer from the Peer to the PeerReadData
00732         s->_sender = from;
00733         s->_rICPmsg_len = s->_bytesReceived;
00734         ink_assert(s->_buf == NULL);
00735         s->_buf = s->_peer->buf;
00736         s->_rICPmsg = (ICPMsg_t *) s->_buf->start();
00737         s->_peer->buf = NULL;
00738 
00739         //
00740         // Handle only ICP_VERSION_2/3 messages.  Reject all others.
00741         //
00742         if ((s->_rICPmsg->h.version != ICP_VERSION_2)
00743             && (s->_rICPmsg->h.version != ICP_VERSION_3)) {
00744           ICP_INCREMENT_DYN_STAT(read_not_v2_icp_stat);
00745           Debug("icp", "Received (v=%d) !v2 && !v3 msg from sender [%s]",
00746             (uint32_t) s->_rICPmsg->h.version, ats_ip_nptop(&from, ipb, sizeof(ipb)));
00747 
00748           s->_rICPmsg = NULL;
00749           s->_buf = NULL;
00750           s->_next_state = READ_NOT_ACTIVE;
00751           RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
00752           break;                // move to next_state
00753         }
00754         //
00755         // If this is a query message, redirect to
00756         // the query specific handlers.
00757         //
00758         if (s->_rICPmsg->h.opcode == ICP_OP_QUERY) {
00759           ICP_INCREMENT_DYN_STAT(icp_remote_query_requests_stat);
00760           ink_assert(!s->_mycont);
00761           s->_next_state = AWAITING_CACHE_LOOKUP_RESPONSE;
00762           RECORD_ICP_STATE_CHANGE(s, 0, AWAITING_CACHE_LOOKUP_RESPONSE);
00763 
00764           if (ICPPeerQueryCont(0, (Event *) 0) == EVENT_DONE) {
00765             break;              // Callback complete
00766           } else {
00767             return EVENT_DONE;  // Callback pending
00768           }
00769         } else {
00770           // We have a response message for an ICP query.
00771           Debug("icp", "Response for Id=%d, from [%s]",
00772             s->_rICPmsg->h.requestno, ats_ip_nptop(&s->_sender, ipb, sizeof(ipb)));
00773           ICP_INCREMENT_DYN_STAT(icp_remote_responses_stat);
00774           s->_next_state = GET_ICP_REQUEST;
00775           RECORD_ICP_STATE_CHANGE(s, 0, GET_ICP_REQUEST);
00776           break;                // move to next_state
00777         }
00778       }
00779       ink_release_assert(0);    // Should never happen
00780 
00781     case AWAITING_CACHE_LOOKUP_RESPONSE:
00782       {
00783         int status = 0;
00784         void *data = s->_rICPmsg->un.query.URL;
00785         int datalen = strlen((const char *) data) + 1;
00786 
00787         if (s->_queryResult == CACHE_EVENT_LOOKUP) {
00788           // Use the received ICP data buffer for the response message
00789           Debug("icp", "Sending ICP_OP_HIT for id=%d, [%.*s] to [%s]",
00790             s->_rICPmsg->h.requestno, datalen, (const char *)data, ats_ip_nptop(&s->_sender, ipb, sizeof(ipb)));
00791           ICP_INCREMENT_DYN_STAT(icp_cache_lookup_success_stat);
00792           status = ICPRequestCont::BuildICPMsg(ICP_OP_HIT,
00793                                                s->_rICPmsg->h.requestno, 0 /* optflags */ , 0 /* optdata */ ,
00794                                                0 /* shostid */ ,
00795                                                data, datalen, &s->_mhdr, s->_iov, s->_rICPmsg);
00796         } else if (s->_queryResult == CACHE_EVENT_LOOKUP_FAILED) {
00797           // Use the received ICP data buffer for response message
00798           Debug("icp", "Sending ICP_OP_MISS for id=%d, [%.*s] to [%s]",
00799             s->_rICPmsg->h.requestno, datalen, (const char *)data, ats_ip_nptop(&s->_sender, ipb, sizeof(ipb)));
00800           ICP_INCREMENT_DYN_STAT(icp_cache_lookup_fail_stat);
00801           status = ICPRequestCont::BuildICPMsg(ICP_OP_MISS,
00802                                                s->_rICPmsg->h.requestno, 0 /* optflags */ , 0 /* optdata */ ,
00803                                                0 /* shostid */ ,
00804                                                data, datalen, &s->_mhdr, s->_iov, s->_rICPmsg);
00805         } else {
00806           Warning("Bad cache lookup event: %d", s->_queryResult);
00807           ink_release_assert(!"Invalid cache lookup event");
00808         }
00809         ink_assert(status == 0);
00810 
00811         // Make system log entry for ICP query
00812         ICPlog logentry(s);
00813         LogAccessICP accessor(&logentry);
00814         Log::access(&accessor);
00815 
00816         s->_next_state = SEND_REPLY;
00817         RECORD_ICP_STATE_CHANGE(s, 0, SEND_REPLY);
00818 
00819         if (_recursion_depth > 0) {
00820           return EVENT_DONE;
00821         } else {
00822           break;
00823         }
00824       }
00825       ink_release_assert(0);    // Should never happen
00826 
00827     case SEND_REPLY:
00828       {
00829         ink_release_assert(_recursion_depth == 0);
00830         //
00831         // Send the query response back to the sender
00832         //
00833         s->_next_state = WRITE_DONE;
00834         RECORD_ICP_STATE_CHANGE(s, 0, WRITE_DONE);
00835         ink_assert(s->_peer->writeAction == NULL);
00836         Action *a = s->_peer->SendMsg_re(this, this,
00837                                          &s->_mhdr, &s->_sender.sa);
00838         if (!a) {
00839           a = ACTION_IO_ERROR;
00840         }
00841         if (a == ACTION_RESULT_DONE) {
00842           // we have been called back already and our state updated
00843           // appropriately
00844           break;
00845 
00846         } else if (a == ACTION_IO_ERROR) {
00847           // Partial write.
00848           ICP_INCREMENT_DYN_STAT(query_response_partial_write_stat);
00849           // coverity[uninit_use_in_call]
00850           Debug("icp_warn", "ICP response send, sent=%d res=%d, ip=%s",
00851             ntohs(s->_rICPmsg->h.msglen), -1, ats_ip_ntop(&s->_sender, ipb, sizeof(ipb)));
00852           s->_next_state = READ_NOT_ACTIVE;
00853           RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
00854           break;
00855         } else {
00856           s->_peer->writeAction = a;
00857           return EVENT_DONE;
00858         }
00859       }
00860       ink_release_assert(0);    // Should never happen
00861 
00862     case WRITE_DONE:
00863       {
00864         s->_peer->writeAction = NULL;
00865         int len = completionUtil::getBytesTransferred(e);
00866 
00867         if (len == (int)ntohs(s->_rICPmsg->h.msglen)) {
00868           ICP_INCREMENT_DYN_STAT(query_response_write_stat);
00869           s->_peer->LogSendMsg(s->_rICPmsg, &s->_sender.sa);       // log query reply
00870         } else {
00871           // Partial write.
00872           ICP_INCREMENT_DYN_STAT(query_response_partial_write_stat);
00873           // coverity[uninit_use_in_call]
00874           Debug("icp_warn", "ICP response send, sent=%d res=%d, ip=%s",
00875             ntohs(s->_rICPmsg->h.msglen), len, ats_ip_ntop(&s->_sender, ipb, sizeof(ipb)));
00876         }
00877         // Processing complete, perform completion actions
00878         s->_next_state = READ_NOT_ACTIVE;
00879         RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
00880         Debug("icp", "state->READ_NOT_ACTIVE");
00881 
00882         if (_recursion_depth > 0) {
00883           return EVENT_DONE;
00884         } else {
00885           break;                // move to next_state
00886         }
00887       }
00888       ink_release_assert(0);    // Should never happen
00889 
00890     case GET_ICP_REQUEST:
00891       {
00892         ink_release_assert(_recursion_depth == 0);
00893         ink_assert(s->_rICPmsg && s->_rICPmsg_len);     // Sanity check
00894 
00895         // Get ICP request associated with response message
00896         s->_ICPReqCont = ICPRequestCont::FindICPRequest(s->_rICPmsg->h.requestno);
00897         if (s->_ICPReqCont) {
00898           s->_next_state = GET_ICP_REQUEST_MUTEX;
00899           RECORD_ICP_STATE_CHANGE(s, 0, GET_ICP_REQUEST_MUTEX);
00900           break;                // move to next_state
00901         }
00902         //
00903         // No ICP request for response message, log as "response
00904         // for non-existent ICP request" and terminate processing
00905         //
00906         Debug("icp", "No ICP Request for Id=%d", s->_rICPmsg->h.requestno);
00907         ICP_INCREMENT_DYN_STAT(no_icp_request_for_response_stat);
00908         Peer *p = _ICPpr->FindPeer(s->_sender);
00909         p->LogRecvMsg(s->_rICPmsg, 0);
00910         s->_next_state = READ_NOT_ACTIVE;
00911         RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
00912         break;                  // move to next_state
00913       }
00914       ink_release_assert(0);    // Should never happen
00915 
00916     case GET_ICP_REQUEST_MUTEX:
00917       {
00918         ink_release_assert(_recursion_depth == 0);
00919         ink_assert(s->_ICPReqCont);
00920         Ptr<ProxyMutex> ICPReqContMutex(s->_ICPReqCont->mutex);
00921         EThread *ethread = this_ethread();
00922         ink_hrtime request_start_time;
00923 
00924         if (!MUTEX_TAKE_TRY_LOCK(ICPReqContMutex, ethread)) {
00925           ICP_INCREMENT_DYN_STAT(icp_response_request_nolock_stat);
00926           //
00927           // Unable to get ICP request mutex, delay and move back
00928           // to the GET_ICP_REQUEST state.  We need to do this
00929           // since the ICP request may be deallocated by the active
00930           // continuation.
00931           //
00932           s->_ICPReqCont = (ICPRequestCont *) 0;
00933           s->_next_state = GET_ICP_REQUEST;
00934           RECORD_ICP_STATE_CHANGE(s, 0, GET_ICP_REQUEST);
00935           return EVENT_CONT;
00936         }
00937         // Log as "response for ICP request"
00938         Peer *p = _ICPpr->FindPeer(s->_sender);
00939         p->LogRecvMsg(s->_rICPmsg, 1);
00940 
00941         // Process the ICP response for the given ICP request
00942         ICPRequestCont::ICPRequestEventArgs_t args;
00943         args.rICPmsg = s->_rICPmsg;
00944         args.rICPmsg_len = s->_rICPmsg_len;
00945         args.peer = p;
00946         if (!s->_ICPReqCont->GetActionPtr()->cancelled) {
00947           request_start_time = s->_ICPReqCont->GetRequestStartTime();
00948           Debug("icp", "Passing Reply for ICP Id=%d", s->_rICPmsg->h.requestno);
00949           s->_ICPReqCont->handleEvent((int) ICP_RESPONSE_MESSAGE, (void *) &args);
00950         } else {
00951           request_start_time = 0;
00952           delete s->_ICPReqCont;
00953           Debug("icp", "User cancelled ICP request Id=%d", s->_rICPmsg->h.requestno);
00954         }
00955 
00956         // Note: s->_ICPReqCont is deallocated at this point.
00957         s->_ICPReqCont = 0;
00958 
00959         MUTEX_UNTAKE_LOCK(ICPReqContMutex, ethread);
00960         if (request_start_time) {
00961           ICP_SUM_DYN_STAT(total_icp_response_time_stat, (ink_get_hrtime() - request_start_time));
00962         }
00963         RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE);
00964         s->_next_state = READ_NOT_ACTIVE;
00965         break;                  // move to next_state
00966       }
00967       ink_release_assert(0);    // Should never happen
00968 
00969     case READ_NOT_ACTIVE:
00970     case READ_NOT_ACTIVE_EXIT:
00971       {
00972         ink_release_assert(_recursion_depth == 0);
00973         if (!_ICPpr->Lock())
00974           return EVENT_CONT;    // unable to get lock, try again later
00975 
00976         // Note incoming ICP request or response completion
00977         _ICPpr->DecPendingQuery();
00978         _ICPpr->Unlock();
00979 
00980         s->_buf = 0;
00981         if (s->_next_state == READ_NOT_ACTIVE_EXIT) {
00982           s->_next_state = READ_PROCESSING_COMPLETE;
00983           return EVENT_DONE;
00984         } else {
00985           // Last read was valid, see if any more read data before exiting
00986           s->reset();
00987           s->_start_time = ink_get_hrtime();
00988           s->_next_state = READ_ACTIVE;
00989           RECORD_ICP_STATE_CHANGE(s, 0, READ_ACTIVE);
00990           break;                // restart
00991         }
00992       }
00993       ink_release_assert(0);    // Should never happen
00994 
00995     case READ_PROCESSING_COMPLETE:
00996     default:
00997       ink_release_assert(0);    // Should never happen
00998 
00999     }                           // End of switch
01000 
01001   }                             // End of while(1)
01002 }
01003 
01004 //------------------------------------------------------------------------
01005 // Class ICPRequestCont member functions
01006 //      Implements the state machine which processes locally generated
01007 //      ICP queries.
01008 //------------------------------------------------------------------------
01009 ClassAllocator<ICPRequestCont> ICPRequestCont_allocator("ICPRequestCont_allocator");
01010 
01011 ICPRequestCont::ICPRequestCont(ICPProcessor * pr, Continuation * c, URL * u)
01012   : Continuation(0), _cont(c), _url(u), _start_time(0),
01013     _ICPpr(pr), _timeout(0),
01014     npending_actions(0), pendingActions(NULL),
01015     _sequence_number(0), _expected_replies(0),
01016     _expected_replies_list(MAX_DEFINED_PEERS), _received_replies(0), _next_state(ICP_START)
01017 {
01018   memset((void *)&_ret_sockaddr, 0, sizeof(_ret_sockaddr));
01019   _ret_status = ICP_LOOKUP_FAILED;
01020   _act.cancelled = false;
01021   _act = c;
01022   memset((void *) &_ICPmsg, 0, sizeof(_ICPmsg));
01023   memset((void *) &_sendMsgHdr, 0, sizeof(_sendMsgHdr));
01024   memset((void *) &_sendMsgIOV, 0, sizeof(_sendMsgIOV[MSG_IOVECS]));
01025 
01026   if (c)
01027     this->mutex = c->mutex;
01028 }
01029 
01030 ICPRequestCont::~ICPRequestCont()
01031 {
01032   _act = NULL;
01033   this->mutex = NULL;
01034 
01035   if (_timeout) {
01036     _timeout->cancel(this);
01037     _timeout = 0;
01038   }
01039   RemoveICPRequest(_sequence_number);
01040 
01041   if (_ICPmsg.h.opcode == ICP_OP_QUERY) {
01042     if (_ICPmsg.un.query.URL) {
01043       ats_free(_ICPmsg.un.query.URL);
01044     }
01045   }
01046   if (pendingActions) {
01047     delete pendingActions;
01048     pendingActions = 0;
01049   }
01050 }
01051 
01052 void
01053 ICPRequestCont::remove_from_pendingActions(Action * a)
01054 {
01055   if (!pendingActions) {
01056     npending_actions--;
01057     return;
01058   }
01059   for (intptr_t i = 0; i < pendingActions->length(); i++) {
01060     if ((*pendingActions)[i] == a) {
01061       for (intptr_t j = i; j < pendingActions->length() - 1; j++)
01062         (*pendingActions)[j] = (*pendingActions)[j + 1];
01063       pendingActions->set_length(pendingActions->length() - 1);
01064       npending_actions--;
01065       return;
01066     }
01067   }
01068   npending_actions--;           // completed inline
01069 }
01070 
01071 void
01072 ICPRequestCont::remove_all_pendingActions()
01073 {
01074   int active_pendingActions = 0;
01075 
01076   if (!pendingActions) {
01077     return;
01078   }
01079   for (intptr_t i = 0; i < pendingActions->length(); i++) {
01080     if ((*pendingActions)[i]
01081         && ((*pendingActions)[i] != ACTION_IO_ERROR)) {
01082       ((*pendingActions)[i])->cancel();
01083       (*pendingActions)[i] = 0;
01084       npending_actions--;
01085       active_pendingActions++;
01086     } else {
01087       (*pendingActions)[i] = 0;
01088     }
01089   }
01090   pendingActions->set_length(pendingActions->length() - active_pendingActions);
01091 }
01092 
01093 int
01094 ICPRequestCont::ICPRequestEvent(int event, Event * e)
01095 {
01096   // Note: Passed parameter 'e' is not an Event *
01097   //       if event == ICP_RESPONSE_MESSAGE
01098 
01099   ink_assert(event == NET_EVENT_DATAGRAM_WRITE_COMPLETE ||
01100              event == NET_EVENT_DATAGRAM_WRITE_ERROR ||
01101              event == EVENT_IMMEDIATE || event == EVENT_INTERVAL || event == ICP_RESPONSE_MESSAGE);
01102   // handle reentrant callback
01103   if ((event == NET_EVENT_DATAGRAM_WRITE_COMPLETE)
01104       || (event == NET_EVENT_DATAGRAM_WRITE_ERROR)) {
01105     ink_assert(npending_actions > 0);
01106     remove_from_pendingActions((Action *) e);
01107     return EVENT_DONE;
01108   }
01109   // Start of user ICP query request processing.  We start here after
01110   // the reschedule in ICPProcessor::ICPQuery().
01111   switch (_next_state) {
01112   case ICP_START:
01113   case ICP_OFF_TERMINATE:
01114   case ICP_QUEUE_REQUEST:
01115   case ICP_AWAITING_RESPONSE:
01116   case ICP_DEQUEUE_REQUEST:
01117   case ICP_POST_COMPLETION:
01118   case ICP_REQUEST_NOT_ACTIVE:
01119     {
01120       if (ICPStateMachine(event, (void *) e) == EVENT_CONT) {
01121         //
01122         // Unable to acquire lock, reschedule continuation
01123         //
01124         eventProcessor.schedule_in(this, HRTIME_MSECONDS(RETRY_INTERVAL), ET_ICP);
01125         return EVENT_CONT;
01126 
01127       } else if (_next_state == ICP_DONE) {
01128         //
01129         // ICP request processing complete.
01130         //
01131         delete this;
01132         break;
01133       } else {
01134         break;
01135       }
01136     }
01137     ink_release_assert(0);      // should never happen
01138 
01139   case ICP_DONE:
01140   default:
01141     ink_release_assert(0);      // should never happen
01142   }                             // End of switch
01143 
01144   return EVENT_DONE;
01145 }
01146 
01147 int
01148 ICPRequestCont::NopICPRequestEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
01149 {
01150   delete this;
01151   return EVENT_DONE;
01152 }
01153 
01154 int
01155 ICPRequestCont::ICPStateMachine(int event, void *d)
01156 {
01157   //*******************************************
01158   // ICP message processing state machine
01159   //*******************************************
01160   ICPConfiguration *ICPcf = _ICPpr->GetConfig();
01161   ip_port_text_buffer ipb;
01162 
01163   while (1) {                   // loop forever
01164 
01165     switch (_next_state) {
01166     case ICP_START:
01167       {
01168         // User may have cancelled request, if so abort request.
01169         if (_act.cancelled) {
01170           _next_state = ICP_DONE;
01171           return EVENT_DONE;
01172         }
01173 
01174         if (!_ICPpr->Lock())
01175           return EVENT_CONT;    // Unable to get lock, try again later
01176 
01177         if (_ICPpr->AllowICPQueries() && (ICPcf->globalConfig()->ICPconfigured() == ICP_MODE_SEND_RECEIVE)) {
01178 
01179           // Reject NULL pointer or "localhost" URLs
01180           if (_url->valid()) {
01181             int host_len;
01182             const char *host = _url->host_get(&host_len);
01183             if (ptr_len_casecmp(host, host_len, "127.0.0.1") == 0 || ptr_len_casecmp(host, host_len, "localhost") == 0) {
01184               _ICPpr->Unlock();
01185 
01186               // NULL pointer or "localhost" URL, terminate request
01187               _next_state = ICP_OFF_TERMINATE;
01188               Debug("icp", "[ICP_START] NULL/localhost URL ignored Id=%d", _sequence_number);
01189               break;            // move to next_state
01190             }
01191           }
01192           // Note pending ICP request
01193           _ICPpr->IncPendingQuery();
01194           _ICPpr->Unlock();
01195 
01196           // Build the ICP query message
01197           char *urlstr = _url->string_get(NULL);
01198           int urlstr_len = strlen(urlstr) + 1;
01199 
01200           int status = BuildICPMsg(ICP_OP_QUERY,
01201                                    _sequence_number = ICPReqSeqNumber(),
01202                                    0 /* optflags */ , 0 /* optdata */ ,
01203                                    0 /* shostid */ ,
01204                                    (void *) urlstr, urlstr_len,
01205                                    &_sendMsgHdr, _sendMsgIOV,
01206                                    &_ICPmsg);
01207           // urlstr memory freed in destructor
01208           ink_assert(status == 0);
01209           Debug("icp", "[ICP_START] ICP_OP_QUERY for [%s], Id=%d", urlstr, _sequence_number);
01210 
01211           _next_state = ICP_QUEUE_REQUEST;
01212           break;                // move to next_state
01213 
01214         } else {
01215           ICP_INCREMENT_DYN_STAT(icp_start_icpoff_stat);
01216           _ICPpr->Unlock();
01217 
01218           // ICP NOT enabled, terminate request
01219           _next_state = ICP_OFF_TERMINATE;
01220           break;                // move to next_state
01221         }
01222       }
01223       ink_release_assert(0);    // should never happen
01224 
01225     case ICP_OFF_TERMINATE:
01226       {
01227         if (!MUTEX_TAKE_TRY_LOCK_FOR(mutex, this_ethread(), _cont)) {
01228           return EVENT_CONT;    // unable to get lock, delay and retry
01229         }
01230         Debug("icp", "[ICP_OFF_TERMINATE] Id=%d", _sequence_number);
01231 
01232         // ICP NOT enabled, post completion on request
01233         if (!_act.cancelled) {
01234           _cont->handleEvent(_ret_status, (void *) &_ret_sockaddr);
01235         }
01236         MUTEX_UNTAKE_LOCK(mutex, this_ethread());
01237 
01238         _next_state = ICP_DONE;
01239         return EVENT_DONE;
01240       }
01241       ink_release_assert(0);    // should never happen
01242 
01243     case ICP_QUEUE_REQUEST:
01244       {
01245         // Place ICP request on the pending request queue
01246         int ret = AddICPRequest(_sequence_number, this);
01247         ink_assert(ret == 0);
01248 
01249         // Generate ICP requests to peers
01250         int bias = _ICPpr->GetStartingSendPeerBias();
01251         int SendPeers = _ICPpr->GetSendPeers();
01252         npending_actions = 0;
01253         while (SendPeers > 0) {
01254           Peer *P = _ICPpr->GetNthSendPeer(SendPeers, bias);
01255           if (!P->IsOnline()) {
01256             SendPeers--;
01257             continue;
01258           }
01259           //
01260           // Send query request to Peers
01261           //
01262 
01263           // because of reentrancy, we have to do this first, just
01264           // in case we get called back immediately.
01265           int was_expected = P->ExpectedReplies(&_expected_replies_list);
01266           _expected_replies += was_expected;
01267           npending_actions++;
01268           Action *a = P->SendMsg_re(this, P, &_sendMsgHdr, NULL);
01269           if (!a) {
01270             a = ACTION_IO_ERROR;
01271           }
01272           if (a != ACTION_IO_ERROR) {
01273             if (a != ACTION_RESULT_DONE) {
01274               if (!pendingActions) {
01275                 pendingActions = new DynArray<Action *>(&default_action);
01276               }
01277               (*pendingActions) (npending_actions) = a;
01278             }
01279             P->LogSendMsg(&_ICPmsg, NULL);  // log as send query
01280             Debug("icp", "[ICP_QUEUE_REQUEST] Id=%d send query to [%s]",
01281               _sequence_number, ats_ip_nptop(P->GetIP(), ipb, sizeof(ipb)));
01282           } else {
01283             _expected_replies_list.ClearBit(P->GetPeerID());
01284             _expected_replies -= was_expected;
01285             // Partial or failed write.
01286             ICP_INCREMENT_DYN_STAT(send_query_partial_write_stat);
01287             // coverity[uninit_use_in_call]
01288             Debug("icp_warn",
01289                   "ICP query send, res=%d, ip=%s", ntohs(_ICPmsg.h.msglen),
01290               ats_ip_ntop(P->GetIP(), ipb, sizeof(ipb)));
01291           }
01292           SendPeers--;
01293         }
01294 
01295         Debug("icp", "[ICP_QUEUE_REQUEST] Id=%d expected replies=%d", _sequence_number, _expected_replies);
01296         if (!_expected_replies) {
01297           //
01298           // Nothing to wait for, terminate ICP processing
01299           //
01300           ICP_INCREMENT_DYN_STAT(icp_queries_no_expected_replies_stat);
01301           _next_state = ICP_DEQUEUE_REQUEST;
01302           break;                // move to next_state
01303         }
01304         ICP_SUM_DYN_STAT(total_udp_send_queries_stat, _expected_replies);
01305 
01306         //
01307         // Setup ICP request response timeout
01308         //
01309         int tval = _ICPpr->GetConfig()->globalConfig()->ICPqueryTimeout();
01310         _timeout = eventProcessor.schedule_in(this, HRTIME_SECONDS(tval), ET_ICP);
01311 
01312         _next_state = ICP_AWAITING_RESPONSE;
01313         return EVENT_DONE;
01314       }
01315       ink_release_assert(0);    // should never happen
01316 
01317     case ICP_AWAITING_RESPONSE:
01318       {
01319         Debug("icp", "[ICP_AWAITING_RESPONSE] Id=%d", _sequence_number);
01320         ink_assert(d);
01321         ICPRequestEventArgs_t dummyArgs;
01322         ICPRequestEventArgs_t *args = 0;
01323 
01324         if (event == ICP_RESPONSE_MESSAGE) {
01325           args = (ICPRequestEventArgs_t *) d;
01326         } else if (event == EVENT_INTERVAL) {
01327           memset((void *) &dummyArgs, 0, sizeof(dummyArgs));
01328           args = &dummyArgs;
01329         } else {
01330           ink_release_assert(0);        // should never happen
01331         }
01332 
01333         // Process ICP response
01334         if (ICPResponseMessage(event, args->rICPmsg, args->peer) == EVENT_DONE) {
01335           // ICP Request processing is complete, do completion actions
01336           _next_state = ICP_DEQUEUE_REQUEST;
01337           break;                // move to next_state
01338 
01339         } else {
01340           // Continue to wait for additional replies
01341           return EVENT_DONE;
01342         }
01343       }
01344       ink_release_assert(0);    // should never happen
01345 
01346     case ICP_DEQUEUE_REQUEST:
01347       {
01348         // Remove ICP request from active queue
01349         int ret = RemoveICPRequest(_sequence_number);
01350         Debug("icp", "[ICP_DEQUEUE_REQUEST] Id=%d", _sequence_number);
01351         ink_assert(ret == 0);
01352         //_sequence_number = 0; // moved to REQUEST_NOT_ACTIVE
01353         _next_state = ICP_POST_COMPLETION;
01354         break;                  // move to next_state
01355       }
01356       ink_release_assert(0);    // should never happen
01357 
01358     case ICP_POST_COMPLETION:
01359       {
01360         if (!MUTEX_TAKE_TRY_LOCK_FOR(mutex, this_ethread(), _cont)) {
01361           return EVENT_CONT;    // unable to get lock, delay and retry
01362         }
01363         Debug("icp", "[ICP_POST_COMPLETION] Id=%d", _sequence_number);
01364 
01365         // Post completion on the ICP request.
01366         if (!_act.cancelled) {
01367           _cont->handleEvent(_ret_status, (void *) &_ret_sockaddr);
01368         }
01369         MUTEX_UNTAKE_LOCK(mutex, this_ethread());
01370         ICP_SUM_DYN_STAT(total_icp_request_time_stat, (ink_get_hrtime() - _start_time));
01371 
01372         _next_state = ICP_WAIT_SEND_COMPLETE;
01373         break;                  // move to next_state
01374       }
01375       ink_release_assert(0);    // should never happen
01376     case ICP_WAIT_SEND_COMPLETE:
01377       {
01378         // wait for all the sends to complete.
01379         if (npending_actions > 0) {
01380           Debug("icp", "[ICP_WAIT_SEND_COMPLETE] Id=%d active=%d", _sequence_number, npending_actions);
01381         } else {
01382           _next_state = ICP_REQUEST_NOT_ACTIVE;
01383           // move to next state
01384           break;
01385         }
01386       }
01387       break;
01388       ink_release_assert(0);    // should never happen
01389     case ICP_REQUEST_NOT_ACTIVE:
01390       {
01391         Debug("icp", "[ICP_REQUEST_NOT_ACTIVE] Id=%d", _sequence_number);
01392         _sequence_number = 0;
01393         if (!_ICPpr->Lock())
01394           return EVENT_CONT;    // Unable to get lock, try again later
01395 
01396         // Note pending ICP request completion
01397         _ICPpr->DecPendingQuery();
01398         _ICPpr->Unlock();
01399 
01400         _next_state = ICP_DONE;
01401         return EVENT_DONE;
01402       }
01403       ink_release_assert(0);    // should never happen
01404 
01405     case ICP_DONE:
01406     default:
01407       ink_release_assert(0);    // should never happen
01408 
01409     }                           // End of switch
01410 
01411   }                             // End of while(1)
01412 }
01413 
01414 int
01415 ICPRequestCont::ICPResponseMessage(int event, ICPMsg_t * m, Peer * peer)
01416 {
01417   ip_port_text_buffer ipb, ipb2;
01418 
01419   if (event == EVENT_INTERVAL) {
01420     _timeout = 0;
01421     remove_all_pendingActions();
01422 
01423     // ICP request response timeout, if we received a response from
01424     // any parent, return it to resolve the miss.
01425 
01426     if (_received_replies) {
01427       int NumParentPeers = _ICPpr->GetParentPeers();
01428       if (NumParentPeers > 0) {
01429         int n;
01430         Peer *pp;
01431         for (n = 0; n < NumParentPeers; n++) {
01432           pp = _ICPpr->GetNthParentPeer(0, _ICPpr->GetStartingParentPeerBias());
01433           if (pp && !_expected_replies_list.IsBitSet(pp->GetPeerID())
01434               && pp->isUp()) {
01435             ats_ip_copy(&_ret_sockaddr.sa, pp->GetIP());
01436             _ret_sockaddr.port() = htons(static_cast<ParentSiblingPeer*>(pp)->GetProxyPort());
01437             _ret_status = ICP_LOOKUP_FOUND;
01438 
01439             Debug("icp",
01440               "ICP timeout using parent Id=%d from [%s] return [%s]",
01441               _sequence_number,
01442               ats_ip_nptop(pp->GetIP(), ipb, sizeof(ipb)),
01443               ats_ip_nptop(&_ret_sockaddr, ipb2, sizeof(ipb2))
01444             );
01445             return EVENT_DONE;
01446           }
01447         }
01448       }
01449     }
01450     // Timeout received on ICP request, return ICP_LOOKUP_FAILED
01451     Debug("icp", "ICP Response timeout for Id=%d", _sequence_number);
01452     return EVENT_DONE;
01453 
01454   } else {
01455     // We have received a response to our ICP query request.
01456     // See if this response resolves the ICP query.
01457     //
01458     ink_assert(m->h.requestno == _sequence_number);
01459 
01460     switch (m->h.opcode) {
01461     case ICP_OP_HIT:
01462     case ICP_OP_HIT_OBJ:
01463       {
01464         // Kill timeout event
01465         _timeout->cancel(this);
01466         _timeout = 0;
01467 
01468         ICP_INCREMENT_DYN_STAT(icp_query_hits_stat);
01469         ++_received_replies;
01470         ats_ip_copy(&_ret_sockaddr, peer->GetIP());
01471         _ret_sockaddr.port() =  htons(static_cast<ParentSiblingPeer*>(peer)->GetProxyPort());
01472         _ret_status = ICP_LOOKUP_FOUND;
01473 
01474         Debug("icp",
01475           "ICP Response HIT for Id=%d from [%s] return [%s]",
01476           _sequence_number,
01477           ats_ip_nptop(peer->GetIP(), ipb, sizeof(ipb)),
01478           ats_ip_nptop(&_ret_sockaddr, ipb2, sizeof(ipb2))
01479         );
01480         return EVENT_DONE;
01481       }
01482     case ICP_OP_MISS:
01483     case ICP_OP_ERR:
01484     case ICP_OP_MISS_NOFETCH:
01485     case ICP_OP_DENIED:
01486       {
01487         Debug("icp", "ICP MISS response for Id=%d from [%s]",
01488           _sequence_number, ats_ip_nptop(peer->GetIP(), ipb, sizeof(ipb)));
01489         // "received_replies" is only for Peers who we expect a reply
01490         //  from (Peers which are in the expected_replies_list).
01491         int Id = peer->GetPeerID();
01492         if (_expected_replies_list.IsBitSet(Id)) {
01493           // Clear bit to note receipt of reply
01494           _expected_replies_list.ClearBit(Id);
01495           ++_received_replies;
01496         }
01497 
01498         if (_received_replies < _expected_replies)
01499           return EVENT_CONT;    // wait for more responses
01500 
01501         // Kill timeout event
01502         _timeout->cancel(this);
01503         _timeout = 0;
01504 
01505         ICP_INCREMENT_DYN_STAT(icp_query_misses_stat);
01506         //
01507         // All responders have returned ICP_OP_MISS.
01508         // If parents exists, select one to resolve the request.
01509         //
01510         if (_ICPpr->GetParentPeers() > 0) {
01511           // In cases where multiple parents exist, we use
01512           // a round robin scheme.
01513           Peer *p = NULL;
01514           // try to find an UP parent, if none, return ICP_LOOKUP_FAILED
01515           {
01516             int i;
01517             for (i = 0; i < _ICPpr->GetParentPeers(); i++) {
01518               p = _ICPpr->GetNthParentPeer(0, _ICPpr->GetStartingParentPeerBias());
01519               // find an UP parent
01520               if (p->isUp())
01521                 break;
01522             }
01523             // if no parent is selected, then return ICP_LOOKUP_FAILED
01524             if (i >= _ICPpr->GetParentPeers()) {
01525               Debug("icp", "None of the %d ICP parent(s) is up", _ICPpr->GetParentPeers());
01526               p = NULL;
01527             }
01528           }
01529           if (p) {
01530             ats_ip_copy(&_ret_sockaddr, p->GetIP());
01531             _ret_sockaddr.port() = htons(static_cast<ParentSiblingPeer*>(p)->GetProxyPort());
01532             _ret_status = ICP_LOOKUP_FOUND;
01533 
01534             Debug("icp", "ICP ALL MISS(1) for Id=%d return [%s]",
01535               _sequence_number, ats_ip_nptop(&_ret_sockaddr, ipb, sizeof(ipb)));
01536             return EVENT_DONE;
01537           }
01538         }
01539         Debug("icp", "ICP ALL MISS(2) for Id=%d return [%s]",
01540           _sequence_number, ats_ip_nptop(&_ret_sockaddr, ipb, sizeof(ipb)));
01541         return EVENT_DONE;
01542       }
01543     default:
01544       {
01545         ICP_INCREMENT_DYN_STAT(invalid_icp_query_response_stat);
01546         // coverity[uninit_use_in_call]
01547         Warning("Invalid ICP response, op=%d reqno=%d ip=%s",
01548           m->h.opcode, m->h.requestno, ats_ip_ntop(peer->GetIP(), ipb, sizeof(ipb)));
01549         return EVENT_CONT;      // wait for more responses
01550       }
01551 
01552     }                           // End of switch
01553   }
01554 }
01555 
01556 //------------------------------------------------
01557 // Class ICPRequestCont static member functions
01558 //------------------------------------------------
01559 
01560 // Static member function
01561 void
01562 ICPRequestCont::NetToHostICPMsg(ICPMsg_t * in, ICPMsg_t * out)
01563 {
01564   out->h.opcode = in->h.opcode;
01565   out->h.version = in->h.version;
01566   out->h.msglen = ntohs(in->h.msglen);
01567   out->h.requestno = ntohl(in->h.requestno);
01568   out->h.optionflags = ntohl(in->h.optionflags);
01569   out->h.optiondata = ntohl(in->h.optiondata);
01570   out->h.shostid = ntohl(in->h.shostid);
01571 
01572   switch (in->h.opcode) {
01573   case ICP_OP_QUERY:
01574     {
01575       memcpy((char *) &out->un.query.rhostid,
01576              (char *) ((char *) (&in->h.shostid) + sizeof(in->h.shostid)), sizeof(out->un.query.rhostid));
01577       out->un.query.rhostid = ntohl(out->un.query.rhostid);
01578       out->un.query.URL = (char *) ((char *) (&in->h.shostid) + sizeof(in->h.shostid) + sizeof(out->un.query.rhostid));
01579       break;
01580     }
01581   case ICP_OP_HIT:
01582     {
01583       out->un.hit.URL = (char *)((char *) (&in->h.shostid) + sizeof(in->h.shostid));
01584       break;
01585     }
01586   case ICP_OP_MISS:
01587     {
01588       out->un.miss.URL = (char *)((char *) (&in->h.shostid) + sizeof(in->h.shostid));
01589       break;
01590     }
01591   case ICP_OP_HIT_OBJ:
01592     {
01593       out->un.hitobj.URL = (char *)((char *) (&in->h.shostid) + sizeof(in->h.shostid));
01594 
01595       // strlen() is bounded since buffer in null terminated.
01596       out->un.hitobj.p_objsize = (char *) (out->un.hitobj.URL + strlen(out->un.hitobj.URL));
01597       memcpy((char *) &out->un.hitobj.objsize, out->un.hitobj.p_objsize, sizeof(out->un.hitobj.objsize));
01598       out->un.hitobj.objsize = ntohs(out->un.hitobj.objsize);
01599       out->un.hitobj.data = (char *) (out->un.hitobj.p_objsize + sizeof(out->un.hitobj.objsize));
01600       break;
01601     }
01602   default:
01603     break;
01604   }
01605 }
01606 
01607 int
01608 ICPRequestCont::BuildICPMsg(ICPopcode_t op, unsigned int seqno,
01609                             int optflags, int optdata, int shostid,
01610                             void *data, int datalen, struct msghdr *mhdr, struct iovec *iov, ICPMsg_t * icpmsg)
01611 {
01612   // Build ICP message for transmission in network byte order.
01613   if (op == ICP_OP_QUERY) {
01614     icpmsg->un.query.rhostid = htonl(0);
01615     icpmsg->un.query.URL = (char *) data;
01616 
01617     mhdr->msg_iov = iov;
01618     mhdr->msg_iovlen = 3;
01619 
01620     iov[0].iov_base = (caddr_t) icpmsg;
01621     iov[0].iov_len = sizeof(ICPMsgHdr_t);
01622 
01623     iov[1].iov_base = (caddr_t) & icpmsg->un.query.rhostid;
01624     iov[1].iov_len = sizeof(icpmsg->un.query.rhostid);
01625 
01626     iov[2].iov_base = (caddr_t) data;
01627     iov[2].iov_len = datalen;
01628     icpmsg->h.msglen = htons(iov[0].iov_len + iov[1].iov_len + iov[2].iov_len);
01629 
01630   } else if (op == ICP_OP_HIT) {
01631     icpmsg->un.hit.URL = (char *) data;
01632 
01633     mhdr->msg_iov = iov;
01634     mhdr->msg_iovlen = 2;
01635 
01636     iov[0].iov_base = (caddr_t) icpmsg;
01637     iov[0].iov_len = sizeof(ICPMsgHdr_t);
01638 
01639     iov[1].iov_base = (caddr_t) data;
01640     iov[1].iov_len = datalen;
01641     icpmsg->h.msglen = htons(iov[0].iov_len + iov[1].iov_len);
01642 
01643   } else if (op == ICP_OP_MISS) {
01644     icpmsg->un.miss.URL = (char *) data;
01645 
01646     mhdr->msg_iov = iov;
01647     mhdr->msg_iovlen = 2;
01648 
01649     iov[0].iov_base = (caddr_t) icpmsg;
01650     iov[0].iov_len = sizeof(ICPMsgHdr_t);
01651 
01652     iov[1].iov_base = (caddr_t) data;
01653     iov[1].iov_len = datalen;
01654     icpmsg->h.msglen = htons(iov[0].iov_len + iov[1].iov_len);
01655 
01656   } else {
01657     ink_release_assert(0);
01658     return 1;                   // failed
01659   }
01660 
01661   mhdr->msg_name = (caddr_t) 0;
01662   mhdr->msg_namelen = 0;
01663   // TODO: The following is just awkward
01664 #if !defined(linux) && !defined(freebsd) && !defined(darwin) && !defined(solaris) \
01665  && !defined(openbsd)
01666   mhdr->msg_accrights = (caddr_t) 0;
01667   mhdr->msg_accrightslen = 0;
01668 #elif !defined(solaris)
01669   mhdr->msg_control = 0;
01670   mhdr->msg_controllen = 0;
01671   mhdr->msg_flags = 0;
01672 #endif
01673 
01674   icpmsg->h.opcode = op;
01675   icpmsg->h.version = ICP_VERSION_2;
01676   icpmsg->h.requestno = htonl(seqno);
01677   icpmsg->h.optionflags = htonl(optflags);
01678   icpmsg->h.optiondata = htonl(optdata);
01679   icpmsg->h.shostid = htonl(shostid);
01680 
01681   return 0;                     // Success
01682 }
01683 
01684 // Static ICPRequestCont data declarations
01685 unsigned int
01686   ICPRequestCont::ICPRequestSeqno = 1;
01687 Queue<ICPRequestCont> ICPRequestQueue[ICPRequestCont::ICP_REQUEST_HASH_SIZE];
01688 
01689 // Static member function
01690 unsigned int
01691 ICPRequestCont::ICPReqSeqNumber()
01692 {
01693   // Generate ICP request sequence numbers.  This must be unique.
01694   unsigned int res = 0;
01695   do {
01696     res = (unsigned int) ink_atomic_increment((int *) &ICPRequestSeqno, 1);
01697   } while (!res);
01698 
01699   return res;
01700 }
01701 
01702 // Static member function
01703 inline int
01704 ICPRequestCont::ICPRequestHash(unsigned int seqno)
01705 {
01706   // ICPRequestQueue hash
01707   return seqno % ICP_REQUEST_HASH_SIZE;
01708 }
01709 
01710 // Static member function
01711 int
01712 ICPRequestCont::AddICPRequest(unsigned int seqno, ICPRequestCont * r)
01713 {
01714   // Add ICP request to ICP outstanding queue (ICPRequestQueue).
01715   // return: 0 - success
01716 
01717   ICPRequestQueue[ICPRequestHash(seqno)].enqueue(r);
01718   return 0;                     // Success
01719 }
01720 
01721 // Static member function
01722 ICPRequestCont *
01723 ICPRequestCont::FindICPRequest(unsigned int seqno)
01724 {
01725   // Find ICP request on outstanding queue with the given sequence number
01726   int hash = ICPRequestHash(seqno);
01727   ICPRequestCont *r;
01728 
01729   for (r = (ICPRequestCont *) ICPRequestQueue[hash].head; r; r = (ICPRequestCont *) r->link.next) {
01730     if (r->_sequence_number == seqno)
01731       return r;
01732   }
01733   return (ICPRequestCont *) 0;  // Not found
01734 }
01735 
01736 // Static member function
01737 int
01738 ICPRequestCont::RemoveICPRequest(unsigned int seqno)
01739 {
01740   // Remove ICP request from outstanding queue with the given
01741   //  sequence number
01742   // Return: 0 - success; 1 - not found
01743 
01744   if (!seqno) {
01745     return 1;                   // Not found
01746   }
01747   int hash = ICPRequestHash(seqno);
01748   ICPRequestCont *r;
01749 
01750   for (r = (ICPRequestCont *) ICPRequestQueue[hash].head; r; r = (ICPRequestCont *) r->link.next) {
01751     if (r->_sequence_number == seqno) {
01752       ICPRequestQueue[hash].remove(r);
01753       return 0;
01754     }
01755   }
01756   return 1;                     // Not found
01757 }
01758 
01759 //------------------------------------------------------------------------
01760 // Class ICPProcessor member functions
01761 //      Central class which initializes the ICP world.
01762 //      Delegates incoming message processing to ICPHandlerCont
01763 //      and outgoing message processing to ICPRequestCont.
01764 //      Manages the ICP configuration database derived from TS
01765 //      configuration info.
01766 //------------------------------------------------------------------------
01767 
01768 // Static data declarations for ICPProcessor
01769 void
01770 initialize_thread_for_icp(EThread * e)
01771 {
01772   (void) e;
01773 }
01774 
01775 ICPProcessor icpProcessorInternal;
01776 ICPProcessorExt icpProcessor(&icpProcessorInternal);
01777 
01778 ICPProcessor::ICPProcessor()
01779  : _l(0), _Initialized(0), _AllowIcpQueries(0),
01780    _PendingIcpQueries(0), _ICPConfig(0), _ICPPeriodic(0), _ICPHandler(0),
01781    _mcastCB_handler(NULL), _PeriodicEvent(0), _ICPHandlerEvent(0),
01782    _nPeerList(-1), _LocalPeer(0),
01783    _curSendPeer(0), _nSendPeerList(-1),
01784    _curRecvPeer(0), _nRecvPeerList(-1), _curParentPeer(0), _nParentPeerList(-1), _ValidPollData(0), _last_recv_peer_bias(0)
01785 {
01786   memset((void *)_PeerList, 0, sizeof(_PeerList[PEER_LIST_SIZE]));
01787   memset((void *)_SendPeerList, 0, sizeof(_SendPeerList[SEND_PEER_LIST_SIZE]));
01788   memset((void *)_RecvPeerList, 0, sizeof(_RecvPeerList[RECV_PEER_LIST_SIZE]));
01789   memset((void *)_ParentPeerList, 0, sizeof(_ParentPeerList[PARENT_PEER_LIST_SIZE]));
01790   memset((void *)_PeerIDtoPollIndex, 0, sizeof(_PeerIDtoPollIndex[PEER_ID_POLL_INDEX_SIZE]));
01791 }
01792 
01793 ICPProcessor::~ICPProcessor()
01794 {
01795   if (_ICPPeriodic) {
01796     MUTEX_TAKE_LOCK(_ICPPeriodic->mutex, this_ethread());
01797     _PeriodicEvent->cancel();
01798     Mutex_unlock(_ICPPeriodic->mutex, this_ethread());
01799   }
01800 
01801   if (_ICPHandler) {
01802     MUTEX_TAKE_LOCK(_ICPHandler->mutex, this_ethread());
01803     _ICPHandlerEvent->cancel();
01804     Mutex_unlock(_ICPHandler->mutex, this_ethread());
01805   }
01806 }
01807 
01808 void
01809 ICPProcessor::start()
01810 {
01811   //*****************************************************
01812   // Perform initialization actions for ICPProcessor
01813   // (called at system startup)
01814   //*****************************************************
01815   if (_Initialized)             // Do only once
01816     return;
01817 
01818   //
01819   // Setup ICPProcessor lock, required since ICPProcessor is instantiated
01820   //  as static object.
01821   //
01822   _l = new AtomicLock();
01823 
01824   //
01825   // Setup custom allocators
01826   //
01827   // replaced with generic IOBufferBlock allocator
01828   ICPHandlerCont::ICPDataBuf_IOBuffer_sizeindex = iobuffer_size_to_index(MAX_ICP_MSGSIZE, MAX_BUFFER_SIZE_INDEX);
01829 
01830   //
01831   // Setup ICP stats callbacks
01832   //
01833   InitICPStatCallbacks();
01834 
01835   //
01836   // Create ICP configuration objects
01837   //
01838   _ICPConfig = new ICPConfiguration();
01839 
01840   _mcastCB_handler = new ICPHandlerCont(this);
01841   SET_CONTINUATION_HANDLER(_mcastCB_handler, (ICPHandlerContHandler) & ICPHandlerCont::TossEvent);
01842 
01843 
01844   //
01845   // Build ICP peer list and setup listen sockets
01846   //
01847   if (_ICPConfig->globalConfig()->ICPconfigured()) {
01848     if (BuildPeerList() == 0) {
01849       if (SetupListenSockets() == 0) {
01850         _AllowIcpQueries = 1;   // allow receipt of queries
01851       }
01852     }
01853   }
01854   DumpICPConfig();
01855 
01856   //
01857   // Start ICP configuration monitor (periodic continuation)
01858   //
01859   _ICPPeriodic = new ICPPeriodicCont(this);
01860   SET_CONTINUATION_HANDLER(_ICPPeriodic, (ICPPeriodicContHandler) & ICPPeriodicCont::PeriodicEvent);
01861   _PeriodicEvent = eventProcessor.schedule_every(_ICPPeriodic, HRTIME_MSECONDS(ICPPeriodicCont::PERIODIC_INTERVAL), ET_ICP);
01862 
01863   //
01864   // Start ICP receive handler continuation
01865   //
01866   _ICPHandler = new ICPHandlerCont(this);
01867   SET_CONTINUATION_HANDLER(_ICPHandler, (ICPHandlerContHandler) & ICPHandlerCont::PeriodicEvent);
01868   _ICPHandlerEvent = eventProcessor.schedule_every(_ICPHandler,
01869                                                    HRTIME_MSECONDS(ICPHandlerCont::ICP_HANDLER_INTERVAL), ET_ICP);
01870   //
01871   // Stale lookup data initializations
01872   //
01873   if (!gclient_request.valid()) {
01874     gclient_request.create(HTTP_TYPE_REQUEST);
01875   }
01876   _Initialized = 1;
01877 }
01878 
01879 Action *
01880 ICPProcessor::ICPQuery(Continuation * c, URL * url)
01881 {
01882   //**************************************
01883   // HTTP state machine interface to ICP
01884   //**************************************
01885 
01886   // Build continuation to process ICP request
01887   EThread *thread = this_ethread();
01888   ProxyMutex *mutex = thread->mutex;
01889   ICPRequestCont *rc = new(ICPRequestCont_allocator.alloc()) ICPRequestCont(this, c, url);
01890 
01891   ICP_INCREMENT_DYN_STAT(icp_query_requests_stat);
01892   
01893   rc->SetRequestStartTime();
01894   SET_CONTINUATION_HANDLER(rc, (ICPRequestContHandler) & ICPRequestCont::ICPRequestEvent);
01895   eventProcessor.schedule_imm(rc, ET_ICP);
01896 
01897   return rc->GetActionPtr();
01898 }
01899 
01900 int
01901 ICPProcessor::BuildPeerList()
01902 {
01903   // Returns 0 on Success
01904 
01905   //
01906   //---------------------------------------------------------------------
01907   //  We always place all allocated Peer elements onto PeerList[],
01908   //  which is used to track allocated elements and validate (ip, port)
01909   //  uniqueness in the ICP configuration.
01910   //
01911   //  All MultiCastPeer(s) link the underlying ParentSiblingPeer structures
01912   //  using a singly linked list off the MultiCastPeer.
01913   //
01914   //  Peer elements placed onto SendPeerList[] are elements which are
01915   //  the target of ICP queries.
01916   //  In the case where MultiCasting is used, a pseudo peer element
01917   //  (MultiCastPeer) is placed onto the SendPeerList[] to act as a place
01918   //  holder for the underlying Peers.
01919   //
01920   //  RecvPeerList[] is the list of Peer(s) we perform reads on for
01921   //  ICP messages.  In the case of MultiCast, the pseudo MultiCast peer
01922   //  element (MultiCastPeer) is placed on this list.  Since we currently
01923   //  funnel all unicast receives through the local peer UDP socket,
01924   //  only the local peer and any pseudo MultiCastPeer structures reside
01925   //  on this list.
01926   //
01927   //  Parent (PEER_PARENT) Peer elements are also added to ParentPeerList
01928   //  which is used to select a parent in the case where all ICP queries
01929   //  have returned ICP_MISS.
01930   //---------------------------------------------------------------------
01931   //
01932   PeerConfigData *Pcfg;
01933   Peer *P;
01934   Peer *mcP;
01935   int index;
01936   int status;
01937   PeerType_t type;
01938 
01939   //
01940   // From the working copy of the ICP configuration data, build the
01941   // internal Peer data structures for ICP processing.
01942   // First, establish the Local Peer descriptor before processing
01943   // parents and siblings.
01944   //
01945   Pcfg = _ICPConfig->indexToPeerConfigData(0);
01946   ink_strlcpy(Pcfg->_hostname, "localhost", sizeof(Pcfg->_hostname));
01947   Pcfg->_ctype = PeerConfigData::CTYPE_LOCAL;
01948 
01949   // Get IP address for given interface
01950   IpEndpoint tmp_ip;
01951   if (!mgmt_getAddrForIntr(GetConfig()->globalConfig()->ICPinterface(), &tmp_ip.sa)) {
01952     Pcfg->_ip_addr._family = AF_UNSPEC;
01953     // No IP address for given interface
01954     RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP interface [%s] has no IP address", GetConfig()->globalConfig()->ICPinterface());
01955   } else {
01956     Pcfg->_my_ip_addr = Pcfg->_ip_addr = tmp_ip;
01957   }
01958   Pcfg->_proxy_port = 0;
01959   Pcfg->_icp_port = GetConfig()->globalConfig()->ICPport();
01960   Pcfg->_mc_member = 0;
01961   Pcfg->_mc_ip_addr._family = AF_UNSPEC;
01962   Pcfg->_mc_ttl = 0;
01963 
01964   //***************************************************
01965   // Descriptor for local host, add to PeerList and
01966   // RecvPeerList
01967   //***************************************************
01968   P = new ParentSiblingPeer(PEER_LOCAL, Pcfg, this);
01969   status = AddPeer(P);
01970   ink_release_assert(status);
01971   status = AddPeerToRecvList(P);
01972   ink_release_assert(status);
01973   _LocalPeer = P;
01974 
01975   for (index = 1; index < MAX_DEFINED_PEERS; ++index) {
01976     Pcfg = _ICPConfig->indexToPeerConfigData(index);
01977     type = PeerConfigData::CTypeToPeerType_t(Pcfg->GetCType());
01978     //
01979     // Ignore parent and sibling entries corresponding to "localhost".
01980     // This is possible in a cluster configuration where parents and
01981     // siblings are cluster members.  Note that in a cluster
01982     // configuration, "icp.config" is shared by all nodes.
01983     //
01984     if (Pcfg->GetIPAddr() == _LocalPeer->GetIP())
01985       continue;                 // ignore
01986 
01987     if ((type == PEER_PARENT) || (type == PEER_SIBLING)) {
01988 
01989       if (Pcfg->MultiCastMember()) {
01990         mcP = FindPeer(Pcfg->GetMultiCastIPAddr(), Pcfg->GetICPPort());
01991         if (!mcP) {
01992           //*********************************
01993           // Create multicast peer structure
01994           //*********************************
01995           mcP = new MultiCastPeer(Pcfg->GetMultiCastIPAddr(), Pcfg->GetICPPort(), Pcfg->GetMultiCastTTL(), this);
01996           status = AddPeer(mcP);
01997           ink_assert(status);
01998           status = AddPeerToSendList(mcP);
01999           ink_assert(status);
02000           status = AddPeerToRecvList(mcP);
02001           ink_assert(status);
02002         }
02003         //*****************************
02004         // Add child to MultiCast peer
02005         //*****************************
02006         P = new ParentSiblingPeer(type, Pcfg, this);
02007         status = AddPeer(P);
02008         ink_assert(status);
02009         status = ((MultiCastPeer *) mcP)->AddMultiCastChild(P);
02010         ink_assert(status);
02011 
02012       } else {
02013         //*****************************
02014         // Add parent/sibling peer
02015         //*****************************
02016         P = new ParentSiblingPeer(type, Pcfg, this);
02017         status = AddPeer(P);
02018         ink_assert(status);
02019         status = AddPeerToSendList(P);
02020         ink_assert(status);
02021       }
02022       //****************************************
02023       // Also, add parent peers to parent list.
02024       //****************************************
02025       if (type == PEER_PARENT) {
02026         status = AddPeerToParentList(P);
02027         ink_assert(status);
02028       }
02029     }
02030   }
02031   return 0;                     // Success
02032 }
02033 
02034 void
02035 ICPProcessor::FreePeerList()
02036 {
02037   // Deallocate all Peer structures
02038   int index;
02039   for (index = 0; index < (_nPeerList + 1); ++index) {
02040     if (_PeerList[index]) {
02041       _PeerList[index] = 0;
02042     }
02043   }
02044   // Reset all control data
02045   _nPeerList = -1;
02046   _LocalPeer = (Peer *) 0;
02047   _curSendPeer = 0;
02048   _nSendPeerList = -1;
02049   _curRecvPeer = 0;
02050   _nRecvPeerList = -1;
02051   _curParentPeer = 0;
02052   _nParentPeerList = -1;
02053   _ValidPollData = 0;
02054   _last_recv_peer_bias = 0;
02055 
02056   for (index = 0; index < PEER_LIST_SIZE; index++) {
02057     _PeerList[index] = 0;
02058   }
02059   for (index = 0; index < SEND_PEER_LIST_SIZE; index++) {
02060     _SendPeerList[index] = 0;
02061   }
02062   for (index = 0; index < RECV_PEER_LIST_SIZE; index++) {
02063     _RecvPeerList[index] = 0;
02064   }
02065   for (index = 0; index < PARENT_PEER_LIST_SIZE; index++) {
02066     _ParentPeerList[index] = 0;
02067   }
02068   memset((void *) _PeerIDtoPollIndex, 0, sizeof(_PeerIDtoPollIndex[PEER_ID_POLL_INDEX_SIZE]));
02069 }
02070 
02071 int
02072 ICPProcessor::SetupListenSockets()
02073 {
02074   int allow_null_configuration;
02075 
02076   if ((_ICPConfig->globalConfig()->ICPconfigured() == ICP_MODE_RECEIVE_ONLY)
02077       && _ICPConfig->globalConfig()->ICPReplyToUnknownPeer()) {
02078     allow_null_configuration = 1;
02079   } else {
02080     allow_null_configuration = 0;
02081   }
02082 
02083   // Returns 0 on Success.
02084 
02085   //
02086   // Perform some basic sanity checks on the ICP configuration.
02087   //
02088   if (!_LocalPeer) {
02089     RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined local Peer");
02090     return 1;                   // Failed
02091   }
02092 
02093   if (GetSendPeers() == 0) {
02094     if (!allow_null_configuration) {
02095       RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined send Peer(s)");
02096       return 1;                 // Failed
02097     }
02098   }
02099   if (GetRecvPeers() == 0) {
02100     if (!allow_null_configuration) {
02101       RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined receive Peer(s)");
02102       return 1;                 // Failed
02103     }
02104   }
02105   //
02106   // Establish the required sockets for elements on the PeerList[].
02107   //
02108   Peer *P;
02109   int status;
02110   int index;
02111   ip_port_text_buffer ipb, ipb2;
02112   for (index = 0; index < (_nPeerList + 1); ++index) {
02113 
02114     if ((P = _PeerList[index])) {
02115 
02116       if ((P->GetType() == PEER_PARENT)
02117           || (P->GetType() == PEER_SIBLING)) {
02118         ParentSiblingPeer *pPS = (ParentSiblingPeer *) P;
02119 
02120         pPS->GetChan()->setRemote(pPS->GetIP());
02121 
02122       } else if (P->GetType() == PEER_MULTICAST) {
02123         MultiCastPeer *pMC = (MultiCastPeer *) P;
02124         ink_assert(_mcastCB_handler != NULL);
02125         status = pMC->GetSendChan()->setup_mc_send(pMC->GetIP(), _LocalPeer->GetIP(), NON_BLOCKING, pMC->GetTTL(), DISABLE_MC_LOOPBACK, _mcastCB_handler);
02126         if (status) {
02127           // coverity[uninit_use_in_call]
02128           RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP MC send setup failed, res=%d, ip=%s bind_ip=%s",
02129             status,
02130             ats_ip_nptop(pMC->GetIP(), ipb, sizeof(ipb)),
02131             ats_ip_nptop(_LocalPeer->GetIP(), ipb2, sizeof(ipb2))
02132           );
02133           return 1;             // Failed
02134         }
02135 
02136         status = pMC->GetRecvChan()->setup_mc_receive(pMC->GetIP(),
02137                                                       _LocalPeer->GetIP(),
02138                                                       NON_BLOCKING, pMC->GetSendChan(), _mcastCB_handler);
02139         if (status) {
02140           // coverity[uninit_use_in_call]
02141           RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP MC recv setup failed, res=%d, ip=%s",
02142             status, ats_ip_nptop(pMC->GetIP(), ipb, sizeof(ipb)));
02143           return 1;             // Failed
02144         }
02145       }
02146     }
02147   }
02148   //
02149   // Setup the socket for the local host.
02150   // We funnel all unicast sends and receives through
02151   // the local peer UDP socket.
02152   //
02153   ParentSiblingPeer *pPS = (ParentSiblingPeer *) ((Peer *) _LocalPeer);
02154 
02155   NetVCOptions options;
02156   options.local_ip.assign(pPS->GetIP());
02157   options.local_port = pPS->GetICPPort();
02158   options.ip_proto = NetVCOptions::USE_UDP;
02159   options.addr_binding = NetVCOptions::INTF_ADDR;
02160   status = pPS->GetChan()->open(options);
02161   if (status) {
02162     // coverity[uninit_use_in_call] ?
02163     RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP bind_connect failed, res=%d, ip=%s",
02164       status,
02165       ats_ip_nptop(pPS->GetIP(), ipb, sizeof(ipb))
02166     );
02167     return 1;             // Failed
02168   }
02169 
02170   return 0;                     // Success
02171 }
02172 
02173 void
02174 ICPProcessor::ShutdownListenSockets()
02175 {
02176   //
02177   // Close all open sockets for elements on the PeerList[]
02178   //
02179   ink_assert(!PendingQuery());
02180   Peer *P;
02181 
02182   int index;
02183   for (index = 0; index < (_nPeerList + 1); ++index) {
02184     if ((P = _PeerList[index])) {
02185       if (P->GetType() == PEER_LOCAL) {
02186         ParentSiblingPeer *pPS = (ParentSiblingPeer *) P;
02187         (void) pPS->GetChan()->close();
02188 
02189       } else if (P->GetType() == PEER_MULTICAST) {
02190         MultiCastPeer *pMC = (MultiCastPeer *) P;
02191         (void) pMC->GetSendChan()->close();
02192         (void) pMC->GetRecvChan()->close();
02193       }
02194     }
02195   }
02196 }
02197 
02198 int
02199 ICPProcessor::Reconfigure(int /* global_config_changed ATS_UNUSED */, int /* peer_config_changed ATS_UNUSED */)
02200 {
02201   // Returns 0 on Success
02202   //
02203   // At this point, ICP requests processing is disabled and
02204   // no pending ICP requests exist.
02205   //
02206   ink_assert(_ICPConfig->HaveLock());
02207   ink_assert(!AllowICPQueries());
02208   ink_assert(!PendingQuery());
02209   //
02210   // Shutdown and deallocate all structures associated with the
02211   // current configuration.
02212   //
02213   ShutdownListenSockets();
02214   FreePeerList();
02215   //
02216   // Copy the new configuration into the working copy and
02217   // rebuild all associated structures.
02218   //
02219   _ICPConfig->UpdateGlobalConfig();
02220   _ICPConfig->UpdatePeerConfig();
02221 
02222   int status = -1;
02223   if (_ICPConfig->globalConfig()->ICPconfigured()) {
02224     if ((status = BuildPeerList()) == 0) {
02225       status = SetupListenSockets();
02226     }
02227     DumpICPConfig();
02228   }
02229   return status;
02230 }
02231 
02232 ICPProcessor::ReconfigState_t
02233   ICPProcessor::ReconfigureStateMachine(ReconfigState_t s, int gconfig_changed, int pconfig_changed)
02234 {
02235   //*****************************************************************
02236   // State machine which performs the ICP reconfiguration actions.
02237   // Defined states are as follows:
02238   //  1) (RC_RECONFIG) disable ICP, reconfigure if no request pending,
02239   //     else delay and retry.  Reconfigure and if success move to
02240   //     RC_ENABLE_ICP else RC_DONE.
02241   //  2) (RC_ENABLE_ICP) enable ICP, free ICP configuration lock.
02242   //  3) (RC_DONE) free ICP configuration lock.
02243   //*****************************************************************
02244   ink_assert(_ICPConfig->HaveLock());
02245   int reconfig_status;
02246 
02247   while (1) {
02248 
02249     switch (s) {
02250     case RC_RECONFIG:
02251       {
02252         if (!Lock())
02253           return RC_RECONFIG;   // Unable to get lock, try again
02254 
02255         if (PendingQuery()) {
02256           DisableICPQueries();  // disable ICP processing
02257           Unlock();
02258           CancelPendingReads();
02259           return RC_RECONFIG;   // Pending requests, delay and retry
02260 
02261         } else {
02262           DisableICPQueries();  // disable ICP processing
02263           Unlock();
02264           // No pending ICP queries, perform reconfiguration
02265           reconfig_status = Reconfigure(gconfig_changed, pconfig_changed);
02266 
02267           if (reconfig_status == 0) {
02268             s = RC_ENABLE_ICP;  // reconfig OK, enable ICP
02269           } else {
02270             s = RC_DONE;        // reconfig failed, do not enable ICP
02271           }
02272           break;                // move to next state
02273         }
02274       }
02275 
02276     case RC_ENABLE_ICP:
02277       {
02278         if (!Lock())
02279           return RC_ENABLE_ICP; // Unable to get lock, try again
02280 
02281         EnableICPQueries();     // Enable ICP processing
02282         Unlock();
02283 
02284         s = RC_DONE;
02285         break;                  // move to next state
02286       }
02287 
02288     case RC_DONE:
02289       {
02290         // Release configuration lock
02291         _ICPConfig->Unlock();
02292         return RC_DONE;         // Reconfiguration complete
02293       }
02294     default:
02295       {
02296         ink_release_assert(0);  // Should never happen
02297       }
02298 
02299     }                           // End of switch
02300 
02301   }                             // End of while
02302   return RC_DONE;
02303 }
02304 
02305 void
02306 ICPProcessor::CancelPendingReads()
02307 {
02308   // Cancel pending ICP read by sending a bogus message to
02309   //  the local ICP port.
02310 
02311   ICPRequestCont *r = new(ICPRequestCont_allocator.alloc())
02312     ICPRequestCont(this, NULL, NULL);
02313   SET_CONTINUATION_HANDLER(r, (ICPRequestContHandler) & ICPRequestCont::NopICPRequestEvent);
02314   r->mutex = new_ProxyMutex();
02315 
02316   // TODO: Check return value?
02317   ICPRequestCont::BuildICPMsg(ICP_OP_HIT, 0, 0 /* optflags */ , 0 /* optdata */ , 0 /* shostid */ ,
02318                                        (void *) 0, 0, &r->_sendMsgHdr, r->_sendMsgIOV, &r->_ICPmsg);
02319   r->_sendMsgHdr.msg_iovlen = 1;
02320   r->_ICPmsg.h.version = ~r->_ICPmsg.h.version; // bogus message
02321 
02322   Peer *lp = GetLocalPeer();
02323   r->_sendMsgHdr.msg_name = (caddr_t) & (lp->GetSendChan())->addr;
02324   r->_sendMsgHdr.msg_namelen = sizeof((lp->GetSendChan())->addr);
02325   udpNet.sendmsg_re(r, r, lp->GetSendFD(), &r->_sendMsgHdr);
02326 }
02327 
02328 Peer *
02329 ICPProcessor::GenericFindListPeer(IpAddr const& ip, uint16_t port, int validListItems, Ptr<Peer> *List)
02330 {
02331   Peer *P;
02332   port = htons(port);
02333   for (int n = 0; n < validListItems; ++n) {
02334     if ((P = List[n])) {
02335       if ((P->GetIP() == ip)
02336         && ((port == 0) || (ats_ip_port_cast(P->GetIP()) == port)))
02337         return P;
02338     }
02339   }
02340   return NULL;
02341 }
02342 
02343 Peer *
02344 ICPProcessor::FindPeer(IpAddr const& ip, uint16_t port)
02345 {
02346   // Find (Peer *) with the given (ip,port) on the global list (PeerList)
02347   return GenericFindListPeer(ip, port, (_nPeerList + 1), _PeerList);
02348 }
02349 
02350 Peer *
02351 ICPProcessor::FindSendListPeer(IpAddr const& ip, uint16_t port)
02352 {
02353   // Find (Peer *) with the given (ip,port) on the
02354   //  scheduler list (SendPeerList)
02355   return GenericFindListPeer(ip, port, (_nSendPeerList + 1), _SendPeerList);
02356 }
02357 
02358 Peer *
02359 ICPProcessor::FindRecvListPeer(IpAddr const& ip, uint16_t port)
02360 {
02361   // Find (Peer *) with the given (ip,port) on the
02362   //  receive list (RecvPeerList)
02363   return GenericFindListPeer(ip, port, (_nRecvPeerList + 1), _RecvPeerList);
02364 }
02365 
02366 int
02367 ICPProcessor::AddPeer(Peer * P)
02368 {
02369   // Add (Peer *) to the global list (PeerList).  Make sure (ip,port) is
02370   //  unique.
02371   // Returns 1 - added; 0 - Not added
02372 
02373   //
02374   // Make sure no duplicate exists
02375   //
02376   if (FindPeer(P->GetIP())) {
02377     ip_port_text_buffer x;
02378     // coverity[uninit_use_in_call]
02379     RecSignalWarning(REC_SIGNAL_CONFIG_ERROR, "bad icp.config, multiple peer definitions for ip=%s", ats_ip_nptop(P->GetIP(), x, sizeof(x)));
02380 
02381     return 0;                   // Not added
02382   } else {
02383     // Valid entry
02384     if (_nPeerList + 1 < PEER_LIST_SIZE) {
02385       _nPeerList++;
02386       _PeerList[_nPeerList] = P;
02387       P->SetPeerID(_nPeerList);
02388       return 1;                 // Added
02389     } else {
02390       return 0;                 // Not added
02391     }
02392   }
02393 }
02394 
02395 int
02396 ICPProcessor::AddPeerToRecvList(Peer * P)
02397 {
02398   // Add (Peer *) to the listen list (RecvPeerList).
02399   //  Make sure (ip,port) is unique.
02400   // Returns 1 - added; 0 - Not added
02401 
02402   // Assert that no duplicate exists
02403   ink_assert(FindRecvListPeer(IpAddr(P->GetIP()), ats_ip_port_host_order(P->GetIP())) == 0);
02404 
02405   if (_nRecvPeerList + 1 < RECV_PEER_LIST_SIZE) {
02406     _nRecvPeerList++;
02407     _RecvPeerList[_nRecvPeerList] = P;
02408     return 1;                   // Added
02409   } else {
02410     return 0;                   // Not added
02411   }
02412 }
02413 
02414 int
02415 ICPProcessor::AddPeerToSendList(Peer * P)
02416 {
02417   // Add (Peer *) to the scheduler list (SendPeerList).
02418   //  Make sure (ip,port) is unique.
02419   // Returns 1 - added; 0 - Not added
02420 
02421   // Assert that no duplicate exists
02422   ink_assert(FindSendListPeer(IpAddr(P->GetIP()), ats_ip_port_host_order(P->GetIP())) == 0);
02423 
02424   if (_nSendPeerList + 1 < SEND_PEER_LIST_SIZE) {
02425     _nSendPeerList++;
02426     _SendPeerList[_nSendPeerList] = P;
02427     return 1;                   // Added
02428   } else {
02429     return 0;                   // Not added
02430   }
02431 }
02432 
02433 int
02434 ICPProcessor::AddPeerToParentList(Peer * P)
02435 {
02436   // Add (Peer *) to the parent list (ParentPeerList).
02437   // Returns 1 - added; 0 - Not added
02438 
02439   if (_nParentPeerList + 1 < PARENT_PEER_LIST_SIZE) {
02440     _nParentPeerList++;
02441     _ParentPeerList[_nParentPeerList] = P;
02442     return 1;                   // Added
02443   } else {
02444     return 0;                   // Not added
02445   }
02446 }
02447 
02448 // End of ICP.cc

Generated by  doxygen 1.7.1