00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #ifndef _ICP_H_
00033 #define _ICP_H_
00034
00035 #include "P_Net.h"
00036 #include "P_Cache.h"
00037 #define ET_ICP ET_CALL
00038 #include "URL.h"
00039 #include "ICPevents.h"
00040 #include "ICPProcessor.h"
00041 #include "DynArray.h"
00042
00043
00044
00045
00046
00047 #define ICP_DEBUG 1
00048
00049
00050
00051
00052
00053
00054
00055
00056 typedef struct ICPMsgHeader
00057 {
00058 uint8_t opcode;
00059 uint8_t version;
00060 uint16_t msglen;
00061 uint32_t requestno;
00062 uint32_t optionflags;
00063 uint32_t optiondata;
00064 uint32_t shostid;
00065 } ICPMsgHdr_t;
00066
00067
00068
00069
00070 typedef enum
00071 {
00072 ICP_OP_INVALID,
00073 ICP_OP_QUERY,
00074 ICP_OP_HIT,
00075 ICP_OP_MISS,
00076 ICP_OP_ERR,
00077
00078 ICP_OP_UNUSED5,
00079 ICP_OP_UNUSED6,
00080 ICP_OP_UNUSED7,
00081 ICP_OP_UNUSED8,
00082 ICP_OP_UNUSED9,
00083
00084 ICP_OP_SECHO,
00085 ICP_OP_DECHO,
00086
00087 ICP_OP_UNUSED12,
00088 ICP_OP_UNUSED13,
00089 ICP_OP_UNUSED14,
00090 ICP_OP_UNUSED15,
00091 ICP_OP_UNUSED16,
00092 ICP_OP_UNUSED17,
00093 ICP_OP_UNUSED18,
00094 ICP_OP_UNUSED19,
00095 ICP_OP_UNUSED20,
00096
00097 ICP_OP_MISS_NOFETCH,
00098 ICP_OP_DENIED,
00099 ICP_OP_HIT_OBJ,
00100 ICP_OP_END_OF_OPS
00101 } ICPopcode_t;
00102
00103 #define ICP_OP_LAST (ICP_OP_END_OF_OPS - 1)
00104
00105
00106
00107
00108 #define ICP_VERSION_1 1
00109 #define ICP_VERSION_2 2
00110 #define ICP_VERSION_3 3
00111 #define ICP_VERSION ICP_VERSION_2
00112
00113
00114
00115
00116 #define ICP_FLAG_HIT_OBJ 0x80000000ul
00117 #define ICP_FLAG_SRC_RTT 0x40000000ul
00118
00119
00120
00121
00122 #define MAX_ICP_MSGSIZE (16 * 1024)
00123 #define MAX_ICP_MSG_PAYLOAD_SIZE (MAX_ICP_MSGSIZE - sizeof(ICPmsgHdr_t))
00124 #define MAX_ICP_QUERY_PAYLOAD_SIZE (MAX_ICP_MSG_PAYLOAD_SIZE - sizeof(uint32_t))
00125 #define MAX_DEFINED_PEERS 64
00126 #define MSG_IOVECS 16
00127
00128
00129
00130
00131 typedef struct ICPData
00132 {
00133 char *URL;
00134 } ICPData_t;
00135
00136
00137
00138
00139 typedef struct ICPQuery
00140 {
00141 uint32_t rhostid;
00142 char *URL;
00143 } ICPQuery_t;
00144
00145
00146
00147
00148 typedef struct ICPHit
00149 {
00150 char *URL;
00151 } ICPHit_t;
00152
00153
00154
00155
00156 typedef struct ICPMiss
00157 {
00158 char *URL;
00159 } ICPMiss_t;
00160
00161
00162
00163
00164 typedef struct ICPHitObj
00165 {
00166 char *URL;
00167 char *p_objsize;
00168 uint16_t objsize;
00169 char *data;
00170 } ICPHitObj_t;
00171
00172
00173
00174
00175 typedef struct ICPMsg
00176 {
00177 ICPMsgHdr_t h;
00178 union
00179 {
00180 ICPData_t data;
00181 ICPQuery_t query;
00182 ICPHit_t hit;
00183 ICPMiss_t miss;
00184 ICPHitObj_t hitobj;
00185 } un;
00186 } ICPMsg_t;
00187
00188
00189
00190
00191
00192 class BitMap;
00193 class ICPProcessor;
00194 class ICPPeriodicCont;
00195 class ICPHandlerCont;
00196 class ICPPeerReadCont;
00197 class ICPRequestCont;
00198
00199 typedef enum
00200 {
00201 PEER_NONE = 0,
00202 PEER_PARENT = 1,
00203 PEER_SIBLING = 2,
00204 PEER_LOCAL = 3,
00205 PEER_MULTICAST = 4
00206 } PeerType_t;
00207
00208 #if !defined(USE_CAS_FOR_ATOMICLOCK)
00209 class AtomicLock
00210 {
00211 public:
00212 AtomicLock();
00213 ~AtomicLock();
00214 int Lock();
00215 int HaveLock();
00216 void Unlock();
00217
00218 private:
00219 Ptr<ProxyMutex> _mutex;
00220 };
00221
00222 #else // USE_CAS_FOR_ATOMICLOCK
00223 class AtomicLock
00224 {
00225 public:
00226 AtomicLock();
00227 ~AtomicLock();
00228 int Lock();
00229 int HaveLock();
00230 void Unlock();
00231
00232 private:
00233 enum
00234 { UNLOCKED = 0, LOCKED = 1 };
00235 int32_t _lock_word;
00236 };
00237 #endif // USE_CAS_FOR_ATOMICLOCK
00238
00239
00240
00241
00242 class ICPConfigData
00243 {
00244 friend class ICPConfiguration;
00245
00246 public:
00247 ICPConfigData():_icp_enabled(0), _icp_port(0), _icp_interface(0),
00248 _multicast_enabled(0), _icp_query_timeout(0), _cache_lookup_local(0),
00249 _stale_lookup(0), _reply_to_unknown_peer(0), _default_reply_port(0)
00250 {
00251 }
00252 ~ICPConfigData()
00253 {
00254 }
00255 inline int operator==(ICPConfigData &);
00256 inline int ICPconfigured()
00257 {
00258 return _icp_enabled;
00259 }
00260 inline int ICPport()
00261 {
00262 return _icp_port;
00263 }
00264 inline char *ICPinterface()
00265 {
00266 return _icp_interface;
00267 }
00268 inline int ICPmulticastConfigured()
00269 {
00270 return _multicast_enabled;
00271 }
00272 inline int ICPqueryTimeout()
00273 {
00274 return _icp_query_timeout;
00275 }
00276 inline int ICPLocalCacheLookup()
00277 {
00278 return _cache_lookup_local;
00279 }
00280 inline int ICPStaleLookup()
00281 {
00282 return _stale_lookup;
00283 }
00284 inline int ICPReplyToUnknownPeer()
00285 {
00286 return _reply_to_unknown_peer;
00287 }
00288 inline int ICPDefaultReplyPort()
00289 {
00290 return _default_reply_port;
00291 }
00292
00293 private:
00294
00295
00296
00297 int _icp_enabled;
00298 int _icp_port;
00299 char *_icp_interface;
00300 int _multicast_enabled;
00301 int _icp_query_timeout;
00302 int _cache_lookup_local;
00303 int _stale_lookup;
00304 int _reply_to_unknown_peer;
00305 int _default_reply_port;
00306 };
00307
00308
00309
00310
00311 class PeerConfigData
00312 {
00313 friend class ICPConfiguration;
00314 friend class ICPProcessor;
00315
00316 public:
00317 PeerConfigData();
00318 PeerConfigData(int ctype, IpAddr const& ip_addr, int proxy_port, int icp_port)
00319 : _ctype(ctype), _ip_addr(ip_addr),
00320 _proxy_port(proxy_port), _icp_port(icp_port), _mc_member(0), _mc_ttl(0),
00321 _my_ip_addr(ip_addr)
00322 {
00323 _hostname[0] = 0;
00324 }
00325 ~PeerConfigData()
00326 {
00327 }
00328 bool operator==(PeerConfigData &);
00329 inline const char *GetHostname()
00330 {
00331 return _hostname;
00332 }
00333 inline int GetCType()
00334 {
00335 return _ctype;
00336 }
00337 inline IpAddr const& GetIPAddr()
00338 {
00339 return _my_ip_addr;
00340 }
00341 inline int GetProxyPort()
00342 {
00343 return _proxy_port;
00344 }
00345 inline int GetICPPort()
00346 {
00347 return _icp_port;
00348 }
00349 inline int MultiCastMember()
00350 {
00351 return _mc_member;
00352 }
00353 inline IpAddr const& GetMultiCastIPAddr()
00354 {
00355 return _mc_ip_addr;
00356 }
00357 inline int GetMultiCastTTL()
00358 {
00359 return _mc_ttl;
00360 }
00361
00362
00363 static PeerType_t CTypeToPeerType_t(int);
00364 static int GetHostIPByName(char *, IpAddr&);
00365
00366 enum
00367 { HOSTNAME_SIZE = 256 };
00368 enum
00369 { CTYPE_NONE = 0,
00370 CTYPE_PARENT = 1,
00371 CTYPE_SIBLING = 2,
00372 CTYPE_LOCAL = 3
00373 };
00374
00375 private:
00376
00377
00378
00379 char _hostname[HOSTNAME_SIZE];
00380 int _ctype;
00381 IpAddr _ip_addr;
00382 int _proxy_port;
00383 int _icp_port;
00384
00385
00386
00387 int _mc_member;
00388 IpAddr _mc_ip_addr;
00389 int _mc_ttl;
00390
00391
00392
00393
00394 IpAddr _my_ip_addr;
00395 };
00396
00397
00398
00399
00400
00401
00402 class ICPConfigUpdateCont:public Continuation
00403 {
00404 public:
00405 ICPConfigUpdateCont(void *data, void *value);
00406 ~ICPConfigUpdateCont()
00407 {
00408 }
00409 int RetryICPconfigUpdate(int, Event *);
00410
00411 enum
00412 { RETRY_INTERVAL = 10 };
00413
00414 private:
00415 void *_data;
00416 void *_value;
00417 };
00418
00419
00420
00421
00422 class ICPConfiguration
00423 {
00424 public:
00425 ICPConfiguration();
00426 ~ICPConfiguration();
00427 int GlobalConfigChange();
00428 void UpdateGlobalConfig();
00429 int PeerConfigChange();
00430 void UpdatePeerConfig();
00431
00432 inline ICPConfigData *globalConfig()
00433 {
00434 return _icp_cdata;
00435 }
00436 inline PeerConfigData *indexToPeerConfigData(int index)
00437 {
00438 ink_assert(index <= MAX_DEFINED_PEERS);
00439
00440
00441 return _peer_cdata[index];
00442 }
00443
00444
00445 static int mgr_icp_config_change_callback(const char *, RecDataT, RecData, void *);
00446
00447
00448 static void *icp_config_change_callback(void *, void *, int startup = 0);
00449
00450 inline int Lock()
00451 {
00452 return _l.Lock();
00453 }
00454 inline void Unlock()
00455 {
00456 _l.Unlock();
00457 return;
00458 }
00459 inline int HaveLock()
00460 {
00461 return _l.HaveLock();
00462 }
00463
00464 inline int ICPConfigCallouts()
00465 {
00466 return _icp_config_callouts;
00467 }
00468
00469 private:
00470
00471 AtomicLock _l;
00472 int _icp_config_callouts;
00473
00474
00475
00476
00477
00478
00479
00480
00481
00482
00483 ICPConfigData *_icp_cdata;
00484 ICPConfigData *_icp_cdata_current;
00485 PeerConfigData *_peer_cdata[MAX_DEFINED_PEERS + 1];
00486 PeerConfigData *_peer_cdata_current[MAX_DEFINED_PEERS + 1];
00487 };
00488
00489
00490
00491
00492
00493
00494
00495 #define PEER_UP (1 << 0)
00496 #define PEER_MULTICAST_COUNT_EVENT (1 << 1) // Member probe event active
00497 #define PEER_DYNAMIC (1 << 2) // Dynamically added, not in config
00498
00499 struct CacheVConnection;
00500
00501 class Peer:public RefCountObj
00502 {
00503 public:
00504 Peer(PeerType_t, ICPProcessor *, bool dynamic_peer = false);
00505 virtual ~ Peer()
00506 {
00507 }
00508 void LogRecvMsg(ICPMsg_t *, int);
00509
00510
00511 virtual sockaddr* GetIP() = 0;
00512 virtual Action *SendMsg_re(Continuation *, void *, struct msghdr *, struct sockaddr const* to) = 0;
00513 virtual Action *RecvFrom_re(Continuation *, void *, IOBufferBlock *, int, struct sockaddr *, socklen_t *) = 0;
00514 virtual int GetRecvFD() = 0;
00515 virtual int GetSendFD() = 0;
00516 virtual int ExpectedReplies(BitMap *) = 0;
00517 virtual int ValidSender(sockaddr *) = 0;
00518 virtual void LogSendMsg(ICPMsg_t *, sockaddr const*) = 0;
00519 virtual int IsOnline() = 0;
00520 virtual Connection *GetSendChan() = 0;
00521 virtual Connection *GetRecvChan() = 0;
00522 virtual int ExtToIntRecvSockAddr(sockaddr const*, sockaddr *) = 0;
00523
00524 enum
00525 { OFFLINE_THRESHOLD = 20 };
00526
00527 inline PeerType_t GetType()
00528 {
00529 return _type;
00530 }
00531 inline int GetPeerID()
00532 {
00533 return _id;
00534 }
00535 inline void SetPeerID(int newid)
00536 {
00537 _id = newid;
00538 }
00539 inline void SetNext(Peer * p)
00540 {
00541 _next = p;
00542 }
00543 inline Peer *GetNext()
00544 {
00545 return _next;
00546 }
00547 inline bool shouldStartRead()
00548 {
00549 return !notFirstRead;
00550 }
00551 inline void startingRead()
00552 {
00553 notFirstRead = 1;
00554 }
00555 inline void cancelRead()
00556 {
00557 notFirstRead = 0;
00558 }
00559 inline bool readActive()
00560 {
00561 return (readAction != NULL);
00562 }
00563 inline bool isUp()
00564 {
00565 return (_state & PEER_UP);
00566 }
00567
00568
00569
00570 Ptr<IOBufferBlock> buf;
00571 IpEndpoint fromaddr;
00572 socklen_t fromaddrlen;
00573 int notFirstRead;
00574 Action *readAction;
00575 Action *writeAction;
00576
00577 protected:
00578 PeerType_t _type;
00579 int _id;
00580 Peer *_next;
00581 ICPProcessor *_ICPpr;
00582
00583
00584
00585
00586 int _state;
00587
00588
00589
00590
00591 struct PeerStats
00592 {
00593 ink_hrtime last_send;
00594 ink_hrtime last_receive;
00595 int sent[ICP_OP_LAST + 1];
00596 int recv[ICP_OP_LAST + 1];
00597 int total_sent;
00598 int total_received;
00599 int dropped_replies;
00600 } _stats;
00601 };
00602
00603
00604
00605
00606 class ParentSiblingPeer:public Peer
00607 {
00608 public:
00609 ParentSiblingPeer(PeerType_t, PeerConfigData *, ICPProcessor *, bool dynamic_peer = false);
00610 ~ParentSiblingPeer()
00611 {
00612 if (_pconfig && (_state & PEER_DYNAMIC))
00613 delete _pconfig;
00614 }
00615 int GetProxyPort();
00616 int GetICPPort();
00617 virtual sockaddr* GetIP();
00618 virtual Action *SendMsg_re(Continuation *, void *, struct msghdr *, struct sockaddr const* to);
00619 virtual Action *RecvFrom_re(Continuation *, void *, IOBufferBlock *, int, struct sockaddr *, socklen_t *);
00620 virtual int GetRecvFD();
00621 virtual int GetSendFD();
00622 virtual int ExpectedReplies(BitMap *);
00623 virtual int ValidSender(struct sockaddr*);
00624 virtual void LogSendMsg(ICPMsg_t *, sockaddr const*);
00625 virtual int ExtToIntRecvSockAddr(sockaddr const* in, sockaddr* out);
00626 inline virtual int IsOnline()
00627 {
00628 return 1;
00629 }
00630 inline virtual Connection *GetSendChan()
00631 {
00632 return &_chan;
00633 }
00634 inline virtual Connection *GetRecvChan()
00635 {
00636 return &_chan;
00637 }
00638 inline PeerConfigData *GetConfig()
00639 {
00640 return _pconfig;
00641 }
00642 inline Connection *GetChan()
00643 {
00644 return &_chan;
00645 }
00646
00647 private:
00648
00649 PeerConfigData * _pconfig;
00650 IpEndpoint _ip;
00651 Connection _chan;
00652 };
00653
00654
00655
00656
00657 class MultiCastPeer:public Peer
00658 {
00659 public:
00660 MultiCastPeer(IpAddr const&, uint16_t, int, ICPProcessor *);
00661 ~MultiCastPeer()
00662 {
00663 }
00664 int GetTTL();
00665 int AddMultiCastChild(Peer * P);
00666
00667
00668
00669 Peer *FindMultiCastChild(
00670 IpAddr const& ip,
00671 uint16_t port = 0
00672 );
00673
00674 virtual sockaddr* GetIP();
00675 virtual Action *SendMsg_re(Continuation *, void *, struct msghdr *, struct sockaddr const* to);
00676 virtual Action *RecvFrom_re(Continuation *, void *, IOBufferBlock *, int, struct sockaddr *, socklen_t *);
00677 virtual int GetRecvFD();
00678 virtual int GetSendFD();
00679 virtual int ExpectedReplies(BitMap *);
00680 virtual int ValidSender(struct sockaddr*);
00681 virtual void LogSendMsg(ICPMsg_t *, sockaddr const*);
00682 virtual int IsOnline();
00683 inline virtual Connection *GetRecvChan()
00684 {
00685 return &_recv_chan;
00686 }
00687 inline virtual Connection *GetSendChan()
00688 {
00689 return &_send_chan;
00690 }
00691 inline virtual int ExtToIntRecvSockAddr(sockaddr const* in, sockaddr* out)
00692 {
00693 Peer *P = FindMultiCastChild(IpAddr(in));
00694 if (P) {
00695 ats_ip_copy(out, in);
00696 ats_ip_port_cast(out) = ats_ip_port_cast(P->GetIP());
00697 return 1;
00698 } else {
00699 return 0;
00700 }
00701 }
00702
00703 private:
00704
00705 Connection _send_chan;
00706 Connection _recv_chan;
00707
00708
00709
00710 IpEndpoint _mc_ip;
00711 int _mc_ttl;
00712 struct multicast_data
00713 {
00714 double avg_members;
00715 int defined_members;
00716 int n_count_events;
00717 int count_event_reqno;
00718 int expected_replies;
00719 } _mc;
00720 };
00721
00722
00723
00724
00725 class BitMap
00726 {
00727 public:
00728 BitMap(int);
00729 ~BitMap();
00730 void SetBit(int);
00731 void ClearBit(int);
00732 int IsBitSet(int);
00733
00734 private:
00735 enum
00736 { STATIC_BITMAP_BYTE_SIZE = 16,
00737 BITS_PER_BYTE = 8
00738 };
00739 char _static_bitmap[STATIC_BITMAP_BYTE_SIZE];
00740 char *_bitmap;
00741 int _bitmap_size;
00742 int _bitmap_byte_size;
00743 };
00744
00745
00746
00747
00748 class ICPProcessor
00749 {
00750 friend class ICPHandlerCont;
00751 friend class ICPPeerReadCont;
00752 friend class ICPRequestCont;
00753
00754 public:
00755 ICPProcessor();
00756 ~ICPProcessor();
00757
00758
00759 void start();
00760 Action *ICPQuery(Continuation *, URL *);
00761
00762
00763 typedef enum
00764 {
00765 RC_RECONFIG,
00766 RC_ENABLE_ICP,
00767 RC_DONE
00768 } ReconfigState_t;
00769 ReconfigState_t ReconfigureStateMachine(ReconfigState_t, int, int);
00770
00771 Peer *FindPeer(IpAddr const& ip, uint16_t port = 0);
00772 Peer *FindPeer(IpEndpoint const& ip) {
00773 return this->FindPeer(IpAddr(&ip), ats_ip_port_host_order(&ip));
00774 }
00775 Peer *FindPeer(sockaddr const* ip) {
00776 return this->FindPeer(IpAddr(ip), ats_ip_port_host_order(ip));
00777 }
00778
00779 inline Peer *GetLocalPeer()
00780 {
00781 return _LocalPeer;
00782 }
00783 inline Peer *IdToPeer(int id)
00784 {
00785 return _PeerList[id];
00786 }
00787 inline ICPConfiguration *GetConfig()
00788 {
00789 return _ICPConfig;
00790 }
00791
00792 inline int GetFreePeers()
00793 {
00794 return PEER_LIST_SIZE - (_nPeerList + 1);
00795 }
00796 inline int GetFreeSendPeers()
00797 {
00798 return SEND_PEER_LIST_SIZE - (_nSendPeerList + 1);
00799 }
00800 inline int GetFreeRecvPeers()
00801 {
00802 return RECV_PEER_LIST_SIZE - (_nRecvPeerList + 1);
00803 }
00804
00805 private:
00806 inline int Lock()
00807 {
00808 return _l->Lock();
00809 }
00810 inline void Unlock()
00811 {
00812 _l->Unlock();
00813 return;
00814 }
00815 inline int HaveLock()
00816 {
00817 return _l->HaveLock();
00818 }
00819 int BuildPeerList();
00820 void FreePeerList();
00821 int SetupListenSockets();
00822 void ShutdownListenSockets();
00823 int Reconfigure(int, int);
00824 void InitICPStatCallbacks();
00825
00826 inline void DisableICPQueries()
00827 {
00828 _AllowIcpQueries = 0;
00829 }
00830 inline void EnableICPQueries()
00831 {
00832 _AllowIcpQueries = 1;
00833 }
00834 inline int AllowICPQueries()
00835 {
00836 return _AllowIcpQueries;
00837 }
00838 inline int PendingQuery()
00839 {
00840 return _PendingIcpQueries;
00841 }
00842 inline void IncPendingQuery()
00843 {
00844 _PendingIcpQueries++;
00845 }
00846 inline void DecPendingQuery()
00847 {
00848 _PendingIcpQueries--;
00849 }
00850
00851 Peer *GenericFindListPeer(IpAddr const&, uint16_t, int, Ptr<Peer> *);
00852 Peer *FindSendListPeer(IpAddr const&, uint16_t);
00853 Peer *FindRecvListPeer(IpAddr const&, uint16_t);
00854 int AddPeer(Peer *);
00855 int AddPeerToSendList(Peer *);
00856 int AddPeerToRecvList(Peer *);
00857 int AddPeerToParentList(Peer *);
00858
00859 inline int GetSendPeers()
00860 {
00861 return _nSendPeerList + 1;
00862 }
00863 inline Peer *GetNthSendPeer(int n, int bias)
00864 {
00865 return _SendPeerList[(bias + n) % (_nSendPeerList + 1)];
00866 }
00867
00868 inline int GetRecvPeers()
00869 {
00870 return _nRecvPeerList + 1;
00871 }
00872 inline Peer *GetNthRecvPeer(int n, int bias)
00873 {
00874 return _RecvPeerList[(bias + n) % (_nRecvPeerList + 1)];
00875 }
00876
00877 inline int GetStartingSendPeerBias()
00878 {
00879 return ++_curSendPeer;
00880 }
00881 inline int GetStartingRecvPeerBias()
00882 {
00883 return ++_curRecvPeer;
00884 }
00885
00886 inline int GetParentPeers()
00887 {
00888 return _nParentPeerList + 1;
00889 }
00890 inline Peer *GetNthParentPeer(int n, int bias)
00891 {
00892 return _ParentPeerList[(bias + n) % (_nParentPeerList + 1)];
00893 }
00894 inline int GetStartingParentPeerBias()
00895 {
00896 return ++_curParentPeer;
00897 }
00898
00899 inline void SetLastRecvPeerBias(int b)
00900 {
00901 _last_recv_peer_bias = b;
00902 }
00903 inline int GetLastRecvPeerBias()
00904 {
00905 return _last_recv_peer_bias;
00906 }
00907 void CancelPendingReads();
00908 void DumpICPConfig();
00909
00910 private:
00911
00912 AtomicLock * _l;
00913 int _Initialized;
00914 int _AllowIcpQueries;
00915 int _PendingIcpQueries;
00916 ICPConfiguration *_ICPConfig;
00917 ICPPeriodicCont *_ICPPeriodic;
00918 ICPHandlerCont *_ICPHandler;
00919 ICPHandlerCont *_mcastCB_handler;
00920 Event *_PeriodicEvent;
00921 Event *_ICPHandlerEvent;
00922
00923 enum
00924 {
00925 PEER_LIST_SIZE = 2 * MAX_DEFINED_PEERS,
00926 SEND_PEER_LIST_SIZE = 2 * MAX_DEFINED_PEERS,
00927 RECV_PEER_LIST_SIZE = 2 * MAX_DEFINED_PEERS,
00928 PARENT_PEER_LIST_SIZE = 2 * MAX_DEFINED_PEERS,
00929 PEER_ID_POLL_INDEX_SIZE = 2 * MAX_DEFINED_PEERS
00930 };
00931
00932
00933 int _nPeerList;
00934 Ptr<Peer> _PeerList[PEER_LIST_SIZE];
00935 Ptr<Peer> _LocalPeer;
00936
00937
00938 int _curSendPeer;
00939 int _nSendPeerList;
00940 Ptr<Peer> _SendPeerList[SEND_PEER_LIST_SIZE];
00941
00942
00943 int _curRecvPeer;
00944 int _nRecvPeerList;
00945 Ptr<Peer> _RecvPeerList[RECV_PEER_LIST_SIZE];
00946
00947
00948 int _curParentPeer;
00949 int _nParentPeerList;
00950 Ptr<Peer> _ParentPeerList[PARENT_PEER_LIST_SIZE];
00951
00952
00953 int _ValidPollData;
00954 int _PeerIDtoPollIndex[PEER_ID_POLL_INDEX_SIZE];
00955 int _last_recv_peer_bias;
00956 };
00957
00958
00959
00960
00961
00962 class PeriodicCont:public Continuation
00963 {
00964 public:
00965 PeriodicCont(ICPProcessor * p);
00966 virtual ~ PeriodicCont();
00967 virtual int PeriodicEvent(int, Event *) = 0;
00968
00969 protected:
00970 ICPProcessor * _ICPpr;
00971 };
00972
00973
00974
00975
00976
00977
00978 class ICPPeriodicCont:public PeriodicCont
00979 {
00980 public:
00981 enum
00982 { PERIODIC_INTERVAL = 5000 };
00983 enum
00984 { RETRY_INTERVAL_MSECS = 10 };
00985 ICPPeriodicCont(ICPProcessor *);
00986 ~ICPPeriodicCont()
00987 {
00988 }
00989 virtual int PeriodicEvent(int, Event *);
00990 int DoReconfigAction(int, Event *);
00991
00992 private:
00993 int _last_icp_config_callouts;
00994 int _global_config_changed;
00995 int _peer_config_changed;
00996 };
00997
00998
00999
01000
01001 class ICPHandlerCont:public PeriodicCont
01002 {
01003 public:
01004 enum
01005 {
01006 ICP_HANDLER_INTERVAL = 10
01007 };
01008 ICPHandlerCont(ICPProcessor *);
01009 ~ICPHandlerCont()
01010 {
01011 }
01012 virtual int PeriodicEvent(int, Event *);
01013 virtual int TossEvent(int, Event *);
01014
01015 #ifdef DEBUG_ICP
01016
01017 #define MAX_ICP_HISTORY 20
01018 struct statehistory
01019 {
01020 int event;
01021 int newstate;
01022 char *file;
01023 int line;
01024 };
01025 statehistory _history[MAX_ICP_HISTORY];
01026 int _nhistory;
01027
01028 #define RECORD_ICP_STATE_CHANGE(peerreaddata,event_,newstate_) \
01029 peerreaddata->_history[peerreaddata->_nhistory].event = event_; \
01030 peerreaddata->_history[peerreaddata->_nhistory].newstate = newstate_; \
01031 peerreaddata->_history[peerreaddata->_nhistory].file = __FILE__; \
01032 peerreaddata->_history[peerreaddata->_nhistory].line = __LINE__; \
01033 peerreaddata->_nhistory = (peerreaddata->_nhistory + 1) % MAX_ICP_HISTORY;
01034
01035 #else
01036 #define RECORD_ICP_STATE_CHANGE(x,y,z)
01037 #endif
01038
01039 static int64_t ICPDataBuf_IOBuffer_sizeindex;
01040 };
01041
01042
01043
01044
01045 class ICPPeerReadCont:public Continuation
01046 {
01047 public:
01048 typedef enum
01049 {
01050 READ_ACTIVE,
01051 READ_DATA,
01052 READ_DATA_DONE,
01053 PROCESS_READ_DATA,
01054 ADD_PEER,
01055 AWAITING_CACHE_LOOKUP_RESPONSE,
01056 SEND_REPLY,
01057 WRITE_DONE,
01058 GET_ICP_REQUEST,
01059 GET_ICP_REQUEST_MUTEX,
01060 READ_NOT_ACTIVE,
01061 READ_NOT_ACTIVE_EXIT,
01062 READ_PROCESSING_COMPLETE
01063 } PeerReadState_t;
01064
01065 class PeerReadData
01066 {
01067 public:
01068 PeerReadData();
01069 void init();
01070 ~PeerReadData();
01071 void reset(int full_reset = 0);
01072
01073 ink_hrtime _start_time;
01074 ICPPeerReadCont *_mycont;
01075 Ptr<Peer> _peer;
01076 PeerReadState_t _next_state;
01077 int _cache_lookup_local;
01078 Ptr<IOBufferBlock> _buf;
01079 ICPMsg_t *_rICPmsg;
01080 int _rICPmsg_len;
01081 IpEndpoint _sender;
01082 URL _cachelookupURL;
01083 int _queryResult;
01084 ICPRequestCont *_ICPReqCont;
01085 int _bytesReceived;
01086
01087 struct msghdr _mhdr;
01088 struct iovec _iov[MSG_IOVECS];
01089 };
01090
01091 ICPPeerReadCont();
01092 void init(ICPProcessor *, Peer *, int);
01093 ~ICPPeerReadCont();
01094 void reset(int full_reset = 0);
01095 int ICPPeerReadEvent(int, Event *);
01096 int ICPPeerQueryCont(int, Event *);
01097 int ICPPeerQueryEvent(int, Event *);
01098 int StaleCheck(int, Event *);
01099 int PeerReadStateMachine(PeerReadData *, Event *);
01100
01101 enum
01102 { RETRY_INTERVAL = 10 };
01103
01104
01105 CacheVConnection *_object_vc;
01106 HTTPInfo *_object_read;
01107 HdrHeapSDKHandle *_cache_req_hdr_heap_handle;
01108 HdrHeapSDKHandle *_cache_resp_hdr_heap_handle;
01109
01110 private:
01111
01112 ICPProcessor * _ICPpr;
01113 PeerReadData *_state;
01114 ink_hrtime _start_time;
01115 int _recursion_depth;
01116 };
01117
01118
01119
01120
01121 class ICPRequestCont:public Continuation
01122 {
01123 friend class ICPProcessor;
01124
01125 public:
01126 ICPRequestCont(ICPProcessor * i = 0, Continuation * c = 0, URL * u = 0);
01127 ~ICPRequestCont();
01128 void *operator new(size_t size, void *mem);
01129 void operator delete(void *mem);
01130 inline void SetRequestStartTime()
01131 {
01132 _start_time = ink_get_hrtime();
01133 }
01134 inline ink_hrtime GetRequestStartTime()
01135 {
01136 return _start_time;
01137 }
01138 inline class Action *GetActionPtr()
01139 {
01140 return &_act;
01141 }
01142
01143 enum
01144 { RETRY_INTERVAL = 10 };
01145 enum
01146 { ICP_REQUEST_HASH_SIZE = 1024 };
01147
01148
01149
01150
01151
01152
01153
01154
01155 typedef struct ICPRequestEventArgs
01156 {
01157 ICPMsg_t *rICPmsg;
01158 int rICPmsg_len;
01159 Peer *peer;
01160 } ICPRequestEventArgs_t;
01161
01162 int ICPRequestEvent(int, Event *);
01163 int NopICPRequestEvent(int, Event *);
01164
01165
01166 static void NetToHostICPMsg(ICPMsg_t *, ICPMsg_t *);
01167 static int BuildICPMsg(ICPopcode_t op, unsigned int sequence_number,
01168 int optflags, int optdata, int shostid,
01169 void *data, int datalen, struct msghdr *mhdr, struct iovec *iov, ICPMsg_t * icpmsg);
01170
01171 static unsigned int ICPReqSeqNumber();
01172 static int ICPRequestHash(unsigned int);
01173 static int AddICPRequest(unsigned int, ICPRequestCont *);
01174 static ICPRequestCont *FindICPRequest(unsigned int);
01175 static int RemoveICPRequest(unsigned int);
01176
01177 private:
01178 typedef enum
01179 {
01180 ICP_START,
01181 ICP_OFF_TERMINATE,
01182 ICP_QUEUE_REQUEST,
01183 ICP_AWAITING_RESPONSE,
01184 ICP_DEQUEUE_REQUEST,
01185 ICP_POST_COMPLETION,
01186 ICP_WAIT_SEND_COMPLETE,
01187 ICP_REQUEST_NOT_ACTIVE,
01188 ICP_DONE
01189 } ICPstate_t;
01190 int ICPStateMachine(int, void *);
01191 int ICPResponseMessage(int, ICPMsg_t *, Peer *);
01192 void remove_from_pendingActions(Action *);
01193 void remove_all_pendingActions();
01194
01195
01196 static uint32_t ICPRequestSeqno;
01197
01198
01199 Continuation *_cont;
01200 URL *_url;
01201
01202
01203 IpEndpoint _ret_sockaddr;
01204 ICPreturn_t _ret_status;
01205 class Action _act;
01206
01207
01208 ink_hrtime _start_time;
01209 ICPProcessor *_ICPpr;
01210 Event *_timeout;
01211
01212
01213 int npending_actions;
01214 DynArray<Action *>*pendingActions;
01215
01216 ICPMsg_t _ICPmsg;
01217 struct msghdr _sendMsgHdr;
01218 struct iovec _sendMsgIOV[MSG_IOVECS];
01219
01220 unsigned int _sequence_number;
01221 int _expected_replies;
01222 BitMap _expected_replies_list;
01223 int _received_replies;
01224 ICPstate_t _next_state;
01225 };
01226
01227
01228 extern ClassAllocator<ICPRequestCont> ICPRequestCont_allocator;
01229
01230 typedef int (*PluginFreshnessCalcFunc) (void *contp);
01231 extern PluginFreshnessCalcFunc pluginFreshnessCalcFunc;
01232
01233 inline void *
01234 ICPRequestCont::operator new(size_t , void *mem)
01235 {
01236 return mem;
01237 }
01238
01239 inline void
01240 ICPRequestCont::operator delete(void *mem)
01241 {
01242 ICPRequestCont_allocator.free((ICPRequestCont *) mem);
01243 }
01244
01245 extern struct RecRawStatBlock *icp_rsb;
01246
01247 enum
01248 {
01249 icp_stat_def,
01250 config_mgmt_callouts_stat,
01251 reconfig_polls_stat,
01252 reconfig_events_stat,
01253 invalid_poll_data_stat,
01254 no_data_read_stat,
01255 short_read_stat,
01256 invalid_sender_stat,
01257 read_not_v2_icp_stat,
01258 icp_remote_query_requests_stat,
01259 icp_remote_responses_stat,
01260 icp_cache_lookup_success_stat,
01261 icp_cache_lookup_fail_stat,
01262 query_response_write_stat,
01263 query_response_partial_write_stat,
01264 no_icp_request_for_response_stat,
01265 icp_response_request_nolock_stat,
01266 icp_start_icpoff_stat,
01267 send_query_partial_write_stat,
01268 icp_queries_no_expected_replies_stat,
01269 icp_query_hits_stat,
01270 icp_query_misses_stat,
01271 invalid_icp_query_response_stat,
01272 icp_query_requests_stat,
01273 total_icp_response_time_stat,
01274 total_udp_send_queries_stat,
01275 total_icp_request_time_stat,
01276 icp_total_reloads,
01277 icp_pending_reloads,
01278 icp_reload_start_aborts,
01279 icp_reload_connect_aborts,
01280 icp_reload_read_aborts,
01281 icp_reload_write_aborts,
01282 icp_reload_successes,
01283 icp_stat_count
01284 };
01285
01286 #define ICP_EstablishStaticConfigInteger(_ix,_n) \
01287 REC_EstablishStaticConfigInt32(_ix,_n)
01288
01289 #define ICP_EstablishStaticConfigStringAlloc(_ix, n) \
01290 REC_EstablishStaticConfigStringAlloc(_ix, n)
01291
01292 #define ICP_INCREMENT_DYN_STAT(x) \
01293 RecIncrRawStat(icp_rsb, mutex->thread_holding, (int) x, 1)
01294 #define ICP_DECREMENT_DYN_STAT(x) \
01295 RecIncrRawStat(icp_rsb, mutex->thread_holding, (int) x, -1)
01296 #define ICP_SUM_DYN_STAT(x, y) \
01297 RecIncrRawStat(icp_rsb, mutex->thread_holding, (int) x, (y))
01298 #define ICP_READ_DYN_STAT(x, C, S) \
01299 RecGetRawStatCount(icp_rsb, (int) x, &C); \
01300 RecGetRawStatSum(icp_rsb, (int) x, &S);
01301
01302 #define ICP_ReadConfigString REC_ReadConfigString
01303 #define ICP_RegisterConfigUpdateFunc REC_RegisterConfigUpdateFunc
01304
01305
01306
01307
01308 #endif // _ICP_H_