optimistic_sync

package
v0.42.2-experimental-c... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 22, 2025 License: AGPL-3.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultTxResultErrMsgsRequestTimeout = 5 * time.Second

Variables

View Source
var (
	// ErrInvalidTransition is returned when a state transition is invalid.
	ErrInvalidTransition = errors.New("invalid state transition")
)

Functions

This section is empty.

Types

type Core

type Core interface {
	// Download retrieves all necessary data for processing.
	// Concurrency safe - all operations will be executed sequentially.
	//
	// Expected errors:
	// - context.Canceled: if the provided context was canceled before completion
	// - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible)
	Download(ctx context.Context) error

	// Index processes the downloaded data and creates in-memory indexes.
	// Concurrency safe - all operations will be executed sequentially.
	//
	// No errors are expected during normal operations
	Index() error

	// Persist stores the indexed data in permanent storage.
	// Concurrency safe - all operations will be executed sequentially.
	//
	// No errors are expected during normal operations
	Persist() error

	// Abandon indicates that the protocol has abandoned this state. Hence processing will be aborted
	// and any data dropped.
	// Concurrency safe - all operations will be executed sequentially.
	// CAUTION: The Core instance should not be used after Abandon is called as it could cause panic due to cleared data.
	//
	// No errors are expected during normal operations
	Abandon() error
}

Core defines the interface for pipeline processing steps. Each implementation should handle an execution data and implement the three-phase processing: download, index, and persist. CAUTION: The Core instance should not be used after Abandon is called as it could cause panic due to cleared data. Core implementations must be - CONCURRENCY SAFE

type CoreFactory

type CoreFactory interface {
	NewCore(result *flow.ExecutionResult) Core
}

CoreFactory is a factory object for creating new Core instances.

type CoreImpl

type CoreImpl struct {
	// contains filtered or unexported fields
}

CoreImpl implements the Core interface for processing execution data. It coordinates the download, indexing, and persisting of execution data. Concurrency safe - all operations will be executed sequentially. CAUTION: The CoreImpl instance should not be used after Abandon is called as it could cause panic due to cleared data.

func NewCoreImpl

func NewCoreImpl(
	logger zerolog.Logger,
	executionResult *flow.ExecutionResult,
	header *flow.Header,
	execDataRequester requester.ExecutionDataRequester,
	txResultErrMsgsRequester tx_error_messages.Requester,
	txResultErrMsgsRequestTimeout time.Duration,
	persistentRegisters storage.RegisterIndex,
	persistentEvents storage.Events,
	persistentCollections storage.Collections,
	persistentTransactions storage.Transactions,
	persistentResults storage.LightTransactionResults,
	persistentTxResultErrMsg storage.TransactionResultErrorMessages,
	latestPersistedSealedResult storage.LatestPersistedSealedResult,
	protocolDB storage.DB,
) *CoreImpl

NewCoreImpl creates a new CoreImpl with all necessary dependencies Concurrency safe - all operations will be executed sequentially.

func (*CoreImpl) Abandon

func (c *CoreImpl) Abandon() error

Abandon indicates that the protocol has abandoned this state. Hence processing will be aborted and any data dropped. Concurrency safe - all operations will be executed sequentially. CAUTION: The CoreImpl instance should not be used after Abandon is called as it could cause panic due to cleared data.

No errors are expected during normal operations

func (*CoreImpl) Download

func (c *CoreImpl) Download(ctx context.Context) error

Download downloads execution data and transaction results error for the block Concurrency safe - all operations will be executed sequentially.

Expected errors: - context.Canceled: if the provided context was canceled before completion - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible)

func (*CoreImpl) Index

func (c *CoreImpl) Index() error

Index retrieves the downloaded execution data and transaction results error messages from the caches and indexes them into in-memory storage. Concurrency safe - all operations will be executed sequentially.

No errors are expected during normal operations

func (*CoreImpl) Persist

func (c *CoreImpl) Persist() error

Persist persists the indexed data to permanent storage atomically. Concurrency safe - all operations will be executed sequentially.

No errors are expected during normal operations

type Pipeline

type Pipeline interface {
	PipelineStateProvider

	// Run starts the pipeline processing and blocks until completion or context cancellation.
	// CAUTION: not concurrency safe! Run must only be called once.
	//
	// Expected Errors:
	//   - context.Canceled: when the context is canceled
	//   - All other errors are potential indicators of bugs or corrupted internal state (continuation impossible)
	Run(ctx context.Context, core Core, parentState State) error

	// SetSealed marks the pipeline's result as sealed, which enables transitioning from StateWaitingPersist to StatePersisting.
	SetSealed()

	// OnParentStateUpdated updates the pipeline's parent's state.
	OnParentStateUpdated(parentState State)

	// Abandon marks the pipeline as abandoned.
	Abandon()
}

Pipeline represents a processing pipelined state machine for a single ExecutionResult. The state machine is initialized in the Pending state.

The state machine is designed to be run in a single goroutine. The Run method must only be called once.

type PipelineFactory

type PipelineFactory interface {
	NewPipeline(result *flow.ExecutionResult, isSealed bool) Pipeline
}

PipelineFactory is a factory object for creating new Pipeline instances.

type PipelineImpl

type PipelineImpl struct {
	// contains filtered or unexported fields
}

PipelineImpl implements the Pipeline interface

func NewPipeline

func NewPipeline(
	log zerolog.Logger,
	executionResult *flow.ExecutionResult,
	isSealed bool,
	stateReceiver PipelineStateConsumer,
) *PipelineImpl

NewPipeline creates a new processing pipeline. The pipeline is initialized in the Pending state.

func (*PipelineImpl) Abandon

func (p *PipelineImpl) Abandon()

Abandon marks the pipeline as abandoned This will cause the pipeline to eventually transition to the Abandoned state and halt processing

func (*PipelineImpl) GetState

func (p *PipelineImpl) GetState() State

GetState returns the current state of the pipeline.

func (*PipelineImpl) OnParentStateUpdated

func (p *PipelineImpl) OnParentStateUpdated(parentState State)

OnParentStateUpdated updates the pipeline's state based on the provided parent state. If the parent state has changed, it will notify the state consumer and trigger a state change notification.

func (*PipelineImpl) Run

func (p *PipelineImpl) Run(ctx context.Context, core Core, parentState State) error

Run starts the pipeline processing and blocks until completion or context cancellation. CAUTION: not concurrency safe! Run must only be called once.

Expected Errors:

  • context.Canceled: when the context is canceled
  • All other errors are potential indicators of bugs or corrupted internal state (continuation impossible)

func (*PipelineImpl) SetSealed

func (p *PipelineImpl) SetSealed()

SetSealed marks the execution result as sealed. This will cause the pipeline to eventually transition to the StateComplete state when the parent finishes processing.

type PipelineStateConsumer

type PipelineStateConsumer interface {
	// OnStateUpdated is called when a pipeline's state changes to notify the receiver of the new state.
	// This method is will be called in the same goroutine that runs the pipeline, so it must not block.
	OnStateUpdated(newState State)
}

PipelineStateConsumer is a receiver of the pipeline state updates. PipelineStateConsumer implementations must be - NON-BLOCKING and consume the state updates without noteworthy delay

type PipelineStateProvider

type PipelineStateProvider interface {
	// GetState returns the current state of the pipeline.
	GetState() State
}

PipelineStateProvider is an interface that provides a pipeline's state.

type State

type State int32

State represents the state of the processing pipeline

const (
	// StatePending is the initial state after instantiation, before Run is called
	StatePending State = iota
	// StateProcessing represents the state where data processing (download and indexing) has been started
	StateProcessing
	// StateWaitingPersist represents the state where all data is indexed, but conditions to persist are not met
	StateWaitingPersist
	// StateComplete represents the state where all data is persisted to storage
	StateComplete
	// StateAbandoned represents the state where processing was aborted
	StateAbandoned
)

func (State) IsTerminal

func (s State) IsTerminal() bool

IsTerminal returns true if the state is a terminal state (Complete or Abandoned).

func (State) String

func (s State) String() string

String representation of states for logging

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL