ingestion

package
v0.43.1-rc.1.access-me... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 30, 2025 License: AGPL-3.0 Imports: 22 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CollectionSyncer added in v0.43.0

type CollectionSyncer struct {
	// contains filtered or unexported fields
}

The CollectionSyncer type provides mechanisms for syncing and indexing data from the Flow blockchain into local storage. Specifically, it handles the retrieval and processing of collections and transactions that may have been missed due to network delays, restarts, or gaps in finalization.

It is responsible for ensuring the local node has all collections associated with finalized blocks starting from the last fully synced height. It works by periodically scanning the finalized block range, identifying missing collections, and triggering requests to fetch them from the network. Once collections are retrieved, it ensures they are persisted in the local collection and transaction stores.

The syncer maintains a persistent, strictly monotonic counter (`lastFullBlockHeight`) to track the highest finalized block for which all collections have been fully indexed. It uses this information to avoid redundant processing and to measure catch-up progress.

It is meant to operate in a background goroutine as part of the node's ingestion pipeline.

func NewCollectionSyncer added in v0.43.0

func NewCollectionSyncer(
	logger zerolog.Logger,
	collectionExecutedMetric module.CollectionExecutedMetric,
	requester module.Requester,
	state protocol.State,
	blocks storage.Blocks,
	collections storage.Collections,
	transactions storage.Transactions,
	lastFullBlockHeight *counters.PersistentStrictMonotonicCounter,
	lockManager storage.LockManager,
) *CollectionSyncer

NewCollectionSyncer creates a new CollectionSyncer responsible for requesting, tracking, and indexing missing collections.

func (*CollectionSyncer) OnCollectionDownloaded added in v0.43.0

func (s *CollectionSyncer) OnCollectionDownloaded(_ flow.Identifier, entity flow.Entity)

OnCollectionDownloaded indexes and persists a downloaded collection. This is a callback intended to be used with the requester engine.

func (*CollectionSyncer) RequestCollections added in v0.43.0

func (s *CollectionSyncer) RequestCollections(ctx irrecoverable.SignalerContext, ready component.ReadyFunc)

RequestCollections continuously monitors and triggers collection sync operations. It handles on startup collection catchup, periodic missing collection requests, and full block height updates.

func (*CollectionSyncer) RequestCollectionsForBlock added in v0.43.0

func (s *CollectionSyncer) RequestCollectionsForBlock(height uint64, missingCollections []*flow.CollectionGuarantee)

RequestCollectionsForBlock conditionally requests missing collections for a specific block height, skipping requests if the block is already below the known full block height.

type Engine

type Engine struct {
	*component.ComponentManager
	// contains filtered or unexported fields
}

Engine represents the ingestion engine, used to funnel data from other nodes to a centralized location that can be queried by a user

No errors are expected during normal operation.

func New

func New(
	log zerolog.Logger,
	net network.EngineRegistry,
	state protocol.State,
	me module.Local,
	blocks storage.Blocks,
	executionResults storage.ExecutionResults,
	executionReceipts storage.ExecutionReceipts,
	finalizedProcessedHeight storage.ConsumerProgressInitializer,
	collectionSyncer *CollectionSyncer,
	collectionExecutedMetric module.CollectionExecutedMetric,
	txErrorMessagesCore *tx_error_messages.TxErrorMessagesCore,
) (*Engine, error)

New creates a new access ingestion engine

No errors are expected during normal operation.

func (*Engine) OnFinalizedBlock

func (e *Engine) OnFinalizedBlock(*model.Block)

OnFinalizedBlock is called by the follower engine after a block has been finalized and the state has been updated. Receives block finalized events from the finalization distributor and forwards them to the finalizedBlockConsumer.

func (*Engine) Process

func (e *Engine) Process(_ channels.Channel, originID flow.Identifier, event interface{}) error

Process processes the given event from the node with the given origin ID in a blocking manner. It returns the potential processing error when done.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL