Documentation
¶
Index ¶
- Constants
- Variables
- type BadgerBlockBuffer
- func (b *BadgerBlockBuffer) Add(blocks []common.BlockData) bool
- func (b *BadgerBlockBuffer) Clear()
- func (b *BadgerBlockBuffer) Close() error
- func (b *BadgerBlockBuffer) Flush() []common.BlockData
- func (b *BadgerBlockBuffer) GetBlockByNumber(chainId *big.Int, blockNumber *big.Int) *common.BlockData
- func (b *BadgerBlockBuffer) GetBlocksInRange(chainId *big.Int, startBlock, endBlock *big.Int) []common.BlockData
- func (b *BadgerBlockBuffer) GetData() []common.BlockData
- func (b *BadgerBlockBuffer) GetMaxBlockNumber(chainId *big.Int) *big.Int
- func (b *BadgerBlockBuffer) IsEmpty() bool
- func (b *BadgerBlockBuffer) ShouldFlush() bool
- func (b *BadgerBlockBuffer) Size() (int64, int)
- func (b *BadgerBlockBuffer) Stats() BufferStats
- type BadgerConnector
- func (bc *BadgerConnector) Close() error
- func (bc *BadgerConnector) DeleteBlockFailures(failures []common.BlockFailure) error
- func (bc *BadgerConnector) DeleteStagingData(data []common.BlockData) error
- func (bc *BadgerConnector) DeleteStagingDataOlderThan(chainId *big.Int, blockNumber *big.Int) error
- func (bc *BadgerConnector) GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error)
- func (bc *BadgerConnector) GetLastCommittedBlockNumber(chainId *big.Int) (*big.Int, error)
- func (bc *BadgerConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error)
- func (bc *BadgerConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error)
- func (bc *BadgerConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (*big.Int, error)
- func (bc *BadgerConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, error)
- func (bc *BadgerConnector) InsertStagingData(data []common.BlockData) error
- func (bc *BadgerConnector) SetLastCommittedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
- func (bc *BadgerConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
- func (bc *BadgerConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
- func (bc *BadgerConnector) StoreBlockFailures(failures []common.BlockFailure) error
- type BalancesQueryFilter
- type BigInt
- type BlockBuffer
- func (b *BlockBuffer) Add(blocks []common.BlockData) bool
- func (b *BlockBuffer) Clear()
- func (b *BlockBuffer) Close() error
- func (b *BlockBuffer) Flush() []common.BlockData
- func (b *BlockBuffer) GetBlockByNumber(chainId *big.Int, blockNumber *big.Int) *common.BlockData
- func (b *BlockBuffer) GetBlocksInRange(chainId *big.Int, startBlock, endBlock *big.Int) []common.BlockData
- func (b *BlockBuffer) GetData() []common.BlockData
- func (b *BlockBuffer) GetMaxBlockNumber(chainId *big.Int) *big.Int
- func (b *BlockBuffer) IsEmpty() bool
- func (b *BlockBuffer) ShouldFlush() bool
- func (b *BlockBuffer) Size() (int64, int)
- func (b *BlockBuffer) Stats() BufferStats
- type BufferStats
- type ChainMetadata
- type ChainStats
- type ClickHouseConnector
- func (c *ClickHouseConnector) Close() error
- func (c *ClickHouseConnector) DeleteBlockFailures(failures []common.BlockFailure) error
- func (c *ClickHouseConnector) DeleteStagingData(data []common.BlockData) error
- func (c *ClickHouseConnector) DeleteStagingDataOlderThan(chainId *big.Int, blockNumber *big.Int) error
- func (c *ClickHouseConnector) FindMissingBlockNumbers(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (blockNumbers []*big.Int, err error)
- func (c *ClickHouseConnector) GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error)
- func (c *ClickHouseConnector) GetBlockCount(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (blockCount *big.Int, err error)
- func (c *ClickHouseConnector) GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error)
- func (c *ClickHouseConnector) GetBlockHeadersDescending(chainId *big.Int, from *big.Int, to *big.Int) (blockHeaders []common.BlockHeader, err error)
- func (c *ClickHouseConnector) GetBlocks(qf QueryFilter, fields ...string) (QueryResult[common.Block], error)
- func (c *ClickHouseConnector) GetFullBlockData(chainId *big.Int, blockNumbers []*big.Int) (blocks []common.BlockData, err error)
- func (c *ClickHouseConnector) GetLastCommittedBlockNumber(chainId *big.Int) (*big.Int, error)
- func (c *ClickHouseConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error)
- func (c *ClickHouseConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error)
- func (c *ClickHouseConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error)
- func (c *ClickHouseConnector) GetLogs(qf QueryFilter, fields ...string) (QueryResult[common.Log], error)
- func (c *ClickHouseConnector) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error)
- func (c *ClickHouseConnector) GetMaxBlockNumberInRange(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (maxBlockNumber *big.Int, err error)
- func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, error)
- func (c *ClickHouseConnector) GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error)
- func (c *ClickHouseConnector) GetTokenTransfers(qf TransfersQueryFilter, fields ...string) (QueryResult[common.TokenTransfer], error)
- func (c *ClickHouseConnector) GetTraces(qf QueryFilter, fields ...string) (QueryResult[common.Trace], error)
- func (c *ClickHouseConnector) GetTransactions(qf QueryFilter, fields ...string) (QueryResult[common.Transaction], error)
- func (c *ClickHouseConnector) GetValidationBlockData(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (blocks []common.BlockData, err error)
- func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error
- func (c *ClickHouseConnector) InsertStagingData(data []common.BlockData) error
- func (c *ClickHouseConnector) ReplaceBlockData(data []common.BlockData) ([]common.BlockData, error)
- func (c *ClickHouseConnector) SetLastCommittedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
- func (c *ClickHouseConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
- func (c *ClickHouseConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
- func (c *ClickHouseConnector) StoreBlockFailures(failures []common.BlockFailure) error
- func (c *ClickHouseConnector) TestQueryGeneration(table, columns string, qf QueryFilter) string
- type DataFormatter
- type IBlockBuffer
- type IMainStorage
- type IOrchestratorStorage
- type IStagingStorage
- type IStorage
- type InsertOptions
- type KafkaConnector
- func (kr *KafkaConnector) Close() error
- func (kr *KafkaConnector) FindMissingBlockNumbers(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) ([]*big.Int, error)
- func (kr *KafkaConnector) GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error)
- func (kr *KafkaConnector) GetBlockCount(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (*big.Int, error)
- func (kr *KafkaConnector) GetBlockHeadersDescending(chainId *big.Int, from *big.Int, to *big.Int) ([]common.BlockHeader, error)
- func (kr *KafkaConnector) GetBlocks(qf QueryFilter, fields ...string) (QueryResult[common.Block], error)
- func (kr *KafkaConnector) GetFullBlockData(chainId *big.Int, blockNumbers []*big.Int) ([]common.BlockData, error)
- func (kr *KafkaConnector) GetLogs(qf QueryFilter, fields ...string) (QueryResult[common.Log], error)
- func (kr *KafkaConnector) GetMaxBlockNumber(chainId *big.Int) (*big.Int, error)
- func (kr *KafkaConnector) GetMaxBlockNumberInRange(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (*big.Int, error)
- func (kr *KafkaConnector) GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error)
- func (kr *KafkaConnector) GetTokenTransfers(qf TransfersQueryFilter, fields ...string) (QueryResult[common.TokenTransfer], error)
- func (kr *KafkaConnector) GetTraces(qf QueryFilter, fields ...string) (QueryResult[common.Trace], error)
- func (kr *KafkaConnector) GetTransactions(qf QueryFilter, fields ...string) (QueryResult[common.Transaction], error)
- func (kr *KafkaConnector) GetValidationBlockData(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) ([]common.BlockData, error)
- func (kr *KafkaConnector) InsertBlockData(data []common.BlockData) error
- func (kr *KafkaConnector) ReplaceBlockData(data []common.BlockData) ([]common.BlockData, error)
- type KafkaPublisher
- type MessageType
- type ParquetBlockData
- type ParquetFormatter
- type PostgresConnector
- func (p *PostgresConnector) Close() error
- func (p *PostgresConnector) DeleteBlockFailures(failures []common.BlockFailure) error
- func (p *PostgresConnector) DeleteStagingData(data []common.BlockData) error
- func (p *PostgresConnector) DeleteStagingDataOlderThan(chainId *big.Int, blockNumber *big.Int) error
- func (p *PostgresConnector) GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error)
- func (p *PostgresConnector) GetLastCommittedBlockNumber(chainId *big.Int) (*big.Int, error)
- func (p *PostgresConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error)
- func (p *PostgresConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error)
- func (p *PostgresConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (*big.Int, error)
- func (p *PostgresConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, error)
- func (p *PostgresConnector) InsertStagingData(data []common.BlockData) error
- func (p *PostgresConnector) SetLastCommittedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
- func (p *PostgresConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
- func (p *PostgresConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
- func (p *PostgresConnector) StoreBlockFailures(failures []common.BlockFailure) error
- type PublishableData
- type PublishableMessageBlockData
- type PublishableMessagePayload
- type PublishableMessageRevert
- type QueryFilter
- type QueryResult
- type RedisConnector
- func (kr *RedisConnector) Close() error
- func (kr *RedisConnector) GetLastCommittedBlockNumber(chainId *big.Int) (*big.Int, error)
- func (kr *RedisConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error)
- func (kr *RedisConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error)
- func (kr *RedisConnector) SetLastCommittedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
- func (kr *RedisConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
- func (kr *RedisConnector) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error
- type S3Connector
- func (s *S3Connector) Close() error
- func (s *S3Connector) FindMissingBlockNumbers(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) ([]*big.Int, error)
- func (s *S3Connector) Flush() error
- func (s *S3Connector) GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error)
- func (s *S3Connector) GetBlockCount(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (*big.Int, error)
- func (s *S3Connector) GetBlockHeadersDescending(chainId *big.Int, from *big.Int, to *big.Int) ([]common.BlockHeader, error)
- func (s *S3Connector) GetBlocks(qf QueryFilter, fields ...string) (QueryResult[common.Block], error)
- func (s *S3Connector) GetFullBlockData(chainId *big.Int, blockNumbers []*big.Int) ([]common.BlockData, error)
- func (s *S3Connector) GetLogs(qf QueryFilter, fields ...string) (QueryResult[common.Log], error)
- func (s *S3Connector) GetMaxBlockNumber(chainId *big.Int) (*big.Int, error)
- func (s *S3Connector) GetMaxBlockNumberInRange(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (*big.Int, error)
- func (s *S3Connector) GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error)
- func (s *S3Connector) GetTokenTransfers(qf TransfersQueryFilter, fields ...string) (QueryResult[common.TokenTransfer], error)
- func (s *S3Connector) GetTraces(qf QueryFilter, fields ...string) (QueryResult[common.Trace], error)
- func (s *S3Connector) GetTransactions(qf QueryFilter, fields ...string) (QueryResult[common.Transaction], error)
- func (s *S3Connector) GetValidationBlockData(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) ([]common.BlockData, error)
- func (s *S3Connector) InsertBlockData(data []common.BlockData) error
- func (s *S3Connector) ReplaceBlockData(data []common.BlockData) ([]common.BlockData, error)
- type TransfersQueryFilter
Constants ¶
const ( // Cursor keys for tracking positions KeyCursorReorg = "cursor:reorg" // String: cursor:reorg:{chainId} KeyCursorPublish = "cursor:publish" // String: cursor:publish:{chainId} KeyCursorCommit = "cursor:commit" // String: cursor:commit:{chainId} )
Redis key namespace constants for better organization and maintainability
Variables ¶
var DEFAULT_MAX_ROWS_PER_INSERT = 100000
var ZERO_BYTES_10 = strings.Repeat("\x00", 10)
var ZERO_BYTES_42 = strings.Repeat("\x00", 42)
var ZERO_BYTES_66 = strings.Repeat("\x00", 66)
Functions ¶
This section is empty.
Types ¶
type BadgerBlockBuffer ¶
type BadgerBlockBuffer struct {
// contains filtered or unexported fields
}
BadgerBlockBuffer manages buffering of block data using Badger as an ephemeral cache
func NewBadgerBlockBuffer ¶
func NewBadgerBlockBuffer(maxSizeMB int64, maxBlocks int) (*BadgerBlockBuffer, error)
NewBadgerBlockBuffer creates a new Badger-backed block buffer with ephemeral storage
func (*BadgerBlockBuffer) Add ¶
func (b *BadgerBlockBuffer) Add(blocks []common.BlockData) bool
Add adds blocks to the buffer and returns true if flush is needed
func (*BadgerBlockBuffer) Clear ¶
func (b *BadgerBlockBuffer) Clear()
Clear empties the buffer without returning data
func (*BadgerBlockBuffer) Close ¶
func (b *BadgerBlockBuffer) Close() error
Close closes the buffer and cleans up resources
func (*BadgerBlockBuffer) Flush ¶
func (b *BadgerBlockBuffer) Flush() []common.BlockData
Flush removes all data from the buffer and returns it
func (*BadgerBlockBuffer) GetBlockByNumber ¶
func (b *BadgerBlockBuffer) GetBlockByNumber(chainId *big.Int, blockNumber *big.Int) *common.BlockData
GetBlockByNumber returns a specific block from the buffer if it exists
func (*BadgerBlockBuffer) GetBlocksInRange ¶
func (b *BadgerBlockBuffer) GetBlocksInRange(chainId *big.Int, startBlock, endBlock *big.Int) []common.BlockData
GetBlocksInRange returns blocks from the buffer that fall within the given range
func (*BadgerBlockBuffer) GetData ¶
func (b *BadgerBlockBuffer) GetData() []common.BlockData
GetData returns a copy of the current buffer data
func (*BadgerBlockBuffer) GetMaxBlockNumber ¶
func (b *BadgerBlockBuffer) GetMaxBlockNumber(chainId *big.Int) *big.Int
GetMaxBlockNumber returns the maximum block number for a chain in the buffer
func (*BadgerBlockBuffer) IsEmpty ¶
func (b *BadgerBlockBuffer) IsEmpty() bool
IsEmpty returns true if the buffer is empty
func (*BadgerBlockBuffer) ShouldFlush ¶
func (b *BadgerBlockBuffer) ShouldFlush() bool
ShouldFlush checks if the buffer should be flushed based on configured thresholds
func (*BadgerBlockBuffer) Size ¶
func (b *BadgerBlockBuffer) Size() (int64, int)
Size returns the current buffer size in bytes and block count
func (*BadgerBlockBuffer) Stats ¶
func (b *BadgerBlockBuffer) Stats() BufferStats
Stats returns statistics about the buffer
type BadgerConnector ¶
type BadgerConnector struct {
// contains filtered or unexported fields
}
func NewBadgerConnector ¶
func NewBadgerConnector(cfg *config.BadgerConfig) (*BadgerConnector, error)
func (*BadgerConnector) Close ¶
func (bc *BadgerConnector) Close() error
func (*BadgerConnector) DeleteBlockFailures ¶
func (bc *BadgerConnector) DeleteBlockFailures(failures []common.BlockFailure) error
func (*BadgerConnector) DeleteStagingData ¶
func (bc *BadgerConnector) DeleteStagingData(data []common.BlockData) error
func (*BadgerConnector) DeleteStagingDataOlderThan ¶
func (*BadgerConnector) GetBlockFailures ¶
func (bc *BadgerConnector) GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error)
IOrchestratorStorage implementation
func (*BadgerConnector) GetLastCommittedBlockNumber ¶
func (*BadgerConnector) GetLastPublishedBlockNumber ¶
func (*BadgerConnector) GetLastReorgCheckedBlockNumber ¶
func (*BadgerConnector) GetLastStagedBlockNumber ¶
func (*BadgerConnector) GetStagingData ¶
func (bc *BadgerConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, error)
func (*BadgerConnector) InsertStagingData ¶
func (bc *BadgerConnector) InsertStagingData(data []common.BlockData) error
IStagingStorage implementation
func (*BadgerConnector) SetLastCommittedBlockNumber ¶
func (*BadgerConnector) SetLastPublishedBlockNumber ¶
func (*BadgerConnector) SetLastReorgCheckedBlockNumber ¶
func (*BadgerConnector) StoreBlockFailures ¶
func (bc *BadgerConnector) StoreBlockFailures(failures []common.BlockFailure) error
type BalancesQueryFilter ¶
type BigInt ¶
func (BigInt) MarshalJSON ¶
type BlockBuffer ¶
type BlockBuffer struct {
// contains filtered or unexported fields
}
BlockBuffer manages buffering of block data with size and count limits
func NewBlockBuffer ¶
func NewBlockBuffer(maxSizeMB int64, maxBlocks int) *BlockBuffer
NewBlockBuffer creates a new in-memory block buffer
func (*BlockBuffer) Add ¶
func (b *BlockBuffer) Add(blocks []common.BlockData) bool
Add adds blocks to the buffer and returns true if flush is needed
func (*BlockBuffer) Clear ¶
func (b *BlockBuffer) Clear()
Clear empties the buffer without returning data
func (*BlockBuffer) Close ¶
func (b *BlockBuffer) Close() error
Close closes the buffer (no-op for in-memory buffer)
func (*BlockBuffer) Flush ¶
func (b *BlockBuffer) Flush() []common.BlockData
Flush removes all data from the buffer and returns it
func (*BlockBuffer) GetBlockByNumber ¶
GetBlockByNumber returns a specific block from the buffer if it exists
func (*BlockBuffer) GetBlocksInRange ¶
func (b *BlockBuffer) GetBlocksInRange(chainId *big.Int, startBlock, endBlock *big.Int) []common.BlockData
GetBlocksInRange returns blocks from the buffer that fall within the given range
func (*BlockBuffer) GetData ¶
func (b *BlockBuffer) GetData() []common.BlockData
GetData returns a copy of the current buffer data
func (*BlockBuffer) GetMaxBlockNumber ¶
func (b *BlockBuffer) GetMaxBlockNumber(chainId *big.Int) *big.Int
GetMaxBlockNumber returns the maximum block number for a chain in the buffer
func (*BlockBuffer) IsEmpty ¶
func (b *BlockBuffer) IsEmpty() bool
IsEmpty returns true if the buffer is empty
func (*BlockBuffer) ShouldFlush ¶
func (b *BlockBuffer) ShouldFlush() bool
ShouldFlush checks if the buffer should be flushed based on configured thresholds
func (*BlockBuffer) Size ¶
func (b *BlockBuffer) Size() (int64, int)
Size returns the current buffer size in bytes and block count
func (*BlockBuffer) Stats ¶
func (b *BlockBuffer) Stats() BufferStats
Stats returns statistics about the buffer
type BufferStats ¶
type BufferStats struct { BlockCount int SizeBytes int64 ChainCount int ChainStats map[uint64]ChainStats }
BufferStats contains statistics about the buffer
func (BufferStats) String ¶
func (s BufferStats) String() string
String returns a string representation of buffer stats
type ChainMetadata ¶
ChainMetadata tracks per-chain statistics for fast lookups
type ChainStats ¶
ChainStats contains per-chain statistics
type ClickHouseConnector ¶
type ClickHouseConnector struct {
// contains filtered or unexported fields
}
func NewClickHouseConnector ¶
func NewClickHouseConnector(cfg *config.ClickhouseConfig) (*ClickHouseConnector, error)
func (*ClickHouseConnector) Close ¶
func (c *ClickHouseConnector) Close() error
Close closes the ClickHouse connection
func (*ClickHouseConnector) DeleteBlockFailures ¶
func (c *ClickHouseConnector) DeleteBlockFailures(failures []common.BlockFailure) error
func (*ClickHouseConnector) DeleteStagingData ¶
func (c *ClickHouseConnector) DeleteStagingData(data []common.BlockData) error
func (*ClickHouseConnector) DeleteStagingDataOlderThan ¶
func (*ClickHouseConnector) FindMissingBlockNumbers ¶
func (*ClickHouseConnector) GetAggregations ¶
func (c *ClickHouseConnector) GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error)
func (*ClickHouseConnector) GetBlockCount ¶
func (*ClickHouseConnector) GetBlockFailures ¶
func (c *ClickHouseConnector) GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error)
func (*ClickHouseConnector) GetBlockHeadersDescending ¶
func (c *ClickHouseConnector) GetBlockHeadersDescending(chainId *big.Int, from *big.Int, to *big.Int) (blockHeaders []common.BlockHeader, err error)
func (*ClickHouseConnector) GetBlocks ¶
func (c *ClickHouseConnector) GetBlocks(qf QueryFilter, fields ...string) (QueryResult[common.Block], error)
func (*ClickHouseConnector) GetFullBlockData ¶
func (*ClickHouseConnector) GetLastCommittedBlockNumber ¶
func (*ClickHouseConnector) GetLastPublishedBlockNumber ¶
func (*ClickHouseConnector) GetLastReorgCheckedBlockNumber ¶
func (*ClickHouseConnector) GetLastStagedBlockNumber ¶
func (*ClickHouseConnector) GetLogs ¶
func (c *ClickHouseConnector) GetLogs(qf QueryFilter, fields ...string) (QueryResult[common.Log], error)
func (*ClickHouseConnector) GetMaxBlockNumber ¶
func (*ClickHouseConnector) GetMaxBlockNumberInRange ¶
func (*ClickHouseConnector) GetStagingData ¶
func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, error)
func (*ClickHouseConnector) GetTokenBalances ¶
func (c *ClickHouseConnector) GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error)
func (*ClickHouseConnector) GetTokenTransfers ¶
func (c *ClickHouseConnector) GetTokenTransfers(qf TransfersQueryFilter, fields ...string) (QueryResult[common.TokenTransfer], error)
func (*ClickHouseConnector) GetTraces ¶
func (c *ClickHouseConnector) GetTraces(qf QueryFilter, fields ...string) (QueryResult[common.Trace], error)
func (*ClickHouseConnector) GetTransactions ¶
func (c *ClickHouseConnector) GetTransactions(qf QueryFilter, fields ...string) (QueryResult[common.Transaction], error)
func (*ClickHouseConnector) GetValidationBlockData ¶
func (*ClickHouseConnector) InsertBlockData ¶
func (c *ClickHouseConnector) InsertBlockData(data []common.BlockData) error
func (*ClickHouseConnector) InsertStagingData ¶
func (c *ClickHouseConnector) InsertStagingData(data []common.BlockData) error
func (*ClickHouseConnector) ReplaceBlockData ¶
func (*ClickHouseConnector) SetLastCommittedBlockNumber ¶
func (*ClickHouseConnector) SetLastPublishedBlockNumber ¶
func (*ClickHouseConnector) SetLastReorgCheckedBlockNumber ¶
func (*ClickHouseConnector) StoreBlockFailures ¶
func (c *ClickHouseConnector) StoreBlockFailures(failures []common.BlockFailure) error
func (*ClickHouseConnector) TestQueryGeneration ¶
func (c *ClickHouseConnector) TestQueryGeneration(table, columns string, qf QueryFilter) string
Helper function to test query generation
type DataFormatter ¶
type DataFormatter interface { FormatBlockData(data []common.BlockData) ([]byte, error) GetFileExtension() string GetContentType() string }
DataFormatter interface for different file formats
type IBlockBuffer ¶
type IBlockBuffer interface { Add(blocks []common.BlockData) bool Flush() []common.BlockData ShouldFlush() bool Size() (int64, int) IsEmpty() bool GetData() []common.BlockData GetBlocksInRange(chainId *big.Int, startBlock, endBlock *big.Int) []common.BlockData GetBlockByNumber(chainId *big.Int, blockNumber *big.Int) *common.BlockData GetMaxBlockNumber(chainId *big.Int) *big.Int Clear() Stats() BufferStats Close() error }
IBlockBuffer defines the interface for block buffer implementations
func NewBlockBufferWithBadger ¶
func NewBlockBufferWithBadger(maxSizeMB int64, maxBlocks int) (IBlockBuffer, error)
NewBlockBufferWithBadger creates a new Badger-backed block buffer for better memory management This uses ephemeral storage with optimized settings for caching
type IMainStorage ¶
type IMainStorage interface { InsertBlockData(data []common.BlockData) error ReplaceBlockData(data []common.BlockData) ([]common.BlockData, error) GetBlocks(qf QueryFilter, fields ...string) (blocks QueryResult[common.Block], err error) GetTransactions(qf QueryFilter, fields ...string) (transactions QueryResult[common.Transaction], err error) GetLogs(qf QueryFilter, fields ...string) (logs QueryResult[common.Log], err error) GetTraces(qf QueryFilter, fields ...string) (traces QueryResult[common.Trace], err error) GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error) GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error) GetTokenTransfers(qf TransfersQueryFilter, fields ...string) (QueryResult[common.TokenTransfer], error) GetMaxBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) GetMaxBlockNumberInRange(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (maxBlockNumber *big.Int, err error) GetBlockCount(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (blockCount *big.Int, err error) /** * Get block headers ordered from latest to oldest. */ GetBlockHeadersDescending(chainId *big.Int, from *big.Int, to *big.Int) (blockHeaders []common.BlockHeader, err error) /** * Gets only the data required for validation. */ GetValidationBlockData(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (blocks []common.BlockData, err error) /** * Finds missing block numbers in a range. Block numbers should be sequential. */ FindMissingBlockNumbers(chainId *big.Int, startBlock *big.Int, endBlock *big.Int) (blockNumbers []*big.Int, err error) /** * Gets full block data with transactions, logs and traces. */ GetFullBlockData(chainId *big.Int, blockNumbers []*big.Int) (blocks []common.BlockData, err error) Close() error }
func NewMainConnector ¶
func NewMainConnector(cfg *config.StorageMainConfig, orchestratorStorage *IOrchestratorStorage) (IMainStorage, error)
type IOrchestratorStorage ¶
type IOrchestratorStorage interface { GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) SetLastReorgCheckedBlockNumber(chainId *big.Int, blockNumber *big.Int) error GetLastPublishedBlockNumber(chainId *big.Int) (blockNumber *big.Int, err error) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error GetLastCommittedBlockNumber(chainId *big.Int) (blockNumber *big.Int, err error) SetLastCommittedBlockNumber(chainId *big.Int, blockNumber *big.Int) error Close() error }
The orchestartor storage is a persisted key/value store
func NewOrchestratorConnector ¶
func NewOrchestratorConnector(cfg *config.StorageOrchestratorConfig) (IOrchestratorStorage, error)
type IStagingStorage ¶
type IStagingStorage interface { // Staging block data InsertStagingData(data []common.BlockData) error GetStagingData(qf QueryFilter) (data []common.BlockData, err error) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) DeleteStagingData(data []common.BlockData) error DeleteStagingDataOlderThan(chainId *big.Int, blockNumber *big.Int) error // Block failures GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error) StoreBlockFailures(failures []common.BlockFailure) error DeleteBlockFailures(failures []common.BlockFailure) error Close() error }
The staging storage is a emphemeral block data store
func NewStagingConnector ¶
func NewStagingConnector(cfg *config.StorageStagingConfig) (IStagingStorage, error)
type IStorage ¶
type IStorage struct { OrchestratorStorage IOrchestratorStorage MainStorage IMainStorage StagingStorage IStagingStorage }
func NewStorageConnector ¶
func NewStorageConnector(cfg *config.StorageConfig) (IStorage, error)
type InsertOptions ¶
type InsertOptions struct {
AsDeleted bool
}
type KafkaConnector ¶
type KafkaConnector struct {
// contains filtered or unexported fields
}
KafkaConnector uses Redis for metadata storage and Kafka for block data delivery
func NewKafkaConnector ¶
func NewKafkaConnector(cfg *config.KafkaConfig, orchestratorStorage *IOrchestratorStorage) (*KafkaConnector, error)
func (*KafkaConnector) Close ¶
func (kr *KafkaConnector) Close() error
Close closes the Redis connection
func (*KafkaConnector) FindMissingBlockNumbers ¶
func (*KafkaConnector) GetAggregations ¶
func (kr *KafkaConnector) GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error)
func (*KafkaConnector) GetBlockCount ¶
func (*KafkaConnector) GetBlockHeadersDescending ¶
func (kr *KafkaConnector) GetBlockHeadersDescending(chainId *big.Int, from *big.Int, to *big.Int) ([]common.BlockHeader, error)
func (*KafkaConnector) GetBlocks ¶
func (kr *KafkaConnector) GetBlocks(qf QueryFilter, fields ...string) (QueryResult[common.Block], error)
Query methods return errors as this is a write-only connector for streaming
func (*KafkaConnector) GetFullBlockData ¶
func (*KafkaConnector) GetLogs ¶
func (kr *KafkaConnector) GetLogs(qf QueryFilter, fields ...string) (QueryResult[common.Log], error)
func (*KafkaConnector) GetMaxBlockNumber ¶
func (*KafkaConnector) GetMaxBlockNumberInRange ¶
func (*KafkaConnector) GetTokenBalances ¶
func (kr *KafkaConnector) GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error)
func (*KafkaConnector) GetTokenTransfers ¶
func (kr *KafkaConnector) GetTokenTransfers(qf TransfersQueryFilter, fields ...string) (QueryResult[common.TokenTransfer], error)
func (*KafkaConnector) GetTraces ¶
func (kr *KafkaConnector) GetTraces(qf QueryFilter, fields ...string) (QueryResult[common.Trace], error)
func (*KafkaConnector) GetTransactions ¶
func (kr *KafkaConnector) GetTransactions(qf QueryFilter, fields ...string) (QueryResult[common.Transaction], error)
func (*KafkaConnector) GetValidationBlockData ¶
func (*KafkaConnector) InsertBlockData ¶
func (kr *KafkaConnector) InsertBlockData(data []common.BlockData) error
InsertBlockData publishes block data to Kafka instead of storing in database
func (*KafkaConnector) ReplaceBlockData ¶
ReplaceBlockData handles reorg by publishing both old and new data to Kafka
type KafkaPublisher ¶
type KafkaPublisher struct {
// contains filtered or unexported fields
}
func NewKafkaPublisher ¶
func NewKafkaPublisher(cfg *config.KafkaConfig) (*KafkaPublisher, error)
NewKafkaPublisher method for storage connector (public)
func (*KafkaPublisher) Close ¶
func (p *KafkaPublisher) Close() error
func (*KafkaPublisher) PublishBlockData ¶
func (p *KafkaPublisher) PublishBlockData(blockData []common.BlockData) error
func (*KafkaPublisher) PublishReorg ¶
type MessageType ¶
type MessageType string
type ParquetBlockData ¶
type ParquetBlockData struct { ChainId uint64 `parquet:"chain_id"` BlockNumber uint64 `parquet:"block_number"` // Numeric for efficient min/max queries BlockHash string `parquet:"block_hash"` BlockTimestamp int64 `parquet:"block_timestamp"` Block []byte `parquet:"block_json"` Transactions []byte `parquet:"transactions_json"` Logs []byte `parquet:"logs_json"` Traces []byte `parquet:"traces_json"` }
ParquetBlockData represents the complete block data in Parquet format
type ParquetFormatter ¶
type ParquetFormatter struct {
// contains filtered or unexported fields
}
ParquetFormatter implements DataFormatter for Parquet format
func (*ParquetFormatter) FormatBlockData ¶
func (f *ParquetFormatter) FormatBlockData(data []common.BlockData) ([]byte, error)
func (*ParquetFormatter) GetContentType ¶
func (f *ParquetFormatter) GetContentType() string
func (*ParquetFormatter) GetFileExtension ¶
func (f *ParquetFormatter) GetFileExtension() string
type PostgresConnector ¶
type PostgresConnector struct {
// contains filtered or unexported fields
}
func NewPostgresConnector ¶
func NewPostgresConnector(cfg *config.PostgresConfig) (*PostgresConnector, error)
func (*PostgresConnector) Close ¶
func (p *PostgresConnector) Close() error
Close closes the database connection
func (*PostgresConnector) DeleteBlockFailures ¶
func (p *PostgresConnector) DeleteBlockFailures(failures []common.BlockFailure) error
func (*PostgresConnector) DeleteStagingData ¶
func (p *PostgresConnector) DeleteStagingData(data []common.BlockData) error
func (*PostgresConnector) DeleteStagingDataOlderThan ¶
func (*PostgresConnector) GetBlockFailures ¶
func (p *PostgresConnector) GetBlockFailures(qf QueryFilter) ([]common.BlockFailure, error)
func (*PostgresConnector) GetLastCommittedBlockNumber ¶
func (*PostgresConnector) GetLastPublishedBlockNumber ¶
func (*PostgresConnector) GetLastReorgCheckedBlockNumber ¶
func (*PostgresConnector) GetLastStagedBlockNumber ¶
func (*PostgresConnector) GetStagingData ¶
func (p *PostgresConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, error)
func (*PostgresConnector) InsertStagingData ¶
func (p *PostgresConnector) InsertStagingData(data []common.BlockData) error
func (*PostgresConnector) SetLastCommittedBlockNumber ¶
func (*PostgresConnector) SetLastPublishedBlockNumber ¶
func (*PostgresConnector) SetLastReorgCheckedBlockNumber ¶
func (*PostgresConnector) StoreBlockFailures ¶
func (p *PostgresConnector) StoreBlockFailures(failures []common.BlockFailure) error
type PublishableData ¶
type PublishableData interface {
GetType() MessageType
}
type PublishableMessageBlockData ¶
type PublishableMessageBlockData struct { common.BlockData ChainId uint64 `json:"chain_id"` IsDeleted int8 `json:"is_deleted"` InsertTimestamp time.Time `json:"insert_timestamp"` }
func (PublishableMessageBlockData) GetType ¶
func (b PublishableMessageBlockData) GetType() MessageType
type PublishableMessagePayload ¶
type PublishableMessagePayload struct { Data PublishableData `json:"data"` Type MessageType `json:"type"` Timestamp time.Time `json:"timestamp"` }
type PublishableMessageRevert ¶
type PublishableMessageRevert struct { ChainId uint64 `json:"chain_id"` BlockNumber uint64 `json:"block_number"` IsDeleted int8 `json:"is_deleted"` InsertTimestamp time.Time `json:"insert_timestamp"` }
func (PublishableMessageRevert) GetType ¶
func (b PublishableMessageRevert) GetType() MessageType
type QueryFilter ¶
type QueryFilter struct { ChainId *big.Int BlockNumbers []*big.Int StartBlock *big.Int EndBlock *big.Int FilterParams map[string]string GroupBy []string SortBy string SortOrder string Page int Limit int Offset int Aggregates []string // e.g., ["COUNT(*) AS count", "SUM(amount) AS total_amount"] FromAddress string ContractAddress string WalletAddress string Signature string ForceConsistentData bool }
type QueryResult ¶
type RedisConnector ¶
type RedisConnector struct {
// contains filtered or unexported fields
}
RedisConnector uses Redis for metadata storage
func NewRedisConnector ¶
func NewRedisConnector(cfg *config.RedisConfig) (*RedisConnector, error)
func (*RedisConnector) Close ¶
func (kr *RedisConnector) Close() error
Close closes the Redis connection
func (*RedisConnector) GetLastCommittedBlockNumber ¶
func (*RedisConnector) GetLastPublishedBlockNumber ¶
func (*RedisConnector) GetLastReorgCheckedBlockNumber ¶
Orchestrator Storage Implementation
func (*RedisConnector) SetLastCommittedBlockNumber ¶
func (*RedisConnector) SetLastPublishedBlockNumber ¶
func (*RedisConnector) SetLastReorgCheckedBlockNumber ¶
type S3Connector ¶
type S3Connector struct {
// contains filtered or unexported fields
}
func NewS3Connector ¶
func NewS3Connector(cfg *config.S3StorageConfig) (*S3Connector, error)
func (*S3Connector) Close ¶
func (s *S3Connector) Close() error
Close closes the S3 connector and flushes any remaining data
func (*S3Connector) FindMissingBlockNumbers ¶
func (*S3Connector) Flush ¶
func (s *S3Connector) Flush() error
Flush manually triggers a buffer flush and waits for completion
func (*S3Connector) GetAggregations ¶
func (s *S3Connector) GetAggregations(table string, qf QueryFilter) (QueryResult[interface{}], error)
func (*S3Connector) GetBlockCount ¶
func (*S3Connector) GetBlockHeadersDescending ¶
func (s *S3Connector) GetBlockHeadersDescending(chainId *big.Int, from *big.Int, to *big.Int) ([]common.BlockHeader, error)
func (*S3Connector) GetBlocks ¶
func (s *S3Connector) GetBlocks(qf QueryFilter, fields ...string) (QueryResult[common.Block], error)
func (*S3Connector) GetFullBlockData ¶
func (*S3Connector) GetLogs ¶
func (s *S3Connector) GetLogs(qf QueryFilter, fields ...string) (QueryResult[common.Log], error)
func (*S3Connector) GetMaxBlockNumber ¶
func (*S3Connector) GetMaxBlockNumberInRange ¶
func (*S3Connector) GetTokenBalances ¶
func (s *S3Connector) GetTokenBalances(qf BalancesQueryFilter, fields ...string) (QueryResult[common.TokenBalance], error)
func (*S3Connector) GetTokenTransfers ¶
func (s *S3Connector) GetTokenTransfers(qf TransfersQueryFilter, fields ...string) (QueryResult[common.TokenTransfer], error)
func (*S3Connector) GetTraces ¶
func (s *S3Connector) GetTraces(qf QueryFilter, fields ...string) (QueryResult[common.Trace], error)
func (*S3Connector) GetTransactions ¶
func (s *S3Connector) GetTransactions(qf QueryFilter, fields ...string) (QueryResult[common.Transaction], error)
func (*S3Connector) GetValidationBlockData ¶
func (*S3Connector) InsertBlockData ¶
func (s *S3Connector) InsertBlockData(data []common.BlockData) error
func (*S3Connector) ReplaceBlockData ¶
type TransfersQueryFilter ¶
type TransfersQueryFilter struct { ChainId *big.Int TokenTypes []string TokenAddress string WalletAddress string TokenIds []*big.Int TransactionHash string StartBlockNumber *big.Int EndBlockNumber *big.Int GroupBy []string SortBy string SortOrder string // "ASC" or "DESC" Page int Limit int Offset int }