Documentation
¶
Index ¶
- Constants
- Variables
- func CtxAddHealthCheckFlag(ctx context.Context) context.Context
- func CtxIsHealthCheckRequest(ctx context.Context) bool
- func MaxTotalDifficulty(a, b *big.Int) *big.Int
- func NewRedialBackoff() backoff.Backoff
- type ChainConfig
- type ChainInfo
- type Head
- type ID
- type ManagedSubscription
- type MultiNode
- func (c *MultiNode[CHAIN_ID, RPC]) ChainID() CHAIN_ID
- func (c *MultiNode[CHAIN_ID, RPC]) DoAll(ctx context.Context, do func(ctx context.Context, rpc RPC, isSendOnly bool)) error
- func (c *MultiNode[CHAIN_ID, RPC]) HighestUserObservations() ChainInfo
- func (c *MultiNode[CHAIN_ID, RPC]) LatestChainInfo() (int, ChainInfo)
- func (c *MultiNode[CHAIN_ID, RPC]) NodeStates() map[string]string
- func (c *MultiNode[CHAIN_ID, RPC]) SelectRPC(ctx context.Context) (rpc RPC, err error)
- type Node
- type NodeConfig
- type NodeSelector
- func NewHighestHeadNodeSelector[CHAIN_ID ID, RPC any](nodes []Node[CHAIN_ID, RPC]) NodeSelector[CHAIN_ID, RPC]
- func NewPriorityLevelNodeSelector[CHAIN_ID ID, RPC any](nodes []Node[CHAIN_ID, RPC]) NodeSelector[CHAIN_ID, RPC]
- func NewRoundRobinSelector[CHAIN_ID ID, RPC any](nodes []Node[CHAIN_ID, RPC]) NodeSelector[CHAIN_ID, RPC]
- func NewTotalDifficultyNodeSelector[CHAIN_ID ID, RPC any](nodes []Node[CHAIN_ID, RPC]) NodeSelector[CHAIN_ID, RPC]
- type NodeTier
- type Poller
- type PoolChainInfoProvider
- type RPCClient
- type RPCClientBase
- func (m *RPCClientBase[HEAD]) AcquireQueryCtx(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc, lifeCycleCh chan struct{})
- func (m *RPCClientBase[HEAD]) CancelLifeCycle()
- func (m *RPCClientBase[HEAD]) Close()
- func (m *RPCClientBase[HEAD]) GetInterceptedChainInfo() (latest, highestUserObservations ChainInfo)
- func (m *RPCClientBase[HEAD]) LatestBlock(ctx context.Context) (HEAD, error)
- func (m *RPCClientBase[HEAD]) LatestFinalizedBlock(ctx context.Context) (HEAD, error)
- func (m *RPCClientBase[HEAD]) OnNewFinalizedHead(ctx context.Context, requestCh <-chan struct{}, head HEAD)
- func (m *RPCClientBase[HEAD]) OnNewHead(ctx context.Context, requestCh <-chan struct{}, head HEAD)
- func (m *RPCClientBase[HEAD]) RegisterSub(sub Subscription, lifeCycleCh chan struct{}) (*ManagedSubscription, error)
- func (m *RPCClientBase[HEAD]) SubscribeToFinalizedHeads(ctx context.Context) (<-chan HEAD, Subscription, error)
- func (m *RPCClientBase[HEAD]) SubscribeToHeads(ctx context.Context) (<-chan HEAD, Subscription, error)
- func (m *RPCClientBase[HEAD]) UnsubscribeAllExcept(subs ...Subscription)
- type RPCClientBaseConfig
- type SendOnlyNode
- type SendTxRPCClient
- type SendTxReturnCode
- type StringID
- type Subscription
- type TransactionSender
- func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Close() error
- func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Name() string
- func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ctx context.Context, tx TX) (result RESULT, code SendTxReturnCode, err error)
- func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Start(ctx context.Context) error
Constants ¶
const ( Primary = NodeTier(iota) Secondary )
const ( NodeSelectionModeHighestHead = "HighestHead" NodeSelectionModeRoundRobin = "RoundRobin" NodeSelectionModeTotalDifficulty = "TotalDifficulty" NodeSelectionModePriorityLevel = "PriorityLevel" )
const QueryTimeout = 10 * time.Second
Variables ¶
var ErrNodeError = fmt.Errorf("no live nodes available")
Functions ¶
func CtxIsHealthCheckRequest ¶
func NewRedialBackoff ¶
NewRedialBackoff is a standard backoff to use for redialling or reconnecting to unreachable network endpoints
Types ¶
type ChainConfig ¶
type Head ¶
type Head interface { BlockNumber() int64 BlockDifficulty() *big.Int GetTotalDifficulty() *big.Int IsValid() bool }
Head is the interface required by the NodeClient
type ID ¶
ID represents the base type, for any chain's ID. It should be convertible to a string, that can uniquely identify this chain
func NewIDFromInt ¶
type ManagedSubscription ¶
type ManagedSubscription struct { Subscription // contains filtered or unexported fields }
ManagedSubscription is a Subscription which contains an onUnsubscribe callback for cleanup
func (*ManagedSubscription) Unsubscribe ¶
func (w *ManagedSubscription) Unsubscribe()
type MultiNode ¶
type MultiNode[ CHAIN_ID ID, RPC any, ] struct { services.Service // contains filtered or unexported fields }
MultiNode is a generalized multi node client interface that includes methods to interact with different chains. It also handles multiple node RPC connections simultaneously.
func NewMultiNode ¶
func NewMultiNode[ CHAIN_ID ID, RPC any, ]( lggr logger.Logger, metrics multiNodeMetrics, selectionMode string, leaseDuration time.Duration, primaryNodes []Node[CHAIN_ID, RPC], sendOnlyNodes []SendOnlyNode[CHAIN_ID, RPC], chainID CHAIN_ID, chainFamily string, deathDeclarationDelay time.Duration, ) *MultiNode[CHAIN_ID, RPC]
func (*MultiNode[CHAIN_ID, RPC]) HighestUserObservations ¶
HighestUserObservations - returns highest ChainInfo ever observed by any user of the MultiNode
func (*MultiNode[CHAIN_ID, RPC]) LatestChainInfo ¶
LatestChainInfo - returns number of live nodes available in the pool, so we can prevent the last alive node in a pool from being marked as out-of-sync. Return highest ChainInfo most recently received by the alive nodes. E.g. If Node A's the most recent block is 10 and highest 15 and for Node B it's - 12 and 14. This method will return 12.
func (*MultiNode[CHAIN_ID, RPC]) NodeStates ¶
func (*MultiNode[CHAIN_ID, RPC]) SelectRPC ¶
SelectRPC returns an RPC of an active node. If there are no active nodes it returns an error, but tolerates undialed nodes by waiting for initial dial. Call this method from your chain-specific client implementation to access any chain-specific rpc calls.
type Node ¶
type Node[ CHAIN_ID ID, RPC any, ] interface { // State returns most accurate state of the Node on the moment of call. // While some of the checks may be performed in the background and State may return cached value, critical, like // `FinalizedBlockOutOfSync`, must be executed upon every call. State() nodeState // StateAndLatest returns nodeState with the latest ChainInfo observed by Node during current lifecycle. StateAndLatest() (nodeState, ChainInfo) // HighestUserObservations - returns highest ChainInfo ever observed by underlying RPC excluding results of health check requests HighestUserObservations() ChainInfo SetPoolChainInfoProvider(PoolChainInfoProvider) // Name is a unique identifier for this node. Name() string // String - returns string representation of the node, useful for debugging (name + URLS used to connect to the RPC) String() string RPC() RPC // UnsubscribeAllExceptAliveLoop - closes all subscriptions except the aliveLoop subscription UnsubscribeAllExceptAliveLoop() ConfiguredChainID() CHAIN_ID // Order - returns priority order configured for the RPC Order() int32 // Start - starts health checks Start(context.Context) error Close() error }
func NewNode ¶
func NewNode[ CHAIN_ID ID, HEAD Head, RPC RPCClient[CHAIN_ID, HEAD], ]( nodeCfg NodeConfig, chainCfg ChainConfig, lggr logger.Logger, metrics nodeMetrics, wsuri *url.URL, httpuri *url.URL, name string, id int, chainID CHAIN_ID, nodeOrder int32, rpc RPC, chainFamily string, isLoadBalancedRPC bool, ) Node[CHAIN_ID, RPC]
type NodeConfig ¶
type NodeConfig interface { PollFailureThreshold() uint32 PollInterval() time.Duration SelectionMode() string SyncThreshold() uint32 NodeIsSyncingEnabled() bool FinalizedBlockPollInterval() time.Duration EnforceRepeatableRead() bool DeathDeclarationDelay() time.Duration NewHeadsPollInterval() time.Duration VerifyChainID() bool }
type NodeSelector ¶
type NodeSelector[ CHAIN_ID ID, RPC any, ] interface { // Select returns a Node, or nil if none can be selected. // Implementation must be thread-safe. Select() Node[CHAIN_ID, RPC] // Name returns the strategy name, e.g. "HighestHead" or "RoundRobin" Name() string }
func NewHighestHeadNodeSelector ¶
func NewHighestHeadNodeSelector[ CHAIN_ID ID, RPC any, ](nodes []Node[CHAIN_ID, RPC]) NodeSelector[CHAIN_ID, RPC]
func NewPriorityLevelNodeSelector ¶
func NewPriorityLevelNodeSelector[ CHAIN_ID ID, RPC any, ](nodes []Node[CHAIN_ID, RPC]) NodeSelector[CHAIN_ID, RPC]
func NewRoundRobinSelector ¶
func NewRoundRobinSelector[ CHAIN_ID ID, RPC any, ](nodes []Node[CHAIN_ID, RPC]) NodeSelector[CHAIN_ID, RPC]
func NewTotalDifficultyNodeSelector ¶
func NewTotalDifficultyNodeSelector[ CHAIN_ID ID, RPC any, ](nodes []Node[CHAIN_ID, RPC]) NodeSelector[CHAIN_ID, RPC]
type Poller ¶
Poller is a component that polls a function at a given interval and delivers the result to a channel. It is used by multinode to poll for new heads and implements the Subscription interface.
func NewPoller ¶
func NewPoller[ T any, ](pollingInterval time.Duration, pollingFunc func(ctx context.Context) (T, error), pollingTimeout time.Duration, lggr logger.Logger) (Poller[T], <-chan T)
NewPoller creates a new Poller instance and returns a channel to receive the polled data
func (*Poller[T]) Unsubscribe ¶
func (p *Poller[T]) Unsubscribe()
Unsubscribe cancels the sending of events to the data channel
type PoolChainInfoProvider ¶
type PoolChainInfoProvider interface { // LatestChainInfo - returns number of live nodes available in the pool, so we can prevent the last alive node in a pool from being // moved to out-of-sync state. It is better to have one out-of-sync node than no nodes at all. // Returns highest latest ChainInfo within the alive nodes. E.g. most recent block number and highest block number // observed by Node A are 10 and 15; Node B - 12 and 14. This method will return 12. LatestChainInfo() (int, ChainInfo) // HighestUserObservations - returns highest ChainInfo ever observed by any user of MultiNode. HighestUserObservations() ChainInfo }
PoolChainInfoProvider - provides aggregation of nodes pool ChainInfo
type RPCClient ¶
type RPCClient[ CHAIN_ID ID, HEAD Head, ] interface { // ChainID - fetches ChainID from the RPC to verify that it matches config ChainID(ctx context.Context) (CHAIN_ID, error) // Dial - prepares the RPC for usage. Can be called on fresh or closed RPC Dial(ctx context.Context) error // SubscribeToHeads - returns channel and subscription for new heads. SubscribeToHeads(ctx context.Context) (<-chan HEAD, Subscription, error) // SubscribeToFinalizedHeads - returns channel and subscription for finalized heads. SubscribeToFinalizedHeads(ctx context.Context) (<-chan HEAD, Subscription, error) // ClientVersion - returns error if RPC is not reachable ClientVersion(context.Context) (string, error) // IsSyncing - returns true if the RPC is in Syncing state and can not process calls IsSyncing(ctx context.Context) (bool, error) // UnsubscribeAllExcept - close all subscriptions except `subs` UnsubscribeAllExcept(subs ...Subscription) // Close - closes all subscriptions and aborts all RPC calls Close() // GetInterceptedChainInfo - returns latest and highest observed by application layer ChainInfo. // latestChainInfo is the most recent value received within a NodeClient's current lifecycle between Dial and DisconnectAll. // highestUserObservations is the highest ChainInfo observed excluding health checks calls. // Its values must not be reset. // The results of corresponding calls, to get the most recent head and the latest finalized head, must be // intercepted and reflected in ChainInfo before being returned to a caller. Otherwise, MultiNode is not able to // provide repeatable read guarantee. // DisconnectAll must reset latest ChainInfo to default value. // Ensure implementation does not have a race condition when values are reset before request completion and as // a result latest ChainInfo contains information from the previous cycle. GetInterceptedChainInfo() (latest, highestUserObservations ChainInfo) }
RPCClient includes all the necessary generalized RPC methods used by Node to perform health checks
type RPCClientBase ¶
type RPCClientBase[HEAD Head] struct { // contains filtered or unexported fields }
RPCClientBase is used to integrate multinode into chain-specific clients. For new MultiNode integrations, we wrap the RPC client and inherit from the RPCClientBase to get the required RPCClient methods and enable the use of MultiNode.
The RPCClientBase provides chain-agnostic functionality such as head and finalized head subscriptions, which are required in each Node lifecycle to execute various health checks.
func NewRPCClientBase ¶
func (*RPCClientBase[HEAD]) AcquireQueryCtx ¶
func (m *RPCClientBase[HEAD]) AcquireQueryCtx(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc, lifeCycleCh chan struct{})
func (*RPCClientBase[HEAD]) CancelLifeCycle ¶
func (m *RPCClientBase[HEAD]) CancelLifeCycle()
CancelLifeCycle closes and replaces the lifeCycleCh
func (*RPCClientBase[HEAD]) Close ¶
func (m *RPCClientBase[HEAD]) Close()
func (*RPCClientBase[HEAD]) GetInterceptedChainInfo ¶
func (m *RPCClientBase[HEAD]) GetInterceptedChainInfo() (latest, highestUserObservations ChainInfo)
func (*RPCClientBase[HEAD]) LatestBlock ¶
func (m *RPCClientBase[HEAD]) LatestBlock(ctx context.Context) (HEAD, error)
func (*RPCClientBase[HEAD]) LatestFinalizedBlock ¶
func (m *RPCClientBase[HEAD]) LatestFinalizedBlock(ctx context.Context) (HEAD, error)
func (*RPCClientBase[HEAD]) OnNewFinalizedHead ¶
func (m *RPCClientBase[HEAD]) OnNewFinalizedHead(ctx context.Context, requestCh <-chan struct{}, head HEAD)
func (*RPCClientBase[HEAD]) OnNewHead ¶
func (m *RPCClientBase[HEAD]) OnNewHead(ctx context.Context, requestCh <-chan struct{}, head HEAD)
func (*RPCClientBase[HEAD]) RegisterSub ¶
func (m *RPCClientBase[HEAD]) RegisterSub(sub Subscription, lifeCycleCh chan struct{}) (*ManagedSubscription, error)
RegisterSub adds the sub to the RPCClientBase list and returns a managed sub which is removed on unsubscribe
func (*RPCClientBase[HEAD]) SubscribeToFinalizedHeads ¶
func (m *RPCClientBase[HEAD]) SubscribeToFinalizedHeads(ctx context.Context) (<-chan HEAD, Subscription, error)
func (*RPCClientBase[HEAD]) SubscribeToHeads ¶
func (m *RPCClientBase[HEAD]) SubscribeToHeads(ctx context.Context) (<-chan HEAD, Subscription, error)
func (*RPCClientBase[HEAD]) UnsubscribeAllExcept ¶
func (m *RPCClientBase[HEAD]) UnsubscribeAllExcept(subs ...Subscription)
type RPCClientBaseConfig ¶
type SendOnlyNode ¶
type SendOnlyNode[ CHAIN_ID ID, RPC any, ] interface { // Start may attempt to connect to the node, but should only return error for misconfiguration - never for temporary errors. Start(context.Context) error Close() error ConfiguredChainID() CHAIN_ID RPC() RPC String() string // State returns nodeState State() nodeState // Name is a unique identifier for this node. Name() string }
SendOnlyNode represents one node used as a sendonly
func NewSendOnlyNode ¶
func NewSendOnlyNode[ CHAIN_ID ID, RPC sendOnlyClient[CHAIN_ID], ]( lggr logger.Logger, metrics nodeMetrics, httpuri url.URL, name string, chainID CHAIN_ID, rpc RPC, ) SendOnlyNode[CHAIN_ID, RPC]
NewSendOnlyNode returns a new sendonly node
type SendTxRPCClient ¶
type SendTxRPCClient[TX any, RESULT any] interface { // SendTransaction errors returned should include name or other unique identifier of the RPC SendTransaction(ctx context.Context, tx TX) (RESULT, SendTxReturnCode, error) }
SendTxRPCClient - defines interface of an RPC used by TransactionSender to broadcast transaction
type SendTxReturnCode ¶
type SendTxReturnCode int
const ( Successful SendTxReturnCode = iota + 1 Fatal // Unrecoverable error. Most likely the attempt should be thrown away. Retryable // The error returned by the RPC indicates that if we retry with the same attempt, the tx will eventually go through. Underpriced // Attempt was underpriced. New estimation is needed with bumped gas price. Unknown // Tx failed with an error response that is not recognized by the client. Unsupported // Attempt failed with an error response that is not supported by the client for the given chain. TransactionAlreadyKnown // The transaction that was sent has already been received by the RPC. InsufficientFunds // Tx was rejected due to insufficient funds. ExceedsMaxFee // Attempt's fee was higher than the node's limit and got rejected. FeeOutOfValidRange // This error is returned when we use a fee price suggested from an RPC, but the network rejects the attempt due to an invalid range(mostly used by L2 chains). Retry by requesting a new suggested fee price. TerminallyStuck // The error returned when a transaction is or could get terminally stuck in the mempool without any chance of inclusion. )
SendTxReturnCode is a generalized client error that dictates what should be the next action, depending on the RPC error response.
func (SendTxReturnCode) String ¶
func (c SendTxReturnCode) String() string
type Subscription ¶
type Subscription interface { // Unsubscribe cancels the sending of events to the data channel // and closes the error channel. Unsubscribe should be callable multiple // times without causing an error. Unsubscribe() // Err returns the subscription error channel. The error channel receives // a value if there is an issue with the subscription (e.g. the network connection // delivering the events has been closed). Only one value will ever be sent. // The error channel is closed by Unsubscribe. Err() <-chan error }
Subscription represents an event subscription where events are delivered on a data channel. This is a generic interface for Subscription to represent used by clients.
type TransactionSender ¶
type TransactionSender[TX any, RESULT any, CHAIN_ID ID, RPC SendTxRPCClient[TX, RESULT]] struct { services.StateMachine // contains filtered or unexported fields }
func NewTransactionSender ¶
func NewTransactionSender[TX any, RESULT any, CHAIN_ID ID, RPC SendTxRPCClient[TX, RESULT]]( lggr logger.Logger, chainID CHAIN_ID, chainFamily string, multiNode *MultiNode[CHAIN_ID, RPC], metrics transactionSenderMetrics, classifyErr func(err error) SendTxReturnCode, sendTxSoftTimeout time.Duration, ) *TransactionSender[TX, RESULT, CHAIN_ID, RPC]
func (*TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Close ¶
func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Close() error
func (*TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Name ¶
func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Name() string
func (*TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction ¶
func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ctx context.Context, tx TX) (result RESULT, code SendTxReturnCode, err error)
SendTransaction - broadcasts transaction to all the send-only and primary nodes in MultiNode. A returned nil or error does not guarantee that the transaction will or won't be included. Additional checks must be performed to determine the final state.
Send-only nodes' results are ignored as they tend to return false-positive responses. Broadcast to them is necessary to speed up the propagation of TX in the network.
Handling of primary nodes' results consists of collection and aggregation. In the collection step, we gather as many results as possible while minimizing waiting time. This operation succeeds on one of the following conditions: * Received at least one success * Received at least one result and `sendTxSoftTimeout` expired * Received results from the sufficient number of nodes defined by sendTxQuorum. The aggregation is based on the following conditions: * If there is at least one success - returns success * If there is at least one terminal error - returns terminal error * If there is both success and terminal error - returns success and reports invariant violation * Otherwise, returns any (effectively random) of the errors.
Source Files
¶
- ctx.go
- models.go
- multi_node.go
- node.go
- node_fsm.go
- node_lifecycle.go
- node_selector.go
- node_selector_highest_head.go
- node_selector_priority_level.go
- node_selector_round_robin.go
- node_selector_total_difficulty.go
- poller.go
- rpc_client_base.go
- send_only_node.go
- send_only_node_lifecycle.go
- transaction_sender.go
- types.go
- utils.go