pipeline

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2024 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package pipeline provides logic for processing a pipeline of data elements using a coordinated concurrency model. A Pipeline is made up of one or more stages each executing a finite (but resizable) set of concurrent goroutines that are coordinated using this module's waypoint package.

Index

Constants

View Source
const (
	ErrCorrupted    = errstr("pipeline state is corrupted")
	ErrIsStarted    = errstr("pipeline is already started")
	ErrNameConflict = errstr("stage name conflict")
	ErrNameUnknown  = errstr("stage name not found")
	ErrNilReceiver  = errstr("nil receiver")
	ErrNoStages     = errstr("no pipeline stages registered")
)

Variables

This section is empty.

Functions

func Recv

func Recv[T any](ctx context.Context, ch <-chan any) (T, bool, error)

func Send

func Send[T any](ctx context.Context, value T, ch chan<- any) error

Types

type Interface

type Interface interface {
	// The Feed methods acts as the data source for a Pipeline by sending data
	// elements into the provided channel. The Pipeine will take care of closing
	// wchan as soon as this method returns.
	//
	// NOTE: The implementor should not close this channel; doing so will cause
	// a panic.
	Feed(ctx context.Context, wchan chan<- any) error

	// A Collect method acts as the data sink for the Pipeline by receiving
	// data elements from the provided channel. The Pipeline will close this
	// channel when no more data is forthcoming.
	Collect(ctx context.Context, rchan <-chan any) error
}

Interface defines methods that should be implemented by types written to provide the data source and sink for a given Pipeline.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

func New

func New(impl Interface) *Pipeline

New creates and returns a new Pipeline using the provided Interface.

func (*Pipeline) Add

func (p *Pipeline) Add(name string, capacity int, pfunc StageFunc) error

Add registers a named Pipeline stage that will execute the provided StageFunc using an initial waypoint capacity. The given name must be unique among all stages for this Pipeline. Add may be called multiple times, to register multiple stages, and data will flow through each stage of the Pipeline in the order they are registered. Note however that the receiver's Run method will fail if no stages have been registered.

Once the receiver has been started (by calling its Run method) no more stages may be registered. Add returns ErrIsStarted if it is called after Run. ErrNameConflict is returned if Add is called using a previously registered name. Otherwise, the new stage is registered and a nil error is returned.

The name parameter may be used with the Resize method in order to alter the capacity of this particular stage. For more details, see this module's waypoint package.

func (*Pipeline) GoContext added in v0.3.0

func (p *Pipeline) GoContext(cfunc errgroupx.ContextFunc)

GoContext adds cfunc to the list of ContextFuncs that will be executed (each in their own goroutine) alongside Pipeline-specific goroutines when the receiver's Run method is called. Note that, while this ContextFunc is separate from the goroutines executing the pipline, an error returned from this function will cause the pipeline to fail. Therefore, only execute goroutines with this method if that is your intent.

func (*Pipeline) Resize

func (p *Pipeline) Resize(name string, newcap int) (int, error)

Resize updates the capacity of the pipeline stage with the given name to the provided newcap value and returns that stage's previous capacity value. If name is not a registered stage name then zero and ErrNameUnknown will be returned.

func (*Pipeline) Run

func (p *Pipeline) Run(ctx context.Context) error

Run executes the Pipeline defined for the receiver as at least three separate goroutines: one each for the Feed and Collect methods implemented by the Interface provided to the constructor plus one or more additional goroutines for the registered stages. Channels are interleaved between the Feed stage, each of the individually registered stages (in the order each was added), and the final Collect stage.

Run blocks until all of its goroutines have completed -- either successfully or until any one of them returns a non-nil error. If the provided context is canceled that cancelation will be propagated to all running goroutines (note that an err returned by a goroutine will cancel the context provided to all of the others).

If the receiver has no stages registered then ErrNoStages is returned. Otherwise, any error returned will be one returned from one of the underlying goroutines.

type StageFunc added in v0.2.0

type StageFunc func(ctx context.Context, input any) (any, error)

A StageFunc is the function called to process each piece of data for a stage registered using the (*Pipeline).Add method.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL