Documentation
¶
Index ¶
- Variables
- type ActionFunc
- type Listener
- type Option
- type ParallelResultHandler
- type Pipeline
- func (p *Pipeline) AddBeforeHook(listener Listener) *Pipeline
- func (p *Pipeline) AddStep(step Step) *Pipeline
- func (p *Pipeline) AsNestedStep(name string) Step
- func (p *Pipeline) Run() Result
- func (p *Pipeline) RunWithContext(ctx context.Context) Result
- func (p *Pipeline) WithBeforeHooks(listeners []Listener) *Pipeline
- func (p *Pipeline) WithFinalizer(handler ResultHandler) *Pipeline
- func (p *Pipeline) WithNestedSteps(name string, steps ...Step) Step
- func (p *Pipeline) WithOptions(options ...Option) *Pipeline
- func (p *Pipeline) WithSteps(steps ...Step) *Pipeline
- type Predicate
- type Result
- type ResultHandler
- type Step
- func If(predicate Predicate, originalStep Step) Step
- func NewFanOutStep(name string, pipelineSupplier Supplier, handler ParallelResultHandler) Step
- func NewStep(name string, action ActionFunc) Step
- func NewStepFromFunc(name string, fn func(ctx context.Context) error) Step
- func NewWorkerPoolStep(name string, size int, pipelineSupplier Supplier, ...) Step
- func ToNestedStep(name string, predicate Predicate, p *Pipeline) Step
- func ToStep(name string, action func(ctx context.Context) error, predicate Predicate) Step
- type Supplier
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrAbort = errors.New("abort")
ErrAbort indicates that the pipeline should be terminated immediately without being marked as failed (returning an error).
Functions ¶
This section is empty.
Types ¶
type ActionFunc ¶
ActionFunc is the func that contains your business logic.
type Listener ¶ added in v0.7.0
type Listener func(step Step)
Listener is a simple func that listens to Pipeline events.
type Option ¶ added in v0.12.0
type Option func(pipeline *Pipeline)
Option configures the given Pipeline with a behaviour-altering setting.
type ParallelResultHandler ¶ added in v0.13.0
ParallelResultHandler is a callback that provides a Result map and expect a single, combined Result object. The map key is a zero-based index of n-th Pipeline spawned, e.g. pipeline number 3 will have index 2. Return an empty Result if you want to ignore errors, or reduce multiple errors into a single one to make the parent Pipeline fail.
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline holds and runs intermediate actions, called "steps".
func (*Pipeline) AddBeforeHook ¶ added in v0.7.0
AddBeforeHook adds the given listener to the list of hooks. See WithBeforeHooks.
func (*Pipeline) AddStep ¶
AddStep appends the given step to the Pipeline at the end and returns itself.
func (*Pipeline) AsNestedStep ¶
AsNestedStep converts the Pipeline instance into a Step that can be used in other pipelines. The properties are passed to the nested pipeline.
func (*Pipeline) Run ¶
Run executes the pipeline with context.Background and returns the result. Steps are executed sequentially as they were added to the Pipeline. If a Step returns a Result with a non-nil error, the Pipeline is aborted and its Result contains the affected step's error. However, if Result.Err is wrapped in ErrAbort, then the pipeline is aborted, but the final Result.Err will be nil.
func (*Pipeline) RunWithContext ¶ added in v0.13.0
RunWithContext is like Run but with a given context.Context. Upon cancellation of the context, the pipeline does not terminate a currently running step, instead it skips the remaining steps in the execution order. The context is passed to each Step.F and each Step may need to listen to the context cancellation event to truly cancel a long-running step. If the pipeline gets canceled, Result.IsCanceled returns true and Result.Err contains the context's error.
Example ¶
// prepare pipeline
p := NewPipeline().WithSteps(
NewStepFromFunc("short step", func(ctx context.Context) error {
fmt.Println("short step")
return nil
}),
NewStepFromFunc("long running step", func(ctx context.Context) error {
time.Sleep(100 * time.Millisecond)
return nil
}),
NewStepFromFunc("canceled step", func(ctx context.Context) error {
return errors.New("shouldn't execute")
}),
)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
result := p.RunWithContext(ctx)
// inspect the result
fmt.Println(result.IsCanceled())
fmt.Println(result.Err())
Output: short step true step "canceled step" failed: context deadline exceeded
func (*Pipeline) WithBeforeHooks ¶ added in v0.7.0
WithBeforeHooks takes a list of listeners. Each Listener.Accept is called once in the given order just before the ActionFunc is invoked. The listeners should return as fast as possible, as they are not intended to do actual business logic.
func (*Pipeline) WithFinalizer ¶ added in v0.8.0
func (p *Pipeline) WithFinalizer(handler ResultHandler) *Pipeline
WithFinalizer returns itself while setting the finalizer for the pipeline. The finalizer is a handler that gets called after the last step is in the pipeline is completed. If a pipeline aborts early or gets canceled then it is also called.
func (*Pipeline) WithNestedSteps ¶ added in v0.4.0
WithNestedSteps is similar to AsNestedStep, but it accepts the steps given directly as parameters.
func (*Pipeline) WithOptions ¶ added in v0.12.0
WithOptions configures the Pipeline with settings. The options are applied immediately. Options are applied to nested pipelines provided they are set before building the nested pipeline. Nested pipelines can be configured with their own options.
type Predicate ¶
Predicate is a function that expects 'true' if a ActionFunc should run. It is evaluated lazily resp. only when needed.
func And ¶
And returns a Predicate that does logical AND of the given predicates. p2 is not evaluated if p1 evaluates already to false.
func Bool ¶
Bool returns a Predicate that simply returns v when evaluated. Use BoolPtr() over Bool() if the value can change between setting up the pipeline and evaluating the predicate.
func BoolPtr ¶ added in v0.13.0
BoolPtr returns a Predicate that returns *v when evaluated. Use BoolPtr() over Bool() if the value can change between setting up the pipeline and evaluating the predicate.
type Result ¶
type Result interface {
// Err contains the step's returned error, nil otherwise.
// In an aborted pipeline with ErrAbort it will still be nil.
Err() error
// Name retrieves the name of the (last) step that has been executed.
Name() string
// IsAborted returns true if the pipeline didn't stop with an error, but just aborted early with ErrAbort.
IsAborted() bool
// IsCanceled returns true if the pipeline's context has been canceled.
IsCanceled() bool
// IsSuccessful returns true if the contained error is nil and not aborted.
IsSuccessful() bool
// IsCompleted returns true if the contained error is nil.
// Aborted pipelines (with ErrAbort) are still reported as completed.
// To query if a pipeline is aborted early, use IsAborted.
IsCompleted() bool
// IsFailed returns true if the contained error is non-nil.
IsFailed() bool
}
Result is the object that is returned after each step and after running a pipeline.
type ResultHandler ¶ added in v0.3.0
ResultHandler is a func that gets called when a step's ActionFunc has finished with any Result.
type Step ¶
type Step struct {
// Name describes the step's human-readable name.
// It has no other uses other than easily identifying a step for debugging or logging.
Name string
// F is the ActionFunc assigned to a pipeline Step.
// This is required.
F ActionFunc
// H is the ParallelResultHandler assigned to a pipeline Step.
// This is optional, and it will be called in any case if it is set after F completed.
// Use cases could be logging, updating a GUI or handle errors while continuing the pipeline.
// The function may return nil even if the Result contains an error, in which case the pipeline will continue.
// This function is called before the next step's F is invoked.
H ResultHandler
}
Step is an intermediary action and part of a Pipeline.
func If ¶ added in v0.13.0
If returns a new step that wraps the given step and executes its action only if the given Predicate evaluates true. The context.Context from the pipeline is passed through the given action.
func NewFanOutStep ¶ added in v0.13.0
func NewFanOutStep(name string, pipelineSupplier Supplier, handler ParallelResultHandler) Step
NewFanOutStep creates a pipeline step that runs nested pipelines in their own Go routines. The function provided as Supplier is expected to close the given channel when no more pipelines should be executed, otherwise this step blocks forever. The step waits until all pipelines are finished. If the given ParallelResultHandler is non-nil it will be called after all pipelines were run, otherwise the step is considered successful.
If the context is canceled, no new pipelines will be retrieved from the channel and the Supplier is expected to stop supplying new instances. Also, once canceled, the step waits for the remaining children pipelines and collects their result via given ParallelResultHandler. However, the error returned from ParallelResultHandler is wrapped in context.Canceled.
Example ¶
p := NewPipeline()
fanout := NewFanOutStep("fanout", func(ctx context.Context, pipelines chan *Pipeline) {
defer close(pipelines)
// create some pipelines
for i := 0; i < 3; i++ {
n := i
select {
case <-ctx.Done():
return // parent pipeline has been canceled, let's not create more pipelines.
default:
pipelines <- NewPipeline().AddStep(NewStepFromFunc(fmt.Sprintf("i = %d", n), func(_ context.Context) error {
time.Sleep(time.Duration(n * 10000000)) // fake some load
fmt.Println(fmt.Sprintf("I am worker %d", n))
return nil
}))
}
}
}, func(ctx context.Context, results map[uint64]Result) error {
for worker, result := range results {
if result.IsFailed() {
fmt.Println(fmt.Sprintf("Worker %d failed: %v", worker, result.Err()))
}
}
return nil
})
p.AddStep(fanout)
p.Run()
Output: I am worker 0 I am worker 1 I am worker 2
func NewStep ¶
func NewStep(name string, action ActionFunc) Step
NewStep returns a new Step with given name and action.
func NewStepFromFunc ¶ added in v0.5.0
NewStepFromFunc returns a new Step with given name using a function that expects an error.
func NewWorkerPoolStep ¶ added in v0.13.0
func NewWorkerPoolStep(name string, size int, pipelineSupplier Supplier, handler ParallelResultHandler) Step
NewWorkerPoolStep creates a pipeline step that runs nested pipelines in a thread pool. The function provided as Supplier is expected to close the given channel when no more pipelines should be executed, otherwise this step blocks forever. The step waits until all pipelines are finished.
- If the given ParallelResultHandler is non-nil it will be called after all pipelines were run, otherwise the step is considered successful.
- The pipelines are executed in a pool of a number of Go routines indicated by size.
- If size is 1, the pipelines are effectively run in sequence.
- If size is 0 or less, the function panics.
Example ¶
p := NewPipeline()
pool := NewWorkerPoolStep("pool", 2, func(ctx context.Context, pipelines chan *Pipeline) {
defer close(pipelines)
// create some pipelines
for i := 0; i < 3; i++ {
n := i
select {
case <-ctx.Done():
return // parent pipeline has been canceled, let's not create more pipelines.
default:
pipelines <- NewPipeline().AddStep(NewStepFromFunc(fmt.Sprintf("i = %d", n), func(_ context.Context) error {
time.Sleep(time.Duration(n * 100000000)) // fake some load
fmt.Println(fmt.Sprintf("This is job item %d", n))
return nil
}))
}
}
}, func(ctx context.Context, results map[uint64]Result) error {
for jobIndex, result := range results {
if result.IsFailed() {
fmt.Println(fmt.Sprintf("Job %d failed: %v", jobIndex, result.Err()))
}
}
return nil
})
p.AddStep(pool)
p.Run()
Output: This is job item 0 This is job item 1 This is job item 2
func ToNestedStep ¶ added in v0.13.0
ToNestedStep wraps the given pipeline in its own step. When the step's function is called, the given Predicate will evaluate whether the nested Pipeline should actually run. It returns the pipeline's Result, otherwise an empty (successful) Result struct. The given pipeline has to define its own context.Context, it's not passed "down".
func ToStep ¶ added in v0.13.0
ToStep wraps the given action func in its own step. When the step's function is called, the given Predicate will evaluate whether the action should actually run. It returns the action's Result, otherwise an empty (successful) Result. The context.Context from the pipeline is passed through the given action.
func (Step) WithErrorHandler ¶ added in v0.10.0
WithErrorHandler wraps given errorHandler and sets the ResultHandler of this specific step and returns the step itself. The difference to WithResultHandler is that errorHandler only gets called if Result.Err is non-nil.
func (Step) WithResultHandler ¶ added in v0.3.0
func (s Step) WithResultHandler(handler ResultHandler) Step
WithResultHandler sets the ResultHandler of this specific step and returns the step itself.
type Supplier ¶ added in v0.13.0
Supplier is a function that spawns Pipeline for consumption. Supply new pipelines by putting new Pipeline instances into the given channel. The function must close the channel once all pipelines are spawned (`defer close()` recommended).
The parent pipeline may get canceled, thus the given context is provided to stop putting more Pipeline instances into the channel. Use
select { case <-ctx.Done(): return, default: pipelinesChan <- ... }
to cancel the supply, otherwise you may leak an orphaned goroutine.
func SupplierFromSlice ¶ added in v0.13.0
SupplierFromSlice returns a Supplier that accepts the given slice of Pipeline and iterates over it to feed the channel.
Context cancellation is only effective if the channel is limited in size. All pipelines may get executed even if the parent pipeline has been canceled, unless each child Pipeline listens for context.Done() in their steps.