structlog

package
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2025 License: GPL-3.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ProcessorName            = "transaction_structlog"
	ProcessForwardsTaskType  = "transaction_structlog_process_forwards"
	ProcessBackwardsTaskType = "transaction_structlog_process_backwards"
	VerifyForwardsTaskType   = "transaction_structlog_verify_forwards"
	VerifyBackwardsTaskType  = "transaction_structlog_verify_backwards"
)

Variables

View Source
var ErrChannelFull = fmt.Errorf("batch collector channel is full")

Custom error for when channel is full

Functions

func NewProcessBackwardsTask

func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, error)

NewProcessBackwardsTask creates a new backwards process task

func NewProcessForwardsTask

func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, error)

NewProcessForwardsTask creates a new forwards process task

func NewVerifyBackwardsTask

func NewVerifyBackwardsTask(payload *VerifyPayload) (*asynq.Task, error)

NewVerifyBackwardsTask creates a new backwards verify task

func NewVerifyForwardsTask

func NewVerifyForwardsTask(payload *VerifyPayload) (*asynq.Task, error)

NewVerifyForwardsTask creates a new forwards verify task

Types

type BatchCollector added in v0.0.7

type BatchCollector struct {
	// contains filtered or unexported fields
}

BatchCollector aggregates structlog rows from multiple tasks and flushes them in batches

func NewBatchCollector added in v0.0.7

func NewBatchCollector(processor *Processor, config BatchConfig) *BatchCollector

NewBatchCollector creates a new batch collector

func (*BatchCollector) Start added in v0.0.7

func (bc *BatchCollector) Start(ctx context.Context) error

Start begins the batch collection process

func (*BatchCollector) Stop added in v0.0.7

func (bc *BatchCollector) Stop(ctx context.Context) error

Stop gracefully shuts down the batch collector

func (*BatchCollector) SubmitBatch added in v0.0.7

func (bc *BatchCollector) SubmitBatch(taskBatch TaskBatch) error

SubmitBatch submits a task batch for processing

type BatchConfig added in v0.0.7

type BatchConfig struct {
	Enabled           bool          `yaml:"enabled"`           // Enable batch aggregation
	MaxRows           int           `yaml:"maxRows"`           // Max rows before forced flush
	FlushInterval     time.Duration `yaml:"flushInterval"`     // Max time to wait before flush
	ChannelBufferSize int           `yaml:"channelBufferSize"` // Max tasks that can be queued
	ChunkSize         int           `yaml:"chunkSize"`         // Chunk size for large single transactions (fallback)
}

BatchConfig holds configuration for the batch aggregation system

type Config

type Config struct {
	clickhouse.Config `yaml:",inline"`
	Enabled           bool        `yaml:"enabled"`
	Table             string      `yaml:"table"`
	BatchConfig       BatchConfig `yaml:"batchConfig"`
}

TransactionStructlogConfig holds configuration for transaction structlog processor

func (*Config) Validate

func (c *Config) Validate() error

type CountMismatchError added in v0.0.5

type CountMismatchError struct {
	Expected int
	Actual   int
	Message  string
}

CountMismatchError represents a structlog count mismatch between expected and actual counts

func (*CountMismatchError) Error added in v0.0.5

func (e *CountMismatchError) Error() string

type Dependencies

type Dependencies struct {
	Log         logrus.FieldLogger
	Pool        *ethereum.Pool
	Network     *ethereum.Network
	State       *state.Manager
	AsynqClient *asynq.Client
	RedisPrefix string
}

Dependencies contains the dependencies needed for the processor

type ProcessPayload

type ProcessPayload struct {
	BlockNumber      big.Int `json:"block_number"`
	TransactionHash  string  `json:"transaction_hash"`
	TransactionIndex uint32  `json:"transaction_index"`
	NetworkID        int32   `json:"network_id"`
	NetworkName      string  `json:"network_name"`
	Network          string  `json:"network"`         // Alias for NetworkName
	ProcessingMode   string  `json:"processing_mode"` // "forwards" or "backwards"
}

ProcessPayload represents the payload for processing a transaction

func (*ProcessPayload) MarshalBinary

func (p *ProcessPayload) MarshalBinary() ([]byte, error)

MarshalBinary implements encoding.BinaryMarshaler

func (*ProcessPayload) UnmarshalBinary

func (p *ProcessPayload) UnmarshalBinary(data []byte) error

UnmarshalBinary implements encoding.BinaryUnmarshaler

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor handles transaction structlog processing

func New

func New(ctx context.Context, deps *Dependencies, config *Config) (*Processor, error)

New creates a new transaction structlog processor

func (*Processor) BatchInsertStructlogs

func (p *Processor) BatchInsertStructlogs(ctx context.Context, blockNumber uint64, transactionHash string, transactionIndex uint32, structlogs []Structlog) error

BatchInsertStructlogs inserts structlog data in batch to ClickHouse

func (*Processor) EnqueueTask

func (p *Processor) EnqueueTask(ctx context.Context, task *asynq.Task, opts ...asynq.Option) error

EnqueueTask enqueues a task to the specified queue

func (*Processor) ExtractStructlogs added in v0.0.7

func (p *Processor) ExtractStructlogs(ctx context.Context, block *types.Block, index int, tx *types.Transaction) ([]Structlog, error)

ExtractStructlogs extracts structlog data from a transaction without inserting to database

func (*Processor) GetHandlers

func (p *Processor) GetHandlers() map[string]asynq.HandlerFunc

GetHandlers returns the task handlers for this processor

func (*Processor) GetQueues

func (p *Processor) GetQueues() []c.QueueInfo

GetQueues returns the queues used by this processor

func (*Processor) Name

func (p *Processor) Name() string

Name returns the processor name

func (*Processor) ProcessNextBlock

func (p *Processor) ProcessNextBlock(ctx context.Context) error

ProcessNextBlock processes the next available block

func (*Processor) ProcessSingleTransaction

func (p *Processor) ProcessSingleTransaction(ctx context.Context, block *types.Block, index int, tx *types.Transaction) (int, error)

ProcessSingleTransaction processes a single transaction using batch collector (exposed for worker handlers)

func (*Processor) SetProcessingMode

func (p *Processor) SetProcessingMode(mode string)

SetProcessingMode sets the processing mode for the processor

func (*Processor) Start

func (p *Processor) Start(ctx context.Context) error

Start starts the processor

func (*Processor) Stop

func (p *Processor) Stop(ctx context.Context) error

Stop stops the processor

func (*Processor) VerifyTransaction

func (p *Processor) VerifyTransaction(ctx context.Context, blockNumber *big.Int, transactionHash string, transactionIndex uint32, networkName string, insertedCount int) error

VerifyTransaction verifies that a transaction has been processed correctly

type Structlog

type Structlog struct {
	UpdatedDateTime        time.Time `json:"updated_date_time"`
	BlockNumber            uint64    `json:"block_number"`
	TransactionHash        string    `json:"transaction_hash"`
	TransactionIndex       uint32    `json:"transaction_index"`
	TransactionGas         uint64    `json:"transaction_gas"`
	TransactionFailed      bool      `json:"transaction_failed"`
	TransactionReturnValue *string   `json:"transaction_return_value"`
	Index                  uint32    `json:"index"`
	ProgramCounter         uint32    `json:"program_counter"`
	Operation              string    `json:"operation"`
	Gas                    uint64    `json:"gas"`
	GasCost                uint64    `json:"gas_cost"`
	Depth                  uint64    `json:"depth"`
	ReturnData             *string   `json:"return_data"`
	Refund                 *uint64   `json:"refund"`
	Error                  *string   `json:"error"`
	MetaNetworkID          int32     `json:"meta_network_id"`
	MetaNetworkName        string    `json:"meta_network_name"`
}

type TaskBatch added in v0.0.7

type TaskBatch struct {
	Rows         []Structlog // All rows from one task
	ResponseChan chan error  // Channel to send result back to task
	TaskID       string      // Unique identifier for this task
}

TaskBatch represents a batch of structlog rows from a single task

type VerifyPayload

type VerifyPayload struct {
	BlockNumber      big.Int `json:"block_number"`
	TransactionHash  string  `json:"transaction_hash"`
	TransactionIndex uint32  `json:"transaction_index"`
	NetworkID        int32   `json:"network_id"`
	NetworkName      string  `json:"network_name"`
	Network          string  `json:"network"`         // Alias for NetworkName
	ProcessingMode   string  `json:"processing_mode"` // "forwards" or "backwards"
	InsertedCount    int     `json:"inserted_count"`
}

VerifyPayload represents the payload for verifying a transaction

func (*VerifyPayload) MarshalBinary

func (v *VerifyPayload) MarshalBinary() ([]byte, error)

MarshalBinary implements encoding.BinaryMarshaler

func (*VerifyPayload) UnmarshalBinary

func (v *VerifyPayload) UnmarshalBinary(data []byte) error

UnmarshalBinary implements encoding.BinaryUnmarshaler

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL