logpoller

package
v1.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2023 License: MIT Imports: 23 Imported by: 0

Documentation

Overview

Package logpoller is a service for querying EVM log data.

It can be thought of as a more performant and sophisticated version of eth_getLogs https://ethereum.org/en/developers/docs/apis/json-rpc/#eth_getlogs. Having a local table of relevant, continually canonical logs allows us to 2 main advantages: - Have hundreds of jobs/clients querying for logs without overloading the underlying RPC provider. - Do more sophisticated querying (filter by confirmations/time/log contents, efficiently join between the logs table and other tables on the node, etc.)

Guarantees provided by the poller: - Queries always return the logs from the _current_ canonical chain (same as eth_getLogs). In particular that means that querying unfinalized logs may change between queries but finalized logs remain stable. The threshold between unfinalized and finalized logs is the finalityDepth parameter, chosen such that with exceedingly high probability logs finalityDepth deep cannot be reorged. - After calling RegisterFilter with a particular event, it will never miss logs for that event despite node crashes and reorgs. The granularity of the filter is always at least one block (more when backfilling). - After calling Replay(fromBlock), all blocks including that one to the latest chain tip will be polled with the current filter. This can be used on first time job add to specify a start block from which you wish to capture existing logs.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrReplayAbortedByClient   = errors.New("replay aborted by client")
	ErrReplayAbortedOnShutdown = errors.New("replay aborted, log poller shutdown")
)

Functions

func EvmWord added in v1.5.0

func EvmWord(i uint64) common.Hash

func NewLogPoller

func NewLogPoller(orm *ORM, ec Client, lggr logger.Logger, pollPeriod time.Duration,
	finalityDepth int64, backfillBatchSize int64, rpcBatchSize int64, keepBlocksDepth int64) *logPoller

NewLogPoller creates a log poller. Note there is an assumption that blocks can be processed faster than they are produced for the given chain, or the poller will fall behind. Block processing involves the following calls in steady state (without reorgs): - eth_getBlockByNumber - headers only (transaction hashes, not full transaction objects), - eth_getLogs - get the logs for the block - 1 db read latest block - for checking reorgs - 1 db tx including block write and logs write to logs. How fast that can be done depends largely on network speed and DB, but even for the fastest support chain, polygon, which has 2s block times, we need RPCs roughly with <= 500ms latency

Types

type Client added in v1.10.0

type Client interface {
	HeadByNumber(ctx context.Context, n *big.Int) (*evmtypes.Head, error)
	HeadByHash(ctx context.Context, n common.Hash) (*evmtypes.Head, error)
	BatchCallContext(ctx context.Context, b []rpc.BatchElem) error
	FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error)
	ChainID() *big.Int
}

type Filter added in v1.10.0

type Filter struct {
	EventSigs []common.Hash
	Addresses []common.Address
}

type Log

type Log struct {
	EvmChainId  *utils.Big
	LogIndex    int64
	BlockHash   common.Hash
	BlockNumber int64
	Topics      pq.ByteaArray
	EventSig    common.Hash
	Address     common.Address
	TxHash      common.Hash
	Data        []byte
	CreatedAt   time.Time
}

Log represents an EVM log.

func (*Log) GetTopics added in v1.5.0

func (l *Log) GetTopics() []common.Hash

func (*Log) ToGethLog added in v1.10.0

func (l *Log) ToGethLog() types.Log

type LogPoller

type LogPoller interface {
	services.ServiceCtx
	Replay(ctx context.Context, fromBlock int64) error
	RegisterFilter(filter Filter) (int, error)
	UnregisterFilter(filterID int) error
	LatestBlock(qopts ...pg.QOpt) (int64, error)
	GetBlocksRange(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error)
	// General querying
	Logs(start, end int64, eventSig common.Hash, address common.Address, qopts ...pg.QOpt) ([]Log, error)
	LogsWithSigs(start, end int64, eventSigs []common.Hash, address common.Address, qopts ...pg.QOpt) ([]Log, error)
	LatestLogByEventSigWithConfs(eventSig common.Hash, address common.Address, confs int, qopts ...pg.QOpt) (*Log, error)
	LatestLogEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs int, qopts ...pg.QOpt) ([]Log, error)

	// Content based querying
	IndexedLogs(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error)
	IndexedLogsTopicGreaterThan(eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error)
	IndexedLogsTopicRange(eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, topicValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error)
	LogsDataWordRange(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin, wordValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error)
	LogsDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error)
}
var (
	ErrDisabled                 = errors.New("log poller disabled")
	LogPollerDisabled LogPoller = disabled{}
)

type LogPollerBlock

type LogPollerBlock struct {
	EvmChainId *utils.Big
	BlockHash  common.Hash
	// Note geth uses int64 internally https://github.com/ethereum/go-ethereum/blob/f66f1a16b3c480d3a43ac7e8a09ab3e362e96ae4/eth/filters/api.go#L340
	BlockNumber int64
	CreatedAt   time.Time
}

LogPollerBlock represents an unfinalized block used for reorg detection when polling.

type ORM

type ORM struct {
	// contains filtered or unexported fields
}

func NewORM

func NewORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QConfig) *ORM

NewORM creates an ORM scoped to chainID.

func (*ORM) DeleteBlocksAfter added in v1.10.0

func (o *ORM) DeleteBlocksAfter(start int64, qopts ...pg.QOpt) error

DeleteBlocksAfter delete all blocks after and including start.

func (*ORM) DeleteBlocksBefore added in v1.10.0

func (o *ORM) DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error

DeleteBlocksBefore delete all blocks before and including end.

func (*ORM) DeleteLogsAfter added in v1.10.0

func (o *ORM) DeleteLogsAfter(start int64, qopts ...pg.QOpt) error

func (*ORM) GetBlocksRange added in v1.12.0

func (o *ORM) GetBlocksRange(start uint64, end uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error)

func (*ORM) InsertBlock

func (o *ORM) InsertBlock(h common.Hash, n int64, qopts ...pg.QOpt) error

InsertBlock is idempotent to support replays.

func (*ORM) InsertLogs

func (o *ORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error

InsertLogs is idempotent to support replays.

func (*ORM) SelectBlockByHash

func (o *ORM) SelectBlockByHash(h common.Hash, qopts ...pg.QOpt) (*LogPollerBlock, error)

func (*ORM) SelectBlockByNumber

func (o *ORM) SelectBlockByNumber(n int64, qopts ...pg.QOpt) (*LogPollerBlock, error)

func (*ORM) SelectDataWordGreaterThan added in v1.5.0

func (o *ORM) SelectDataWordGreaterThan(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error)

func (*ORM) SelectDataWordRange added in v1.5.0

func (o *ORM) SelectDataWordRange(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin, wordValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error)

func (*ORM) SelectIndexLogsTopicGreaterThan added in v1.5.0

func (o *ORM) SelectIndexLogsTopicGreaterThan(address common.Address, eventSig common.Hash, topicIndex int, topicValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error)

func (*ORM) SelectIndexLogsTopicRange added in v1.5.0

func (o *ORM) SelectIndexLogsTopicRange(address common.Address, eventSig common.Hash, topicIndex int, topicValueMin, topicValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error)

func (*ORM) SelectIndexedLogs added in v1.5.0

func (o *ORM) SelectIndexedLogs(address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error)

func (*ORM) SelectLatestBlock

func (o *ORM) SelectLatestBlock(qopts ...pg.QOpt) (*LogPollerBlock, error)

func (*ORM) SelectLatestLogEventSigWithConfs

func (o *ORM) SelectLatestLogEventSigWithConfs(eventSig common.Hash, address common.Address, confs int, qopts ...pg.QOpt) (*Log, error)

func (*ORM) SelectLatestLogEventSigsAddrsWithConfs added in v1.9.0

func (o *ORM) SelectLatestLogEventSigsAddrsWithConfs(fromBlock int64, addresses []common.Address, eventSigs []common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error)

SelectLatestLogEventSigsAddrsWithConfs finds the latest log by (address, event) combination that matches a list of Addresses and list of events

func (*ORM) SelectLogsByBlockRangeFilter

func (o *ORM) SelectLogsByBlockRangeFilter(start, end int64, address common.Address, eventSig common.Hash, qopts ...pg.QOpt) ([]Log, error)

SelectLogsByBlockRangeFilter finds the logs in a given block range.

func (*ORM) SelectLogsWithSigsByBlockRangeFilter added in v1.7.0

func (o *ORM) SelectLogsWithSigsByBlockRangeFilter(start, end int64, address common.Address, eventSigs []common.Hash, qopts ...pg.QOpt) (logs []Log, err error)

SelectLogsWithSigsByBlockRangeFilter finds the logs in the given block range with the given event signatures emitted from the given address.

type ReplayRequest added in v1.9.0

type ReplayRequest struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL