pipeline

package module
v0.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2022 License: Apache-2.0 Imports: 5 Imported by: 34

README

go-command-pipeline

Go version Version Go Report Card Codecov

Small Go utility that executes business actions in a pipeline.

Usage

import (
    "context"
    pipeline "github.com/ccremer/go-command-pipeline"
)

type Data struct {
    Number int
}

func main() {
	data := &Data // define arbitrary data to pass around in the steps.
	p := pipeline.NewPipeline()
	p.WithSteps(
		pipeline.NewStepFromFunc("define random number", defineNumber),
		pipeline.NewStepFromFunc("print number", printNumber),
	)
	result := p.RunWithContext(context.WithValue(context.Background, "data", data))
	if !result.IsSuccessful() {
		log.Fatal(result.Err)
	}
}

func defineNumber(ctx context.Context) error {
	ctx.Value("data").(*Data).Number = 10
	return nil
}

// Let's assume this is a business function that can fail.
// You can enable "automatic" fail-on-first-error pipelines by having more small functions that return errors.
func printNumber(ctx context.Context) error {
	number := ctx.Value("data").(*Data).Number
	fmt.Println(number)
	return nil
}

See more usage in the examples dir

Who is it for

This utility is interesting for you if you have many business functions that are executed sequentially, each with their own error handling. Do you grow tired of the tedious error handling in Go when all you do is passing the error "up" in the stack in over 90% of the cases, only to log it at the root? This utility helps you focus on the business logic by dividing each failure-prone action into small steps since pipeline aborts on first error.

Consider the following prose example:

func Persist(data Data) error {
    err := database.prepareTransaction()
    if err != nil {
        return err
    }
    err = database.executeQuery("SOME QUERY", data)
    if err != nil {
        return err
    }
    err = database.commit()
    return err
}

We have tons of if err != nil that bloats the function with more error handling than actual interesting business logic.

It could be simplified to something like this:

func Persist(data *Data) error {
    p := pipeline.NewPipeline().WithSteps(
        pipeline.NewStepFromFunc("prepareTransaction", prepareTransaction()),
        pipeline.NewStepFromFunc("executeQuery", executeQuery()),
        pipeline.NewStepFromFunc("commitTransaction", commit()),
    )
    return p.RunWithContext(context.WithValue(context.Background(), myKey, data).Err
}

func executeQuery() error {
	return func(ctx context.Context) error {
		data := ctx.Value(myKey).(*Data)
		err := database.executeQuery("SOME QUERY", data)
		return err
	)
}
...

While it seems to add more lines in order to set up a pipeline, it makes it very easily understandable what Persist() does without all the error handling.

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrAbort = errors.New("abort")

ErrAbort indicates that the pipeline should be terminated immediately without being marked as failed (returning an error).

Functions

This section is empty.

Types

type ActionFunc

type ActionFunc func(ctx context.Context) Result

ActionFunc is the func that contains your business logic.

type Listener added in v0.7.0

type Listener func(step Step)

Listener is a simple func that listens to Pipeline events.

type Option added in v0.12.0

type Option func(pipeline *Pipeline)

Option configures the given Pipeline with a behaviour-altering setting.

var DisableErrorWrapping Option = func(pipeline *Pipeline) {
	pipeline.options.disableErrorWrapping = true
}

DisableErrorWrapping disables the wrapping of errors that are emitted from pipeline steps. This effectively causes Result.Err to be exactly the error as returned from a step.

type ParallelResultHandler added in v0.13.0

type ParallelResultHandler func(ctx context.Context, results map[uint64]Result) error

ParallelResultHandler is a callback that provides a Result map and expect a single, combined Result object. The map key is a zero-based index of n-th Pipeline spawned, e.g. pipeline number 3 will have index 2. Return an empty Result if you want to ignore errors, or reduce multiple errors into a single one to make the parent Pipeline fail.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline holds and runs intermediate actions, called "steps".

func NewPipeline

func NewPipeline() *Pipeline

NewPipeline returns a new Pipeline instance.

func (*Pipeline) AddBeforeHook added in v0.7.0

func (p *Pipeline) AddBeforeHook(listener Listener) *Pipeline

AddBeforeHook adds the given listener to the list of hooks. See WithBeforeHooks.

func (*Pipeline) AddStep

func (p *Pipeline) AddStep(step Step) *Pipeline

AddStep appends the given step to the Pipeline at the end and returns itself.

func (*Pipeline) AsNestedStep

func (p *Pipeline) AsNestedStep(name string) Step

AsNestedStep converts the Pipeline instance into a Step that can be used in other pipelines. The properties are passed to the nested pipeline.

func (*Pipeline) Run

func (p *Pipeline) Run() Result

Run executes the pipeline with context.Background and returns the result. Steps are executed sequentially as they were added to the Pipeline. If a Step returns a Result with a non-nil error, the Pipeline is aborted and its Result contains the affected step's error. However, if Result.Err is wrapped in ErrAbort, then the pipeline is aborted, but the final Result.Err will be nil.

func (*Pipeline) RunWithContext added in v0.13.0

func (p *Pipeline) RunWithContext(ctx context.Context) Result

RunWithContext is like Run but with a given context.Context. Upon cancellation of the context, the pipeline does not terminate a currently running step, instead it skips the remaining steps in the execution order. The context is passed to each Step.F and each Step may need to listen to the context cancellation event to truly cancel a long-running step. If the pipeline gets canceled, Result.IsCanceled returns true and Result.Err contains the context's error.

Example
// prepare pipeline
p := NewPipeline().WithSteps(
	NewStepFromFunc("short step", func(ctx context.Context) error {
		fmt.Println("short step")
		return nil
	}),
	NewStepFromFunc("long running step", func(ctx context.Context) error {
		time.Sleep(100 * time.Millisecond)
		return nil
	}),
	NewStepFromFunc("canceled step", func(ctx context.Context) error {
		return errors.New("shouldn't execute")
	}),
)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
result := p.RunWithContext(ctx)
// inspect the result
fmt.Println(result.IsCanceled())
fmt.Println(result.Err())
Output:

short step
true
step "canceled step" failed: context deadline exceeded

func (*Pipeline) WithBeforeHooks added in v0.7.0

func (p *Pipeline) WithBeforeHooks(listeners []Listener) *Pipeline

WithBeforeHooks takes a list of listeners. Each Listener.Accept is called once in the given order just before the ActionFunc is invoked. The listeners should return as fast as possible, as they are not intended to do actual business logic.

func (*Pipeline) WithFinalizer added in v0.8.0

func (p *Pipeline) WithFinalizer(handler ResultHandler) *Pipeline

WithFinalizer returns itself while setting the finalizer for the pipeline. The finalizer is a handler that gets called after the last step is in the pipeline is completed. If a pipeline aborts early or gets canceled then it is also called.

func (*Pipeline) WithNestedSteps added in v0.4.0

func (p *Pipeline) WithNestedSteps(name string, steps ...Step) Step

WithNestedSteps is similar to AsNestedStep, but it accepts the steps given directly as parameters.

func (*Pipeline) WithOptions added in v0.12.0

func (p *Pipeline) WithOptions(options ...Option) *Pipeline

WithOptions configures the Pipeline with settings. The options are applied immediately. Options are applied to nested pipelines provided they are set before building the nested pipeline. Nested pipelines can be configured with their own options.

func (*Pipeline) WithSteps

func (p *Pipeline) WithSteps(steps ...Step) *Pipeline

WithSteps appends the given array of steps to the Pipeline at the end and returns itself.

type Predicate

type Predicate func(ctx context.Context) bool

Predicate is a function that expects 'true' if a ActionFunc should run. It is evaluated lazily resp. only when needed.

func And

func And(p1, p2 Predicate) Predicate

And returns a Predicate that does logical AND of the given predicates. p2 is not evaluated if p1 evaluates already to false.

func Bool

func Bool(v bool) Predicate

Bool returns a Predicate that simply returns v when evaluated. Use BoolPtr() over Bool() if the value can change between setting up the pipeline and evaluating the predicate.

func BoolPtr added in v0.13.0

func BoolPtr(v *bool) Predicate

BoolPtr returns a Predicate that returns *v when evaluated. Use BoolPtr() over Bool() if the value can change between setting up the pipeline and evaluating the predicate.

func Not

func Not(predicate Predicate) Predicate

Not returns a Predicate that evaluates, but then negates the given Predicate.

func Or added in v0.13.0

func Or(p1, p2 Predicate) Predicate

Or returns a Predicate that does logical OR of the given predicates. p2 is not evaluated if p1 evaluates already to true.

type Result

type Result interface {
	// Err contains the step's returned error, nil otherwise.
	// In an aborted pipeline with ErrAbort it will still be nil.
	Err() error
	// Name retrieves the name of the (last) step that has been executed.
	Name() string
	// IsAborted returns true if the pipeline didn't stop with an error, but just aborted early with ErrAbort.
	IsAborted() bool
	// IsCanceled returns true if the pipeline's context has been canceled.
	IsCanceled() bool
	// IsSuccessful returns true if the contained error is nil and not aborted.
	IsSuccessful() bool
	// IsCompleted returns true if the contained error is nil.
	// Aborted pipelines (with ErrAbort) are still reported as completed.
	// To query if a pipeline is aborted early, use IsAborted.
	IsCompleted() bool
	// IsFailed returns true if the contained error is non-nil.
	IsFailed() bool
}

Result is the object that is returned after each step and after running a pipeline.

type ResultHandler added in v0.3.0

type ResultHandler func(ctx context.Context, result Result) error

ResultHandler is a func that gets called when a step's ActionFunc has finished with any Result.

type Step

type Step struct {
	// Name describes the step's human-readable name.
	// It has no other uses other than easily identifying a step for debugging or logging.
	Name string
	// F is the ActionFunc assigned to a pipeline Step.
	// This is required.
	F ActionFunc
	// H is the ParallelResultHandler assigned to a pipeline Step.
	// This is optional, and it will be called in any case if it is set after F completed.
	// Use cases could be logging, updating a GUI or handle errors while continuing the pipeline.
	// The function may return nil even if the Result contains an error, in which case the pipeline will continue.
	// This function is called before the next step's F is invoked.
	H ResultHandler
}

Step is an intermediary action and part of a Pipeline.

func If added in v0.13.0

func If(predicate Predicate, originalStep Step) Step

If returns a new step that wraps the given step and executes its action only if the given Predicate evaluates true. The context.Context from the pipeline is passed through the given action.

func NewFanOutStep added in v0.13.0

func NewFanOutStep(name string, pipelineSupplier Supplier, handler ParallelResultHandler) Step

NewFanOutStep creates a pipeline step that runs nested pipelines in their own Go routines. The function provided as Supplier is expected to close the given channel when no more pipelines should be executed, otherwise this step blocks forever. The step waits until all pipelines are finished. If the given ParallelResultHandler is non-nil it will be called after all pipelines were run, otherwise the step is considered successful.

If the context is canceled, no new pipelines will be retrieved from the channel and the Supplier is expected to stop supplying new instances. Also, once canceled, the step waits for the remaining children pipelines and collects their result via given ParallelResultHandler. However, the error returned from ParallelResultHandler is wrapped in context.Canceled.

Example
p := NewPipeline()
fanout := NewFanOutStep("fanout", func(ctx context.Context, pipelines chan *Pipeline) {
	defer close(pipelines)
	// create some pipelines
	for i := 0; i < 3; i++ {
		n := i
		select {
		case <-ctx.Done():
			return // parent pipeline has been canceled, let's not create more pipelines.
		default:
			pipelines <- NewPipeline().AddStep(NewStepFromFunc(fmt.Sprintf("i = %d", n), func(_ context.Context) error {
				time.Sleep(time.Duration(n * 10000000)) // fake some load
				fmt.Println(fmt.Sprintf("I am worker %d", n))
				return nil
			}))
		}
	}
}, func(ctx context.Context, results map[uint64]Result) error {
	for worker, result := range results {
		if result.IsFailed() {
			fmt.Println(fmt.Sprintf("Worker %d failed: %v", worker, result.Err()))
		}
	}
	return nil
})
p.AddStep(fanout)
p.Run()
Output:

I am worker 0
I am worker 1
I am worker 2

func NewStep

func NewStep(name string, action ActionFunc) Step

NewStep returns a new Step with given name and action.

func NewStepFromFunc added in v0.5.0

func NewStepFromFunc(name string, fn func(ctx context.Context) error) Step

NewStepFromFunc returns a new Step with given name using a function that expects an error.

func NewWorkerPoolStep added in v0.13.0

func NewWorkerPoolStep(name string, size int, pipelineSupplier Supplier, handler ParallelResultHandler) Step

NewWorkerPoolStep creates a pipeline step that runs nested pipelines in a thread pool. The function provided as Supplier is expected to close the given channel when no more pipelines should be executed, otherwise this step blocks forever. The step waits until all pipelines are finished.

  • If the given ParallelResultHandler is non-nil it will be called after all pipelines were run, otherwise the step is considered successful.
  • The pipelines are executed in a pool of a number of Go routines indicated by size.
  • If size is 1, the pipelines are effectively run in sequence.
  • If size is 0 or less, the function panics.
Example
p := NewPipeline()
pool := NewWorkerPoolStep("pool", 2, func(ctx context.Context, pipelines chan *Pipeline) {
	defer close(pipelines)
	// create some pipelines
	for i := 0; i < 3; i++ {
		n := i
		select {
		case <-ctx.Done():
			return // parent pipeline has been canceled, let's not create more pipelines.
		default:
			pipelines <- NewPipeline().AddStep(NewStepFromFunc(fmt.Sprintf("i = %d", n), func(_ context.Context) error {
				time.Sleep(time.Duration(n * 100000000)) // fake some load
				fmt.Println(fmt.Sprintf("This is job item %d", n))
				return nil
			}))
		}
	}
}, func(ctx context.Context, results map[uint64]Result) error {
	for jobIndex, result := range results {
		if result.IsFailed() {
			fmt.Println(fmt.Sprintf("Job %d failed: %v", jobIndex, result.Err()))
		}
	}
	return nil
})
p.AddStep(pool)
p.Run()
Output:

This is job item 0
This is job item 1
This is job item 2

func ToNestedStep added in v0.13.0

func ToNestedStep(name string, predicate Predicate, p *Pipeline) Step

ToNestedStep wraps the given pipeline in its own step. When the step's function is called, the given Predicate will evaluate whether the nested Pipeline should actually run. It returns the pipeline's Result, otherwise an empty (successful) Result struct. The given pipeline has to define its own context.Context, it's not passed "down".

func ToStep added in v0.13.0

func ToStep(name string, action func(ctx context.Context) error, predicate Predicate) Step

ToStep wraps the given action func in its own step. When the step's function is called, the given Predicate will evaluate whether the action should actually run. It returns the action's Result, otherwise an empty (successful) Result. The context.Context from the pipeline is passed through the given action.

func (Step) WithErrorHandler added in v0.10.0

func (s Step) WithErrorHandler(errorHandler func(ctx context.Context, err error) error) Step

WithErrorHandler wraps given errorHandler and sets the ResultHandler of this specific step and returns the step itself. The difference to WithResultHandler is that errorHandler only gets called if Result.Err is non-nil.

func (Step) WithResultHandler added in v0.3.0

func (s Step) WithResultHandler(handler ResultHandler) Step

WithResultHandler sets the ResultHandler of this specific step and returns the step itself.

type Supplier added in v0.13.0

type Supplier func(ctx context.Context, pipelinesChan chan *Pipeline)

Supplier is a function that spawns Pipeline for consumption. Supply new pipelines by putting new Pipeline instances into the given channel. The function must close the channel once all pipelines are spawned (`defer close()` recommended).

The parent pipeline may get canceled, thus the given context is provided to stop putting more Pipeline instances into the channel. Use

select { case <-ctx.Done(): return, default: pipelinesChan <- ... }

to cancel the supply, otherwise you may leak an orphaned goroutine.

func SupplierFromSlice added in v0.13.0

func SupplierFromSlice(pipelines []*Pipeline) Supplier

SupplierFromSlice returns a Supplier that accepts the given slice of Pipeline and iterates over it to feed the channel.

Context cancellation is only effective if the channel is limited in size. All pipelines may get executed even if the parent pipeline has been canceled, unless each child Pipeline listens for context.Done() in their steps.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL