Documentation
¶
Index ¶
Constants ¶
const DefaultTxResultErrMsgsRequestTimeout = 5 * time.Second
Variables ¶
var ( // ErrInvalidTransition is returned when a state transition is invalid. ErrInvalidTransition = errors.New("invalid state transition") )
Functions ¶
This section is empty.
Types ¶
type Core ¶
type Core interface { // Download retrieves all necessary data for processing. // Concurrency safe - all operations will be executed sequentially. // // Expected errors: // - context.Canceled: if the provided context was canceled before completion // - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible) Download(ctx context.Context) error // Index processes the downloaded data and creates in-memory indexes. // Concurrency safe - all operations will be executed sequentially. // // No errors are expected during normal operations Index() error // Persist stores the indexed data in permanent storage. // Concurrency safe - all operations will be executed sequentially. // // No errors are expected during normal operations Persist() error // Abandon indicates that the protocol has abandoned this state. Hence processing will be aborted // and any data dropped. // Concurrency safe - all operations will be executed sequentially. // CAUTION: The Core instance should not be used after Abandon is called as it could cause panic due to cleared data. // // No errors are expected during normal operations Abandon() error }
Core defines the interface for pipeline processing steps. Each implementation should handle an execution data and implement the three-phase processing: download, index, and persist. CAUTION: The Core instance should not be used after Abandon is called as it could cause panic due to cleared data. Core implementations must be - CONCURRENCY SAFE
type CoreFactory ¶
type CoreFactory interface {
NewCore(result *flow.ExecutionResult) Core
}
CoreFactory is a factory object for creating new Core instances.
type CoreImpl ¶
type CoreImpl struct {
// contains filtered or unexported fields
}
CoreImpl implements the Core interface for processing execution data. It coordinates the download, indexing, and persisting of execution data. Concurrency safe - all operations will be executed sequentially. CAUTION: The CoreImpl instance should not be used after Abandon is called as it could cause panic due to cleared data.
func NewCoreImpl ¶
func NewCoreImpl( logger zerolog.Logger, executionResult *flow.ExecutionResult, header *flow.Header, execDataRequester requester.ExecutionDataRequester, txResultErrMsgsRequester tx_error_messages.Requester, txResultErrMsgsRequestTimeout time.Duration, persistentRegisters storage.RegisterIndex, persistentEvents storage.Events, persistentCollections storage.Collections, persistentTransactions storage.Transactions, persistentResults storage.LightTransactionResults, persistentTxResultErrMsg storage.TransactionResultErrorMessages, latestPersistedSealedResult storage.LatestPersistedSealedResult, protocolDB storage.DB, ) *CoreImpl
NewCoreImpl creates a new CoreImpl with all necessary dependencies Concurrency safe - all operations will be executed sequentially.
func (*CoreImpl) Abandon ¶
Abandon indicates that the protocol has abandoned this state. Hence processing will be aborted and any data dropped. Concurrency safe - all operations will be executed sequentially. CAUTION: The CoreImpl instance should not be used after Abandon is called as it could cause panic due to cleared data.
No errors are expected during normal operations
func (*CoreImpl) Download ¶
Download downloads execution data and transaction results error for the block Concurrency safe - all operations will be executed sequentially.
Expected errors: - context.Canceled: if the provided context was canceled before completion - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible)
type Pipeline ¶
type Pipeline interface { PipelineStateProvider // Run starts the pipeline processing and blocks until completion or context cancellation. // CAUTION: not concurrency safe! Run must only be called once. // // Expected Errors: // - context.Canceled: when the context is canceled // - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible) Run(ctx context.Context, core Core, parentState State) error // SetSealed marks the pipeline's result as sealed, which enables transitioning from StateWaitingPersist to StatePersisting. SetSealed() // OnParentStateUpdated updates the pipeline's parent's state. OnParentStateUpdated(parentState State) // Abandon marks the pipeline as abandoned. Abandon() }
Pipeline represents a processing pipelined state machine for a single ExecutionResult. The state machine is initialized in the Pending state.
The state machine is designed to be run in a single goroutine. The Run method must only be called once.
type PipelineFactory ¶
type PipelineFactory interface {
NewPipeline(result *flow.ExecutionResult, isSealed bool) Pipeline
}
PipelineFactory is a factory object for creating new Pipeline instances.
type PipelineImpl ¶
type PipelineImpl struct {
// contains filtered or unexported fields
}
PipelineImpl implements the Pipeline interface
func NewPipeline ¶
func NewPipeline( log zerolog.Logger, executionResult *flow.ExecutionResult, isSealed bool, stateReceiver PipelineStateConsumer, ) *PipelineImpl
NewPipeline creates a new processing pipeline. The pipeline is initialized in the Pending state.
func (*PipelineImpl) Abandon ¶
func (p *PipelineImpl) Abandon()
Abandon marks the pipeline as abandoned This will cause the pipeline to eventually transition to the Abandoned state and halt processing
func (*PipelineImpl) GetState ¶
func (p *PipelineImpl) GetState() State
GetState returns the current state of the pipeline.
func (*PipelineImpl) OnParentStateUpdated ¶
func (p *PipelineImpl) OnParentStateUpdated(parentState State)
OnParentStateUpdated updates the pipeline's state based on the provided parent state. If the parent state has changed, it will notify the state consumer and trigger a state change notification.
func (*PipelineImpl) Run ¶
Run starts the pipeline processing and blocks until completion or context cancellation. CAUTION: not concurrency safe! Run must only be called once.
Expected Errors:
- context.Canceled: when the context is canceled
- All other errors are potential indicators of bugs or corrupted internal state (continuation impossible)
func (*PipelineImpl) SetSealed ¶
func (p *PipelineImpl) SetSealed()
SetSealed marks the execution result as sealed. This will cause the pipeline to eventually transition to the StateComplete state when the parent finishes processing.
type PipelineStateConsumer ¶
type PipelineStateConsumer interface { // OnStateUpdated is called when a pipeline's state changes to notify the receiver of the new state. // This method is will be called in the same goroutine that runs the pipeline, so it must not block. OnStateUpdated(newState State) }
PipelineStateConsumer is a receiver of the pipeline state updates. PipelineStateConsumer implementations must be - NON-BLOCKING and consume the state updates without noteworthy delay
type PipelineStateProvider ¶
type PipelineStateProvider interface { // GetState returns the current state of the pipeline. GetState() State }
PipelineStateProvider is an interface that provides a pipeline's state.
type State ¶
type State int32
State represents the state of the processing pipeline
const ( // StatePending is the initial state after instantiation, before Run is called StatePending State = iota // StateProcessing represents the state where data processing (download and indexing) has been started StateProcessing // StateWaitingPersist represents the state where all data is indexed, but conditions to persist are not met StateWaitingPersist // StateComplete represents the state where all data is persisted to storage StateComplete // StateAbandoned represents the state where processing was aborted StateAbandoned )
func (State) IsTerminal ¶
IsTerminal returns true if the state is a terminal state (Complete or Abandoned).