processor

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// block to fallback during reorg incase there is no ancestor found
	DefaultHardFallbackBlocks = 1000
)

Variables

This section is empty.

Functions

func NewChainProgress

func NewChainProgress(startBlock uint64) *chainProgress

Types

type BlockHashCache

type BlockHashCache struct {
	// contains filtered or unexported fields
}

BlockHashCache is an LRU cache for storing block hashes. Used for reorg detection by comparing parent hashes.

func NewBlockHashCache

func NewBlockHashCache(capacity int) *BlockHashCache

func (*BlockHashCache) Clear

func (c *BlockHashCache) Clear()

Clear removes all entries from the cache.

func (*BlockHashCache) DropAfter

func (c *BlockHashCache) DropAfter(after uint64)

DropAfter removes all entries with blockNum > after. Used during reorg to invalidate orphaned block hashes.

func (*BlockHashCache) Get

func (c *BlockHashCache) Get(blockNum uint64) (string, bool)

Get retrieves a block hash. Returns the hash and true if found.

func (*BlockHashCache) Len

func (c *BlockHashCache) Len() int

Len returns the number of entries in the cache.

func (*BlockHashCache) Set

func (c *BlockHashCache) Set(blockNum uint64, hash string)

Set stores a block hash. If the block already exists, it updates the hash and moves it to the back (most recent). If capacity is exceeded, evicts the oldest.

type BlockRange

type BlockRange struct {
	From uint64
	To   uint64
}

type ChainInfo

type ChainInfo struct {
	// Chain identification
	// Convert to string incase of integer chain id
	ChainId string
	// Name of the chain
	Name string
	// RPC information of the chain.
	RPC rpc.RPC
}

type ChainStatus

type ChainStatus struct {
	ChainId   string `json:"chain_id"`
	Name      string `json:"name"`
	IsRunning bool   `json:"is_running"`
	IsLive    bool   `json:"is_live"`

	CursorBlock uint64 `json:"cursor_block"`
	CursorHash  string `json:"cursor_hash"`
	HeadBlock   uint64 `json:"head_block"`

	BlocksBehind uint64  `json:"blocks_behind"`
	ProgressPct  float64 `json:"progress_pct"`
	BlocksPerSec float64 `json:"blocks_per_sec"`
	EventsTotal  uint64  `json:"events_total"`
	EventsPerSec float64 `json:"events_per_sec"`
	ETA          string  `json:"eta"`

	LastProgressAt time.Time `json:"last_progress_at"`
	LastError      string    `json:"last_error"`
	LastErrorAt    time.Time `json:"last_error_at"`

	ConfirmationDepth  uint64 `json:"confirmation_depth"`
	RangeSize          int    `json:"range_size"`
	FetcherConcurrency int    `json:"fetcher_concurrency"`
	DecoderConcurrency int    `json:"decoder_concurrency"`
}

type FetchMode

type FetchMode string
const (
	FetchModeLogs     FetchMode = "logs"     // Use eth_getlogs for efficiency
	FetchModeReceipts FetchMode = "receipts" // Use eth_getBlockReceipts for reliability
)

type FetchResult

type FetchResult struct {
	Range      BlockRange
	Logs       []types.Log
	Timestamps map[uint64]uint64
}

type HealthReport

type HealthReport struct {
	Healthy   bool            `json:"healthy"`
	IsRunning bool            `json:"is_running"`
	Chains    map[string]bool `json:"chains"`
	Reasons   []string        `json:"reasons"`
}

type Options

type Options struct {
	// BatchSize controls how many decoded events are buffered and written to sinks at once.
	BatchSize int
	// RangeSize is the number of blocks requested per eth_getLogs window.
	// Larger ranges reduce round-trips but may exceed provider limits; tune per provider.
	RangeSize int

	// FetcherConcurrency spwawns number of goroutine for fetcher.
	// Set 1 for strictly serial fetching.
	FetcherConcurrency int
	// StartBlock is the inclusive block height to begin indexing from.
	// Use 0 to let the processor derive it (e.g., from a stored cursor).
	StartBlock uint64
	// Confimation is range of block to wait.
	// Confirmation is used to avoid most reorgs.
	// Eth PoS confirmation is around 5-15 for "safe"
	ConfirmationDepth uint64
	// EnableTimestamps allow you to get timestamps for each event.
	// Note that enabling this would cost additional call to the RPC.
	// Default: false
	EnableTimestamps bool

	// ReorgLookbackBlocks is the maximum number of blocks to walk back when detecting a reorg. Used to bound header lookups and the size of stored window hashes.
	// Default: 64 (good starting point)
	ReorgLookbackBlocks uint64
	// Topics is the event for indexer to listen and get the log
	Topics [][]string
	// Addresses is a list of whitelisted addresses to filter
	Addresses []string
	// FetchMode determines which RPC method to use for fetching logs
	// - "logs": Uses eth_getLogs (default, more efficient)
	// - "receipts": Uses eth_getBlockReceipts (more reliable, higher bandwidth)
	FetchMode FetchMode
	// UseLogsForHistoricalSync determine whether to use eth_getlogs during historical sync
	// Using eth_getlogs instead of eth_getBlockReceipts during historical sync can save up rpc cost
	// Default: true
	UseLogsForHistoricalSync bool
	// RetryConfig manage how to handle retry on retriable errors.
	// Use pointer since it nillable
	// There is default settings
	RetryConfig *rpc.RetryConfig
}

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

func NewProcessor

func NewProcessor(m metrics.Metrics, s sink.Sink) *Processor

func (*Processor) AddChain

func (p *Processor) AddChain(chain ChainInfo, opts *Options, router *decoder.DecoderRouter) error

func (*Processor) GetChain

func (p *Processor) GetChain(chainId string) ChainInfo

func (*Processor) Health

func (p *Processor) Health() HealthReport

func (*Processor) IsLive

func (p *Processor) IsLive(chainId string) (bool, error)

func (*Processor) Run

func (p *Processor) Run(ctx context.Context) error

func (*Processor) SetLogger

func (p *Processor) SetLogger(l *slog.Logger)

func (*Processor) Status

func (p *Processor) Status() ProcessorStatus

type ProcessorStatus

type ProcessorStatus struct {
	IsRunning   bool                   `json:"is_running"`
	Chains      map[string]ChainStatus `json:"chains"`
	StartTime   time.Time              `json:"start_time"`
	TotalEvents uint64                 `json:"total_events"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL