Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Core ¶
type Core interface { // Download retrieves all necessary data for processing. Download(ctx context.Context) error // Index processes the downloaded data and creates in-memory indexes. Index(ctx context.Context) error // Persist stores the indexed data in permanent storage. Persist(ctx context.Context) error }
Core defines the interface for pipeline processing steps. Each implementation should handle an execution data and implement the three-phase processing: download, index, and persist.
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline represents a generic processing pipeline with state transitions. It processes data through sequential states: Ready -> Downloading -> Indexing -> WaitingPersist -> Persisting -> Complete, with conditions for each transition.
func NewPipeline ¶
func NewPipeline( logger zerolog.Logger, isSealed bool, executionResult *flow.ExecutionResult, core Core, stateUpdatePublisher StateUpdatePublisher, ) *Pipeline
NewPipeline creates a new processing pipeline. Pipelines must only be created for ExecutionResults that descend from the latest persisted sealed result. The pipeline is initialized in the Ready state.
Parameters:
- logger: the logger to use for the pipeline
- isSealed: indicates if the pipeline's ExecutionResult is sealed
- executionResult: processed execution result
- core: implements the processing logic for the pipeline
- stateUpdatePublisher: called when the pipeline needs to broadcast state updates
Returns:
- new pipeline object
func (*Pipeline) Run ¶
Run starts the pipeline processing and blocks until completion or context cancellation.
This function handles the progression through the pipeline states, executing the appropriate processing functions at each step.
When the pipeline reaches a terminal state (StateComplete or StateCanceled), the function returns. The function will also return if the provided context is canceled.
Returns an error if any processing step fails with an irrecoverable error. Returns nil if processing completes successfully, reaches a terminal state, or if either the parent or pipeline context is canceled.
func (*Pipeline) SetSealed ¶
func (p *Pipeline) SetSealed()
SetSealed marks the data as sealed, which enables transitioning from StateWaitingPersist to StatePersisting.
func (*Pipeline) UpdateState ¶
func (p *Pipeline) UpdateState(update StateUpdate)
UpdateState updates the pipeline's state based on the provided state update.
type State ¶
type State int
State represents the state of the processing pipeline
const ( // StateReady is the initial state after instantiation and before downloading has begun StateReady State = iota // StateDownloading represents the state where data download is in progress StateDownloading // StateIndexing represents the state where data is being indexed StateIndexing // StateWaitingPersist represents the state where all data is indexed, but conditions to persist are not met StateWaitingPersist // StatePersisting represents the state where the indexed data is being persisted to storage StatePersisting // StateComplete represents the state where all data is persisted to storage StateComplete // StateCanceled represents the state where processing was aborted StateCanceled )
type StateUpdate ¶
type StateUpdate struct { // DescendsFromLastPersistedSealed indicates if this pipeline descends from // the last persisted sealed result DescendsFromLastPersistedSealed bool // ParentState contains the state information from the parent pipeline ParentState State }
StateUpdate contains state update information
type StateUpdatePublisher ¶
type StateUpdatePublisher func(update StateUpdate)
StateUpdatePublisher is a function that publishes state updates