00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 #ifndef _ICP_H_
00033 #define _ICP_H_
00034 
00035 #include "P_Net.h"
00036 #include "P_Cache.h"
00037 #define  ET_ICP ET_CALL
00038 #include "URL.h"
00039 #include "ICPevents.h"
00040 #include "ICPProcessor.h"
00041 #include "DynArray.h"
00042 
00043 
00044 
00045 
00046 
00047 #define ICP_DEBUG       1
00048 
00049 
00050 
00051 
00052 
00053 
00054 
00055 
00056 typedef struct ICPMsgHeader
00057 {
00058   uint8_t opcode;
00059   uint8_t version;
00060   uint16_t msglen;
00061   uint32_t requestno;
00062   uint32_t optionflags;
00063   uint32_t optiondata;
00064   uint32_t shostid;
00065 } ICPMsgHdr_t;
00066 
00067 
00068 
00069 
00070 typedef enum
00071 {
00072   ICP_OP_INVALID,               
00073   ICP_OP_QUERY,                 
00074   ICP_OP_HIT,                   
00075   ICP_OP_MISS,                  
00076   ICP_OP_ERR,                   
00077   
00078   ICP_OP_UNUSED5,               
00079   ICP_OP_UNUSED6,               
00080   ICP_OP_UNUSED7,               
00081   ICP_OP_UNUSED8,               
00082   ICP_OP_UNUSED9,               
00083   
00084   ICP_OP_SECHO,                 
00085   ICP_OP_DECHO,                 
00086   
00087   ICP_OP_UNUSED12,              
00088   ICP_OP_UNUSED13,              
00089   ICP_OP_UNUSED14,              
00090   ICP_OP_UNUSED15,              
00091   ICP_OP_UNUSED16,              
00092   ICP_OP_UNUSED17,              
00093   ICP_OP_UNUSED18,              
00094   ICP_OP_UNUSED19,              
00095   ICP_OP_UNUSED20,              
00096   
00097   ICP_OP_MISS_NOFETCH,          
00098   ICP_OP_DENIED,                
00099   ICP_OP_HIT_OBJ,               
00100   ICP_OP_END_OF_OPS             
00101 } ICPopcode_t;
00102 
00103 #define ICP_OP_LAST             (ICP_OP_END_OF_OPS - 1)
00104 
00105 
00106 
00107 
00108 #define ICP_VERSION_1           1
00109 #define ICP_VERSION_2           2
00110 #define ICP_VERSION_3           3
00111 #define ICP_VERSION             ICP_VERSION_2
00112 
00113 
00114 
00115 
00116 #define ICP_FLAG_HIT_OBJ        0x80000000ul
00117 #define ICP_FLAG_SRC_RTT        0x40000000ul
00118 
00119 
00120 
00121 
00122 #define MAX_ICP_MSGSIZE            (16 * 1024)
00123 #define MAX_ICP_MSG_PAYLOAD_SIZE   (MAX_ICP_MSGSIZE - sizeof(ICPmsgHdr_t))
00124 #define MAX_ICP_QUERY_PAYLOAD_SIZE (MAX_ICP_MSG_PAYLOAD_SIZE - sizeof(uint32_t))
00125 #define MAX_DEFINED_PEERS          64
00126 #define MSG_IOVECS 16
00127 
00128 
00129 
00130 
00131 typedef struct ICPData
00132 {
00133   char *URL;                    
00134 } ICPData_t;
00135 
00136 
00137 
00138 
00139 typedef struct ICPQuery
00140 {
00141   uint32_t rhostid;
00142   char *URL;                    
00143 } ICPQuery_t;
00144 
00145 
00146 
00147 
00148 typedef struct ICPHit
00149 {
00150   char *URL;                    
00151 } ICPHit_t;
00152 
00153 
00154 
00155 
00156 typedef struct ICPMiss
00157 {
00158   char *URL;                    
00159 } ICPMiss_t;
00160 
00161 
00162 
00163 
00164 typedef struct ICPHitObj
00165 {
00166   char *URL;                    
00167   char *p_objsize;              
00168   uint16_t objsize;               
00169   char *data;                   
00170 } ICPHitObj_t;
00171 
00172 
00173 
00174 
00175 typedef struct ICPMsg
00176 {
00177   ICPMsgHdr_t h;
00178   union
00179   {
00180     ICPData_t data;
00181     ICPQuery_t query;
00182     ICPHit_t hit;
00183     ICPMiss_t miss;
00184     ICPHitObj_t hitobj;
00185   } un;
00186 } ICPMsg_t;
00187 
00188 
00189 
00190 
00191 
00192 class BitMap;
00193 class ICPProcessor;
00194 class ICPPeriodicCont;
00195 class ICPHandlerCont;
00196 class ICPPeerReadCont;
00197 class ICPRequestCont;
00198 
00199 typedef enum
00200 {
00201   PEER_NONE = 0,
00202   PEER_PARENT = 1,
00203   PEER_SIBLING = 2,
00204   PEER_LOCAL = 3,
00205   PEER_MULTICAST = 4
00206 } PeerType_t;
00207 
00208 #if !defined(USE_CAS_FOR_ATOMICLOCK)
00209 class AtomicLock
00210 {
00211 public:
00212   AtomicLock();
00213   ~AtomicLock();
00214   int Lock();
00215   int HaveLock();
00216   void Unlock();
00217 
00218 private:
00219     Ptr<ProxyMutex> _mutex;
00220 };
00221 
00222 #else // USE_CAS_FOR_ATOMICLOCK
00223 class AtomicLock
00224 {
00225 public:
00226   AtomicLock();
00227   ~AtomicLock();
00228   int Lock();
00229   int HaveLock();
00230   void Unlock();
00231 
00232 private:
00233   enum
00234   { UNLOCKED = 0, LOCKED = 1 };
00235   int32_t _lock_word;
00236 };
00237 #endif // USE_CAS_FOR_ATOMICLOCK
00238 
00239 
00240 
00241 
00242 class ICPConfigData
00243 {
00244   friend class ICPConfiguration;
00245 
00246 public:
00247     ICPConfigData():_icp_enabled(0), _icp_port(0), _icp_interface(0),
00248     _multicast_enabled(0), _icp_query_timeout(0), _cache_lookup_local(0),
00249     _stale_lookup(0), _reply_to_unknown_peer(0), _default_reply_port(0)
00250   {
00251   }
00252    ~ICPConfigData()
00253   {
00254   }                             
00255   inline int operator==(ICPConfigData &);
00256   inline int ICPconfigured()
00257   {
00258     return _icp_enabled;
00259   }
00260   inline int ICPport()
00261   {
00262     return _icp_port;
00263   }
00264   inline char *ICPinterface()
00265   {
00266     return _icp_interface;
00267   }
00268   inline int ICPmulticastConfigured()
00269   {
00270     return _multicast_enabled;
00271   }
00272   inline int ICPqueryTimeout()
00273   {
00274     return _icp_query_timeout;
00275   }
00276   inline int ICPLocalCacheLookup()
00277   {
00278     return _cache_lookup_local;
00279   }
00280   inline int ICPStaleLookup()
00281   {
00282     return _stale_lookup;
00283   }
00284   inline int ICPReplyToUnknownPeer()
00285   {
00286     return _reply_to_unknown_peer;
00287   }
00288   inline int ICPDefaultReplyPort()
00289   {
00290     return _default_reply_port;
00291   }
00292 
00293 private:
00294   
00295   
00296   
00297   int _icp_enabled;             
00298   int _icp_port;
00299   char *_icp_interface;
00300   int _multicast_enabled;
00301   int _icp_query_timeout;
00302   int _cache_lookup_local;
00303   int _stale_lookup;
00304   int _reply_to_unknown_peer;
00305   int _default_reply_port;
00306 };
00307 
00308 
00309 
00310 
00311 class PeerConfigData
00312 {
00313   friend class ICPConfiguration;
00314   friend class ICPProcessor;
00315 
00316 public:
00317     PeerConfigData();
00318     PeerConfigData(int ctype, IpAddr const& ip_addr, int proxy_port, int icp_port)
00319       : _ctype(ctype), _ip_addr(ip_addr),
00320       _proxy_port(proxy_port), _icp_port(icp_port), _mc_member(0), _mc_ttl(0),
00321       _my_ip_addr(ip_addr)
00322   {
00323     _hostname[0] = 0;
00324   }
00325    ~PeerConfigData()
00326   {
00327   }
00328   bool operator==(PeerConfigData &);
00329   inline const char *GetHostname()
00330   {
00331     return _hostname;
00332   }
00333   inline int GetCType()
00334   {
00335     return _ctype;
00336   }
00337   inline IpAddr const& GetIPAddr()
00338   {
00339     return _my_ip_addr;
00340   }
00341   inline int GetProxyPort()
00342   {
00343     return _proxy_port;
00344   }
00345   inline int GetICPPort()
00346   {
00347     return _icp_port;
00348   }
00349   inline int MultiCastMember()
00350   {
00351     return _mc_member;
00352   }
00353   inline IpAddr const& GetMultiCastIPAddr()
00354   {
00355     return _mc_ip_addr;
00356   }
00357   inline int GetMultiCastTTL()
00358   {
00359     return _mc_ttl;
00360   }
00361 
00362   
00363   static PeerType_t CTypeToPeerType_t(int);
00364   static int GetHostIPByName(char *, IpAddr&);
00365 
00366   enum
00367   { HOSTNAME_SIZE = 256 };
00368   enum
00369   { CTYPE_NONE = 0,
00370     CTYPE_PARENT = 1,
00371     CTYPE_SIBLING = 2,
00372     CTYPE_LOCAL = 3
00373   };
00374 
00375 private:
00376   
00377   
00378   
00379   char _hostname[HOSTNAME_SIZE];
00380   int _ctype;
00381   IpAddr _ip_addr;
00382   int _proxy_port;
00383   int _icp_port;
00384   
00385   
00386   
00387   int _mc_member;
00388   IpAddr _mc_ip_addr;
00389   int _mc_ttl;
00390 
00391   
00392   
00393   
00394   IpAddr _my_ip_addr;
00395 };
00396 
00397 
00398 
00399 
00400 
00401 
00402 class ICPConfigUpdateCont:public Continuation
00403 {
00404 public:
00405   ICPConfigUpdateCont(void *data, void *value);
00406    ~ICPConfigUpdateCont()
00407   {
00408   }
00409   int RetryICPconfigUpdate(int, Event *);
00410 
00411   enum
00412   { RETRY_INTERVAL = 10 };
00413 
00414 private:
00415   void *_data;
00416   void *_value;
00417 };
00418 
00419 
00420 
00421 
00422 class ICPConfiguration
00423 {
00424 public:
00425   ICPConfiguration();
00426   ~ICPConfiguration();
00427   int GlobalConfigChange();
00428   void UpdateGlobalConfig();
00429   int PeerConfigChange();
00430   void UpdatePeerConfig();
00431 
00432   inline ICPConfigData *globalConfig()
00433   {
00434     return _icp_cdata;
00435   }
00436   inline PeerConfigData *indexToPeerConfigData(int index)
00437   {
00438     ink_assert(index <= MAX_DEFINED_PEERS);
00439     
00440     
00441     return _peer_cdata[index];
00442   }
00443 
00444   
00445   static int mgr_icp_config_change_callback(const char *, RecDataT, RecData, void *);
00446 
00447   
00448   static void *icp_config_change_callback(void *, void *, int startup = 0);
00449 
00450   inline int Lock()
00451   {
00452     return _l.Lock();
00453   }
00454   inline void Unlock()
00455   {
00456     _l.Unlock();
00457     return;
00458   }
00459   inline int HaveLock()
00460   {
00461     return _l.HaveLock();
00462   }
00463 
00464   inline int ICPConfigCallouts()
00465   {
00466     return _icp_config_callouts;
00467   }
00468 
00469 private:
00470   
00471   AtomicLock _l;
00472   int _icp_config_callouts;
00473 
00474   
00475   
00476   
00477   
00478   
00479   
00480   
00481   
00482   
00483   ICPConfigData *_icp_cdata;
00484   ICPConfigData *_icp_cdata_current;
00485   PeerConfigData *_peer_cdata[MAX_DEFINED_PEERS + 1];
00486   PeerConfigData *_peer_cdata_current[MAX_DEFINED_PEERS + 1];
00487 };
00488 
00489 
00490 
00491 
00492 
00493 
00494 
00495 #define PEER_UP                    (1 << 0)
00496 #define PEER_MULTICAST_COUNT_EVENT (1 << 1)     // Member probe event active
00497 #define PEER_DYNAMIC               (1 << 2)     // Dynamically added, not in config
00498 
00499 struct CacheVConnection;
00500 
00501 class Peer:public RefCountObj
00502 {
00503 public:
00504   Peer(PeerType_t, ICPProcessor *, bool dynamic_peer = false);
00505   virtual ~ Peer()
00506   {
00507   }
00508   void LogRecvMsg(ICPMsg_t *, int);
00509 
00510   
00511   virtual sockaddr* GetIP() = 0;
00512   virtual Action *SendMsg_re(Continuation *, void *, struct msghdr *, struct sockaddr const* to) = 0;
00513   virtual Action *RecvFrom_re(Continuation *, void *, IOBufferBlock *, int, struct sockaddr *, socklen_t *) = 0;
00514   virtual int GetRecvFD() = 0;
00515   virtual int GetSendFD() = 0;
00516   virtual int ExpectedReplies(BitMap *) = 0;
00517   virtual int ValidSender(sockaddr *) = 0;
00518   virtual void LogSendMsg(ICPMsg_t *, sockaddr const*) = 0;
00519   virtual int IsOnline() = 0;
00520   virtual Connection *GetSendChan() = 0;
00521   virtual Connection *GetRecvChan() = 0;
00522   virtual int ExtToIntRecvSockAddr(sockaddr const*, sockaddr *) = 0;
00523 
00524   enum
00525   { OFFLINE_THRESHOLD = 20 };
00526 
00527   inline PeerType_t GetType()
00528   {
00529     return _type;
00530   }
00531   inline int GetPeerID()
00532   {
00533     return _id;
00534   }
00535   inline void SetPeerID(int newid)
00536   {
00537     _id = newid;
00538   }
00539   inline void SetNext(Peer * p)
00540   {
00541     _next = p;
00542   }
00543   inline Peer *GetNext()
00544   {
00545     return _next;
00546   }
00547   inline bool shouldStartRead()
00548   {
00549     return !notFirstRead;
00550   }
00551   inline void startingRead()
00552   {
00553     notFirstRead = 1;
00554   }
00555   inline void cancelRead()
00556   {
00557     notFirstRead = 0;
00558   }
00559   inline bool readActive()
00560   {
00561     return (readAction != NULL);
00562   }
00563   inline bool isUp()
00564   {
00565     return (_state & PEER_UP);
00566   }
00567 
00568   
00569   
00570   Ptr<IOBufferBlock> buf;
00571   IpEndpoint fromaddr;
00572   socklen_t fromaddrlen;
00573   int notFirstRead;             
00574   Action *readAction;           
00575   Action *writeAction;          
00576 
00577 protected:
00578   PeerType_t _type;
00579   int _id;                      
00580   Peer *_next;
00581   ICPProcessor *_ICPpr;
00582 
00583   
00584   
00585   
00586   int _state;
00587 
00588   
00589   
00590   
00591   struct PeerStats
00592   {
00593     ink_hrtime last_send;
00594     ink_hrtime last_receive;
00595     int sent[ICP_OP_LAST + 1];
00596     int recv[ICP_OP_LAST + 1];
00597     int total_sent;
00598     int total_received;
00599     int dropped_replies;        
00600   } _stats;
00601 };
00602 
00603 
00604 
00605 
00606 class ParentSiblingPeer:public Peer
00607 {
00608 public:
00609   ParentSiblingPeer(PeerType_t, PeerConfigData *, ICPProcessor *, bool dynamic_peer = false);
00610   ~ParentSiblingPeer()
00611   {
00612     if (_pconfig && (_state & PEER_DYNAMIC))
00613       delete _pconfig;
00614   }
00615   int GetProxyPort();
00616   int GetICPPort();
00617   virtual sockaddr* GetIP();
00618   virtual Action *SendMsg_re(Continuation *, void *, struct msghdr *, struct sockaddr const* to);
00619   virtual Action *RecvFrom_re(Continuation *, void *, IOBufferBlock *, int, struct sockaddr *, socklen_t *);
00620   virtual int GetRecvFD();
00621   virtual int GetSendFD();
00622   virtual int ExpectedReplies(BitMap *);
00623   virtual int ValidSender(struct sockaddr*);
00624   virtual void LogSendMsg(ICPMsg_t *, sockaddr const*);
00625   virtual int ExtToIntRecvSockAddr(sockaddr const* in, sockaddr* out);
00626   inline virtual int IsOnline()
00627   {
00628     return 1;
00629   }
00630   inline virtual Connection *GetSendChan()
00631   {
00632     return &_chan;
00633   }
00634   inline virtual Connection *GetRecvChan()
00635   {
00636     return &_chan;
00637   }
00638   inline PeerConfigData *GetConfig()
00639   {
00640     return _pconfig;
00641   }
00642   inline Connection *GetChan()
00643   {
00644     return &_chan;
00645   }
00646 
00647 private:
00648   
00649   PeerConfigData * _pconfig;    
00650   IpEndpoint _ip; 
00651   Connection _chan;
00652 };
00653 
00654 
00655 
00656 
00657 class MultiCastPeer:public Peer
00658 {
00659 public:
00660   MultiCastPeer(IpAddr const&, uint16_t, int, ICPProcessor *);
00661   ~MultiCastPeer()
00662   {
00663   }
00664   int GetTTL();
00665   int AddMultiCastChild(Peer * P);
00666 
00667 
00668 
00669   Peer *FindMultiCastChild(
00670     IpAddr const& ip, 
00671     uint16_t port = 0 
00672   );
00673 
00674   virtual sockaddr* GetIP();
00675   virtual Action *SendMsg_re(Continuation *, void *, struct msghdr *, struct sockaddr const* to);
00676   virtual Action *RecvFrom_re(Continuation *, void *, IOBufferBlock *, int, struct sockaddr *, socklen_t *);
00677   virtual int GetRecvFD();
00678   virtual int GetSendFD();
00679   virtual int ExpectedReplies(BitMap *);
00680   virtual int ValidSender(struct sockaddr*);
00681   virtual void LogSendMsg(ICPMsg_t *, sockaddr const*);
00682   virtual int IsOnline();
00683   inline virtual Connection *GetRecvChan()
00684   {
00685     return &_recv_chan;
00686   }
00687   inline virtual Connection *GetSendChan()
00688   {
00689     return &_send_chan;
00690   }
00691   inline virtual int ExtToIntRecvSockAddr(sockaddr const* in, sockaddr* out)
00692   {
00693     Peer *P = FindMultiCastChild(IpAddr(in));
00694     if (P) {
00695       ats_ip_copy(out, in);
00696       ats_ip_port_cast(out) = ats_ip_port_cast(P->GetIP());
00697       return 1;
00698     } else {
00699       return 0;
00700     }
00701   }
00702 
00703 private:
00704   
00705   Connection _send_chan;
00706   Connection _recv_chan;
00707   
00708   
00709   
00710   IpEndpoint _mc_ip;
00711   int _mc_ttl;
00712   struct multicast_data
00713   {
00714     double avg_members;         
00715     int defined_members;        
00716     int n_count_events;         
00717     int count_event_reqno;      
00718     int expected_replies;       
00719   } _mc;
00720 };
00721 
00722 
00723 
00724 
00725 class BitMap
00726 {
00727 public:
00728   BitMap(int);
00729    ~BitMap();
00730   void SetBit(int);
00731   void ClearBit(int);
00732   int IsBitSet(int);
00733 
00734 private:
00735   enum
00736   { STATIC_BITMAP_BYTE_SIZE = 16,
00737     BITS_PER_BYTE = 8
00738   };
00739   char _static_bitmap[STATIC_BITMAP_BYTE_SIZE];
00740   char *_bitmap;
00741   int _bitmap_size;
00742   int _bitmap_byte_size;
00743 };
00744 
00745 
00746 
00747 
00748 class ICPProcessor
00749 {
00750   friend class ICPHandlerCont;  
00751   friend class ICPPeerReadCont; 
00752   friend class ICPRequestCont;  
00753 
00754 public:
00755     ICPProcessor();
00756    ~ICPProcessor();
00757 
00758   
00759   void start();
00760   Action *ICPQuery(Continuation *, URL *);
00761 
00762   
00763   typedef enum
00764   {
00765     RC_RECONFIG,
00766     RC_ENABLE_ICP,
00767     RC_DONE
00768   } ReconfigState_t;
00769   ReconfigState_t ReconfigureStateMachine(ReconfigState_t, int, int);
00770 
00771   Peer *FindPeer(IpAddr const& ip, uint16_t port = 0);
00772   Peer *FindPeer(IpEndpoint const& ip) {
00773     return this->FindPeer(IpAddr(&ip), ats_ip_port_host_order(&ip));
00774   }
00775   Peer *FindPeer(sockaddr const* ip) {
00776     return this->FindPeer(IpAddr(ip), ats_ip_port_host_order(ip));
00777   }
00778 
00779   inline Peer *GetLocalPeer()
00780   {
00781     return _LocalPeer;
00782   }
00783   inline Peer *IdToPeer(int id)
00784   {
00785     return _PeerList[id];
00786   }
00787   inline ICPConfiguration *GetConfig()
00788   {
00789     return _ICPConfig;
00790   }
00791 
00792   inline int GetFreePeers()
00793   {
00794     return PEER_LIST_SIZE - (_nPeerList + 1);
00795   }
00796   inline int GetFreeSendPeers()
00797   {
00798     return SEND_PEER_LIST_SIZE - (_nSendPeerList + 1);
00799   }
00800   inline int GetFreeRecvPeers()
00801   {
00802     return RECV_PEER_LIST_SIZE - (_nRecvPeerList + 1);
00803   }
00804 
00805 private:
00806   inline int Lock()
00807   {
00808     return _l->Lock();
00809   }
00810   inline void Unlock()
00811   {
00812     _l->Unlock();
00813     return;
00814   }
00815   inline int HaveLock()
00816   {
00817     return _l->HaveLock();
00818   }
00819   int BuildPeerList();
00820   void FreePeerList();
00821   int SetupListenSockets();
00822   void ShutdownListenSockets();
00823   int Reconfigure(int, int);
00824   void InitICPStatCallbacks();
00825 
00826   inline void DisableICPQueries()
00827   {
00828     _AllowIcpQueries = 0;
00829   }
00830   inline void EnableICPQueries()
00831   {
00832     _AllowIcpQueries = 1;
00833   }
00834   inline int AllowICPQueries()
00835   {
00836     return _AllowIcpQueries;
00837   }
00838   inline int PendingQuery()
00839   {
00840     return _PendingIcpQueries;
00841   }
00842   inline void IncPendingQuery()
00843   {
00844     _PendingIcpQueries++;
00845   }
00846   inline void DecPendingQuery()
00847   {
00848     _PendingIcpQueries--;
00849   }
00850 
00851   Peer *GenericFindListPeer(IpAddr const&, uint16_t, int, Ptr<Peer> *);
00852   Peer *FindSendListPeer(IpAddr const&, uint16_t);
00853   Peer *FindRecvListPeer(IpAddr const&, uint16_t);
00854   int AddPeer(Peer *);
00855   int AddPeerToSendList(Peer *);
00856   int AddPeerToRecvList(Peer *);
00857   int AddPeerToParentList(Peer *);
00858 
00859   inline int GetSendPeers()
00860   {
00861     return _nSendPeerList + 1;
00862   }
00863   inline Peer *GetNthSendPeer(int n, int bias)
00864   {
00865     return _SendPeerList[(bias + n) % (_nSendPeerList + 1)];
00866   }
00867 
00868   inline int GetRecvPeers()
00869   {
00870     return _nRecvPeerList + 1;
00871   }
00872   inline Peer *GetNthRecvPeer(int n, int bias)
00873   {
00874     return _RecvPeerList[(bias + n) % (_nRecvPeerList + 1)];
00875   }
00876 
00877   inline int GetStartingSendPeerBias()
00878   {
00879     return ++_curSendPeer;
00880   }
00881   inline int GetStartingRecvPeerBias()
00882   {
00883     return ++_curRecvPeer;
00884   }
00885 
00886   inline int GetParentPeers()
00887   {
00888     return _nParentPeerList + 1;
00889   }
00890   inline Peer *GetNthParentPeer(int n, int bias)
00891   {
00892     return _ParentPeerList[(bias + n) % (_nParentPeerList + 1)];
00893   }
00894   inline int GetStartingParentPeerBias()
00895   {
00896     return ++_curParentPeer;
00897   }
00898 
00899   inline void SetLastRecvPeerBias(int b)
00900   {
00901     _last_recv_peer_bias = b;
00902   }
00903   inline int GetLastRecvPeerBias()
00904   {
00905     return _last_recv_peer_bias;
00906   }
00907   void CancelPendingReads();
00908   void DumpICPConfig();
00909 
00910 private:
00911   
00912   AtomicLock * _l;
00913   int _Initialized;
00914   int _AllowIcpQueries;
00915   int _PendingIcpQueries;
00916   ICPConfiguration *_ICPConfig;
00917   ICPPeriodicCont *_ICPPeriodic;
00918   ICPHandlerCont *_ICPHandler;
00919   ICPHandlerCont *_mcastCB_handler;
00920   Event *_PeriodicEvent;
00921   Event *_ICPHandlerEvent;
00922 
00923   enum
00924   {
00925     PEER_LIST_SIZE = 2 * MAX_DEFINED_PEERS,
00926     SEND_PEER_LIST_SIZE = 2 * MAX_DEFINED_PEERS,
00927     RECV_PEER_LIST_SIZE = 2 * MAX_DEFINED_PEERS,
00928     PARENT_PEER_LIST_SIZE = 2 * MAX_DEFINED_PEERS,
00929     PEER_ID_POLL_INDEX_SIZE = 2 * MAX_DEFINED_PEERS
00930   };
00931 
00932   
00933   int _nPeerList;               
00934   Ptr<Peer> _PeerList[PEER_LIST_SIZE];
00935   Ptr<Peer> _LocalPeer;
00936 
00937   
00938   int _curSendPeer;             
00939   int _nSendPeerList;           
00940   Ptr<Peer> _SendPeerList[SEND_PEER_LIST_SIZE];
00941 
00942   
00943   int _curRecvPeer;             
00944   int _nRecvPeerList;           
00945   Ptr<Peer> _RecvPeerList[RECV_PEER_LIST_SIZE];
00946 
00947   
00948   int _curParentPeer;           
00949   int _nParentPeerList;         
00950   Ptr<Peer> _ParentPeerList[PARENT_PEER_LIST_SIZE];
00951 
00952   
00953   int _ValidPollData;
00954   int _PeerIDtoPollIndex[PEER_ID_POLL_INDEX_SIZE];
00955   int _last_recv_peer_bias;     
00956 };
00957 
00958 
00959 
00960 
00961 
00962 class PeriodicCont:public Continuation
00963 {
00964 public:
00965   PeriodicCont(ICPProcessor * p);
00966   virtual ~ PeriodicCont();
00967   virtual int PeriodicEvent(int, Event *) = 0;
00968 
00969 protected:
00970     ICPProcessor * _ICPpr;
00971 };
00972 
00973 
00974 
00975 
00976 
00977 
00978 class ICPPeriodicCont:public PeriodicCont
00979 {
00980 public:
00981   enum
00982   { PERIODIC_INTERVAL = 5000 };
00983   enum
00984   { RETRY_INTERVAL_MSECS = 10 };
00985     ICPPeriodicCont(ICPProcessor *);
00986    ~ICPPeriodicCont()
00987   {
00988   }
00989   virtual int PeriodicEvent(int, Event *);
00990   int DoReconfigAction(int, Event *);
00991 
00992 private:
00993   int _last_icp_config_callouts;
00994   int _global_config_changed;
00995   int _peer_config_changed;
00996 };
00997 
00998 
00999 
01000 
01001 class ICPHandlerCont:public PeriodicCont
01002 {
01003 public:
01004   enum
01005   {
01006     ICP_HANDLER_INTERVAL = 10
01007   };
01008     ICPHandlerCont(ICPProcessor *);
01009    ~ICPHandlerCont()
01010   {
01011   }
01012   virtual int PeriodicEvent(int, Event *);
01013   virtual int TossEvent(int, Event *);
01014 
01015 #ifdef DEBUG_ICP
01016   
01017 #define MAX_ICP_HISTORY 20
01018   struct statehistory
01019   {
01020     int event;
01021     int newstate;
01022     char *file;
01023     int line;
01024   };
01025   statehistory _history[MAX_ICP_HISTORY];
01026   int _nhistory;
01027 
01028 #define RECORD_ICP_STATE_CHANGE(peerreaddata,event_,newstate_) \
01029     peerreaddata->_history[peerreaddata->_nhistory].event = event_; \
01030     peerreaddata->_history[peerreaddata->_nhistory].newstate = newstate_; \
01031     peerreaddata->_history[peerreaddata->_nhistory].file = __FILE__; \
01032     peerreaddata->_history[peerreaddata->_nhistory].line = __LINE__; \
01033     peerreaddata->_nhistory = (peerreaddata->_nhistory + 1) % MAX_ICP_HISTORY;
01034 
01035 #else
01036 #define RECORD_ICP_STATE_CHANGE(x,y,z)
01037 #endif
01038 
01039   static int64_t ICPDataBuf_IOBuffer_sizeindex;
01040 };
01041 
01042 
01043 
01044 
01045 class ICPPeerReadCont:public Continuation
01046 {
01047 public:
01048   typedef enum
01049   {
01050     READ_ACTIVE,
01051     READ_DATA,
01052     READ_DATA_DONE,
01053     PROCESS_READ_DATA,
01054     ADD_PEER,
01055     AWAITING_CACHE_LOOKUP_RESPONSE,
01056     SEND_REPLY,
01057     WRITE_DONE,
01058     GET_ICP_REQUEST,
01059     GET_ICP_REQUEST_MUTEX,
01060     READ_NOT_ACTIVE,
01061     READ_NOT_ACTIVE_EXIT,
01062     READ_PROCESSING_COMPLETE
01063   } PeerReadState_t;
01064 
01065   class PeerReadData
01066   {
01067   public:
01068     PeerReadData();
01069     void init();
01070      ~PeerReadData();
01071     void reset(int full_reset = 0);
01072 
01073     ink_hrtime _start_time;
01074     ICPPeerReadCont *_mycont;
01075       Ptr<Peer> _peer;
01076     PeerReadState_t _next_state;
01077     int _cache_lookup_local;
01078       Ptr<IOBufferBlock> _buf;       
01079     ICPMsg_t *_rICPmsg;
01080     int _rICPmsg_len;
01081     IpEndpoint _sender; 
01082     URL _cachelookupURL;
01083     int _queryResult;
01084     ICPRequestCont *_ICPReqCont;
01085     int _bytesReceived;
01086     
01087     struct msghdr _mhdr;
01088     struct iovec _iov[MSG_IOVECS];
01089   };
01090 
01091     ICPPeerReadCont();
01092   void init(ICPProcessor *, Peer *, int);
01093    ~ICPPeerReadCont();
01094   void reset(int full_reset = 0);
01095   int ICPPeerReadEvent(int, Event *);
01096   int ICPPeerQueryCont(int, Event *);
01097   int ICPPeerQueryEvent(int, Event *);
01098   int StaleCheck(int, Event *);
01099   int PeerReadStateMachine(PeerReadData *, Event *);
01100 
01101   enum
01102   { RETRY_INTERVAL = 10 };
01103 
01104   
01105   CacheVConnection *_object_vc;
01106   HTTPInfo *_object_read;
01107   HdrHeapSDKHandle *_cache_req_hdr_heap_handle;
01108   HdrHeapSDKHandle *_cache_resp_hdr_heap_handle;
01109 
01110 private:
01111   
01112     ICPProcessor * _ICPpr;
01113   PeerReadData *_state;
01114   ink_hrtime _start_time;
01115   int _recursion_depth;
01116 };
01117 
01118 
01119 
01120 
01121 class ICPRequestCont:public Continuation
01122 {
01123   friend class ICPProcessor;
01124 
01125 public:
01126     ICPRequestCont(ICPProcessor * i = 0, Continuation * c = 0, URL * u = 0);
01127    ~ICPRequestCont();
01128   void *operator  new(size_t size, void *mem);
01129   void operator  delete(void *mem);
01130   inline void SetRequestStartTime()
01131   {
01132     _start_time = ink_get_hrtime();
01133   }
01134   inline ink_hrtime GetRequestStartTime()
01135   {
01136     return _start_time;
01137   }
01138   inline class Action *GetActionPtr()
01139   {
01140     return &_act;
01141   }
01142 
01143   enum
01144   { RETRY_INTERVAL = 10 };
01145   enum
01146   { ICP_REQUEST_HASH_SIZE = 1024 };
01147 
01148   
01149   
01150   
01151   
01152   
01153   
01154   
01155   typedef struct ICPRequestEventArgs
01156   {
01157     ICPMsg_t *rICPmsg;
01158     int rICPmsg_len;
01159     Peer *peer;
01160   } ICPRequestEventArgs_t;
01161   
01162   int ICPRequestEvent(int, Event *);
01163   int NopICPRequestEvent(int, Event *);
01164 
01165   
01166   static void NetToHostICPMsg(ICPMsg_t *, ICPMsg_t *);
01167   static int BuildICPMsg(ICPopcode_t op, unsigned int sequence_number,
01168                          int optflags, int optdata, int shostid,
01169                          void *data, int datalen, struct msghdr *mhdr, struct iovec *iov, ICPMsg_t * icpmsg);
01170 
01171   static unsigned int ICPReqSeqNumber();
01172   static int ICPRequestHash(unsigned int);
01173   static int AddICPRequest(unsigned int, ICPRequestCont *);
01174   static ICPRequestCont *FindICPRequest(unsigned int);
01175   static int RemoveICPRequest(unsigned int);
01176 
01177 private:
01178   typedef enum
01179   {
01180     ICP_START,
01181     ICP_OFF_TERMINATE,
01182     ICP_QUEUE_REQUEST,
01183     ICP_AWAITING_RESPONSE,
01184     ICP_DEQUEUE_REQUEST,
01185     ICP_POST_COMPLETION,
01186     ICP_WAIT_SEND_COMPLETE,
01187     ICP_REQUEST_NOT_ACTIVE,
01188     ICP_DONE
01189   } ICPstate_t;
01190   int ICPStateMachine(int, void *);
01191   int ICPResponseMessage(int, ICPMsg_t *, Peer *);
01192   void remove_from_pendingActions(Action *);
01193   void remove_all_pendingActions();
01194 
01195   
01196   static uint32_t ICPRequestSeqno;
01197 
01198   
01199   Continuation *_cont;
01200   URL *_url;
01201 
01202   
01203   IpEndpoint _ret_sockaddr;
01204   ICPreturn_t _ret_status;
01205   class Action _act;
01206 
01207   
01208   ink_hrtime _start_time;
01209   ICPProcessor *_ICPpr;
01210   Event *_timeout;
01211 
01212   
01213   int npending_actions;
01214   DynArray<Action *>*pendingActions;
01215 
01216   ICPMsg_t _ICPmsg;
01217   struct msghdr _sendMsgHdr;
01218   struct iovec _sendMsgIOV[MSG_IOVECS];
01219 
01220   unsigned int _sequence_number;
01221   int _expected_replies;
01222   BitMap _expected_replies_list;
01223   int _received_replies;
01224   ICPstate_t _next_state;
01225 };
01226 
01227 
01228 extern ClassAllocator<ICPRequestCont> ICPRequestCont_allocator;
01229 
01230 typedef int (*PluginFreshnessCalcFunc) (void *contp);
01231 extern PluginFreshnessCalcFunc pluginFreshnessCalcFunc;
01232 
01233 inline void *
01234 ICPRequestCont::operator new(size_t , void *mem)
01235 {
01236   return mem;
01237 }
01238 
01239 inline void
01240 ICPRequestCont::operator delete(void *mem)
01241 {
01242   ICPRequestCont_allocator.free((ICPRequestCont *) mem);
01243 }
01244 
01245 extern struct RecRawStatBlock *icp_rsb;
01246 
01247 enum
01248 {
01249   icp_stat_def,
01250   config_mgmt_callouts_stat,
01251   reconfig_polls_stat,
01252   reconfig_events_stat,
01253   invalid_poll_data_stat,
01254   no_data_read_stat,
01255   short_read_stat,
01256   invalid_sender_stat,
01257   read_not_v2_icp_stat,
01258   icp_remote_query_requests_stat,
01259   icp_remote_responses_stat,
01260   icp_cache_lookup_success_stat,
01261   icp_cache_lookup_fail_stat,
01262   query_response_write_stat,
01263   query_response_partial_write_stat,
01264   no_icp_request_for_response_stat,
01265   icp_response_request_nolock_stat,
01266   icp_start_icpoff_stat,
01267   send_query_partial_write_stat,
01268   icp_queries_no_expected_replies_stat,
01269   icp_query_hits_stat,
01270   icp_query_misses_stat,
01271   invalid_icp_query_response_stat,
01272   icp_query_requests_stat,
01273   total_icp_response_time_stat,
01274   total_udp_send_queries_stat,
01275   total_icp_request_time_stat,
01276   icp_total_reloads,
01277   icp_pending_reloads,
01278   icp_reload_start_aborts,
01279   icp_reload_connect_aborts,
01280   icp_reload_read_aborts,
01281   icp_reload_write_aborts,
01282   icp_reload_successes,
01283   icp_stat_count
01284 };
01285 
01286 #define ICP_EstablishStaticConfigInteger(_ix,_n) \
01287         REC_EstablishStaticConfigInt32(_ix,_n)
01288 
01289 #define ICP_EstablishStaticConfigStringAlloc(_ix, n) \
01290         REC_EstablishStaticConfigStringAlloc(_ix, n)
01291 
01292 #define ICP_INCREMENT_DYN_STAT(x) \
01293         RecIncrRawStat(icp_rsb, mutex->thread_holding, (int) x, 1)
01294 #define ICP_DECREMENT_DYN_STAT(x) \
01295         RecIncrRawStat(icp_rsb, mutex->thread_holding, (int) x, -1)
01296 #define ICP_SUM_DYN_STAT(x, y) \
01297         RecIncrRawStat(icp_rsb, mutex->thread_holding, (int) x, (y))
01298 #define ICP_READ_DYN_STAT(x, C, S) \
01299         RecGetRawStatCount(icp_rsb, (int) x, &C); \
01300         RecGetRawStatSum(icp_rsb, (int) x, &S);
01301 
01302 #define ICP_ReadConfigString          REC_ReadConfigString
01303 #define ICP_RegisterConfigUpdateFunc  REC_RegisterConfigUpdateFunc
01304 
01305 
01306 
01307 
01308 #endif // _ICP_H_