storage

package
v1.2.0-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 28, 2025 License: Apache-2.0 Imports: 35 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Cursor keys for tracking positions
	KeyCursorReorg   = "cursor:reorg"   // String: cursor:reorg:{chainId}
	KeyCursorPublish = "cursor:publish" // String: cursor:publish:{chainId}
	KeyCursorCommit  = "cursor:commit"  // String: cursor:commit:{chainId}
)

Redis key namespace constants for better organization and maintainability

Variables

View Source
var DEFAULT_MAX_ROWS_PER_INSERT = 100000
View Source
var ZERO_BYTES_10 = strings.Repeat("\x00", 10)
View Source
var ZERO_BYTES_42 = strings.Repeat("\x00", 42)
View Source
var ZERO_BYTES_66 = strings.Repeat("\x00", 66)

Functions

This section is empty.

Types

type BadgerBlockBuffer

type BadgerBlockBuffer struct {
	// contains filtered or unexported fields
}

BadgerBlockBuffer manages buffering of block data using Badger as an ephemeral cache

func NewBadgerBlockBuffer

func NewBadgerBlockBuffer(maxSizeMB int64, maxBlocks int) (*BadgerBlockBuffer, error)

NewBadgerBlockBuffer creates a new Badger-backed block buffer with ephemeral storage

func (*BadgerBlockBuffer) Add

func (b *BadgerBlockBuffer) Add(blocks []common.BlockData) bool

Add adds blocks to the buffer and returns true if flush is needed

func (*BadgerBlockBuffer) Clear

func (b *BadgerBlockBuffer) Clear()

Clear empties the buffer without returning data

func (*BadgerBlockBuffer) Close

func (b *BadgerBlockBuffer) Close() error

Close closes the buffer and cleans up resources

func (*BadgerBlockBuffer) Flush

func (b *BadgerBlockBuffer) Flush() []common.BlockData

Flush removes all data from the buffer and returns it

func (*BadgerBlockBuffer) GetBlockByNumber

func (b *BadgerBlockBuffer) GetBlockByNumber(chainId *big.Int, blockNumber *big.Int) *common.BlockData

GetBlockByNumber returns a specific block from the buffer if it exists

func (*BadgerBlockBuffer) GetBlocksInRange

func (b *BadgerBlockBuffer) GetBlocksInRange(chainId *big.Int, startBlock, endBlock *big.Int) []common.BlockData

GetBlocksInRange returns blocks from the buffer that fall within the given range

func (*BadgerBlockBuffer) GetData

func (b *BadgerBlockBuffer) GetData() []common.BlockData

GetData returns a copy of the current buffer data

func (*BadgerBlockBuffer) GetMaxBlockNumber

func (b *BadgerBlockBuffer) GetMaxBlockNumber(chainId *big.Int) *big.Int

GetMaxBlockNumber returns the maximum block number for a chain in the buffer

func (*BadgerBlockBuffer) IsEmpty

func (b *BadgerBlockBuffer) IsEmpty() bool

IsEmpty returns true if the buffer is empty

func (*BadgerBlockBuffer) ShouldFlush

func (b *BadgerBlockBuffer) ShouldFlush() bool

ShouldFlush checks if the buffer should be flushed based on configured thresholds

func (*BadgerBlockBuffer) Size

func (b *BadgerBlockBuffer) Size() (int64, int)

Size returns the current buffer size in bytes and block count

func (*BadgerBlockBuffer) Stats

func (b *BadgerBlockBuffer) Stats() BufferStats

Stats returns statistics about the buffer

type BadgerConnector

type BadgerConnector struct {
	// contains filtered or unexported fields
}

func NewBadgerConnector

func NewBadgerConnector(cfg *config.BadgerConfig) (*BadgerConnector, error)

func (*BadgerConnector) Close

func (bc *BadgerConnector) Close() error

func (*BadgerConnector) DeleteBlockFailures

func (bc *BadgerConnector) DeleteBlockFailures(failures []common.BlockFailure) error

func (*BadgerConnector) DeleteStagingData

func (bc *BadgerConnector) DeleteStagingData(data []common.BlockData) error

func (*BadgerConnector) DeleteStagingDataOlderThan

func (bc *BadgerConnector) DeleteStagingDataOlderThan(chainId *big.Int, blockNumber *big.Int) error

func (*BadgerConnector) GetBlockFailures

func (bc *BadgerConnector) GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error)

IOrchestratorStorage implementation

func (*BadgerConnector) GetLastCommittedBlockNumber

func (bc *BadgerConnector) GetLastCommittedBlockNumber(chainId *big.Int) (*big.Int, error)

func (*BadgerConnector) GetLastPublishedBlockNumber

func (bc *BadgerConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error)

func (*BadgerConnector) GetLastReorgCheckedBlockNumber

func (bc *BadgerConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error)

func (*BadgerConnector) GetLastStagedBlockNumber

func (bc *BadgerConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (*big.Int, error)

func (*BadgerConnector) GetStagingData

func (bc *BadgerConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, error)

func (*BadgerConnector) InsertStagingData

func (bc *BadgerConnector) InsertStagingData(data []common.BlockData) error

IStagingStorage implementation

func (*BadgerConnector) SetLastCommittedBlockNumber

func (bc *BadgerConnector) SetLastCommittedBlockNumber(chainId *big.Int, blockNumber *big.Int) error

func (*BadgerConnector) SetLastPublishedBlockNumber

func (bc *BadgerConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error

func (*BadgerConnector) SetLastReorgCheckedBlockNumber

func (bc *BadgerConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error

func (*BadgerConnector) StoreBlockFailures

func (bc *BadgerConnector) StoreBlockFailures(failures []common.BlockFailure) error

type BalancesQueryFilter

type BalancesQueryFilter struct {
	ChainId      *big.Int
	TokenTypes   []string
	TokenAddress string
	Owner        string
	TokenIds     []*big.Int
	ZeroBalance  bool
	GroupBy      []string
	SortBy       string
	SortOrder    string
	Page         int
	Limit        int
	Offset       int
}

type BigInt

type BigInt struct {
	big.Int
}

func (BigInt) MarshalJSON

func (b BigInt) MarshalJSON() ([]byte, error)

type BlockBuffer

type BlockBuffer struct {
	// contains filtered or unexported fields
}

BlockBuffer manages buffering of block data with size and count limits

func NewBlockBuffer

func NewBlockBuffer(maxSizeMB int64, maxBlocks int) *BlockBuffer

NewBlockBuffer creates a new in-memory block buffer

func (*BlockBuffer) Add

func (b *BlockBuffer) Add(blocks []common.BlockData) bool

Add adds blocks to the buffer and returns true if flush is needed

func (*BlockBuffer) Clear

func (b *BlockBuffer) Clear()

Clear empties the buffer without returning data

func (*BlockBuffer) Close

func (b *BlockBuffer) Close() error

Close closes the buffer (no-op for in-memory buffer)

func (*BlockBuffer) Flush

func (b *BlockBuffer) Flush() []common.BlockData

Flush removes all data from the buffer and returns it

func (*BlockBuffer) GetBlockByNumber

func (b *BlockBuffer) GetBlockByNumber(chainId *big.Int, blockNumber *big.Int) *common.BlockData

GetBlockByNumber returns a specific block from the buffer if it exists

func (*BlockBuffer) GetBlocksInRange

func (b *BlockBuffer) GetBlocksInRange(chainId *big.Int, startBlock, endBlock *big.Int) []common.BlockData

GetBlocksInRange returns blocks from the buffer that fall within the given range

func (*BlockBuffer) GetData

func (b *BlockBuffer) GetData() []common.BlockData

GetData returns a copy of the current buffer data

func (*BlockBuffer) GetMaxBlockNumber

func (b *BlockBuffer) GetMaxBlockNumber(chainId *big.Int) *big.Int

GetMaxBlockNumber returns the maximum block number for a chain in the buffer

func (*BlockBuffer) IsEmpty

func (b *BlockBuffer) IsEmpty() bool

IsEmpty returns true if the buffer is empty

func (*BlockBuffer) ShouldFlush

func (b *BlockBuffer) ShouldFlush() bool

ShouldFlush checks if the buffer should be flushed based on configured thresholds

func (*BlockBuffer) Size

func (b *BlockBuffer) Size() (int64, int)

Size returns the current buffer size in bytes and block count

func (*BlockBuffer) Stats

func (b *BlockBuffer) Stats() BufferStats

Stats returns statistics about the buffer

type BufferStats

type BufferStats struct {
	BlockCount int
	SizeBytes  int64
	ChainCount int
	ChainStats map[uint64]ChainStats
}

BufferStats contains statistics about the buffer

func (BufferStats) String

func (s BufferStats) String() string

String returns a string representation of buffer stats

type ChainMetadata

type ChainMetadata struct {
	MinBlock   *big.Int
	MaxBlock   *big.Int
	BlockCount int
}

ChainMetadata tracks per-chain statistics for fast lookups

type ChainStats

type ChainStats struct {
	BlockCount int
	MinBlock   *big.Int
	MaxBlock   *big.Int
}

ChainStats contains per-chain statistics

type ClickHouseConnector

type ClickHouseConnector struct {
	// contains filtered or unexported fields
}

func NewClickHouseConnector

func NewClickHouseConnector(cfg *config.ClickhouseConfig) (*ClickHouseConnector, error)

func (*ClickHouseConnector) Close

func (c *ClickHouseConnector) Close() error

Close closes the ClickHouse connection

func (*ClickHouseConnector) DeleteBlockFailures

func (c *ClickHouseConnector) DeleteBlockFailures(failures []common.BlockFailure) error

func (*ClickHouseConnector) DeleteStagingData

func (c *ClickHouseConnector) DeleteStagingData(data []common.BlockData) error

func (*ClickHouseConnector) DeleteStagingDataOlderThan

func (c *ClickHouseConnector) DeleteStagingDataOlderThan(chainId *big.Int, blockNumber *big.Int) error

func (*ClickHouseConnector) FindMissingBlockNumbers

func (c *ClickHouseConnector) FindMissingBlockNumbers(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (blockNumbers []*big.Int, err error)

func (*ClickHouseConnector) GetAggregations

func (c *ClickHouseConnector) GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error)

func (*ClickHouseConnector) GetBlockCount

func (c *ClickHouseConnector) GetBlockCount(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (blockCount *big.Int, err error)

func (*ClickHouseConnector) GetBlockFailures

func (c *ClickHouseConnector) GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error)

func (*ClickHouseConnector) GetBlockHeadersDescending

func (c *ClickHouseConnector) GetBlockHeadersDescending(chainId *big.Int, from *big.Int, to *big.Int) (blockHeaders []common.BlockHeader, err error)

func (*ClickHouseConnector) GetBlocks

func (c *ClickHouseConnector) GetBlocks(qf QueryFilter, fields ...string) (QueryResult[common.Block], error)

func (*ClickHouseConnector) GetFullBlockData

func (c *ClickHouseConnector) GetFullBlockData(chainId *big.Int, blockNumbers []*big.Int) (blocks []common.BlockData, err error)

func (*ClickHouseConnector) GetLastCommittedBlockNumber

func (c *ClickHouseConnector) GetLastCommittedBlockNumber(chainId *big.Int) (*big.Int, error)

func (*ClickHouseConnector) GetLastPublishedBlockNumber

func (c *ClickHouseConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error)

func (*ClickHouseConnector) GetLastReorgCheckedBlockNumber

func (c *ClickHouseConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error)

func (*ClickHouseConnector) GetLastStagedBlockNumber

func (c *ClickHouseConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error)

func (*ClickHouseConnector) GetLogs

func (c *ClickHouseConnector) GetLogs(qf QueryFilter, fields ...string) (QueryResult[common.Log], error)

func (*ClickHouseConnector) GetMaxBlockNumber

func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)

func (*ClickHouseConnector) GetMaxBlockNumberInRange

func (c *ClickHouseConnector) GetMaxBlockNumberInRange(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (maxBlockNumber *big.Int, err error)

func (*ClickHouseConnector) GetStagingData

func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, error)

func (*ClickHouseConnector) GetTokenBalances

func (c *ClickHouseConnector) GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error)

func (*ClickHouseConnector) GetTokenTransfers

func (c *ClickHouseConnector) GetTokenTransfers(qf TransfersQueryFilter, fields ...string) (QueryResult[common.TokenTransfer], error)

func (*ClickHouseConnector) GetTraces

func (c *ClickHouseConnector) GetTraces(qf QueryFilter, fields ...string) (QueryResult[common.Trace], error)

func (*ClickHouseConnector) GetTransactions

func (c *ClickHouseConnector) GetTransactions(qf QueryFilter, fields ...string) (QueryResult[common.Transaction], error)

func (*ClickHouseConnector) GetValidationBlockData

func (c *ClickHouseConnector) GetValidationBlockData(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (blocks []common.BlockData, err error)

func (*ClickHouseConnector) InsertBlockData

func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error

func (*ClickHouseConnector) InsertStagingData

func (c *ClickHouseConnector) InsertStagingData(data []common.BlockData) error

func (*ClickHouseConnector) ReplaceBlockData

func (c *ClickHouseConnector) ReplaceBlockData(data []common.BlockData) ([]common.BlockData, error)

func (*ClickHouseConnector) SetLastCommittedBlockNumber

func (c *ClickHouseConnector) SetLastCommittedBlockNumber(chainId *big.Int, blockNumber *big.Int) error

func (*ClickHouseConnector) SetLastPublishedBlockNumber

func (c *ClickHouseConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error

func (*ClickHouseConnector) SetLastReorgCheckedBlockNumber

func (c *ClickHouseConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error

func (*ClickHouseConnector) StoreBlockFailures

func (c *ClickHouseConnector) StoreBlockFailures(failures []common.BlockFailure) error

func (*ClickHouseConnector) TestQueryGeneration

func (c *ClickHouseConnector) TestQueryGeneration(table, columns string, qf QueryFilter) string

Helper function to test query generation

type DataFormatter

type DataFormatter interface {
	FormatBlockData(data []common.BlockData) ([]byte, error)
	GetFileExtension() string
	GetContentType() string
}

DataFormatter interface for different file formats

type IBlockBuffer

type IBlockBuffer interface {
	Add(blocks []common.BlockData) bool
	Flush() []common.BlockData
	ShouldFlush() bool
	Size() (int64, int)
	IsEmpty() bool
	GetData() []common.BlockData
	GetBlocksInRange(chainId *big.Int, startBlock, endBlock *big.Int) []common.BlockData
	GetBlockByNumber(chainId *big.Int, blockNumber *big.Int) *common.BlockData
	GetMaxBlockNumber(chainId *big.Int) *big.Int
	Clear()
	Stats() BufferStats
	Close() error
}

IBlockBuffer defines the interface for block buffer implementations

func NewBlockBufferWithBadger

func NewBlockBufferWithBadger(maxSizeMB int64, maxBlocks int) (IBlockBuffer, error)

NewBlockBufferWithBadger creates a new Badger-backed block buffer for better memory management This uses ephemeral storage with optimized settings for caching

type IMainStorage

type IMainStorage interface {
	InsertBlockData(data []common.BlockData) error
	ReplaceBlockData(data []common.BlockData) ([]common.BlockData, error)

	GetBlocks(qf QueryFilter, fields ...string) (blocks QueryResult[common.Block], err error)
	GetTransactions(qf QueryFilter, fields ...string) (transactions QueryResult[common.Transaction], err error)
	GetLogs(qf QueryFilter, fields ...string) (logs QueryResult[common.Log], err error)
	GetTraces(qf QueryFilter, fields ...string) (traces QueryResult[common.Trace], err error)
	GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error)
	GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error)
	GetTokenTransfers(qf TransfersQueryFilter, fields ...string) (QueryResult[common.TokenTransfer], error)

	GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)
	GetMaxBlockNumberInRange(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (maxBlockNumber *big.Int, err error)
	GetBlockCount(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (blockCount *big.Int, err error)

	/**
	 * Get block headers ordered from latest to oldest.
	 */
	GetBlockHeadersDescending(chainId *big.Int, from *big.Int, to *big.Int) (blockHeaders []common.BlockHeader, err error)
	/**
	 * Gets only the data required for validation.
	 */
	GetValidationBlockData(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (blocks []common.BlockData, err error)
	/**
	 * Finds missing block numbers in a range. Block numbers should be sequential.
	 */
	FindMissingBlockNumbers(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (blockNumbers []*big.Int, err error)
	/**
	 * Gets full block data with transactions, logs and traces.
	 */
	GetFullBlockData(chainId *big.Int, blockNumbers []*big.Int) (blocks []common.BlockData, err error)

	Close() error
}

func NewMainConnector

func NewMainConnector(cfg *config.StorageMainConfig, orchestratorStorage *IOrchestratorStorage) (IMainStorage, error)

type IOrchestratorStorage

type IOrchestratorStorage interface {
	GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error)
	SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
	GetLastPublishedBlockNumber(chainId *big.Int) (blockNumber *big.Int, err error)
	SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
	GetLastCommittedBlockNumber(chainId *big.Int) (blockNumber *big.Int, err error)
	SetLastCommittedBlockNumber(chainId *big.Int, blockNumber *big.Int) error

	Close() error
}

The orchestartor storage is a persisted key/value store

type IStagingStorage

type IStagingStorage interface {
	// Staging block data
	InsertStagingData(data []common.BlockData) error
	GetStagingData(qf QueryFilter) (data []common.BlockData, err error)
	GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error)
	DeleteStagingData(data []common.BlockData) error
	DeleteStagingDataOlderThan(chainId *big.Int, blockNumber *big.Int) error

	// Block failures
	GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error)
	StoreBlockFailures(failures []common.BlockFailure) error
	DeleteBlockFailures(failures []common.BlockFailure) error

	Close() error
}

The staging storage is a emphemeral block data store

func NewStagingConnector

func NewStagingConnector(cfg *config.StorageStagingConfig) (IStagingStorage, error)

type IStorage

type IStorage struct {
	OrchestratorStorage IOrchestratorStorage
	MainStorage         IMainStorage
	StagingStorage      IStagingStorage
}

func NewStorageConnector

func NewStorageConnector(cfg *config.StorageConfig) (IStorage, error)

func (*IStorage) Close

func (s *IStorage) Close() error

Close closes all storage connections

type InsertOptions

type InsertOptions struct {
	AsDeleted bool
}

type KafkaConnector

type KafkaConnector struct {
	// contains filtered or unexported fields
}

KafkaConnector uses Redis for metadata storage and Kafka for block data delivery

func NewKafkaConnector

func NewKafkaConnector(cfg *config.KafkaConfig, orchestratorStorage *IOrchestratorStorage) (*KafkaConnector, error)

func (*KafkaConnector) Close

func (kr *KafkaConnector) Close() error

Close closes the Redis connection

func (*KafkaConnector) FindMissingBlockNumbers

func (kr *KafkaConnector) FindMissingBlockNumbers(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) ([]*big.Int, error)

func (*KafkaConnector) GetAggregations

func (kr *KafkaConnector) GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error)

func (*KafkaConnector) GetBlockCount

func (kr *KafkaConnector) GetBlockCount(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (*big.Int, error)

func (*KafkaConnector) GetBlockHeadersDescending

func (kr *KafkaConnector) GetBlockHeadersDescending(chainId *big.Int, from *big.Int, to *big.Int) ([]common.BlockHeader, error)

func (*KafkaConnector) GetBlocks

func (kr *KafkaConnector) GetBlocks(qf QueryFilter, fields ...string) (QueryResult[common.Block], error)

Query methods return errors as this is a write-only connector for streaming

func (*KafkaConnector) GetFullBlockData

func (kr *KafkaConnector) GetFullBlockData(chainId *big.Int, blockNumbers []*big.Int) ([]common.BlockData, error)

func (*KafkaConnector) GetLogs

func (kr *KafkaConnector) GetLogs(qf QueryFilter, fields ...string) (QueryResult[common.Log], error)

func (*KafkaConnector) GetMaxBlockNumber

func (kr *KafkaConnector) GetMaxBlockNumber(chainId *big.Int) (*big.Int, error)

func (*KafkaConnector) GetMaxBlockNumberInRange

func (kr *KafkaConnector) GetMaxBlockNumberInRange(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (*big.Int, error)

func (*KafkaConnector) GetTokenBalances

func (kr *KafkaConnector) GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error)

func (*KafkaConnector) GetTokenTransfers

func (kr *KafkaConnector) GetTokenTransfers(qf TransfersQueryFilter, fields ...string) (QueryResult[common.TokenTransfer], error)

func (*KafkaConnector) GetTraces

func (kr *KafkaConnector) GetTraces(qf QueryFilter, fields ...string) (QueryResult[common.Trace], error)

func (*KafkaConnector) GetTransactions

func (kr *KafkaConnector) GetTransactions(qf QueryFilter, fields ...string) (QueryResult[common.Transaction], error)

func (*KafkaConnector) GetValidationBlockData

func (kr *KafkaConnector) GetValidationBlockData(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) ([]common.BlockData, error)

func (*KafkaConnector) InsertBlockData

func (kr *KafkaConnector) InsertBlockData(data []common.BlockData) error

InsertBlockData publishes block data to Kafka instead of storing in database

func (*KafkaConnector) ReplaceBlockData

func (kr *KafkaConnector) ReplaceBlockData(data []common.BlockData) ([]common.BlockData, error)

ReplaceBlockData handles reorg by publishing both old and new data to Kafka

type KafkaPublisher

type KafkaPublisher struct {
	// contains filtered or unexported fields
}

func NewKafkaPublisher

func NewKafkaPublisher(cfg *config.KafkaConfig) (*KafkaPublisher, error)

NewKafkaPublisher method for storage connector (public)

func (*KafkaPublisher) Close

func (p *KafkaPublisher) Close() error

func (*KafkaPublisher) PublishBlockData

func (p *KafkaPublisher) PublishBlockData(blockData []common.BlockData) error

func (*KafkaPublisher) PublishReorg

func (p *KafkaPublisher) PublishReorg(oldData []common.BlockData, newData []common.BlockData) error

type MessageType

type MessageType string

type ParquetBlockData

type ParquetBlockData struct {
	ChainId        uint64 `parquet:"chain_id"`
	BlockNumber    uint64 `parquet:"block_number"` // Numeric for efficient min/max queries
	BlockHash      string `parquet:"block_hash"`
	BlockTimestamp int64  `parquet:"block_timestamp"`
	Block          []byte `parquet:"block_json"`
	Transactions   []byte `parquet:"transactions_json"`
	Logs           []byte `parquet:"logs_json"`
	Traces         []byte `parquet:"traces_json"`
}

ParquetBlockData represents the complete block data in Parquet format

type ParquetFormatter

type ParquetFormatter struct {
	// contains filtered or unexported fields
}

ParquetFormatter implements DataFormatter for Parquet format

func (*ParquetFormatter) FormatBlockData

func (f *ParquetFormatter) FormatBlockData(data []common.BlockData) ([]byte, error)

func (*ParquetFormatter) GetContentType

func (f *ParquetFormatter) GetContentType() string

func (*ParquetFormatter) GetFileExtension

func (f *ParquetFormatter) GetFileExtension() string

type PostgresConnector

type PostgresConnector struct {
	// contains filtered or unexported fields
}

func NewPostgresConnector

func NewPostgresConnector(cfg *config.PostgresConfig) (*PostgresConnector, error)

func (*PostgresConnector) Close

func (p *PostgresConnector) Close() error

Close closes the database connection

func (*PostgresConnector) DeleteBlockFailures

func (p *PostgresConnector) DeleteBlockFailures(failures []common.BlockFailure) error

func (*PostgresConnector) DeleteStagingData

func (p *PostgresConnector) DeleteStagingData(data []common.BlockData) error

func (*PostgresConnector) DeleteStagingDataOlderThan

func (p *PostgresConnector) DeleteStagingDataOlderThan(chainId *big.Int, blockNumber *big.Int) error

func (*PostgresConnector) GetBlockFailures

func (p *PostgresConnector) GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error)

func (*PostgresConnector) GetLastCommittedBlockNumber

func (p *PostgresConnector) GetLastCommittedBlockNumber(chainId *big.Int) (*big.Int, error)

func (*PostgresConnector) GetLastPublishedBlockNumber

func (p *PostgresConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error)

func (*PostgresConnector) GetLastReorgCheckedBlockNumber

func (p *PostgresConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error)

func (*PostgresConnector) GetLastStagedBlockNumber

func (p *PostgresConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (*big.Int, error)

func (*PostgresConnector) GetStagingData

func (p *PostgresConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, error)

func (*PostgresConnector) InsertStagingData

func (p *PostgresConnector) InsertStagingData(data []common.BlockData) error

func (*PostgresConnector) SetLastCommittedBlockNumber

func (p *PostgresConnector) SetLastCommittedBlockNumber(chainId *big.Int, blockNumber *big.Int) error

func (*PostgresConnector) SetLastPublishedBlockNumber

func (p *PostgresConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error

func (*PostgresConnector) SetLastReorgCheckedBlockNumber

func (p *PostgresConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error

func (*PostgresConnector) StoreBlockFailures

func (p *PostgresConnector) StoreBlockFailures(failures []common.BlockFailure) error

type PublishableData

type PublishableData interface {
	GetType() MessageType
}

type PublishableMessageBlockData

type PublishableMessageBlockData struct {
	common.BlockData
	ChainId         uint64    `json:"chain_id"`
	IsDeleted       int8      `json:"is_deleted"`
	InsertTimestamp time.Time `json:"insert_timestamp"`
}

func (PublishableMessageBlockData) GetType

type PublishableMessagePayload

type PublishableMessagePayload struct {
	Data      PublishableData `json:"data"`
	Type      MessageType     `json:"type"`
	Timestamp time.Time       `json:"timestamp"`
}

type PublishableMessageRevert

type PublishableMessageRevert struct {
	ChainId         uint64    `json:"chain_id"`
	BlockNumber     uint64    `json:"block_number"`
	IsDeleted       int8      `json:"is_deleted"`
	InsertTimestamp time.Time `json:"insert_timestamp"`
}

func (PublishableMessageRevert) GetType

type QueryFilter

type QueryFilter struct {
	ChainId             *big.Int
	BlockNumbers        []*big.Int
	StartBlock          *big.Int
	EndBlock            *big.Int
	FilterParams        map[string]string
	GroupBy             []string
	SortBy              string
	SortOrder           string
	Page                int
	Limit               int
	Offset              int
	Aggregates          []string // e.g., ["COUNT(*) AS count", "SUM(amount) AS total_amount"]
	FromAddress         string
	ContractAddress     string
	WalletAddress       string
	Signature           string
	ForceConsistentData bool
}

type QueryResult

type QueryResult[T any] struct {
	// TODO: findout how to only allow Log/transaction arrays or split the result
	Data       []T                      `json:"data"`
	Aggregates []map[string]interface{} `json:"aggregates"`
}

type RedisConnector

type RedisConnector struct {
	// contains filtered or unexported fields
}

RedisConnector uses Redis for metadata storage

func NewRedisConnector

func NewRedisConnector(cfg *config.RedisConfig) (*RedisConnector, error)

func (*RedisConnector) Close

func (kr *RedisConnector) Close() error

Close closes the Redis connection

func (*RedisConnector) GetLastCommittedBlockNumber

func (kr *RedisConnector) GetLastCommittedBlockNumber(chainId *big.Int) (*big.Int, error)

func (*RedisConnector) GetLastPublishedBlockNumber

func (kr *RedisConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error)

func (*RedisConnector) GetLastReorgCheckedBlockNumber

func (kr *RedisConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error)

Orchestrator Storage Implementation

func (*RedisConnector) SetLastCommittedBlockNumber

func (kr *RedisConnector) SetLastCommittedBlockNumber(chainId *big.Int, blockNumber *big.Int) error

func (*RedisConnector) SetLastPublishedBlockNumber

func (kr *RedisConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error

func (*RedisConnector) SetLastReorgCheckedBlockNumber

func (kr *RedisConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error

type S3Connector

type S3Connector struct {
	// contains filtered or unexported fields
}

func NewS3Connector

func NewS3Connector(cfg *config.S3StorageConfig) (*S3Connector, error)

func (*S3Connector) Close

func (s *S3Connector) Close() error

Close closes the S3 connector and flushes any remaining data

func (*S3Connector) FindMissingBlockNumbers

func (s *S3Connector) FindMissingBlockNumbers(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) ([]*big.Int, error)

func (*S3Connector) Flush

func (s *S3Connector) Flush() error

Flush manually triggers a buffer flush and waits for completion

func (*S3Connector) GetAggregations

func (s *S3Connector) GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error)

func (*S3Connector) GetBlockCount

func (s *S3Connector) GetBlockCount(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (*big.Int, error)

func (*S3Connector) GetBlockHeadersDescending

func (s *S3Connector) GetBlockHeadersDescending(chainId *big.Int, from *big.Int, to *big.Int) ([]common.BlockHeader, error)

func (*S3Connector) GetBlocks

func (s *S3Connector) GetBlocks(qf QueryFilter, fields ...string) (QueryResult[common.Block], error)

func (*S3Connector) GetFullBlockData

func (s *S3Connector) GetFullBlockData(chainId *big.Int, blockNumbers []*big.Int) ([]common.BlockData, error)

func (*S3Connector) GetLogs

func (s *S3Connector) GetLogs(qf QueryFilter, fields ...string) (QueryResult[common.Log], error)

func (*S3Connector) GetMaxBlockNumber

func (s *S3Connector) GetMaxBlockNumber(chainId *big.Int) (*big.Int, error)

func (*S3Connector) GetMaxBlockNumberInRange

func (s *S3Connector) GetMaxBlockNumberInRange(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (*big.Int, error)

func (*S3Connector) GetTokenBalances

func (s *S3Connector) GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error)

func (*S3Connector) GetTokenTransfers

func (s *S3Connector) GetTokenTransfers(qf TransfersQueryFilter, fields ...string) (QueryResult[common.TokenTransfer], error)

func (*S3Connector) GetTraces

func (s *S3Connector) GetTraces(qf QueryFilter, fields ...string) (QueryResult[common.Trace], error)

func (*S3Connector) GetTransactions

func (s *S3Connector) GetTransactions(qf QueryFilter, fields ...string) (QueryResult[common.Transaction], error)

func (*S3Connector) GetValidationBlockData

func (s *S3Connector) GetValidationBlockData(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) ([]common.BlockData, error)

func (*S3Connector) InsertBlockData

func (s *S3Connector) InsertBlockData(data []common.BlockData) error

func (*S3Connector) ReplaceBlockData

func (s *S3Connector) ReplaceBlockData(data []common.BlockData) ([]common.BlockData, error)

type TransfersQueryFilter

type TransfersQueryFilter struct {
	ChainId          *big.Int
	TokenTypes       []string
	TokenAddress     string
	WalletAddress    string
	TokenIds         []*big.Int
	TransactionHash  string
	StartBlockNumber *big.Int
	EndBlockNumber   *big.Int
	GroupBy          []string
	SortBy           string
	SortOrder        string // "ASC" or "DESC"
	Page             int
	Limit            int
	Offset           int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL