tracker

package
v0.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2026 License: GPL-3.0 Imports: 13 Imported by: 0

Documentation

Overview

Package tracker provides block processing coordination and tracking.

Processing Pipeline

The execution-processor uses a multi-stage pipeline for block processing:

┌─────────────────┐
│  Block Source   │  Execution node provides blocks
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│ ProcessNextBlock│  Manager calls processor to discover next block
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│  Task Creation  │  Processor creates tasks for each transaction
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│  Redis/Asynq    │  Tasks enqueued to distributed queue
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│  Worker Handler │  Asynq workers process tasks (may be distributed)
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│   ClickHouse    │  Data inserted using columnar protocol
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│Block Completion │  Pending tracker marks block complete when all tasks finish
└─────────────────┘

Processing Modes

  • FORWARDS_MODE: Process blocks from current head forward (real-time)
  • BACKWARDS_MODE: Process blocks from start point backward (backfill)

Backpressure Control

The pipeline uses MaxPendingBlockRange to control backpressure:

  • Limits concurrent incomplete blocks
  • Prevents memory exhaustion during slow ClickHouse inserts
  • Uses Redis to track pending tasks per block

Index

Constants

View Source
const (
	// DefaultBlockMetaTTL is the default TTL for block tracking keys.
	DefaultBlockMetaTTL = 30 * time.Minute

	// DefaultStaleThreshold is the default time after which a block is considered stale.
	DefaultStaleThreshold = 5 * time.Minute
)

Default configuration values for BlockCompletionTracker.

View Source
const (
	// DefaultMaxPendingBlockRange is the maximum distance between the oldest
	// incomplete block and the current block before blocking new block processing.
	DefaultMaxPendingBlockRange = 2

	// DefaultClickHouseTimeout is the default timeout for ClickHouse operations.
	DefaultClickHouseTimeout = 30 * time.Second

	// DefaultTraceTimeout is the default timeout for trace fetching operations.
	DefaultTraceTimeout = 30 * time.Second
)

Default configuration values for processors. These provide a single source of truth for default configuration.

View Source
const (
	BACKWARDS_MODE = "backwards"
	FORWARDS_MODE  = "forwards"
)

Variables

This section is empty.

Functions

func GenerateTaskID

func GenerateTaskID(processor, network string, blockNum uint64, identifier string) string

GenerateTaskID creates a deterministic task ID for deduplication. Format: {processor}:{network}:{blockNum}:{identifier} For transaction-based processors: identifier = txHash. For block-based processors: identifier = "block".

func IsBlockNotFoundError

func IsBlockNotFoundError(err error) bool

IsBlockNotFoundError checks if an error indicates a block was not found. Uses errors.Is for sentinel errors, with fallback to string matching for wrapped errors.

func PrefixedProcessBackwardsQueue

func PrefixedProcessBackwardsQueue(processorName, prefix string) string

PrefixedProcessBackwardsQueue returns the backwards process queue name with prefix.

func PrefixedProcessForwardsQueue

func PrefixedProcessForwardsQueue(processorName, prefix string) string

PrefixedProcessForwardsQueue returns the forwards process queue name with prefix.

func PrefixedProcessReprocessBackwardsQueue

func PrefixedProcessReprocessBackwardsQueue(processorName, prefix string) string

PrefixedProcessReprocessBackwardsQueue returns the reprocess backwards queue name with prefix.

func PrefixedProcessReprocessForwardsQueue

func PrefixedProcessReprocessForwardsQueue(processorName, prefix string) string

PrefixedProcessReprocessForwardsQueue returns the reprocess forwards queue name with prefix.

func ProcessBackwardsQueue

func ProcessBackwardsQueue(processorName string) string

ProcessBackwardsQueue returns the backwards process queue name for a processor.

func ProcessForwardsQueue

func ProcessForwardsQueue(processorName string) string

ProcessForwardsQueue returns the forwards process queue name for a processor.

func ProcessQueue

func ProcessQueue(processorName string) string

ProcessQueue returns the process queue name for a processor (deprecated - use mode-specific queues).

func ProcessReprocessBackwardsQueue

func ProcessReprocessBackwardsQueue(processorName string) string

ProcessReprocessBackwardsQueue returns the reprocess backwards queue name for a processor.

func ProcessReprocessForwardsQueue

func ProcessReprocessForwardsQueue(processorName string) string

ProcessReprocessForwardsQueue returns the reprocess forwards queue name for a processor.

Types

type BlockCompletionTracker

type BlockCompletionTracker struct {
	// contains filtered or unexported fields
}

BlockCompletionTracker tracks block completion using Redis SETs for task deduplication. This replaces the counter-based PendingTracker with a SET-based approach that: - Uses asynq.TaskID() for deterministic task deduplication - Tracks completed taskIDs in a Redis SET (idempotent SADD) - Stores expected count and enqueued_at metadata - Supports stale block detection for auto-retry.

func NewBlockCompletionTracker

func NewBlockCompletionTracker(
	redisClient *redis.Client,
	prefix string,
	log logrus.FieldLogger,
	stateProvider StateProvider,
	config BlockCompletionTrackerConfig,
) *BlockCompletionTracker

NewBlockCompletionTracker creates a new BlockCompletionTracker.

func (*BlockCompletionTracker) ClearBlock

func (t *BlockCompletionTracker) ClearBlock(
	ctx context.Context,
	blockNum uint64,
	network, processor, mode string,
) error

ClearBlock removes all tracking data for a block (used when retrying).

func (*BlockCompletionTracker) ClearStaleBlocks

func (t *BlockCompletionTracker) ClearStaleBlocks(
	ctx context.Context,
	network, processor, mode string,
) (int, error)

ClearStaleBlocks removes all stale blocks and their associated tracking data. Uses ZRANGEBYSCORE to identify stale blocks and a pipeline to efficiently delete all related keys. Returns the number of blocks cleared.

func (*BlockCompletionTracker) GetBlockStatus

func (t *BlockCompletionTracker) GetBlockStatus(
	ctx context.Context,
	blockNum uint64,
	network, processor, mode string,
) (completed int64, expected int64, enqueuedAt time.Time, err error)

GetBlockStatus returns the completion status of a block.

func (*BlockCompletionTracker) GetStaleBlocks

func (t *BlockCompletionTracker) GetStaleBlocks(
	ctx context.Context,
	network, processor, mode string,
) ([]uint64, error)

GetStaleBlocks returns blocks that have been processing longer than the stale threshold. Uses ZRANGEBYSCORE on the pending blocks sorted set for O(log N + M) complexity.

func (*BlockCompletionTracker) HasBlockTracking

func (t *BlockCompletionTracker) HasBlockTracking(
	ctx context.Context,
	blockNum uint64,
	network, processor, mode string,
) (bool, error)

HasBlockTracking checks if a block has Redis tracking data. Returns true if block_meta key exists (block is being tracked). Used to detect orphaned blocks that are in ClickHouse (complete=0) but have no Redis tracking.

func (*BlockCompletionTracker) MarkBlockComplete

func (t *BlockCompletionTracker) MarkBlockComplete(
	ctx context.Context,
	blockNum uint64,
	network, processor, mode string,
) error

MarkBlockComplete writes to ClickHouse and cleans up Redis.

func (*BlockCompletionTracker) RegisterBlock

func (t *BlockCompletionTracker) RegisterBlock(
	ctx context.Context,
	blockNum uint64,
	expectedCount int,
	network, processor, mode, queue string,
) error

RegisterBlock initializes tracking for a new block. Clears any existing completion data (safe for retries). This should be called AFTER MarkBlockEnqueued to ensure ClickHouse has the record.

func (*BlockCompletionTracker) TrackTaskCompletion

func (t *BlockCompletionTracker) TrackTaskCompletion(
	ctx context.Context,
	taskID string,
	blockNum uint64,
	network, processor, mode string,
) (bool, error)

TrackTaskCompletion records a task completion and checks if block is done. Returns true if all tasks are now complete. Uses a Lua script for atomic completion tracking in a single round trip.

type BlockCompletionTrackerConfig

type BlockCompletionTrackerConfig struct {
	// StaleThreshold is the time after which a block is considered stale.
	StaleThreshold time.Duration

	// AutoRetryStale enables automatic retry of stale blocks.
	AutoRetryStale bool
}

BlockCompletionTrackerConfig holds configuration for the BlockCompletionTracker.

type BlockProcessor

type BlockProcessor interface {
	Processor

	// ProcessNextBlock discovers and enqueues tasks for the next block.
	// Returns an error if no block is available or processing fails.
	ProcessNextBlock(ctx context.Context) error

	// GetQueues returns the Asynq queues used by this processor.
	// Queues have priorities to control processing order.
	GetQueues() []QueueInfo

	// GetHandlers returns task handlers for Asynq worker registration.
	// These handlers process the distributed tasks.
	GetHandlers() map[string]asynq.HandlerFunc

	// EnqueueTask adds a task to the distributed queue.
	// Uses infinite retries to ensure eventual processing.
	EnqueueTask(ctx context.Context, task *asynq.Task, opts ...asynq.Option) error

	// SetProcessingMode configures forwards or backwards processing.
	SetProcessingMode(mode string)

	// ReprocessBlock re-enqueues tasks for an orphaned block.
	// Used when a block is in ClickHouse (complete=0) but has no Redis tracking.
	// This can happen due to Redis TTL expiry, Redis restart, or crashes.
	ReprocessBlock(ctx context.Context, blockNum uint64) error

	// ProcessBlock processes a single block - fetches, marks enqueued, and enqueues tasks.
	// This is used for gap filling of missing blocks (blocks with no row in DB).
	ProcessBlock(ctx context.Context, block execution.Block) error

	// GetCompletionTracker returns the block completion tracker for checking tracking status.
	GetCompletionTracker() *BlockCompletionTracker
}

BlockProcessor extends Processor with block-level processing capabilities. It coordinates the full pipeline from block discovery through task completion.

Pipeline stages managed by BlockProcessor:

  1. Block Discovery: ProcessNextBlock identifies the next block to process
  2. Task Creation: Creates distributed tasks for each unit of work
  3. Queue Management: Manages Asynq queues for forwards/backwards processing
  4. Task Handling: Worker handlers process individual tasks
  5. Completion Tracking: Marks blocks complete when all tasks finish

type GapResult

type GapResult struct {
	Incomplete   []uint64      // Blocks with row but complete=0
	Missing      []uint64      // Blocks with no row at all
	ScanDuration time.Duration // Time taken to perform the scan
}

GapResult contains the results of a gap scan.

type GapStateProvider

type GapStateProvider interface {
	StateProvider
	GetIncompleteBlocksInRange(ctx context.Context, network, processor string, minBlock, maxBlock uint64, limit int) ([]uint64, error)
	GetMissingBlocksInRange(ctx context.Context, network, processor string, minBlock, maxBlock uint64, limit int) ([]uint64, error)
	GetMinMaxStoredBlocks(ctx context.Context, network, processor string) (*big.Int, *big.Int, error)
}

GapStateProvider extends StateProvider with gap detection capabilities.

type Limiter

type Limiter struct {
	// contains filtered or unexported fields
}

Limiter provides shared blocking and completion functionality for processors.

func NewLimiter

func NewLimiter(deps *LimiterDeps, config LimiterConfig) *Limiter

NewLimiter creates a new Limiter.

func (*Limiter) GetAvailableCapacity

func (l *Limiter) GetAvailableCapacity(ctx context.Context, nextBlock uint64, mode string) (int, error)

GetAvailableCapacity returns how many more blocks can be enqueued before hitting the maxPendingBlockRange limit. Returns 0 if at or over capacity.

func (*Limiter) GetGaps

func (l *Limiter) GetGaps(ctx context.Context, currentBlock uint64, lookbackRange uint64, limit int) (*GapResult, error)

GetGaps returns both incomplete and missing blocks outside the maxPendingBlockRange window. If lookbackRange is 0, scans from the oldest stored block. This performs a full-range scan for gap detection, excluding the recent window that is already handled by IsBlockedByIncompleteBlocks. Returns a GapResult containing:

  • Incomplete: blocks with a row in DB but complete=0
  • Missing: blocks with no row in DB at all

func (*Limiter) IsBlockedByIncompleteBlocks

func (l *Limiter) IsBlockedByIncompleteBlocks(
	ctx context.Context,
	nextBlock uint64,
	mode string,
) (bool, *uint64, error)

IsBlockedByIncompleteBlocks checks if processing should be blocked based on distance from the oldest/newest incomplete block (depending on processing mode). Returns: blocked status, blocking block number (if blocked), error. The blocking block number can be used to check if the block is orphaned (no Redis tracking).

func (*Limiter) ValidateBatchWithinLeash

func (l *Limiter) ValidateBatchWithinLeash(ctx context.Context, startBlock uint64, count int, mode string) error

ValidateBatchWithinLeash ensures a batch of blocks won't exceed the maxPendingBlockRange. Returns an error if the batch would violate the constraint.

type LimiterConfig

type LimiterConfig struct {
	MaxPendingBlockRange int
}

LimiterConfig holds configuration for the Limiter.

type LimiterDeps

type LimiterDeps struct {
	Log           logrus.FieldLogger
	StateProvider StateProvider
	Network       string
	Processor     string
}

LimiterDeps holds dependencies for the Limiter.

type Processor

type Processor interface {
	// Start initializes the processor and its dependencies (e.g., ClickHouse).
	Start(ctx context.Context) error

	// Stop gracefully shuts down the processor.
	Stop(ctx context.Context) error

	// Name returns the unique identifier for this processor.
	Name() string
}

Processor defines the base interface for all processors. Processors are responsible for transforming blockchain data into a format suitable for storage and analysis.

type QueueInfo

type QueueInfo struct {
	Name     string
	Priority int
}

QueueInfo contains information about a processor queue.

type StateProvider

type StateProvider interface {
	GetOldestIncompleteBlock(ctx context.Context, network, processor string, minBlockNumber uint64) (*uint64, error)
	GetNewestIncompleteBlock(ctx context.Context, network, processor string, maxBlockNumber uint64) (*uint64, error)
	MarkBlockComplete(ctx context.Context, blockNumber uint64, network, processor string) error
}

StateProvider defines the state manager methods needed by Limiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL