Documentation
¶
Overview ¶
Package ingestion2 implements a modular ingestion engine responsible for orchestrating the processing of finalized blockchain data and receiving execution receipts from the network.
The Engine coordinates several internal workers, each dedicated to a specific task:
- Receiving and persisting execution receipts from the network.
- Subscribing to finalized block events.
- Synchronizing collections associated with finalized blocks.
Index ¶
- type CollectionSyncer
- func (s *CollectionSyncer) OnCollectionDownloaded(id flow.Identifier, entity flow.Entity)
- func (s *CollectionSyncer) RequestCollectionsForBlock(height uint64, missingCollections []*flow.CollectionGuarantee)
- func (s *CollectionSyncer) StartWorkerLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc)
- type Engine
- type FinalizedBlockProcessor
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CollectionSyncer ¶
type CollectionSyncer struct {
// contains filtered or unexported fields
}
The CollectionSyncer type provides mechanisms for syncing and indexing data from the Flow blockchain into local storage. Specifically, it handles the retrieval and processing of collections and transactions that may have been missed due to network delays, restarts, or gaps in finalization.
It is responsible for ensuring the local node has all collections associated with finalized blocks starting from the last fully synced height. It works by periodically scanning the finalized block range, identifying missing collections, and triggering requests to fetch them from the network. Once collections are retrieved, it ensures they are persisted in the local collection and transaction stores.
The syncer maintains a persistent, strictly monotonic counter (`lastFullBlockHeight`) to track the highest finalized block for which all collections have been fully indexed. It uses this information to avoid redundant processing and to measure catch-up progress.
It is meant to operate in a background goroutine as part of the node's ingestion pipeline.
func NewCollectionSyncer ¶
func NewCollectionSyncer( logger zerolog.Logger, collectionExecutedMetric module.CollectionExecutedMetric, requester module.Requester, state protocol.State, blocks storage.Blocks, collections storage.Collections, transactions storage.Transactions, lastFullBlockHeight *counters.PersistentStrictMonotonicCounter, lockManager storage.LockManager, ) (*CollectionSyncer, error)
NewCollectionSyncer creates a new CollectionSyncer responsible for requesting, tracking, and indexing missing collections.
func (*CollectionSyncer) OnCollectionDownloaded ¶
func (s *CollectionSyncer) OnCollectionDownloaded(id flow.Identifier, entity flow.Entity)
OnCollectionDownloaded indexes and persists a downloaded collection. This function is a callback intended to be used by the requester engine.
func (*CollectionSyncer) RequestCollectionsForBlock ¶
func (s *CollectionSyncer) RequestCollectionsForBlock(height uint64, missingCollections []*flow.CollectionGuarantee)
RequestCollectionsForBlock conditionally requests missing collections for a specific block height, skipping requests if the block is already below the known full block height.
func (*CollectionSyncer) StartWorkerLoop ¶
func (s *CollectionSyncer) StartWorkerLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc)
StartWorkerLoop continuously monitors and triggers collection sync operations. It handles on startup collection catchup, periodic missing collection requests, and full block height updates.
type Engine ¶
type Engine struct { *component.ComponentManager // contains filtered or unexported fields }
func New ¶
func New( log zerolog.Logger, net network.EngineRegistry, finalizedBlockProcessor *FinalizedBlockProcessor, collectionSyncer *CollectionSyncer, receipts storage.ExecutionReceipts, collectionExecutedMetric module.CollectionExecutedMetric, ) (*Engine, error)
func (*Engine) OnFinalizedBlock ¶
OnFinalizedBlock is called by the follower engine after a block has been finalized and the state has been updated. Receives block finalized events from the finalization distributor and forwards them to the consumer.
func (*Engine) Process ¶
func (e *Engine) Process(chanName channels.Channel, originID flow.Identifier, event interface{}) error
Process processes the given event from the node with the given origin ID in a blocking manner. It returns the potential processing error when done.
No errors are expected during normal operations.
type FinalizedBlockProcessor ¶
type FinalizedBlockProcessor struct {
// contains filtered or unexported fields
}
FinalizedBlockProcessor handles processing of finalized blocks, including indexing and syncing of related collections and execution results.
FinalizedBlockProcessor is designed to handle the ingestion of finalized Flow blocks in a scalable and decoupled manner. It uses a jobqueue.ComponentConsumer to consume and process finalized block jobs asynchronously. This design enables the processor to handle high-throughput block finalization events without blocking other parts of the system.
The processor relies on a notifier (engine.Notifier) to signal when a new finalized block is available, which triggers the job consumer to process it. The actual processing involves indexing block-to-collection and block-to-execution-result mappings, as well as requesting the associated collections.
func NewFinalizedBlockProcessor ¶
func NewFinalizedBlockProcessor( log zerolog.Logger, state protocol.State, blocks storage.Blocks, executionResults storage.ExecutionResults, finalizedProcessedHeight storage.ConsumerProgressInitializer, syncer *CollectionSyncer, collectionExecutedMetric module.CollectionExecutedMetric, ) (*FinalizedBlockProcessor, error)
NewFinalizedBlockProcessor creates and initializes a new FinalizedBlockProcessor, setting up job consumer infrastructure to handle finalized block processing.
No errors are expected during normal operations.
func (*FinalizedBlockProcessor) Notify ¶
func (p *FinalizedBlockProcessor) Notify()
Notify notifies the processor that a new finalized block is available for processing.
func (*FinalizedBlockProcessor) StartWorkerLoop ¶
func (p *FinalizedBlockProcessor) StartWorkerLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc)
StartWorkerLoop begins processing of finalized blocks and signals readiness when initialization is complete.