simple

package
v0.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2026 License: GPL-3.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultBufferMaxRows       = 100000
	DefaultBufferFlushInterval = time.Second
)

Default buffer configuration values.

View Source
const (
	// ProcessForwardsTaskType is the task type for forwards processing.
	ProcessForwardsTaskType = "transaction_simple_process_forwards"
	// ProcessBackwardsTaskType is the task type for backwards processing.
	ProcessBackwardsTaskType = "transaction_simple_process_backwards"
)
View Source
const ProcessorName = "transaction_simple"

ProcessorName is the name of the simple transaction processor.

Variables

View Source
var ErrTaskIDConflict = asynq.ErrTaskIDConflict

ErrTaskIDConflict is returned when a task with the same ID already exists.

Functions

func GenerateTaskID added in v0.1.5

func GenerateTaskID(network string, blockNumber uint64) string

GenerateTaskID creates a deterministic task ID for deduplication. Format: {processor}:{network}:{blockNum}:block. For block-based processors, we use "block" as the identifier since there's one task per block.

func NewProcessBackwardsTask

func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, string, error)

NewProcessBackwardsTask creates a new backwards process task. Returns the task, taskID for deduplication, and any error.

func NewProcessForwardsTask

func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, string, error)

NewProcessForwardsTask creates a new forwards process task. Returns the task, taskID for deduplication, and any error.

Types

type ClickHouseDateTime added in v0.1.5

type ClickHouseDateTime time.Time

ClickHouseDateTime is a time.Time wrapper that formats correctly for ClickHouse JSON.

func (ClickHouseDateTime) MarshalJSON added in v0.1.5

func (t ClickHouseDateTime) MarshalJSON() ([]byte, error)

MarshalJSON formats time for ClickHouse DateTime column.

type Columns added in v0.1.5

type Columns struct {
	UpdatedDateTime    proto.ColDateTime
	BlockNumber        proto.ColUInt64
	BlockHash          proto.ColStr
	ParentHash         proto.ColStr
	Position           proto.ColUInt32
	Hash               proto.ColStr
	From               proto.ColStr
	To                 *proto.ColNullable[string]
	Nonce              proto.ColUInt64
	GasPrice           proto.ColUInt128
	Gas                proto.ColUInt64
	GasTipCap          *proto.ColNullable[proto.UInt128]
	GasFeeCap          *proto.ColNullable[proto.UInt128]
	Value              proto.ColUInt128
	Type               proto.ColUInt8
	Size               proto.ColUInt32
	CallDataSize       proto.ColUInt32
	BlobGas            *proto.ColNullable[uint64]
	BlobGasFeeCap      *proto.ColNullable[proto.UInt128]
	BlobHashes         *proto.ColArr[string]
	Success            proto.ColBool
	NInputBytes        proto.ColUInt32
	NInputZeroBytes    proto.ColUInt32
	NInputNonzeroBytes proto.ColUInt32
	MetaNetworkName    proto.ColStr
}

Columns holds all columns for transaction batch insert using ch-go columnar protocol.

func NewColumns added in v0.1.5

func NewColumns() *Columns

NewColumns creates a new Columns instance with all nullable and array columns initialized.

func (*Columns) Append added in v0.1.5

func (c *Columns) Append(tx Transaction)

Append adds a Transaction row to all columns.

func (*Columns) Input added in v0.1.5

func (c *Columns) Input() proto.Input

Input returns the proto.Input for inserting data.

func (*Columns) Reset added in v0.1.5

func (c *Columns) Reset()

Reset clears all columns for reuse.

func (*Columns) Rows added in v0.1.5

func (c *Columns) Rows() int

Rows returns the number of rows in the columns.

type Config

type Config struct {
	clickhouse.Config `yaml:",inline"`
	Enabled           bool   `yaml:"enabled"`
	Table             string `yaml:"table"`

	// Row buffer settings for batched ClickHouse inserts
	BufferMaxRows       int           `yaml:"bufferMaxRows"`       // Max rows before flush. Default: 100000
	BufferFlushInterval time.Duration `yaml:"bufferFlushInterval"` // Max time before flush. Default: 1s

	// Block completion tracking
	MaxPendingBlockRange int `yaml:"maxPendingBlockRange"` // Max distance between oldest incomplete and current block. Default: 2
}

Config holds configuration for the simple transaction processor.

func (*Config) Validate

func (c *Config) Validate() error

Validate validates the configuration.

type Dependencies

type Dependencies struct {
	Log            logrus.FieldLogger
	Pool           *ethereum.Pool
	Network        *ethereum.Network
	State          *state.Manager
	AsynqClient    *asynq.Client
	AsynqInspector *asynq.Inspector
	RedisClient    *redis.Client
	RedisPrefix    string
}

Dependencies contains the dependencies needed for the processor.

type ProcessPayload

type ProcessPayload struct {
	BlockNumber    big.Int `json:"block_number"`
	NetworkName    string  `json:"network_name"`
	ProcessingMode string  `json:"processing_mode"`
}

ProcessPayload represents the payload for processing a block.

func (*ProcessPayload) MarshalBinary

func (p *ProcessPayload) MarshalBinary() ([]byte, error)

MarshalBinary implements encoding.BinaryMarshaler.

func (*ProcessPayload) UnmarshalBinary

func (p *ProcessPayload) UnmarshalBinary(data []byte) error

UnmarshalBinary implements encoding.BinaryUnmarshaler.

type Processor

type Processor struct {

	// Embedded limiter for shared blocking/completion logic
	*tracker.Limiter
	// contains filtered or unexported fields
}

Processor handles simple transaction processing.

func New

func New(deps *Dependencies, config *Config) (*Processor, error)

New creates a new simple transaction processor.

func (*Processor) EnqueueTask

func (p *Processor) EnqueueTask(ctx context.Context, task *asynq.Task, opts ...asynq.Option) error

EnqueueTask enqueues a task to the specified queue with infinite retries.

func (*Processor) GetCompletionTracker added in v0.1.5

func (p *Processor) GetCompletionTracker() *tracker.BlockCompletionTracker

GetCompletionTracker returns the block completion tracker.

func (*Processor) GetHandlers

func (p *Processor) GetHandlers() map[string]asynq.HandlerFunc

GetHandlers returns the task handlers for this processor.

func (*Processor) GetQueues

func (p *Processor) GetQueues() []tracker.QueueInfo

GetQueues returns the queues used by this processor.

func (*Processor) Name

func (p *Processor) Name() string

Name returns the processor name.

func (*Processor) ProcessBlock added in v0.1.5

func (p *Processor) ProcessBlock(ctx context.Context, block execution.Block) error

ProcessBlock processes a single block - fetches, marks enqueued, and enqueues tasks. This is used for both normal processing and gap filling of missing blocks.

func (*Processor) ProcessNextBlock

func (p *Processor) ProcessNextBlock(ctx context.Context) error

ProcessNextBlock processes the next available block(s). In zero-interval mode, this attempts to fetch and process multiple blocks up to the available capacity for improved throughput.

func (*Processor) ReprocessBlock added in v0.1.5

func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error

ReprocessBlock re-enqueues tasks for an orphaned block. Used when a block is in ClickHouse (complete=0) but has no Redis tracking. TaskID deduplication ensures no duplicate tasks are created.

func (*Processor) SetProcessingMode

func (p *Processor) SetProcessingMode(mode string)

SetProcessingMode sets the processing mode for the processor.

func (*Processor) Start

func (p *Processor) Start(ctx context.Context) error

Start starts the processor.

func (*Processor) Stop

func (p *Processor) Stop(ctx context.Context) error

Stop stops the processor.

type Transaction

type Transaction struct {
	UpdatedDateTime    ClickHouseDateTime `json:"updated_date_time"`
	BlockNumber        uint64             `json:"block_number"`
	BlockHash          string             `json:"block_hash"`
	ParentHash         string             `json:"parent_hash"`
	Position           uint32             `json:"position"`
	Hash               string             `json:"hash"`
	From               string             `json:"from"`
	To                 *string            `json:"to"`
	Nonce              uint64             `json:"nonce"`
	GasPrice           string             `json:"gas_price"`        // Effective gas price as UInt128 string
	Gas                uint64             `json:"gas"`              // Gas limit
	GasTipCap          *string            `json:"gas_tip_cap"`      // Nullable UInt128 string
	GasFeeCap          *string            `json:"gas_fee_cap"`      // Nullable UInt128 string
	Value              string             `json:"value"`            // UInt128 string
	Type               uint8              `json:"type"`             // Transaction type
	Size               uint32             `json:"size"`             // Transaction size in bytes
	CallDataSize       uint32             `json:"call_data_size"`   // Size of call data
	BlobGas            *uint64            `json:"blob_gas"`         // Nullable - for type 3 txs
	BlobGasFeeCap      *string            `json:"blob_gas_fee_cap"` // Nullable UInt128 string
	BlobHashes         []string           `json:"blob_hashes"`      // Array of versioned hashes
	Success            bool               `json:"success"`
	NInputBytes        uint32             `json:"n_input_bytes"`
	NInputZeroBytes    uint32             `json:"n_input_zero_bytes"`
	NInputNonzeroBytes uint32             `json:"n_input_nonzero_bytes"`
	MetaNetworkName    string             `json:"meta_network_name"`
}

Transaction represents a row in the execution_transaction table.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL