Documentation
¶
Index ¶
- Constants
- Variables
- func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, error)
- func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, error)
- func NewVerifyBackwardsTask(payload *VerifyPayload) (*asynq.Task, error)
- func NewVerifyForwardsTask(payload *VerifyPayload) (*asynq.Task, error)
- type BatchCollector
- type BatchConfig
- type Config
- type CountMismatchError
- type Dependencies
- type ProcessPayload
- type Processor
- func (p *Processor) BatchInsertStructlogs(ctx context.Context, blockNumber uint64, transactionHash string, ...) error
- func (p *Processor) EnqueueTask(ctx context.Context, task *asynq.Task, opts ...asynq.Option) error
- func (p *Processor) ExtractStructlogs(ctx context.Context, block *types.Block, index int, tx *types.Transaction) ([]Structlog, error)
- func (p *Processor) GetHandlers() map[string]asynq.HandlerFunc
- func (p *Processor) GetQueues() []c.QueueInfo
- func (p *Processor) Name() string
- func (p *Processor) ProcessNextBlock(ctx context.Context) error
- func (p *Processor) ProcessSingleTransaction(ctx context.Context, block *types.Block, index int, tx *types.Transaction) (int, error)
- func (p *Processor) SetProcessingMode(mode string)
- func (p *Processor) Start(ctx context.Context) error
- func (p *Processor) Stop(ctx context.Context) error
- func (p *Processor) VerifyTransaction(ctx context.Context, blockNumber *big.Int, transactionHash string, ...) error
- type Structlog
- type TaskBatch
- type VerifyPayload
Constants ¶
const ( ProcessorName = "transaction_structlog" ProcessForwardsTaskType = "transaction_structlog_process_forwards" ProcessBackwardsTaskType = "transaction_structlog_process_backwards" VerifyForwardsTaskType = "transaction_structlog_verify_forwards" VerifyBackwardsTaskType = "transaction_structlog_verify_backwards" )
Variables ¶
var ErrChannelFull = fmt.Errorf("batch collector channel is full")
Custom error for when channel is full
Functions ¶
func NewProcessBackwardsTask ¶
func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, error)
NewProcessBackwardsTask creates a new backwards process task
func NewProcessForwardsTask ¶
func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, error)
NewProcessForwardsTask creates a new forwards process task
func NewVerifyBackwardsTask ¶
func NewVerifyBackwardsTask(payload *VerifyPayload) (*asynq.Task, error)
NewVerifyBackwardsTask creates a new backwards verify task
func NewVerifyForwardsTask ¶
func NewVerifyForwardsTask(payload *VerifyPayload) (*asynq.Task, error)
NewVerifyForwardsTask creates a new forwards verify task
Types ¶
type BatchCollector ¶ added in v0.0.7
type BatchCollector struct {
// contains filtered or unexported fields
}
BatchCollector aggregates structlog rows from multiple tasks and flushes them in batches
func NewBatchCollector ¶ added in v0.0.7
func NewBatchCollector(processor *Processor, config BatchConfig) *BatchCollector
NewBatchCollector creates a new batch collector
func (*BatchCollector) Start ¶ added in v0.0.7
func (bc *BatchCollector) Start(ctx context.Context) error
Start begins the batch collection process
func (*BatchCollector) Stop ¶ added in v0.0.7
func (bc *BatchCollector) Stop(ctx context.Context) error
Stop gracefully shuts down the batch collector
func (*BatchCollector) SubmitBatch ¶ added in v0.0.7
func (bc *BatchCollector) SubmitBatch(taskBatch TaskBatch) error
SubmitBatch submits a task batch for processing
type BatchConfig ¶ added in v0.0.7
type BatchConfig struct {
Enabled bool `yaml:"enabled"` // Enable batch aggregation
MaxRows int `yaml:"maxRows"` // Max rows before forced flush
FlushInterval time.Duration `yaml:"flushInterval"` // Max time to wait before flush
ChannelBufferSize int `yaml:"channelBufferSize"` // Max tasks that can be queued
ChunkSize int `yaml:"chunkSize"` // Chunk size for large single transactions (fallback)
}
BatchConfig holds configuration for the batch aggregation system
type Config ¶
type Config struct {
clickhouse.Config `yaml:",inline"`
Enabled bool `yaml:"enabled"`
Table string `yaml:"table"`
BatchConfig BatchConfig `yaml:"batchConfig"`
}
TransactionStructlogConfig holds configuration for transaction structlog processor
type CountMismatchError ¶ added in v0.0.5
CountMismatchError represents a structlog count mismatch between expected and actual counts
func (*CountMismatchError) Error ¶ added in v0.0.5
func (e *CountMismatchError) Error() string
type Dependencies ¶
type Dependencies struct {
Log logrus.FieldLogger
Pool *ethereum.Pool
Network *ethereum.Network
State *state.Manager
AsynqClient *asynq.Client
RedisPrefix string
}
Dependencies contains the dependencies needed for the processor
type ProcessPayload ¶
type ProcessPayload struct {
BlockNumber big.Int `json:"block_number"`
TransactionHash string `json:"transaction_hash"`
TransactionIndex uint32 `json:"transaction_index"`
NetworkID int32 `json:"network_id"`
NetworkName string `json:"network_name"`
Network string `json:"network"` // Alias for NetworkName
ProcessingMode string `json:"processing_mode"` // "forwards" or "backwards"
}
ProcessPayload represents the payload for processing a transaction
func (*ProcessPayload) MarshalBinary ¶
func (p *ProcessPayload) MarshalBinary() ([]byte, error)
MarshalBinary implements encoding.BinaryMarshaler
func (*ProcessPayload) UnmarshalBinary ¶
func (p *ProcessPayload) UnmarshalBinary(data []byte) error
UnmarshalBinary implements encoding.BinaryUnmarshaler
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor handles transaction structlog processing
func (*Processor) BatchInsertStructlogs ¶
func (p *Processor) BatchInsertStructlogs(ctx context.Context, blockNumber uint64, transactionHash string, transactionIndex uint32, structlogs []Structlog) error
BatchInsertStructlogs inserts structlog data in batch to ClickHouse
func (*Processor) EnqueueTask ¶
EnqueueTask enqueues a task to the specified queue
func (*Processor) ExtractStructlogs ¶ added in v0.0.7
func (p *Processor) ExtractStructlogs(ctx context.Context, block *types.Block, index int, tx *types.Transaction) ([]Structlog, error)
ExtractStructlogs extracts structlog data from a transaction without inserting to database
func (*Processor) GetHandlers ¶
func (p *Processor) GetHandlers() map[string]asynq.HandlerFunc
GetHandlers returns the task handlers for this processor
func (*Processor) ProcessNextBlock ¶
ProcessNextBlock processes the next available block
func (*Processor) ProcessSingleTransaction ¶
func (p *Processor) ProcessSingleTransaction(ctx context.Context, block *types.Block, index int, tx *types.Transaction) (int, error)
ProcessSingleTransaction processes a single transaction using batch collector (exposed for worker handlers)
func (*Processor) SetProcessingMode ¶
SetProcessingMode sets the processing mode for the processor
type Structlog ¶
type Structlog struct {
UpdatedDateTime time.Time `json:"updated_date_time"`
BlockNumber uint64 `json:"block_number"`
TransactionHash string `json:"transaction_hash"`
TransactionIndex uint32 `json:"transaction_index"`
TransactionGas uint64 `json:"transaction_gas"`
TransactionFailed bool `json:"transaction_failed"`
TransactionReturnValue *string `json:"transaction_return_value"`
Index uint32 `json:"index"`
ProgramCounter uint32 `json:"program_counter"`
Operation string `json:"operation"`
Gas uint64 `json:"gas"`
GasCost uint64 `json:"gas_cost"`
Depth uint64 `json:"depth"`
ReturnData *string `json:"return_data"`
Refund *uint64 `json:"refund"`
Error *string `json:"error"`
MetaNetworkID int32 `json:"meta_network_id"`
MetaNetworkName string `json:"meta_network_name"`
}
type TaskBatch ¶ added in v0.0.7
type TaskBatch struct {
Rows []Structlog // All rows from one task
ResponseChan chan error // Channel to send result back to task
TaskID string // Unique identifier for this task
}
TaskBatch represents a batch of structlog rows from a single task
type VerifyPayload ¶
type VerifyPayload struct {
BlockNumber big.Int `json:"block_number"`
TransactionHash string `json:"transaction_hash"`
TransactionIndex uint32 `json:"transaction_index"`
NetworkID int32 `json:"network_id"`
NetworkName string `json:"network_name"`
Network string `json:"network"` // Alias for NetworkName
ProcessingMode string `json:"processing_mode"` // "forwards" or "backwards"
InsertedCount int `json:"inserted_count"`
}
VerifyPayload represents the payload for verifying a transaction
func (*VerifyPayload) MarshalBinary ¶
func (v *VerifyPayload) MarshalBinary() ([]byte, error)
MarshalBinary implements encoding.BinaryMarshaler
func (*VerifyPayload) UnmarshalBinary ¶
func (v *VerifyPayload) UnmarshalBinary(data []byte) error
UnmarshalBinary implements encoding.BinaryUnmarshaler