Documentation
¶
Index ¶
- Constants
- Variables
- func NewMockStateConsumer() *mockStateConsumer
- func NewMockStateProvider() *mockStateProvider
- type Core
- type CoreFactory
- type CoreImpl
- type Criteria
- type ExecutionResultQueryProvider
- type ExecutionStateCache
- type Pipeline
- type PipelineFactory
- type PipelineImpl
- type PipelineStateConsumer
- type PipelineStateProvider
- type Query
- type Snapshot
- type State
Constants ¶
const DefaultTxResultErrMsgsRequestTimeout = 5 * time.Second
Variables ¶
var ( // ErrInvalidTransition is returned when a state transition is invalid. ErrInvalidTransition = errors.New("invalid state transition") )
Functions ¶
func NewMockStateConsumer ¶
func NewMockStateConsumer() *mockStateConsumer
NewMockStateConsumer creates a new instance of mockStateConsumer with a buffered channel.
func NewMockStateProvider ¶
func NewMockStateProvider() *mockStateProvider
NewMockStateProvider initializes a mockStateProvider with the default state StatePending.
Types ¶
type Core ¶
type Core interface { // Download retrieves all necessary data for processing. // Concurrency safe - all operations will be executed sequentially. // // Expected errors: // - context.Canceled: if the provided context was canceled before completion // - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible) Download(ctx context.Context) error // Index processes the downloaded data and creates in-memory indexes. // Concurrency safe - all operations will be executed sequentially. // // No errors are expected during normal operations Index() error // Persist stores the indexed data in permanent storage. // Concurrency safe - all operations will be executed sequentially. // // No errors are expected during normal operations Persist() error // Abandon indicates that the protocol has abandoned this state. Hence processing will be aborted // and any data dropped. // Concurrency safe - all operations will be executed sequentially. // CAUTION: The Core instance should not be used after Abandon is called as it could cause panic due to cleared data. // // No errors are expected during normal operations Abandon() error }
Core defines the interface for pipeline processing steps. Each implementation should handle an execution data and implement the three-phase processing: download, index, and persist. CAUTION: The Core instance should not be used after Abandon is called as it could cause panic due to cleared data. Core implementations must be - CONCURRENCY SAFE
type CoreFactory ¶
type CoreFactory interface {
NewCore(result *flow.ExecutionResult) Core
}
CoreFactory is a factory object for creating new Core instances.
type CoreImpl ¶
type CoreImpl struct {
// contains filtered or unexported fields
}
CoreImpl implements the Core interface for processing execution data. It coordinates the download, indexing, and persisting of execution data. Concurrency safe - all operations will be executed sequentially. CAUTION: The CoreImpl instance should not be used after Abandon is called as it could cause panic due to cleared data.
func NewCoreImpl ¶
func NewCoreImpl( logger zerolog.Logger, executionResult *flow.ExecutionResult, header *flow.Header, execDataRequester requester.ExecutionDataRequester, txResultErrMsgsRequester tx_error_messages.Requester, txResultErrMsgsRequestTimeout time.Duration, persistentRegisters storage.RegisterIndex, persistentEvents storage.Events, persistentCollections storage.Collections, persistentResults storage.LightTransactionResults, persistentTxResultErrMsg storage.TransactionResultErrorMessages, latestPersistedSealedResult storage.LatestPersistedSealedResult, protocolDB storage.DB, lockManager storage.LockManager, ) *CoreImpl
NewCoreImpl creates a new CoreImpl with all necessary dependencies Concurrency safe - all operations will be executed sequentially.
func (*CoreImpl) Abandon ¶
Abandon indicates that the protocol has abandoned this state. Hence processing will be aborted and any data dropped. Concurrency safe - all operations will be executed sequentially. CAUTION: The CoreImpl instance should not be used after Abandon is called as it could cause panic due to cleared data.
No errors are expected during normal operations
func (*CoreImpl) Download ¶
Download downloads execution data and transaction results error for the block Concurrency safe - all operations will be executed sequentially.
Expected errors: - context.Canceled: if the provided context was canceled before completion - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible)
type Criteria ¶
type Criteria struct { // AgreeingExecutorsCount is the number of receipts including the same ExecutionResult AgreeingExecutorsCount uint // RequiredExecutors is the list of EN node IDs, one of which must have produced the result RequiredExecutors flow.IdentifierList }
Criteria defines the filtering criteria for execution result queries. It specifies requirements for execution result selection including the number of agreeing executors and requires executor nodes.
func (*Criteria) OverrideWith ¶
OverrideWith overrides the original criteria with the incoming criteria, returning a new Criteria object. Fields from `override` criteria take precedence when set.
type ExecutionResultQueryProvider ¶
type ExecutionResultQueryProvider interface { // ExecutionResultQuery retrieves execution results and associated execution nodes for a given block ID // based on the provided criteria. It returns a Query containing the execution result and // the execution nodes that produced it. // // Expected errors during normal operations: // - backend.InsufficientExecutionReceipts - found insufficient receipts for given block ID. ExecutionResultQuery(blockID flow.Identifier, criteria Criteria) (*Query, error) }
ExecutionResultQueryProvider provides execution results and execution nodes based on criteria. It allows querying for execution results by block ID with specific filtering criteria to ensure consistency and reliability of execution results.
type ExecutionStateCache ¶
type ExecutionStateCache interface { // Snapshot returns a view of the execution state as of the provided ExecutionResult. // The returned Snapshot provides access to execution state data for the fork ending // on the provided ExecutionResult which extends from the latest sealed result. // The result may be sealed or unsealed. Only data for finalized blocks is available. // // Expected errors during normal operation: // - storage.ErrNotFound - result is not available, not ready for querying, or does not descend from the latest sealed result. // - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible) Snapshot(executionResultID flow.Identifier) (Snapshot, error) }
ExecutionStateCache provides access to execution state snapshots for querying data at specific ExecutionResults.
type Pipeline ¶
type Pipeline interface { PipelineStateProvider // Run starts the pipeline processing and blocks until completion or context cancellation. // CAUTION: not concurrency safe! Run must only be called once. // // Expected Errors: // - context.Canceled: when the context is canceled // - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible) Run(ctx context.Context, core Core, parentState State) error // SetSealed marks the pipeline's result as sealed, which enables transitioning from StateWaitingPersist to StatePersisting. SetSealed() // OnParentStateUpdated updates the pipeline's parent's state. OnParentStateUpdated(parentState State) // Abandon marks the pipeline as abandoned. Abandon() }
Pipeline represents a processing pipelined state machine for a single ExecutionResult. The state machine is initialized in the Pending state.
The state machine is designed to be run in a single goroutine. The Run method must only be called once.
type PipelineFactory ¶
type PipelineFactory interface {
NewPipeline(result *flow.ExecutionResult, isSealed bool) Pipeline
}
PipelineFactory is a factory object for creating new Pipeline instances.
type PipelineImpl ¶
type PipelineImpl struct {
// contains filtered or unexported fields
}
PipelineImpl implements the Pipeline interface
func NewPipeline ¶
func NewPipeline( log zerolog.Logger, executionResult *flow.ExecutionResult, isSealed bool, stateReceiver PipelineStateConsumer, ) *PipelineImpl
NewPipeline creates a new processing pipeline. The pipeline is initialized in the Pending state.
func (*PipelineImpl) Abandon ¶
func (p *PipelineImpl) Abandon()
Abandon marks the pipeline as abandoned This will cause the pipeline to eventually transition to the Abandoned state and halt processing
func (*PipelineImpl) GetState ¶
func (p *PipelineImpl) GetState() State
GetState returns the current state of the pipeline.
func (*PipelineImpl) OnParentStateUpdated ¶
func (p *PipelineImpl) OnParentStateUpdated(parentState State)
OnParentStateUpdated updates the pipeline's state based on the provided parent state. If the parent state has changed, it will notify the state consumer and trigger a state change notification.
func (*PipelineImpl) Run ¶
Run starts the pipeline processing and blocks until completion or context cancellation. CAUTION: not concurrency safe! Run must only be called once.
Expected Errors:
- context.Canceled: when the context is canceled
- All other errors are potential indicators of bugs or corrupted internal state (continuation impossible)
func (*PipelineImpl) SetSealed ¶
func (p *PipelineImpl) SetSealed()
SetSealed marks the execution result as sealed. This will cause the pipeline to eventually transition to the StateComplete state when the parent finishes processing.
type PipelineStateConsumer ¶
type PipelineStateConsumer interface { // OnStateUpdated is called when a pipeline's state changes to notify the receiver of the new state. // This method is will be called in the same goroutine that runs the pipeline, so it must not block. OnStateUpdated(newState State) }
PipelineStateConsumer is a receiver of the pipeline state updates. PipelineStateConsumer implementations must be - NON-BLOCKING and consume the state updates without noteworthy delay
type PipelineStateProvider ¶
type PipelineStateProvider interface { // GetState returns the current state of the pipeline. GetState() State }
PipelineStateProvider is an interface that provides a pipeline's state.
type Query ¶
type Query struct { // ExecutionResult is the execution result for the queried block ExecutionResult *flow.ExecutionResult // ExecutionNodes is the list of execution node identities that produced the result ExecutionNodes flow.IdentitySkeletonList }
Query contains the result of an execution result query. It includes both the execution result and the execution nodes that produced it.
type Snapshot ¶
type Snapshot interface { // Events returns a reader for querying event data. Events() storage.EventsReader // Collections returns a reader for querying collection data. Collections() storage.CollectionsReader // Transactions returns a reader for querying transaction data. Transactions() storage.TransactionsReader // LightTransactionResults returns a reader for querying light transaction result data. LightTransactionResults() storage.LightTransactionResultsReader // TransactionResultErrorMessages returns a reader for querying transaction error message data. TransactionResultErrorMessages() storage.TransactionResultErrorMessagesReader // Registers returns a reader for querying register data. Registers() storage.RegisterIndexReader }
Snapshot provides access to execution data readers for querying various data types from a specific ExecutionResult.
type State ¶
type State int32
State represents the state of the processing pipeline
const ( // StatePending is the initial state after instantiation, before Run is called StatePending State = iota // StateProcessing represents the state where data processing (download and indexing) has been started StateProcessing // StateWaitingPersist represents the state where all data is indexed, but conditions to persist are not met StateWaitingPersist // StateComplete represents the state where all data is persisted to storage StateComplete // StateAbandoned represents the state where processing was aborted StateAbandoned )
func (State) IsTerminal ¶
IsTerminal returns true if the state is a terminal state (Complete or Abandoned).