structlog

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2025 License: GPL-3.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ProcessorName            = "transaction_structlog"
	ProcessForwardsTaskType  = "transaction_structlog_process_forwards"
	ProcessBackwardsTaskType = "transaction_structlog_process_backwards"
	VerifyForwardsTaskType   = "transaction_structlog_verify_forwards"
	VerifyBackwardsTaskType  = "transaction_structlog_verify_backwards"
)

Variables

This section is empty.

Functions

func NewProcessBackwardsTask

func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, error)

NewProcessBackwardsTask creates a new backwards process task.

func NewProcessForwardsTask

func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, error)

NewProcessForwardsTask creates a new forwards process task.

func NewVerifyBackwardsTask

func NewVerifyBackwardsTask(payload *VerifyPayload) (*asynq.Task, error)

NewVerifyBackwardsTask creates a new backwards verify task.

func NewVerifyForwardsTask

func NewVerifyForwardsTask(payload *VerifyPayload) (*asynq.Task, error)

NewVerifyForwardsTask creates a new forwards verify task.

Types

type BatchItem added in v0.1.1

type BatchItem struct {
	// contains filtered or unexported fields
}

BatchItem represents a single transaction's data within a batch.

type BatchManager added in v0.1.1

type BatchManager struct {
	// contains filtered or unexported fields
}

BatchManager accumulates small transactions and flushes them in batches.

func NewBatchManager added in v0.1.1

func NewBatchManager(processor *Processor, config *Config) *BatchManager

NewBatchManager creates a new batch manager.

func (*BatchManager) Add added in v0.1.1

func (bm *BatchManager) Add(structlogs []Structlog, task *asynq.Task, payload *ProcessPayload) error

Add adds a transaction's structlogs to the batch.

func (*BatchManager) Flush added in v0.1.1

func (bm *BatchManager) Flush()

Flush performs a batch flush (thread-safe).

func (*BatchManager) FlushIfNeeded added in v0.1.1

func (bm *BatchManager) FlushIfNeeded()

FlushIfNeeded checks if the batch should be flushed based on size threshold.

func (*BatchManager) Start added in v0.1.1

func (bm *BatchManager) Start() error

Start begins the batch manager's flush goroutine.

func (*BatchManager) Stop added in v0.1.1

func (bm *BatchManager) Stop()

Stop gracefully shuts down the batch manager.

type BigTransactionManager added in v0.1.1

type BigTransactionManager struct {
	// contains filtered or unexported fields
}

BigTransactionManager coordinates processing of large transactions to prevent OOM.

func NewBigTransactionManager added in v0.1.1

func NewBigTransactionManager(threshold int, log logrus.FieldLogger) *BigTransactionManager

NewBigTransactionManager creates a new manager for big transaction coordination.

func (*BigTransactionManager) GetThreshold added in v0.1.1

func (btm *BigTransactionManager) GetThreshold() int

GetThreshold returns the threshold for big transactions.

func (*BigTransactionManager) RegisterBigTransaction added in v0.1.1

func (btm *BigTransactionManager) RegisterBigTransaction(txHash string, processor *Processor)

RegisterBigTransaction marks a big transaction as actively processing.

func (*BigTransactionManager) ShouldPauseNewWork added in v0.1.1

func (btm *BigTransactionManager) ShouldPauseNewWork() bool

ShouldPauseNewWork returns true if new work should be paused.

func (*BigTransactionManager) UnregisterBigTransaction added in v0.1.1

func (btm *BigTransactionManager) UnregisterBigTransaction(txHash string)

UnregisterBigTransaction marks a big transaction as complete.

type ClickHouseTime added in v0.1.1

type ClickHouseTime struct {
	time.Time
}

ClickHouseTime wraps time.Time to provide custom JSON marshaling for ClickHouse.

func NewClickHouseTime added in v0.1.1

func NewClickHouseTime(t time.Time) ClickHouseTime

NewClickHouseTime creates a new ClickHouseTime from a time.Time.

func (ClickHouseTime) MarshalJSON added in v0.1.1

func (t ClickHouseTime) MarshalJSON() ([]byte, error)

MarshalJSON formats the time for ClickHouse DateTime format.

func (*ClickHouseTime) UnmarshalJSON added in v0.1.1

func (t *ClickHouseTime) UnmarshalJSON(data []byte) error

UnmarshalJSON parses the time from ClickHouse DateTime format.

type Config

type Config struct {
	clickhouse.Config `yaml:",inline"`
	Enabled           bool   `yaml:"enabled"`
	Table             string `yaml:"table"`

	// Big transaction handling
	BigTransactionThreshold int `yaml:"bigTransactionThreshold"` // Default: 500,000
	ChunkSize               int `yaml:"chunkSize"`               // Default: 10,000
	ChannelBufferSize       int `yaml:"channelBufferSize"`       // Default: 2
	ProgressLogThreshold    int `yaml:"progressLogThreshold"`    // Default: 100,000

	// Batch processing configuration
	// BatchInsertThreshold is the minimum number of structlogs to accumulate before batch insert
	// Transactions with more structlogs than this will bypass batching
	BatchInsertThreshold int64 `yaml:"batchInsertThreshold" default:"50000"`

	// BatchFlushInterval is the maximum time to wait before flushing a batch
	BatchFlushInterval time.Duration `yaml:"batchFlushInterval" default:"5s"`

	// BatchMaxSize is the maximum number of structlogs to accumulate in a batch
	BatchMaxSize int64 `yaml:"batchMaxSize" default:"100000"`
}

Config holds configuration for transaction structlog processor.

func (*Config) Validate

func (c *Config) Validate() error

type CountMismatchError added in v0.0.5

type CountMismatchError struct {
	Expected int
	Actual   int
	Message  string
}

CountMismatchError represents a structlog count mismatch between expected and actual counts.

func (*CountMismatchError) Error added in v0.0.5

func (e *CountMismatchError) Error() string

type Dependencies

type Dependencies struct {
	Log         logrus.FieldLogger
	Pool        *ethereum.Pool
	Network     *ethereum.Network
	State       *state.Manager
	AsynqClient *asynq.Client
	RedisPrefix string
}

Dependencies contains the dependencies needed for the processor.

type ProcessPayload

type ProcessPayload struct {
	BlockNumber      big.Int `json:"block_number"`
	TransactionHash  string  `json:"transaction_hash"`
	TransactionIndex uint32  `json:"transaction_index"`
	NetworkID        int32   `json:"network_id"`
	NetworkName      string  `json:"network_name"`
	Network          string  `json:"network"`         // Alias for NetworkName
	ProcessingMode   string  `json:"processing_mode"` // "forwards" or "backwards"
}

ProcessPayload represents the payload for processing a transaction.

func (*ProcessPayload) MarshalBinary

func (p *ProcessPayload) MarshalBinary() ([]byte, error)

MarshalBinary implements encoding.BinaryMarshaler.

func (*ProcessPayload) UnmarshalBinary

func (p *ProcessPayload) UnmarshalBinary(data []byte) error

UnmarshalBinary implements encoding.BinaryUnmarshaler.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor handles transaction structlog processing.

func New

func New(ctx context.Context, deps *Dependencies, config *Config) (*Processor, error)

New creates a new transaction structlog processor.

func (*Processor) EnqueueTask

func (p *Processor) EnqueueTask(ctx context.Context, task *asynq.Task, opts ...asynq.Option) error

EnqueueTask enqueues a task to the specified queue.

func (*Processor) EnqueueTransactionTasks added in v0.0.10

func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block *types.Block) (int, error)

enqueueTransactionTasks enqueues tasks for all transactions in a block. EnqueueTransactionTasks enqueues transaction processing tasks for a given block.

func (*Processor) ExtractStructlogs added in v0.0.7

func (p *Processor) ExtractStructlogs(ctx context.Context, block *types.Block, index int, tx *types.Transaction) ([]Structlog, error)

ExtractStructlogs extracts structlog data from a transaction without inserting to database.

func (*Processor) GetHandlers

func (p *Processor) GetHandlers() map[string]asynq.HandlerFunc

GetHandlers returns the task handlers for this processor.

func (*Processor) GetQueues

func (p *Processor) GetQueues() []c.QueueInfo

GetQueues returns the queues used by this processor.

func (*Processor) Name

func (p *Processor) Name() string

Name returns the processor name.

func (*Processor) ProcessNextBlock

func (p *Processor) ProcessNextBlock(ctx context.Context) error

ProcessNextBlock processes the next available block.

func (*Processor) ProcessSingleTransaction

func (p *Processor) ProcessSingleTransaction(ctx context.Context, block *types.Block, index int, tx *types.Transaction) (int, error)

ProcessSingleTransaction processes a single transaction and inserts its structlogs directly to ClickHouse.

func (*Processor) ProcessTransaction added in v0.1.1

func (p *Processor) ProcessTransaction(ctx context.Context, block *types.Block, index int, tx *types.Transaction) (int, error)

ProcessTransaction processes a transaction using memory-efficient channel-based batching.

func (*Processor) SetProcessingMode

func (p *Processor) SetProcessingMode(mode string)

SetProcessingMode sets the processing mode for the processor.

func (*Processor) ShouldBatch added in v0.1.1

func (p *Processor) ShouldBatch(structlogCount int64) bool

ShouldBatch determines if a transaction should be batched based on structlog count.

func (*Processor) Start

func (p *Processor) Start(ctx context.Context) error

Start starts the processor.

func (*Processor) Stop

func (p *Processor) Stop(ctx context.Context) error

Stop stops the processor.

func (*Processor) VerifyTransaction

func (p *Processor) VerifyTransaction(ctx context.Context, blockNumber *big.Int, transactionHash string, transactionIndex uint32, networkName string, insertedCount int) error

VerifyTransaction verifies that a transaction has been processed correctly.

type Structlog

type Structlog struct {
	UpdatedDateTime        ClickHouseTime `json:"updated_date_time"`
	BlockNumber            uint64         `json:"block_number"`
	TransactionHash        string         `json:"transaction_hash"`
	TransactionIndex       uint32         `json:"transaction_index"`
	TransactionGas         uint64         `json:"transaction_gas"`
	TransactionFailed      bool           `json:"transaction_failed"`
	TransactionReturnValue *string        `json:"transaction_return_value"`
	Index                  uint32         `json:"index"`
	ProgramCounter         uint32         `json:"program_counter"`
	Operation              string         `json:"operation"`
	Gas                    uint64         `json:"gas"`
	GasCost                uint64         `json:"gas_cost"`
	Depth                  uint64         `json:"depth"`
	ReturnData             *string        `json:"return_data"`
	Refund                 *uint64        `json:"refund"`
	Error                  *string        `json:"error"`
	CallToAddress          *string        `json:"call_to_address"`
	MetaNetworkID          int32          `json:"meta_network_id"`
	MetaNetworkName        string         `json:"meta_network_name"`
}

type VerifyPayload

type VerifyPayload struct {
	BlockNumber      big.Int `json:"block_number"`
	TransactionHash  string  `json:"transaction_hash"`
	TransactionIndex uint32  `json:"transaction_index"`
	NetworkID        int32   `json:"network_id"`
	NetworkName      string  `json:"network_name"`
	Network          string  `json:"network"`         // Alias for NetworkName
	ProcessingMode   string  `json:"processing_mode"` // "forwards" or "backwards"
	InsertedCount    int     `json:"inserted_count"`
}

VerifyPayload represents the payload for verifying a transaction.

func (*VerifyPayload) MarshalBinary

func (v *VerifyPayload) MarshalBinary() ([]byte, error)

MarshalBinary implements encoding.BinaryMarshaler.

func (*VerifyPayload) UnmarshalBinary

func (v *VerifyPayload) UnmarshalBinary(data []byte) error

UnmarshalBinary implements encoding.BinaryUnmarshaler.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL