optimistic_sync

package
v0.42.5-cadence-compil... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 10, 2025 License: AGPL-3.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultTxResultErrMsgsRequestTimeout = 5 * time.Second

Variables

View Source
var (
	// ErrInvalidTransition is returned when a state transition is invalid.
	ErrInvalidTransition = errors.New("invalid state transition")
)

Functions

func NewMockStateConsumer

func NewMockStateConsumer() *mockStateConsumer

NewMockStateConsumer creates a new instance of mockStateConsumer with a buffered channel.

func NewMockStateProvider

func NewMockStateProvider() *mockStateProvider

NewMockStateProvider initializes a mockStateProvider with the default state StatePending.

Types

type Core

type Core interface {
	// Download retrieves all necessary data for processing.
	// Concurrency safe - all operations will be executed sequentially.
	//
	// Expected errors:
	// - context.Canceled: if the provided context was canceled before completion
	// - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible)
	Download(ctx context.Context) error

	// Index processes the downloaded data and creates in-memory indexes.
	// Concurrency safe - all operations will be executed sequentially.
	//
	// No errors are expected during normal operations
	Index() error

	// Persist stores the indexed data in permanent storage.
	// Concurrency safe - all operations will be executed sequentially.
	//
	// No errors are expected during normal operations
	Persist() error

	// Abandon indicates that the protocol has abandoned this state. Hence processing will be aborted
	// and any data dropped.
	// Concurrency safe - all operations will be executed sequentially.
	// CAUTION: The Core instance should not be used after Abandon is called as it could cause panic due to cleared data.
	//
	// No errors are expected during normal operations
	Abandon() error
}

Core defines the interface for pipeline processing steps. Each implementation should handle an execution data and implement the three-phase processing: download, index, and persist. CAUTION: The Core instance should not be used after Abandon is called as it could cause panic due to cleared data. Core implementations must be - CONCURRENCY SAFE

type CoreFactory

type CoreFactory interface {
	NewCore(result *flow.ExecutionResult) Core
}

CoreFactory is a factory object for creating new Core instances.

type CoreImpl

type CoreImpl struct {
	// contains filtered or unexported fields
}

CoreImpl implements the Core interface for processing execution data. It coordinates the download, indexing, and persisting of execution data. Concurrency safe - all operations will be executed sequentially. CAUTION: The CoreImpl instance should not be used after Abandon is called as it could cause panic due to cleared data.

func NewCoreImpl

func NewCoreImpl(
	logger zerolog.Logger,
	executionResult *flow.ExecutionResult,
	header *flow.Header,
	execDataRequester requester.ExecutionDataRequester,
	txResultErrMsgsRequester tx_error_messages.Requester,
	txResultErrMsgsRequestTimeout time.Duration,
	persistentRegisters storage.RegisterIndex,
	persistentEvents storage.Events,
	persistentCollections storage.Collections,
	persistentResults storage.LightTransactionResults,
	persistentTxResultErrMsg storage.TransactionResultErrorMessages,
	latestPersistedSealedResult storage.LatestPersistedSealedResult,
	protocolDB storage.DB,
	lockManager storage.LockManager,
) *CoreImpl

NewCoreImpl creates a new CoreImpl with all necessary dependencies Concurrency safe - all operations will be executed sequentially.

func (*CoreImpl) Abandon

func (c *CoreImpl) Abandon() error

Abandon indicates that the protocol has abandoned this state. Hence processing will be aborted and any data dropped. Concurrency safe - all operations will be executed sequentially. CAUTION: The CoreImpl instance should not be used after Abandon is called as it could cause panic due to cleared data.

No errors are expected during normal operations

func (*CoreImpl) Download

func (c *CoreImpl) Download(ctx context.Context) error

Download downloads execution data and transaction results error for the block Concurrency safe - all operations will be executed sequentially.

Expected errors: - context.Canceled: if the provided context was canceled before completion - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible)

func (*CoreImpl) Index

func (c *CoreImpl) Index() error

Index retrieves the downloaded execution data and transaction results error messages from the caches and indexes them into in-memory storage. Concurrency safe - all operations will be executed sequentially.

No errors are expected during normal operations

func (*CoreImpl) Persist

func (c *CoreImpl) Persist() error

Persist persists the indexed data to permanent storage atomically. Concurrency safe - all operations will be executed sequentially.

No errors are expected during normal operations

type Criteria

type Criteria struct {
	// AgreeingExecutorsCount is the number of receipts including the same ExecutionResult
	AgreeingExecutorsCount uint
	// RequiredExecutors is the list of EN node IDs, one of which must have produced the result
	RequiredExecutors flow.IdentifierList
}

Criteria defines the filtering criteria for execution result queries. It specifies requirements for execution result selection including the number of agreeing executors and requires executor nodes.

func (*Criteria) OverrideWith

func (c *Criteria) OverrideWith(override Criteria) Criteria

OverrideWith overrides the original criteria with the incoming criteria, returning a new Criteria object. Fields from `override` criteria take precedence when set.

type ExecutionResultQueryProvider

type ExecutionResultQueryProvider interface {
	// ExecutionResultQuery retrieves execution results and associated execution nodes for a given block ID
	// based on the provided criteria. It returns a Query containing the execution result and
	// the execution nodes that produced it.
	//
	// Expected errors during normal operations:
	//   - backend.InsufficientExecutionReceipts - found insufficient receipts for given block ID.
	ExecutionResultQuery(blockID flow.Identifier, criteria Criteria) (*Query, error)
}

ExecutionResultQueryProvider provides execution results and execution nodes based on criteria. It allows querying for execution results by block ID with specific filtering criteria to ensure consistency and reliability of execution results.

type ExecutionStateCache

type ExecutionStateCache interface {
	// Snapshot returns a view of the execution state as of the provided ExecutionResult.
	// The returned Snapshot provides access to execution state data for the fork ending
	// on the provided ExecutionResult which extends from the latest sealed result.
	// The result may be sealed or unsealed. Only data for finalized blocks is available.
	//
	// Expected errors during normal operation:
	//   - storage.ErrNotFound - result is not available, not ready for querying, or does not descend from the latest sealed result.
	//   - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible)
	Snapshot(executionResultID flow.Identifier) (Snapshot, error)
}

ExecutionStateCache provides access to execution state snapshots for querying data at specific ExecutionResults.

type Pipeline

type Pipeline interface {
	PipelineStateProvider

	// Run starts the pipeline processing and blocks until completion or context cancellation.
	// CAUTION: not concurrency safe! Run must only be called once.
	//
	// Expected Errors:
	//   - context.Canceled: when the context is canceled
	//   - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible)
	Run(ctx context.Context, core Core, parentState State) error

	// SetSealed marks the pipeline's result as sealed, which enables transitioning from StateWaitingPersist to StatePersisting.
	SetSealed()

	// OnParentStateUpdated updates the pipeline's parent's state.
	OnParentStateUpdated(parentState State)

	// Abandon marks the pipeline as abandoned.
	Abandon()
}

Pipeline represents a processing pipelined state machine for a single ExecutionResult. The state machine is initialized in the Pending state.

The state machine is designed to be run in a single goroutine. The Run method must only be called once.

type PipelineFactory

type PipelineFactory interface {
	NewPipeline(result *flow.ExecutionResult, isSealed bool) Pipeline
}

PipelineFactory is a factory object for creating new Pipeline instances.

type PipelineImpl

type PipelineImpl struct {
	// contains filtered or unexported fields
}

PipelineImpl implements the Pipeline interface

func NewPipeline

func NewPipeline(
	log zerolog.Logger,
	executionResult *flow.ExecutionResult,
	isSealed bool,
	stateReceiver PipelineStateConsumer,
) *PipelineImpl

NewPipeline creates a new processing pipeline. The pipeline is initialized in the Pending state.

func (*PipelineImpl) Abandon

func (p *PipelineImpl) Abandon()

Abandon marks the pipeline as abandoned This will cause the pipeline to eventually transition to the Abandoned state and halt processing

func (*PipelineImpl) GetState

func (p *PipelineImpl) GetState() State

GetState returns the current state of the pipeline.

func (*PipelineImpl) OnParentStateUpdated

func (p *PipelineImpl) OnParentStateUpdated(parentState State)

OnParentStateUpdated updates the pipeline's state based on the provided parent state. If the parent state has changed, it will notify the state consumer and trigger a state change notification.

func (*PipelineImpl) Run

func (p *PipelineImpl) Run(ctx context.Context, core Core, parentState State) error

Run starts the pipeline processing and blocks until completion or context cancellation. CAUTION: not concurrency safe! Run must only be called once.

Expected Errors:

  • context.Canceled: when the context is canceled
  • All other errors are potential indicators of bugs or corrupted internal state (continuation impossible)

func (*PipelineImpl) SetSealed

func (p *PipelineImpl) SetSealed()

SetSealed marks the execution result as sealed. This will cause the pipeline to eventually transition to the StateComplete state when the parent finishes processing.

type PipelineStateConsumer

type PipelineStateConsumer interface {
	// OnStateUpdated is called when a pipeline's state changes to notify the receiver of the new state.
	// This method is will be called in the same goroutine that runs the pipeline, so it must not block.
	OnStateUpdated(newState State)
}

PipelineStateConsumer is a receiver of the pipeline state updates. PipelineStateConsumer implementations must be - NON-BLOCKING and consume the state updates without noteworthy delay

type PipelineStateProvider

type PipelineStateProvider interface {
	// GetState returns the current state of the pipeline.
	GetState() State
}

PipelineStateProvider is an interface that provides a pipeline's state.

type Query

type Query struct {
	// ExecutionResult is the execution result for the queried block
	ExecutionResult *flow.ExecutionResult
	// ExecutionNodes is the list of execution node identities that produced the result
	ExecutionNodes flow.IdentitySkeletonList
}

Query contains the result of an execution result query. It includes both the execution result and the execution nodes that produced it.

type Snapshot

type Snapshot interface {
	// Events returns a reader for querying event data.
	Events() storage.EventsReader

	// Collections returns a reader for querying collection data.
	Collections() storage.CollectionsReader

	// Transactions returns a reader for querying transaction data.
	Transactions() storage.TransactionsReader

	// LightTransactionResults returns a reader for querying light transaction result data.
	LightTransactionResults() storage.LightTransactionResultsReader

	// TransactionResultErrorMessages returns a reader for querying transaction error message data.
	TransactionResultErrorMessages() storage.TransactionResultErrorMessagesReader

	// Registers returns a reader for querying register data.
	Registers() storage.RegisterIndexReader
}

Snapshot provides access to execution data readers for querying various data types from a specific ExecutionResult.

type State

type State int32

State represents the state of the processing pipeline

const (
	// StatePending is the initial state after instantiation, before Run is called
	StatePending State = iota
	// StateProcessing represents the state where data processing (download and indexing) has been started
	StateProcessing
	// StateWaitingPersist represents the state where all data is indexed, but conditions to persist are not met
	StateWaitingPersist
	// StateComplete represents the state where all data is persisted to storage
	StateComplete
	// StateAbandoned represents the state where processing was aborted
	StateAbandoned
)

func (State) IsTerminal

func (s State) IsTerminal() bool

IsTerminal returns true if the state is a terminal state (Complete or Abandoned).

func (State) String

func (s State) String() string

String representation of states for logging

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL