Documentation
¶
Index ¶
- Constants
- func ComputeGasUsed(structlogs []execution.StructLog) []uint64
- func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, error)
- func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, error)
- func NewVerifyBackwardsTask(payload *VerifyPayload) (*asynq.Task, error)
- func NewVerifyForwardsTask(payload *VerifyPayload) (*asynq.Task, error)
- type BatchItem
- type BatchManager
- type BigTransactionManager
- type ClickHouseTime
- type Config
- type CountMismatchError
- type Dependencies
- type ProcessPayload
- type Processor
- func (p *Processor) EnqueueTask(ctx context.Context, task *asynq.Task, opts ...asynq.Option) error
- func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block *types.Block) (int, error)
- func (p *Processor) ExtractStructlogs(ctx context.Context, block *types.Block, index int, tx *types.Transaction) ([]Structlog, error)
- func (p *Processor) GetHandlers() map[string]asynq.HandlerFunc
- func (p *Processor) GetQueues() []c.QueueInfo
- func (p *Processor) Name() string
- func (p *Processor) ProcessNextBlock(ctx context.Context) error
- func (p *Processor) ProcessSingleTransaction(ctx context.Context, block *types.Block, index int, tx *types.Transaction) (int, error)
- func (p *Processor) ProcessTransaction(ctx context.Context, block *types.Block, index int, tx *types.Transaction) (int, error)
- func (p *Processor) SetProcessingMode(mode string)
- func (p *Processor) ShouldBatch(structlogCount int64) bool
- func (p *Processor) Start(ctx context.Context) error
- func (p *Processor) Stop(ctx context.Context) error
- func (p *Processor) VerifyTransaction(ctx context.Context, blockNumber *big.Int, transactionHash string, ...) error
- type Structlog
- type VerifyPayload
Constants ¶
const ( ProcessorName = "transaction_structlog" ProcessForwardsTaskType = "transaction_structlog_process_forwards" ProcessBackwardsTaskType = "transaction_structlog_process_backwards" VerifyForwardsTaskType = "transaction_structlog_verify_forwards" VerifyBackwardsTaskType = "transaction_structlog_verify_backwards" )
Variables ¶
This section is empty.
Functions ¶
func ComputeGasUsed ¶ added in v0.1.3
ComputeGasUsed calculates the actual gas consumed for each structlog using the difference between consecutive gas values at the same depth level.
Returns a slice of gasUsed values corresponding to each structlog index. For opcodes that are the last in their call context (before returning to parent), the pre-calculated GasCost is returned since we cannot compute actual cost across call boundaries.
func NewProcessBackwardsTask ¶
func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, error)
NewProcessBackwardsTask creates a new backwards process task.
func NewProcessForwardsTask ¶
func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, error)
NewProcessForwardsTask creates a new forwards process task.
func NewVerifyBackwardsTask ¶
func NewVerifyBackwardsTask(payload *VerifyPayload) (*asynq.Task, error)
NewVerifyBackwardsTask creates a new backwards verify task.
func NewVerifyForwardsTask ¶
func NewVerifyForwardsTask(payload *VerifyPayload) (*asynq.Task, error)
NewVerifyForwardsTask creates a new forwards verify task.
Types ¶
type BatchItem ¶ added in v0.1.1
type BatchItem struct {
// contains filtered or unexported fields
}
BatchItem represents a single transaction's data within a batch.
type BatchManager ¶ added in v0.1.1
type BatchManager struct {
// contains filtered or unexported fields
}
BatchManager accumulates small transactions and flushes them in batches.
func NewBatchManager ¶ added in v0.1.1
func NewBatchManager(processor *Processor, config *Config) *BatchManager
NewBatchManager creates a new batch manager.
func (*BatchManager) Add ¶ added in v0.1.1
func (bm *BatchManager) Add(structlogs []Structlog, task *asynq.Task, payload *ProcessPayload) error
Add adds a transaction's structlogs to the batch.
func (*BatchManager) Flush ¶ added in v0.1.1
func (bm *BatchManager) Flush()
Flush performs a batch flush (thread-safe).
func (*BatchManager) FlushIfNeeded ¶ added in v0.1.1
func (bm *BatchManager) FlushIfNeeded()
FlushIfNeeded checks if the batch should be flushed based on size threshold.
func (*BatchManager) Start ¶ added in v0.1.1
func (bm *BatchManager) Start() error
Start begins the batch manager's flush goroutine.
func (*BatchManager) Stop ¶ added in v0.1.1
func (bm *BatchManager) Stop()
Stop gracefully shuts down the batch manager.
type BigTransactionManager ¶ added in v0.1.1
type BigTransactionManager struct {
// contains filtered or unexported fields
}
BigTransactionManager coordinates processing of large transactions to prevent OOM.
func NewBigTransactionManager ¶ added in v0.1.1
func NewBigTransactionManager(threshold int, log logrus.FieldLogger) *BigTransactionManager
NewBigTransactionManager creates a new manager for big transaction coordination.
func (*BigTransactionManager) GetThreshold ¶ added in v0.1.1
func (btm *BigTransactionManager) GetThreshold() int
GetThreshold returns the threshold for big transactions.
func (*BigTransactionManager) RegisterBigTransaction ¶ added in v0.1.1
func (btm *BigTransactionManager) RegisterBigTransaction(txHash string, processor *Processor)
RegisterBigTransaction marks a big transaction as actively processing.
func (*BigTransactionManager) ShouldPauseNewWork ¶ added in v0.1.1
func (btm *BigTransactionManager) ShouldPauseNewWork() bool
ShouldPauseNewWork returns true if new work should be paused.
func (*BigTransactionManager) UnregisterBigTransaction ¶ added in v0.1.1
func (btm *BigTransactionManager) UnregisterBigTransaction(txHash string)
UnregisterBigTransaction marks a big transaction as complete.
type ClickHouseTime ¶ added in v0.1.1
ClickHouseTime wraps time.Time to provide custom JSON marshaling for ClickHouse.
func NewClickHouseTime ¶ added in v0.1.1
func NewClickHouseTime(t time.Time) ClickHouseTime
NewClickHouseTime creates a new ClickHouseTime from a time.Time.
func (ClickHouseTime) MarshalJSON ¶ added in v0.1.1
func (t ClickHouseTime) MarshalJSON() ([]byte, error)
MarshalJSON formats the time for ClickHouse DateTime format.
func (*ClickHouseTime) UnmarshalJSON ¶ added in v0.1.1
func (t *ClickHouseTime) UnmarshalJSON(data []byte) error
UnmarshalJSON parses the time from ClickHouse DateTime format.
type Config ¶
type Config struct {
clickhouse.Config `yaml:",inline"`
Enabled bool `yaml:"enabled"`
Table string `yaml:"table"`
// Big transaction handling
BigTransactionThreshold int `yaml:"bigTransactionThreshold"` // Default: 500,000
ChunkSize int `yaml:"chunkSize"` // Default: 10,000
ChannelBufferSize int `yaml:"channelBufferSize"` // Default: 2
ProgressLogThreshold int `yaml:"progressLogThreshold"` // Default: 100,000
// Batch processing configuration
// BatchInsertThreshold is the minimum number of structlogs to accumulate before batch insert
// Transactions with more structlogs than this will bypass batching
BatchInsertThreshold int64 `yaml:"batchInsertThreshold" default:"50000"`
// BatchFlushInterval is the maximum time to wait before flushing a batch
BatchFlushInterval time.Duration `yaml:"batchFlushInterval" default:"5s"`
// BatchMaxSize is the maximum number of structlogs to accumulate in a batch
BatchMaxSize int64 `yaml:"batchMaxSize" default:"100000"`
}
Config holds configuration for transaction structlog processor.
type CountMismatchError ¶ added in v0.0.5
CountMismatchError represents a structlog count mismatch between expected and actual counts.
func (*CountMismatchError) Error ¶ added in v0.0.5
func (e *CountMismatchError) Error() string
type Dependencies ¶
type Dependencies struct {
Log logrus.FieldLogger
Pool *ethereum.Pool
Network *ethereum.Network
State *state.Manager
AsynqClient *asynq.Client
RedisPrefix string
}
Dependencies contains the dependencies needed for the processor.
type ProcessPayload ¶
type ProcessPayload struct {
BlockNumber big.Int `json:"block_number"`
TransactionHash string `json:"transaction_hash"`
TransactionIndex uint32 `json:"transaction_index"`
NetworkID int32 `json:"network_id"`
NetworkName string `json:"network_name"`
Network string `json:"network"` // Alias for NetworkName
ProcessingMode string `json:"processing_mode"` // "forwards" or "backwards"
}
ProcessPayload represents the payload for processing a transaction.
func (*ProcessPayload) MarshalBinary ¶
func (p *ProcessPayload) MarshalBinary() ([]byte, error)
MarshalBinary implements encoding.BinaryMarshaler.
func (*ProcessPayload) UnmarshalBinary ¶
func (p *ProcessPayload) UnmarshalBinary(data []byte) error
UnmarshalBinary implements encoding.BinaryUnmarshaler.
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor handles transaction structlog processing.
func (*Processor) EnqueueTask ¶
EnqueueTask enqueues a task to the specified queue.
func (*Processor) EnqueueTransactionTasks ¶ added in v0.0.10
enqueueTransactionTasks enqueues tasks for all transactions in a block. EnqueueTransactionTasks enqueues transaction processing tasks for a given block.
func (*Processor) ExtractStructlogs ¶ added in v0.0.7
func (p *Processor) ExtractStructlogs(ctx context.Context, block *types.Block, index int, tx *types.Transaction) ([]Structlog, error)
ExtractStructlogs extracts structlog data from a transaction without inserting to database.
func (*Processor) GetHandlers ¶
func (p *Processor) GetHandlers() map[string]asynq.HandlerFunc
GetHandlers returns the task handlers for this processor.
func (*Processor) ProcessNextBlock ¶
ProcessNextBlock processes the next available block.
func (*Processor) ProcessSingleTransaction ¶
func (p *Processor) ProcessSingleTransaction(ctx context.Context, block *types.Block, index int, tx *types.Transaction) (int, error)
ProcessSingleTransaction processes a single transaction and inserts its structlogs directly to ClickHouse.
func (*Processor) ProcessTransaction ¶ added in v0.1.1
func (p *Processor) ProcessTransaction(ctx context.Context, block *types.Block, index int, tx *types.Transaction) (int, error)
ProcessTransaction processes a transaction using memory-efficient channel-based batching.
func (*Processor) SetProcessingMode ¶
SetProcessingMode sets the processing mode for the processor.
func (*Processor) ShouldBatch ¶ added in v0.1.1
ShouldBatch determines if a transaction should be batched based on structlog count.
type Structlog ¶
type Structlog struct {
UpdatedDateTime ClickHouseTime `json:"updated_date_time"`
BlockNumber uint64 `json:"block_number"`
TransactionHash string `json:"transaction_hash"`
TransactionIndex uint32 `json:"transaction_index"`
TransactionGas uint64 `json:"transaction_gas"`
TransactionFailed bool `json:"transaction_failed"`
TransactionReturnValue *string `json:"transaction_return_value"`
Index uint32 `json:"index"`
ProgramCounter uint32 `json:"program_counter"`
Operation string `json:"operation"`
Gas uint64 `json:"gas"`
GasCost uint64 `json:"gas_cost"`
GasUsed uint64 `json:"gas_used"`
Depth uint64 `json:"depth"`
ReturnData *string `json:"return_data"`
Refund *uint64 `json:"refund"`
Error *string `json:"error"`
CallToAddress *string `json:"call_to_address"`
MetaNetworkID int32 `json:"meta_network_id"`
MetaNetworkName string `json:"meta_network_name"`
}
type VerifyPayload ¶
type VerifyPayload struct {
BlockNumber big.Int `json:"block_number"`
TransactionHash string `json:"transaction_hash"`
TransactionIndex uint32 `json:"transaction_index"`
NetworkID int32 `json:"network_id"`
NetworkName string `json:"network_name"`
Network string `json:"network"` // Alias for NetworkName
ProcessingMode string `json:"processing_mode"` // "forwards" or "backwards"
InsertedCount int `json:"inserted_count"`
}
VerifyPayload represents the payload for verifying a transaction.
func (*VerifyPayload) MarshalBinary ¶
func (v *VerifyPayload) MarshalBinary() ([]byte, error)
MarshalBinary implements encoding.BinaryMarshaler.
func (*VerifyPayload) UnmarshalBinary ¶
func (v *VerifyPayload) UnmarshalBinary(data []byte) error
UnmarshalBinary implements encoding.BinaryUnmarshaler.