Documentation
¶
Index ¶
- Constants
- Variables
- func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, error)
- func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, error)
- func NewVerifyBackwardsTask(payload *VerifyPayload) (*asynq.Task, error)
- func NewVerifyForwardsTask(payload *VerifyPayload) (*asynq.Task, error)
- type BatchCollector
- type BatchConfig
- type Config
- type CountMismatchError
- type Dependencies
- type LargeTransactionConfig
- type LargeTxLockManager
- func (m *LargeTxLockManager) AcquireLock(ctx context.Context, txHash string, size int, operation string) error
- func (m *LargeTxLockManager) GetStatus() map[string]interface{}
- func (m *LargeTxLockManager) IsLargeTransaction(structlogCount int) bool
- func (m *LargeTxLockManager) ReleaseLock(txHash string)
- func (m *LargeTxLockManager) WaitForLargeTransaction(ctx context.Context, workerTask string) error
- type ProcessPayload
- type Processor
- func (p *Processor) EnqueueTask(ctx context.Context, task *asynq.Task, opts ...asynq.Option) error
- func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block *types.Block) (int, error)
- func (p *Processor) ExtractStructlogs(ctx context.Context, block *types.Block, index int, tx *types.Transaction) ([]Structlog, error)
- func (p *Processor) GetHandlers() map[string]asynq.HandlerFunc
- func (p *Processor) GetLargeTxStatus() map[string]interface{}
- func (p *Processor) GetQueues() []c.QueueInfo
- func (p *Processor) Name() string
- func (p *Processor) ProcessNextBlock(ctx context.Context) error
- func (p *Processor) ProcessSingleTransaction(ctx context.Context, block *types.Block, index int, tx *types.Transaction) (int, error)
- func (p *Processor) SetProcessingMode(mode string)
- func (p *Processor) Start(ctx context.Context) error
- func (p *Processor) Stop(ctx context.Context) error
- func (p *Processor) VerifyTransaction(ctx context.Context, blockNumber *big.Int, transactionHash string, ...) error
- type Structlog
- type TaskBatch
- type VerifyPayload
Constants ¶
const ( ProcessorName = "transaction_structlog" ProcessForwardsTaskType = "transaction_structlog_process_forwards" ProcessBackwardsTaskType = "transaction_structlog_process_backwards" VerifyForwardsTaskType = "transaction_structlog_verify_forwards" VerifyBackwardsTaskType = "transaction_structlog_verify_backwards" )
Variables ¶
var ErrChannelFull = fmt.Errorf("batch collector channel is full")
Custom error for when channel is full
Functions ¶
func NewProcessBackwardsTask ¶
func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, error)
NewProcessBackwardsTask creates a new backwards process task
func NewProcessForwardsTask ¶
func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, error)
NewProcessForwardsTask creates a new forwards process task
func NewVerifyBackwardsTask ¶
func NewVerifyBackwardsTask(payload *VerifyPayload) (*asynq.Task, error)
NewVerifyBackwardsTask creates a new backwards verify task
func NewVerifyForwardsTask ¶
func NewVerifyForwardsTask(payload *VerifyPayload) (*asynq.Task, error)
NewVerifyForwardsTask creates a new forwards verify task
Types ¶
type BatchCollector ¶ added in v0.0.7
type BatchCollector struct {
// contains filtered or unexported fields
}
BatchCollector aggregates structlog rows from multiple tasks and flushes them in batches
func NewBatchCollector ¶ added in v0.0.7
func NewBatchCollector(processor *Processor, config BatchConfig) *BatchCollector
NewBatchCollector creates a new batch collector
func (*BatchCollector) Start ¶ added in v0.0.7
func (bc *BatchCollector) Start(ctx context.Context) error
Start begins the batch collection process
func (*BatchCollector) Stop ¶ added in v0.0.7
func (bc *BatchCollector) Stop(ctx context.Context) error
Stop gracefully shuts down the batch collector
func (*BatchCollector) SubmitBatch ¶ added in v0.0.7
func (bc *BatchCollector) SubmitBatch(taskBatch TaskBatch) error
SubmitBatch submits a task batch for processing
type BatchConfig ¶ added in v0.0.7
type BatchConfig struct {
Enabled bool `yaml:"enabled"` // Enable batch aggregation
MaxRows int `yaml:"maxRows"` // Max rows before forced flush
FlushInterval time.Duration `yaml:"flushInterval"` // Max time to wait before flush
ChannelBufferSize int `yaml:"channelBufferSize"` // Max tasks that can be queued
FlushTimeout time.Duration `yaml:"flushTimeout"` // Timeout for ClickHouse flush operations
}
BatchConfig holds configuration for the batch aggregation system
type Config ¶
type Config struct {
clickhouse.Config `yaml:",inline"`
Enabled bool `yaml:"enabled"`
Table string `yaml:"table"`
BatchConfig BatchConfig `yaml:"batchConfig"`
LargeTransactionConfig *LargeTransactionConfig `yaml:"largeTransactionConfig"`
}
TransactionStructlogConfig holds configuration for transaction structlog processor
type CountMismatchError ¶ added in v0.0.5
CountMismatchError represents a structlog count mismatch between expected and actual counts
func (*CountMismatchError) Error ¶ added in v0.0.5
func (e *CountMismatchError) Error() string
type Dependencies ¶
type Dependencies struct {
Log logrus.FieldLogger
Pool *ethereum.Pool
Network *ethereum.Network
State *state.Manager
AsynqClient *asynq.Client
RedisPrefix string
}
Dependencies contains the dependencies needed for the processor
type LargeTransactionConfig ¶ added in v0.0.11
type LargeTransactionConfig struct {
Enabled bool `yaml:"enabled"` // Enable large transaction handling
StructlogThreshold int `yaml:"structlogThreshold"` // Number of structlogs to consider a transaction "large"
WorkerWaitTimeout time.Duration `yaml:"workerWaitTimeout"` // How long workers wait before returning task to queue
MaxProcessingTime time.Duration `yaml:"maxProcessingTime"` // Maximum time allowed for processing a large transaction
EnableSequentialMode bool `yaml:"enableSequentialMode"` // If true, large transactions are processed one at a time
}
LargeTransactionConfig holds configuration for handling large transactions
type LargeTxLockManager ¶ added in v0.0.11
type LargeTxLockManager struct {
// contains filtered or unexported fields
}
LargeTxLockManager manages sequential processing of large transactions
func NewLargeTxLockManager ¶ added in v0.0.11
func NewLargeTxLockManager(log logrus.FieldLogger, config *LargeTransactionConfig) *LargeTxLockManager
NewLargeTxLockManager creates a new large transaction lock manager
func (*LargeTxLockManager) AcquireLock ¶ added in v0.0.11
func (m *LargeTxLockManager) AcquireLock(ctx context.Context, txHash string, size int, operation string) error
AcquireLock attempts to acquire the lock for processing a large transaction
func (*LargeTxLockManager) GetStatus ¶ added in v0.0.11
func (m *LargeTxLockManager) GetStatus() map[string]interface{}
GetStatus returns current status of the lock manager
func (*LargeTxLockManager) IsLargeTransaction ¶ added in v0.0.11
func (m *LargeTxLockManager) IsLargeTransaction(structlogCount int) bool
IsLargeTransaction checks if a transaction qualifies as large
func (*LargeTxLockManager) ReleaseLock ¶ added in v0.0.11
func (m *LargeTxLockManager) ReleaseLock(txHash string)
ReleaseLock releases the lock after processing a large transaction
func (*LargeTxLockManager) WaitForLargeTransaction ¶ added in v0.0.11
func (m *LargeTxLockManager) WaitForLargeTransaction(ctx context.Context, workerTask string) error
WaitForLargeTransaction waits for any active large transaction to complete
type ProcessPayload ¶
type ProcessPayload struct {
BlockNumber big.Int `json:"block_number"`
TransactionHash string `json:"transaction_hash"`
TransactionIndex uint32 `json:"transaction_index"`
NetworkID int32 `json:"network_id"`
NetworkName string `json:"network_name"`
Network string `json:"network"` // Alias for NetworkName
ProcessingMode string `json:"processing_mode"` // "forwards" or "backwards"
}
ProcessPayload represents the payload for processing a transaction
func (*ProcessPayload) MarshalBinary ¶
func (p *ProcessPayload) MarshalBinary() ([]byte, error)
MarshalBinary implements encoding.BinaryMarshaler
func (*ProcessPayload) UnmarshalBinary ¶
func (p *ProcessPayload) UnmarshalBinary(data []byte) error
UnmarshalBinary implements encoding.BinaryUnmarshaler
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor handles transaction structlog processing
func (*Processor) EnqueueTask ¶
EnqueueTask enqueues a task to the specified queue
func (*Processor) EnqueueTransactionTasks ¶ added in v0.0.10
enqueueTransactionTasks enqueues tasks for all transactions in a block EnqueueTransactionTasks enqueues transaction processing tasks for a given block
func (*Processor) ExtractStructlogs ¶ added in v0.0.7
func (p *Processor) ExtractStructlogs(ctx context.Context, block *types.Block, index int, tx *types.Transaction) ([]Structlog, error)
ExtractStructlogs extracts structlog data from a transaction without inserting to database
func (*Processor) GetHandlers ¶
func (p *Processor) GetHandlers() map[string]asynq.HandlerFunc
GetHandlers returns the task handlers for this processor
func (*Processor) GetLargeTxStatus ¶ added in v0.0.11
GetLargeTxStatus returns the current status of large transaction processing
func (*Processor) ProcessNextBlock ¶
ProcessNextBlock processes the next available block
func (*Processor) ProcessSingleTransaction ¶
func (p *Processor) ProcessSingleTransaction(ctx context.Context, block *types.Block, index int, tx *types.Transaction) (int, error)
ProcessSingleTransaction processes a single transaction using batch collector (exposed for worker handlers)
func (*Processor) SetProcessingMode ¶
SetProcessingMode sets the processing mode for the processor
type Structlog ¶
type Structlog struct {
UpdatedDateTime time.Time `json:"updated_date_time"`
BlockNumber uint64 `json:"block_number"`
TransactionHash string `json:"transaction_hash"`
TransactionIndex uint32 `json:"transaction_index"`
TransactionGas uint64 `json:"transaction_gas"`
TransactionFailed bool `json:"transaction_failed"`
TransactionReturnValue *string `json:"transaction_return_value"`
Index uint32 `json:"index"`
ProgramCounter uint32 `json:"program_counter"`
Operation string `json:"operation"`
Gas uint64 `json:"gas"`
GasCost uint64 `json:"gas_cost"`
Depth uint64 `json:"depth"`
ReturnData *string `json:"return_data"`
Refund *uint64 `json:"refund"`
Error *string `json:"error"`
CallToAddress *string `json:"call_to_address"`
MetaNetworkID int32 `json:"meta_network_id"`
MetaNetworkName string `json:"meta_network_name"`
}
type TaskBatch ¶ added in v0.0.7
type TaskBatch struct {
Rows []Structlog // All rows from one task
ResponseChan chan error // Channel to send result back to task
TaskID string // Unique identifier for this task
}
TaskBatch represents a batch of structlog rows from a single task
type VerifyPayload ¶
type VerifyPayload struct {
BlockNumber big.Int `json:"block_number"`
TransactionHash string `json:"transaction_hash"`
TransactionIndex uint32 `json:"transaction_index"`
NetworkID int32 `json:"network_id"`
NetworkName string `json:"network_name"`
Network string `json:"network"` // Alias for NetworkName
ProcessingMode string `json:"processing_mode"` // "forwards" or "backwards"
InsertedCount int `json:"inserted_count"`
}
VerifyPayload represents the payload for verifying a transaction
func (*VerifyPayload) MarshalBinary ¶
func (v *VerifyPayload) MarshalBinary() ([]byte, error)
MarshalBinary implements encoding.BinaryMarshaler
func (*VerifyPayload) UnmarshalBinary ¶
func (v *VerifyPayload) UnmarshalBinary(data []byte) error
UnmarshalBinary implements encoding.BinaryUnmarshaler