pipeline

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 7, 2026 License: MIT Imports: 5 Imported by: 0

README

pipeline

Pipeline is a generic, thread-safe execution engine that processes data through a series of named steps that are executed each in its own goroutine. It supports both single-shot execution and reactive listening. It guarantees that a previous run of a certain step had a chance to cleanup, if interrupted and not finished, before it is run again in a listening scenario.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option[T any] func(*Pipeline[T])

Option defines a functional configuration pattern for the Pipeline.

func WithAfter

func WithAfter[T any](fn func(data T) error) Option[T]

WithAfter adds a synchronous function to be executed only if all steps finished successfully and no external cancellation occurred.

func WithBefore

func WithBefore[T any](fn func(data T) error) Option[T]

WithBefore adds a synchronous function to be executed before any steps start. If it returns an error, the pipeline execution is aborted immediately.

type Pipeline

type Pipeline[T any] struct {
	// contains filtered or unexported fields
}

Pipeline is a generic, thread-safe execution engine that processes data through a series of named steps that are executed each in its own goroutine. It supports both single-shot execution and reactive listening. It guarantees that a previous run of a certain step had a chance to cleanup, if interrupted and not finished, before it is run again in a listening scenario.

func New

func New[T any](options ...Option[T]) *Pipeline[T]

New initializes a new Pipeline with the provided options.

func (*Pipeline[T]) AddStep

func (p *Pipeline[T]) AddStep(name string, timeout time.Duration, fn StepFunc[T])

AddStep registers a new task in the pipeline. Steps are executed sequentially. Each step is wrapped in its own goroutine but the pipeline waits for completion before moving to the next step.

func (*Pipeline[T]) Err

func (p *Pipeline[T]) Err() error

Err returns a copy of the error map from the most recent pipeline execution. Returns nil if no errors occurred.

func (*Pipeline[T]) Listen

func (p *Pipeline[T]) Listen(ctx context.Context, triggerChan <-chan T)

Listen starts a long-running reactive loop that waits for triggers on the provided channel. If a new trigger arrives while a pipeline is still running, the current run is canceled, the system waits for its cleanup, and then starts the new run.

func (*Pipeline[T]) Run

func (p *Pipeline[T]) Run(ctx context.Context, data T) error

Run executes the pipeline synchronously for a single data input. It blocks until the execution and all associated cleanups are finished.

type StepFunc

type StepFunc[T any] func(ctx context.Context, data T) error

StepFunc defines the signature for a single unit of work within the pipeline. It receives a context (with a step-specific timeout) and the generic data.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL