Documentation ¶
Overview ¶
Package pipeline provides logic for processing a pipeline of data elements using a coordinated concurrency model. A Pipeline is made up of one or more stages each executing a finite (but resizable) set of concurrent goroutines that are coordinated using this module's waypoint package.
Index ¶
Constants ¶
const ( ErrCorrupted = errstr("pipeline state is corrupted") ErrIsStarted = errstr("pipeline is already started") ErrNameConflict = errstr("stage name conflict") ErrNameUnknown = errstr("stage name not found") ErrNilReceiver = errstr("nil receiver") ErrNoStages = errstr("no pipeline stages registered") )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Interface ¶
type Interface interface { // The Feed methods acts as the data source for a Pipeline by sending data // elements into the provided channel. The Pipeine will take care of closing // wchan as soon as this method returns. // // NOTE: The implementor should not close this channel; doing so will cause // a panic. Feed(ctx context.Context, wchan chan<- any) error // A Collect method acts as the data sink for the Pipeline by receiving // data elements from the provided channel. The Pipeline will close this // channel when no more data is forthcoming. Collect(ctx context.Context, rchan <-chan any) error }
Interface defines methods that should be implemented by types written to provide the data source and sink for a given Pipeline.
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
func (*Pipeline) Add ¶
Add registers a named Pipeline stage that will execute the provided StageFunc using an initial waypoint capacity. The given name must be unique among all stages for this Pipeline. Add may be called multiple times, to register multiple stages, and data will flow through each stage of the Pipeline in the order they are registered. Note however that the receiver's Run method will fail if no stages have been registered.
Once the receiver has been started (by calling its Run method) no more stages may be registered. Add returns ErrIsStarted if it is called after Run. ErrNameConflict is returned if Add is called using a previously registered name. Otherwise, the new stage is registered and a nil error is returned.
The name parameter may be used with the Resize method in order to alter the capacity of this particular stage. For more details, see this module's waypoint package.
func (*Pipeline) GoContext ¶ added in v0.3.0
func (p *Pipeline) GoContext(cfunc errgroupx.ContextFunc)
GoContext adds cfunc to the list of ContextFuncs that will be executed (each in their own goroutine) alongside Pipeline-specific goroutines when the receiver's Run method is called. Note that, while this ContextFunc is separate from the goroutines executing the pipline, an error returned from this function will cause the pipeline to fail. Therefore, only execute goroutines with this method if that is your intent.
func (*Pipeline) Resize ¶
Resize updates the capacity of the pipeline stage with the given name to the provided newcap value and returns that stage's previous capacity value. If name is not a registered stage name then zero and ErrNameUnknown will be returned.
func (*Pipeline) Run ¶
Run executes the Pipeline defined for the receiver as at least three separate goroutines: one each for the Feed and Collect methods implemented by the Interface provided to the constructor plus one or more additional goroutines for the registered stages. Channels are interleaved between the Feed stage, each of the individually registered stages (in the order each was added), and the final Collect stage.
Run blocks until all of its goroutines have completed -- either successfully or until any one of them returns a non-nil error. If the provided context is canceled that cancelation will be propagated to all running goroutines (note that an err returned by a goroutine will cancel the context provided to all of the others).
If the receiver has no stages registered then ErrNoStages is returned. Otherwise, any error returned will be one returned from one of the underlying goroutines.