pipeline

package
v0.42.4-access-txerr-b... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2025 License: AGPL-3.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Core

type Core interface {
	// Download retrieves all necessary data for processing.
	Download(ctx context.Context) error

	// Index processes the downloaded data and creates in-memory indexes.
	Index(ctx context.Context) error

	// Persist stores the indexed data in permanent storage.
	Persist(ctx context.Context) error
}

Core defines the interface for pipeline processing steps. Each implementation should handle an execution data and implement the three-phase processing: download, index, and persist.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline represents a generic processing pipeline with state transitions. It processes data through sequential states: Ready -> Downloading -> Indexing -> WaitingPersist -> Persisting -> Complete, with conditions for each transition.

func NewPipeline

func NewPipeline(
	logger zerolog.Logger,
	isSealed bool,
	executionResult *flow.ExecutionResult,
	core Core,
	stateUpdatePublisher StateUpdatePublisher,
) *Pipeline

NewPipeline creates a new processing pipeline. Pipelines must only be created for ExecutionResults that descend from the latest persisted sealed result. The pipeline is initialized in the Ready state.

Parameters:

  • logger: the logger to use for the pipeline
  • isSealed: indicates if the pipeline's ExecutionResult is sealed
  • executionResult: processed execution result
  • core: implements the processing logic for the pipeline
  • stateUpdatePublisher: called when the pipeline needs to broadcast state updates

Returns:

  • new pipeline object

func (*Pipeline) GetState

func (p *Pipeline) GetState() State

GetState returns the current state of the pipeline.

func (*Pipeline) Run

func (p *Pipeline) Run(parentCtx context.Context) error

Run starts the pipeline processing and blocks until completion or context cancellation.

This function handles the progression through the pipeline states, executing the appropriate processing functions at each step.

When the pipeline reaches a terminal state (StateComplete or StateCanceled), the function returns. The function will also return if the provided context is canceled.

Returns an error if any processing step fails with an irrecoverable error. Returns nil if processing completes successfully, reaches a terminal state, or if either the parent or pipeline context is canceled.

func (*Pipeline) SetSealed

func (p *Pipeline) SetSealed()

SetSealed marks the data as sealed, which enables transitioning from StateWaitingPersist to StatePersisting.

func (*Pipeline) UpdateState

func (p *Pipeline) UpdateState(update StateUpdate)

UpdateState updates the pipeline's state based on the provided state update.

type State

type State int

State represents the state of the processing pipeline

const (
	// StateReady is the initial state after instantiation and before downloading has begun
	StateReady State = iota
	// StateDownloading represents the state where data download is in progress
	StateDownloading
	// StateIndexing represents the state where data is being indexed
	StateIndexing
	// StateWaitingPersist represents the state where all data is indexed, but conditions to persist are not met
	StateWaitingPersist
	// StatePersisting represents the state where the indexed data is being persisted to storage
	StatePersisting
	// StateComplete represents the state where all data is persisted to storage
	StateComplete
	// StateCanceled represents the state where processing was aborted
	StateCanceled
)

func (State) String

func (s State) String() string

String representation of states for logging

type StateUpdate

type StateUpdate struct {
	// DescendsFromLastPersistedSealed indicates if this pipeline descends from
	// the last persisted sealed result
	DescendsFromLastPersistedSealed bool
	// ParentState contains the state information from the parent pipeline
	ParentState State
}

StateUpdate contains state update information

type StateUpdatePublisher

type StateUpdatePublisher func(update StateUpdate)

StateUpdatePublisher is a function that publishes state updates

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL