structlog

package
v0.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 2, 2025 License: GPL-3.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ProcessorName            = "transaction_structlog"
	ProcessForwardsTaskType  = "transaction_structlog_process_forwards"
	ProcessBackwardsTaskType = "transaction_structlog_process_backwards"
	VerifyForwardsTaskType   = "transaction_structlog_verify_forwards"
	VerifyBackwardsTaskType  = "transaction_structlog_verify_backwards"
)

Variables

View Source
var ErrChannelFull = fmt.Errorf("batch collector channel is full")

Custom error for when channel is full

Functions

func NewProcessBackwardsTask

func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, error)

NewProcessBackwardsTask creates a new backwards process task

func NewProcessForwardsTask

func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, error)

NewProcessForwardsTask creates a new forwards process task

func NewVerifyBackwardsTask

func NewVerifyBackwardsTask(payload *VerifyPayload) (*asynq.Task, error)

NewVerifyBackwardsTask creates a new backwards verify task

func NewVerifyForwardsTask

func NewVerifyForwardsTask(payload *VerifyPayload) (*asynq.Task, error)

NewVerifyForwardsTask creates a new forwards verify task

Types

type BatchCollector added in v0.0.7

type BatchCollector struct {
	// contains filtered or unexported fields
}

BatchCollector aggregates structlog rows from multiple tasks and flushes them in batches

func NewBatchCollector added in v0.0.7

func NewBatchCollector(processor *Processor, config BatchConfig) *BatchCollector

NewBatchCollector creates a new batch collector

func (*BatchCollector) Start added in v0.0.7

func (bc *BatchCollector) Start(ctx context.Context) error

Start begins the batch collection process

func (*BatchCollector) Stop added in v0.0.7

func (bc *BatchCollector) Stop(ctx context.Context) error

Stop gracefully shuts down the batch collector

func (*BatchCollector) SubmitBatch added in v0.0.7

func (bc *BatchCollector) SubmitBatch(taskBatch TaskBatch) error

SubmitBatch submits a task batch for processing

type BatchConfig added in v0.0.7

type BatchConfig struct {
	Enabled           bool          `yaml:"enabled"`           // Enable batch aggregation
	MaxRows           int           `yaml:"maxRows"`           // Max rows before forced flush
	FlushInterval     time.Duration `yaml:"flushInterval"`     // Max time to wait before flush
	ChannelBufferSize int           `yaml:"channelBufferSize"` // Max tasks that can be queued
	FlushTimeout      time.Duration `yaml:"flushTimeout"`      // Timeout for ClickHouse flush operations
}

BatchConfig holds configuration for the batch aggregation system

type Config

type Config struct {
	clickhouse.Config      `yaml:",inline"`
	Enabled                bool                    `yaml:"enabled"`
	Table                  string                  `yaml:"table"`
	BatchConfig            BatchConfig             `yaml:"batchConfig"`
	LargeTransactionConfig *LargeTransactionConfig `yaml:"largeTransactionConfig"`
}

TransactionStructlogConfig holds configuration for transaction structlog processor

func (*Config) Validate

func (c *Config) Validate() error

type CountMismatchError added in v0.0.5

type CountMismatchError struct {
	Expected int
	Actual   int
	Message  string
}

CountMismatchError represents a structlog count mismatch between expected and actual counts

func (*CountMismatchError) Error added in v0.0.5

func (e *CountMismatchError) Error() string

type Dependencies

type Dependencies struct {
	Log         logrus.FieldLogger
	Pool        *ethereum.Pool
	Network     *ethereum.Network
	State       *state.Manager
	AsynqClient *asynq.Client
	RedisPrefix string
}

Dependencies contains the dependencies needed for the processor

type LargeTransactionConfig added in v0.0.11

type LargeTransactionConfig struct {
	Enabled              bool          `yaml:"enabled"`              // Enable large transaction handling
	StructlogThreshold   int           `yaml:"structlogThreshold"`   // Number of structlogs to consider a transaction "large"
	WorkerWaitTimeout    time.Duration `yaml:"workerWaitTimeout"`    // How long workers wait before returning task to queue
	MaxProcessingTime    time.Duration `yaml:"maxProcessingTime"`    // Maximum time allowed for processing a large transaction
	EnableSequentialMode bool          `yaml:"enableSequentialMode"` // If true, large transactions are processed one at a time
}

LargeTransactionConfig holds configuration for handling large transactions

type LargeTxLockManager added in v0.0.11

type LargeTxLockManager struct {
	// contains filtered or unexported fields
}

LargeTxLockManager manages sequential processing of large transactions

func NewLargeTxLockManager added in v0.0.11

func NewLargeTxLockManager(log logrus.FieldLogger, config *LargeTransactionConfig) *LargeTxLockManager

NewLargeTxLockManager creates a new large transaction lock manager

func (*LargeTxLockManager) AcquireLock added in v0.0.11

func (m *LargeTxLockManager) AcquireLock(ctx context.Context, txHash string, size int, operation string) error

AcquireLock attempts to acquire the lock for processing a large transaction

func (*LargeTxLockManager) GetStatus added in v0.0.11

func (m *LargeTxLockManager) GetStatus() map[string]interface{}

GetStatus returns current status of the lock manager

func (*LargeTxLockManager) IsLargeTransaction added in v0.0.11

func (m *LargeTxLockManager) IsLargeTransaction(structlogCount int) bool

IsLargeTransaction checks if a transaction qualifies as large

func (*LargeTxLockManager) ReleaseLock added in v0.0.11

func (m *LargeTxLockManager) ReleaseLock(txHash string)

ReleaseLock releases the lock after processing a large transaction

func (*LargeTxLockManager) WaitForLargeTransaction added in v0.0.11

func (m *LargeTxLockManager) WaitForLargeTransaction(ctx context.Context, workerTask string) error

WaitForLargeTransaction waits for any active large transaction to complete

type ProcessPayload

type ProcessPayload struct {
	BlockNumber      big.Int `json:"block_number"`
	TransactionHash  string  `json:"transaction_hash"`
	TransactionIndex uint32  `json:"transaction_index"`
	NetworkID        int32   `json:"network_id"`
	NetworkName      string  `json:"network_name"`
	Network          string  `json:"network"`         // Alias for NetworkName
	ProcessingMode   string  `json:"processing_mode"` // "forwards" or "backwards"
}

ProcessPayload represents the payload for processing a transaction

func (*ProcessPayload) MarshalBinary

func (p *ProcessPayload) MarshalBinary() ([]byte, error)

MarshalBinary implements encoding.BinaryMarshaler

func (*ProcessPayload) UnmarshalBinary

func (p *ProcessPayload) UnmarshalBinary(data []byte) error

UnmarshalBinary implements encoding.BinaryUnmarshaler

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor handles transaction structlog processing

func New

func New(ctx context.Context, deps *Dependencies, config *Config) (*Processor, error)

New creates a new transaction structlog processor

func (*Processor) EnqueueTask

func (p *Processor) EnqueueTask(ctx context.Context, task *asynq.Task, opts ...asynq.Option) error

EnqueueTask enqueues a task to the specified queue

func (*Processor) EnqueueTransactionTasks added in v0.0.10

func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block *types.Block) (int, error)

enqueueTransactionTasks enqueues tasks for all transactions in a block EnqueueTransactionTasks enqueues transaction processing tasks for a given block

func (*Processor) ExtractStructlogs added in v0.0.7

func (p *Processor) ExtractStructlogs(ctx context.Context, block *types.Block, index int, tx *types.Transaction) ([]Structlog, error)

ExtractStructlogs extracts structlog data from a transaction without inserting to database

func (*Processor) GetHandlers

func (p *Processor) GetHandlers() map[string]asynq.HandlerFunc

GetHandlers returns the task handlers for this processor

func (*Processor) GetLargeTxStatus added in v0.0.11

func (p *Processor) GetLargeTxStatus() map[string]interface{}

GetLargeTxStatus returns the current status of large transaction processing

func (*Processor) GetQueues

func (p *Processor) GetQueues() []c.QueueInfo

GetQueues returns the queues used by this processor

func (*Processor) Name

func (p *Processor) Name() string

Name returns the processor name

func (*Processor) ProcessNextBlock

func (p *Processor) ProcessNextBlock(ctx context.Context) error

ProcessNextBlock processes the next available block

func (*Processor) ProcessSingleTransaction

func (p *Processor) ProcessSingleTransaction(ctx context.Context, block *types.Block, index int, tx *types.Transaction) (int, error)

ProcessSingleTransaction processes a single transaction using batch collector (exposed for worker handlers)

func (*Processor) SetProcessingMode

func (p *Processor) SetProcessingMode(mode string)

SetProcessingMode sets the processing mode for the processor

func (*Processor) Start

func (p *Processor) Start(ctx context.Context) error

Start starts the processor

func (*Processor) Stop

func (p *Processor) Stop(ctx context.Context) error

Stop stops the processor

func (*Processor) VerifyTransaction

func (p *Processor) VerifyTransaction(ctx context.Context, blockNumber *big.Int, transactionHash string, transactionIndex uint32, networkName string, insertedCount int) error

VerifyTransaction verifies that a transaction has been processed correctly

type Structlog

type Structlog struct {
	UpdatedDateTime        time.Time `json:"updated_date_time"`
	BlockNumber            uint64    `json:"block_number"`
	TransactionHash        string    `json:"transaction_hash"`
	TransactionIndex       uint32    `json:"transaction_index"`
	TransactionGas         uint64    `json:"transaction_gas"`
	TransactionFailed      bool      `json:"transaction_failed"`
	TransactionReturnValue *string   `json:"transaction_return_value"`
	Index                  uint32    `json:"index"`
	ProgramCounter         uint32    `json:"program_counter"`
	Operation              string    `json:"operation"`
	Gas                    uint64    `json:"gas"`
	GasCost                uint64    `json:"gas_cost"`
	Depth                  uint64    `json:"depth"`
	ReturnData             *string   `json:"return_data"`
	Refund                 *uint64   `json:"refund"`
	Error                  *string   `json:"error"`
	CallToAddress          *string   `json:"call_to_address"`
	MetaNetworkID          int32     `json:"meta_network_id"`
	MetaNetworkName        string    `json:"meta_network_name"`
}

type TaskBatch added in v0.0.7

type TaskBatch struct {
	Rows         []Structlog // All rows from one task
	ResponseChan chan error  // Channel to send result back to task
	TaskID       string      // Unique identifier for this task
}

TaskBatch represents a batch of structlog rows from a single task

type VerifyPayload

type VerifyPayload struct {
	BlockNumber      big.Int `json:"block_number"`
	TransactionHash  string  `json:"transaction_hash"`
	TransactionIndex uint32  `json:"transaction_index"`
	NetworkID        int32   `json:"network_id"`
	NetworkName      string  `json:"network_name"`
	Network          string  `json:"network"`         // Alias for NetworkName
	ProcessingMode   string  `json:"processing_mode"` // "forwards" or "backwards"
	InsertedCount    int     `json:"inserted_count"`
}

VerifyPayload represents the payload for verifying a transaction

func (*VerifyPayload) MarshalBinary

func (v *VerifyPayload) MarshalBinary() ([]byte, error)

MarshalBinary implements encoding.BinaryMarshaler

func (*VerifyPayload) UnmarshalBinary

func (v *VerifyPayload) UnmarshalBinary(data []byte) error

UnmarshalBinary implements encoding.BinaryUnmarshaler

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL