multinode

package module
v0.0.0-...-508e798 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 29, 2025 License: MIT Imports: 18 Imported by: 7

README

MultiNode

Enables the use of multiple RPCs in chain integrations. Performs critical health checks, RPC selection, node metrics, and is used to send transactions to all RPCs and aggregate results. MultiNode is used by all other components which require reading from or writing to the chain.

Components

RPCClient

Interface for wrapping an RPC of any chain type. Required for integrating a new chain with MultiNode.

Node

Wrapper of an RPCClient with state and lifecycles to handle health of an individual RPC.

MultiNode

Manages all nodes performing node selection and load balancing, health checks and metrics, and running actions across all nodes.

Poller

Used to poll for new heads and finalized heads within subscriptions.

Transaction Sender

Used to send transactions to all healthy RPCs and aggregate the results.

Documentation

Index

Constants

View Source
const (
	Primary = NodeTier(iota)
	Secondary
)
View Source
const (
	NodeSelectionModeHighestHead     = "HighestHead"
	NodeSelectionModeRoundRobin      = "RoundRobin"
	NodeSelectionModeTotalDifficulty = "TotalDifficulty"
	NodeSelectionModePriorityLevel   = "PriorityLevel"
)
View Source
const QueryTimeout = 10 * time.Second

Variables

View Source
var ErrNodeError = fmt.Errorf("no live nodes available")

Functions

func CtxAddHealthCheckFlag

func CtxAddHealthCheckFlag(ctx context.Context) context.Context

func CtxIsHealthCheckRequest

func CtxIsHealthCheckRequest(ctx context.Context) bool

func MaxTotalDifficulty

func MaxTotalDifficulty(a, b *big.Int) *big.Int

func NewRedialBackoff

func NewRedialBackoff() backoff.Backoff

NewRedialBackoff is a standard backoff to use for redialling or reconnecting to unreachable network endpoints

Types

type ChainConfig

type ChainConfig interface {
	NodeNoNewHeadsThreshold() time.Duration
	NoNewFinalizedHeadsThreshold() time.Duration
	FinalityDepth() uint32
	FinalityTagEnabled() bool
	FinalizedBlockOffset() uint32
}

type ChainInfo

type ChainInfo struct {
	BlockNumber          int64
	FinalizedBlockNumber int64
	TotalDifficulty      *big.Int
}

ChainInfo - defines RPC's or MultiNode's view on the chain

type Head interface {
	BlockNumber() int64
	BlockDifficulty() *big.Int
	GetTotalDifficulty() *big.Int
	IsValid() bool
}

Head is the interface required by the NodeClient

type ID

type ID fmt.Stringer

ID represents the base type, for any chain's ID. It should be convertible to a string, that can uniquely identify this chain

func NewIDFromInt

func NewIDFromInt(id int64) ID

func RandomID

func RandomID() ID

type ManagedSubscription

type ManagedSubscription struct {
	Subscription
	// contains filtered or unexported fields
}

ManagedSubscription is a Subscription which contains an onUnsubscribe callback for cleanup

func (*ManagedSubscription) Unsubscribe

func (w *ManagedSubscription) Unsubscribe()

type MultiNode

type MultiNode[
	CHAIN_ID ID,
	RPC any,
] struct {
	services.Service
	// contains filtered or unexported fields
}

MultiNode is a generalized multi node client interface that includes methods to interact with different chains. It also handles multiple node RPC connections simultaneously.

func NewMultiNode

func NewMultiNode[
	CHAIN_ID ID,
	RPC any,
](
	lggr logger.Logger,
	metrics multiNodeMetrics,
	selectionMode string,
	leaseDuration time.Duration,
	primaryNodes []Node[CHAIN_ID, RPC],
	sendOnlyNodes []SendOnlyNode[CHAIN_ID, RPC],
	chainID CHAIN_ID,
	chainFamily string,
	deathDeclarationDelay time.Duration,
) *MultiNode[CHAIN_ID, RPC]

func (*MultiNode[CHAIN_ID, RPC]) ChainID

func (c *MultiNode[CHAIN_ID, RPC]) ChainID() CHAIN_ID

func (*MultiNode[CHAIN_ID, RPC]) DoAll

func (c *MultiNode[CHAIN_ID, RPC]) DoAll(ctx context.Context, do func(ctx context.Context, rpc RPC, isSendOnly bool)) error

func (*MultiNode[CHAIN_ID, RPC]) HighestUserObservations

func (c *MultiNode[CHAIN_ID, RPC]) HighestUserObservations() ChainInfo

HighestUserObservations - returns highest ChainInfo ever observed by any user of the MultiNode

func (*MultiNode[CHAIN_ID, RPC]) LatestChainInfo

func (c *MultiNode[CHAIN_ID, RPC]) LatestChainInfo() (int, ChainInfo)

LatestChainInfo - returns number of live nodes available in the pool, so we can prevent the last alive node in a pool from being marked as out-of-sync. Return highest ChainInfo most recently received by the alive nodes. E.g. If Node A's the most recent block is 10 and highest 15 and for Node B it's - 12 and 14. This method will return 12.

func (*MultiNode[CHAIN_ID, RPC]) NodeStates

func (c *MultiNode[CHAIN_ID, RPC]) NodeStates() map[string]string

func (*MultiNode[CHAIN_ID, RPC]) SelectRPC

func (c *MultiNode[CHAIN_ID, RPC]) SelectRPC(ctx context.Context) (rpc RPC, err error)

SelectRPC returns an RPC of an active node. If there are no active nodes it returns an error, but tolerates undialed nodes by waiting for initial dial. Call this method from your chain-specific client implementation to access any chain-specific rpc calls.

type Node

type Node[
	CHAIN_ID ID,
	RPC any,
] interface {
	// State returns most accurate state of the Node on the moment of call.
	// While some of the checks may be performed in the background and State may return cached value, critical, like
	// `FinalizedBlockOutOfSync`, must be executed upon every call.
	State() nodeState
	// StateAndLatest returns nodeState with the latest ChainInfo observed by Node during current lifecycle.
	StateAndLatest() (nodeState, ChainInfo)
	// HighestUserObservations - returns highest ChainInfo ever observed by underlying RPC excluding results of health check requests
	HighestUserObservations() ChainInfo
	SetPoolChainInfoProvider(PoolChainInfoProvider)
	// Name is a unique identifier for this node.
	Name() string
	// String - returns string representation of the node, useful for debugging (name + URLS used to connect to the RPC)
	String() string
	RPC() RPC
	// UnsubscribeAllExceptAliveLoop - closes all subscriptions except the aliveLoop subscription
	UnsubscribeAllExceptAliveLoop()
	ConfiguredChainID() CHAIN_ID
	// Order - returns priority order configured for the RPC
	Order() int32
	// Start - starts health checks
	Start(context.Context) error
	Close() error
}

func NewNode

func NewNode[
	CHAIN_ID ID,
	HEAD Head,
	RPC RPCClient[CHAIN_ID, HEAD],
](
	nodeCfg NodeConfig,
	chainCfg ChainConfig,
	lggr logger.Logger,
	metrics nodeMetrics,
	wsuri *url.URL,
	httpuri *url.URL,
	name string,
	id int,
	chainID CHAIN_ID,
	nodeOrder int32,
	rpc RPC,
	chainFamily string,
	isLoadBalancedRPC bool,
) Node[CHAIN_ID, RPC]

type NodeConfig

type NodeConfig interface {
	PollFailureThreshold() uint32
	PollInterval() time.Duration
	SelectionMode() string
	SyncThreshold() uint32
	NodeIsSyncingEnabled() bool
	FinalizedBlockPollInterval() time.Duration
	EnforceRepeatableRead() bool
	DeathDeclarationDelay() time.Duration
	NewHeadsPollInterval() time.Duration
	VerifyChainID() bool
}

type NodeSelector

type NodeSelector[
	CHAIN_ID ID,
	RPC any,
] interface {
	// Select returns a Node, or nil if none can be selected.
	// Implementation must be thread-safe.
	Select() Node[CHAIN_ID, RPC]
	// Name returns the strategy name, e.g. "HighestHead" or "RoundRobin"
	Name() string
}

func NewHighestHeadNodeSelector

func NewHighestHeadNodeSelector[
	CHAIN_ID ID,
	RPC any,
](nodes []Node[CHAIN_ID, RPC]) NodeSelector[CHAIN_ID, RPC]

func NewPriorityLevelNodeSelector

func NewPriorityLevelNodeSelector[
	CHAIN_ID ID,
	RPC any,
](nodes []Node[CHAIN_ID, RPC]) NodeSelector[CHAIN_ID, RPC]

func NewRoundRobinSelector

func NewRoundRobinSelector[
	CHAIN_ID ID,
	RPC any,
](nodes []Node[CHAIN_ID, RPC]) NodeSelector[CHAIN_ID, RPC]

func NewTotalDifficultyNodeSelector

func NewTotalDifficultyNodeSelector[
	CHAIN_ID ID,
	RPC any,
](nodes []Node[CHAIN_ID, RPC]) NodeSelector[CHAIN_ID, RPC]

type NodeTier

type NodeTier int

func (NodeTier) String

func (n NodeTier) String() string

type Poller

type Poller[T any] struct {
	services.Service
	// contains filtered or unexported fields
}

Poller is a component that polls a function at a given interval and delivers the result to a channel. It is used by multinode to poll for new heads and implements the Subscription interface.

func NewPoller

func NewPoller[
	T any,
](pollingInterval time.Duration, pollingFunc func(ctx context.Context) (T, error), pollingTimeout time.Duration, lggr logger.Logger) (Poller[T], <-chan T)

NewPoller creates a new Poller instance and returns a channel to receive the polled data

func (*Poller[T]) Err

func (p *Poller[T]) Err() <-chan error

func (*Poller[T]) Unsubscribe

func (p *Poller[T]) Unsubscribe()

Unsubscribe cancels the sending of events to the data channel

type PoolChainInfoProvider

type PoolChainInfoProvider interface {
	// LatestChainInfo - returns number of live nodes available in the pool, so we can prevent the last alive node in a pool from being
	// moved to out-of-sync state. It is better to have one out-of-sync node than no nodes at all.
	// Returns highest latest ChainInfo within the alive nodes. E.g. most recent block number and highest block number
	// observed by Node A are 10 and 15; Node B - 12 and 14. This method will return 12.
	LatestChainInfo() (int, ChainInfo)
	// HighestUserObservations - returns highest ChainInfo ever observed by any user of MultiNode.
	HighestUserObservations() ChainInfo
}

PoolChainInfoProvider - provides aggregation of nodes pool ChainInfo

type RPCClient

type RPCClient[
	CHAIN_ID ID,
	HEAD Head,
] interface {
	// ChainID - fetches ChainID from the RPC to verify that it matches config
	ChainID(ctx context.Context) (CHAIN_ID, error)
	// Dial - prepares the RPC for usage. Can be called on fresh or closed RPC
	Dial(ctx context.Context) error
	// SubscribeToHeads - returns channel and subscription for new heads.
	SubscribeToHeads(ctx context.Context) (<-chan HEAD, Subscription, error)
	// SubscribeToFinalizedHeads - returns channel and subscription for finalized heads.
	SubscribeToFinalizedHeads(ctx context.Context) (<-chan HEAD, Subscription, error)
	// ClientVersion - returns error if RPC is not reachable
	ClientVersion(context.Context) (string, error)
	// IsSyncing - returns true if the RPC is in Syncing state and can not process calls
	IsSyncing(ctx context.Context) (bool, error)
	// UnsubscribeAllExcept - close all subscriptions except `subs`
	UnsubscribeAllExcept(subs ...Subscription)
	// Close - closes all subscriptions and aborts all RPC calls
	Close()
	// GetInterceptedChainInfo - returns latest and highest observed by application layer ChainInfo.
	// latestChainInfo is the most recent value received within a NodeClient's current lifecycle between Dial and DisconnectAll.
	// highestUserObservations is the highest ChainInfo observed excluding health checks calls.
	// Its values must not be reset.
	// The results of corresponding calls, to get the most recent head and the latest finalized head, must be
	// intercepted and reflected in ChainInfo before being returned to a caller. Otherwise, MultiNode is not able to
	// provide repeatable read guarantee.
	// DisconnectAll must reset latest ChainInfo to default value.
	// Ensure implementation does not have a race condition when values are reset before request completion and as
	// a result latest ChainInfo contains information from the previous cycle.
	GetInterceptedChainInfo() (latest, highestUserObservations ChainInfo)
}

RPCClient includes all the necessary generalized RPC methods used by Node to perform health checks

type RPCClientBase

type RPCClientBase[HEAD Head] struct {
	// contains filtered or unexported fields
}

RPCClientBase is used to integrate multinode into chain-specific clients. For new MultiNode integrations, we wrap the RPC client and inherit from the RPCClientBase to get the required RPCClient methods and enable the use of MultiNode.

The RPCClientBase provides chain-agnostic functionality such as head and finalized head subscriptions, which are required in each Node lifecycle to execute various health checks.

func NewRPCClientBase

func NewRPCClientBase[HEAD Head](
	cfg RPCClientBaseConfig, ctxTimeout time.Duration, log logger.Logger,
	latestBlock func(ctx context.Context) (HEAD, error),
	latestFinalizedBlock func(ctx context.Context) (HEAD, error),
) *RPCClientBase[HEAD]

func (*RPCClientBase[HEAD]) AcquireQueryCtx

func (m *RPCClientBase[HEAD]) AcquireQueryCtx(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc,
	lifeCycleCh chan struct{})

func (*RPCClientBase[HEAD]) CancelLifeCycle

func (m *RPCClientBase[HEAD]) CancelLifeCycle()

CancelLifeCycle closes and replaces the lifeCycleCh

func (*RPCClientBase[HEAD]) Close

func (m *RPCClientBase[HEAD]) Close()

func (*RPCClientBase[HEAD]) GetInterceptedChainInfo

func (m *RPCClientBase[HEAD]) GetInterceptedChainInfo() (latest, highestUserObservations ChainInfo)

func (*RPCClientBase[HEAD]) LatestBlock

func (m *RPCClientBase[HEAD]) LatestBlock(ctx context.Context) (HEAD, error)

func (*RPCClientBase[HEAD]) LatestFinalizedBlock

func (m *RPCClientBase[HEAD]) LatestFinalizedBlock(ctx context.Context) (HEAD, error)

func (*RPCClientBase[HEAD]) OnNewFinalizedHead

func (m *RPCClientBase[HEAD]) OnNewFinalizedHead(ctx context.Context, requestCh <-chan struct{}, head HEAD)

func (*RPCClientBase[HEAD]) OnNewHead

func (m *RPCClientBase[HEAD]) OnNewHead(ctx context.Context, requestCh <-chan struct{}, head HEAD)

func (*RPCClientBase[HEAD]) RegisterSub

func (m *RPCClientBase[HEAD]) RegisterSub(sub Subscription, lifeCycleCh chan struct{}) (*ManagedSubscription, error)

RegisterSub adds the sub to the RPCClientBase list and returns a managed sub which is removed on unsubscribe

func (*RPCClientBase[HEAD]) SubscribeToFinalizedHeads

func (m *RPCClientBase[HEAD]) SubscribeToFinalizedHeads(ctx context.Context) (<-chan HEAD, Subscription, error)

func (*RPCClientBase[HEAD]) SubscribeToHeads

func (m *RPCClientBase[HEAD]) SubscribeToHeads(ctx context.Context) (<-chan HEAD, Subscription, error)

func (*RPCClientBase[HEAD]) UnsubscribeAllExcept

func (m *RPCClientBase[HEAD]) UnsubscribeAllExcept(subs ...Subscription)

type RPCClientBaseConfig

type RPCClientBaseConfig interface {
	NewHeadsPollInterval() time.Duration
	FinalizedBlockPollInterval() time.Duration
}

type SendOnlyNode

type SendOnlyNode[
	CHAIN_ID ID,
	RPC any,
] interface {
	// Start may attempt to connect to the node, but should only return error for misconfiguration - never for temporary errors.
	Start(context.Context) error
	Close() error

	ConfiguredChainID() CHAIN_ID
	RPC() RPC

	String() string
	// State returns nodeState
	State() nodeState
	// Name is a unique identifier for this node.
	Name() string
}

SendOnlyNode represents one node used as a sendonly

func NewSendOnlyNode

func NewSendOnlyNode[
	CHAIN_ID ID,
	RPC sendOnlyClient[CHAIN_ID],
](
	lggr logger.Logger,
	metrics nodeMetrics,
	httpuri url.URL,
	name string,
	chainID CHAIN_ID,
	rpc RPC,
) SendOnlyNode[CHAIN_ID, RPC]

NewSendOnlyNode returns a new sendonly node

type SendTxRPCClient

type SendTxRPCClient[TX any, RESULT any] interface {
	// SendTransaction errors returned should include name or other unique identifier of the RPC
	SendTransaction(ctx context.Context, tx TX) (RESULT, SendTxReturnCode, error)
}

SendTxRPCClient - defines interface of an RPC used by TransactionSender to broadcast transaction

type SendTxReturnCode

type SendTxReturnCode int
const (
	Successful              SendTxReturnCode = iota + 1
	Fatal                                    // Unrecoverable error. Most likely the attempt should be thrown away.
	Retryable                                // The error returned by the RPC indicates that if we retry with the same attempt, the tx will eventually go through.
	Underpriced                              // Attempt was underpriced. New estimation is needed with bumped gas price.
	Unknown                                  // Tx failed with an error response that is not recognized by the client.
	Unsupported                              // Attempt failed with an error response that is not supported by the client for the given chain.
	TransactionAlreadyKnown                  // The transaction that was sent has already been received by the RPC.
	InsufficientFunds                        // Tx was rejected due to insufficient funds.
	ExceedsMaxFee                            // Attempt's fee was higher than the node's limit and got rejected.
	FeeOutOfValidRange                       // This error is returned when we use a fee price suggested from an RPC, but the network rejects the attempt due to an invalid range(mostly used by L2 chains). Retry by requesting a new suggested fee price.
	TerminallyStuck                          // The error returned when a transaction is or could get terminally stuck in the mempool without any chance of inclusion.

)

SendTxReturnCode is a generalized client error that dictates what should be the next action, depending on the RPC error response.

func (SendTxReturnCode) String

func (c SendTxReturnCode) String() string

type StringID

type StringID string

StringID enables using string directly as a ChainID

func (StringID) String

func (s StringID) String() string

type Subscription

type Subscription interface {
	// Unsubscribe cancels the sending of events to the data channel
	// and closes the error channel. Unsubscribe should be callable multiple
	// times without causing an error.
	Unsubscribe()
	// Err returns the subscription error channel. The error channel receives
	// a value if there is an issue with the subscription (e.g. the network connection
	// delivering the events has been closed). Only one value will ever be sent.
	// The error channel is closed by Unsubscribe.
	Err() <-chan error
}

Subscription represents an event subscription where events are delivered on a data channel. This is a generic interface for Subscription to represent used by clients.

type TransactionSender

type TransactionSender[TX any, RESULT any, CHAIN_ID ID, RPC SendTxRPCClient[TX, RESULT]] struct {
	services.StateMachine
	// contains filtered or unexported fields
}

func NewTransactionSender

func NewTransactionSender[TX any, RESULT any, CHAIN_ID ID, RPC SendTxRPCClient[TX, RESULT]](
	lggr logger.Logger,
	chainID CHAIN_ID,
	chainFamily string,
	multiNode *MultiNode[CHAIN_ID, RPC],
	metrics transactionSenderMetrics,
	classifyErr func(err error) SendTxReturnCode,
	sendTxSoftTimeout time.Duration,
) *TransactionSender[TX, RESULT, CHAIN_ID, RPC]

func (*TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Close

func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Close() error

func (*TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Name

func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Name() string

func (*TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction

func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ctx context.Context, tx TX) (result RESULT, code SendTxReturnCode, err error)

SendTransaction - broadcasts transaction to all the send-only and primary nodes in MultiNode. A returned nil or error does not guarantee that the transaction will or won't be included. Additional checks must be performed to determine the final state.

Send-only nodes' results are ignored as they tend to return false-positive responses. Broadcast to them is necessary to speed up the propagation of TX in the network.

Handling of primary nodes' results consists of collection and aggregation. In the collection step, we gather as many results as possible while minimizing waiting time. This operation succeeds on one of the following conditions: * Received at least one success * Received at least one result and `sendTxSoftTimeout` expired * Received results from the sufficient number of nodes defined by sendTxQuorum. The aggregation is based on the following conditions: * If there is at least one success - returns success * If there is at least one terminal error - returns terminal error * If there is both success and terminal error - returns success and reports invariant violation * Otherwise, returns any (effectively random) of the errors.

func (*TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Start

func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Start(ctx context.Context) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL