saga

package
v0.4.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2023 License: Apache-2.0 Imports: 11 Imported by: 0

README

SAGAs / Process Managers

The saga package implements a SAGA coordinator / process manager for more complex multi-step transactions. This package integrates with the event, aggregate and command system to provide convenient access to the different components within the defined SAGA actions.

This SAGA implementation is very likely subject to a rewrite. This is due to the implementation not working distributed like the command bus or projection service. Instead, a SAGA is executed and coordinates from within a single process, which provides no recover strategy for when the process that executes the SAGA fails. A rewrite is necessary to make SAGAs work event-driven, which should provide the required persistence layer and state needed for recovering SAGAs.

Features

  • Action-based SAGA setups
  • Compensators
  • Flexible action invoking
  • Integrates with goes' components

Setups

The main type of the SAGA implementation is the saga.Setup interface, which provides the executor with runnable actions and some configuration:

package saga

type Setup interface {
  // Sequence returns the names of the actions that should be run sequentially.
  Sequence() []string

  // Compensator finds and returns the name of the compensating action for the
  // Action with the given name. If Compensator returns an empty string, there
  // is no compensator for the given action configured.
  Compensator(string) string

  // Action returns the action with the given name. Action returns nil if no
  // Action with that name was configured.
  Action(string) action.Action
}
Actions

A saga.Setup implementation can be instantiated using saga.New(). Pass saga.Action() options to define the actions within the SAGA.

package example

import (
  "github.com/modernice/goes/saga"
  "github.com/modernice/goes/saga/action"
)

func example() {
  setup := saga.New(
    saga.Action("foo", func(ctx action.Context) error {
       return nil
    }),
    saga.Action("bar", func(ctx action.Context) error {
       return nil
    }),
    saga.Action("baz", func(ctx action.Context) error {
       return nil
    }),
  )
}
Sequence / Starting Action

The saga.Sequece() option defines the order in which the actions of the SAGA are run. When an action returns a non-nil error, remaining actions are not run.

package example

import (
  "github.com/modernice/goes/saga"
  "github.com/modernice/goes/saga/action"
)

func example() {
  setup := saga.New(
    saga.Action("foo", func(ctx action.Context) error {
       return nil
    }),
    saga.Action("bar", func(ctx action.Context) error {
       return nil
    }),
    saga.Action("baz", func(ctx action.Context) error {
       return nil
    }),

    // Run "bar", then "baz", finally "foo".
    saga.Sequence("bar", "baz", "foo"),
  )
}

Alternatively, the saga.StartWith() option can be used to specify a single action that should be run when executing this setup. Using the action's context, other actions can be called from within a running action by name.

package example

import (
  "github.com/modernice/goes/saga"
  "github.com/modernice/goes/saga/action"
)

func example() {
  setup := saga.New(
    saga.Action("foo", func(ctx action.Context) error {
       return nil
    }),
    saga.Action("bar", func(ctx action.Context) error {
      return ctx.Run(ctx, "baz")
    }),
    saga.Action("baz", func(ctx action.Context) error {
      return ctx.Run(ctx, "foo")
    }),

    // Same as saga.Sequece("bar")
    saga.StartWith("bar"),
  )
}

If neither the saga.Sequence() nor the saga saga.StartWith() option is passed, the first defined action is automatically used as the starting action to the SAGA (effectively an automatic saga.StartWith() option using the first defined action).

Compensating Actions

Compensating actions are run to gracefully recover or fix application state when the execution of a setup fails. When an action of a SAGA fails, all previously run actions that returned no error will be compensating by running their configured compensator action (if any), in reverse order. Any action can be configured as the compensating action for another action.

package example

import (
  "github.com/modernice/goes/saga"
  "github.com/modernice/goes/saga/action"
)

func example() {
  setup := saga.New(
    saga.Action("foo", func(ctx action.Context) error {
       return nil
    }),
    saga.Action("bar", func(ctx action.Context) error {
      return nil
    }),
    saga.Action("baz", func(ctx action.Context) error {
      return errors.New("oops")
    }),

    saga.Action("fix-foo", func(ctx action.Context) error {
      return nil
    }),

    saga.Action("fix-bar", func(ctx action.Context) error {
      return nil
    }),

    saga.Sequence("foo", "bar", "baz"),
    saga.Compensate("foo", "fix-foo"),
    saga.Compensate("bar", "fix-bar"),
  )
}

The example above would result the following actions be run in the given order:

  • "foo"
  • "bar"
  • "baz"
  • "fix-bar"
  • "fix-foo"
Action Context

The action.Context that is passed to action functions provides functions to fetch aggregates, publish events and dispatch commands. For this to work, the saga.Executor that executes the setup needs to have the aggregate repository, event bus and/or command bus explicitly configured (see Executor Options); otherwise these following functions return action.ErrMissingRepository errors.

package example

func example() {
  setup := saga.New(
    saga.Action("foo", func(ctx action.Context) error {
      err := ctx.Fetch(ctx, ...)
      err := ctx.Publish(ctx, event.New(...))
      err := ctx.Dispatch(ctx, command.New(...))
    }),
  )
}

Executor

A SAGA setup can be executed using a saga.Executor.

package example

func example(setup saga.Setup) {
  exec := saga.NewExecutor()
  err := exec.Execute(context.TODO(), setup)
}
Executor Options

When creating an executor, use the

  • saga.Repository() ,
  • saga.EventBus() and
  • saga.CommandBus()

options to provide the executor with repositories which will be available from within the SAGAs action.Contexts.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrActionNotFound is returned when trying to run an Action which is not
	// configured.
	ErrActionNotFound = errors.New("action not found")

	// ErrEmptyName is returned when an Action is configured with an empty name.
	ErrEmptyName = errors.New("empty action name")

	// ErrCompensateTimeout is returned when the compensation of a SAGA fails
	// due to the CompensateTimeout being exceeded.
	ErrCompensateTimeout = errors.New("compensation timed out")
)
View Source
var (
	// DefaultCompensateTimeout is the default timeout for compensating Actions
	// of a failed SAGA.
	DefaultCompensateTimeout = 10 * time.Second
)

Functions

func Execute

func Execute(ctx context.Context, s Setup, opts ...ExecutorOption) error

Execute executes the given Setup.

Execution error

Execution can fail because of multiple reasons. Execute returns an error in the following cases:

  • Setup validation fails (see Validate)
  • An Action fails and compensation is disabled
  • An Action and any of the subsequent compensations fail

Compensation is disabled when no compensators are configured.

Skip validation

Validation of the Setup can be skipped by providing the SkipValidation ExecutorOption, but should only be used when the Setup s is ensured to be valid (e.g. by calling Validate manually beforehand).

Reporting

When a Reporter r is provided using the Report ExecutorOption, Execute will call r.Report with a report.Report which contains detailed information about the execution of the SAGA (a *report.Report is also a Reporter):

s := saga.New(...)
var r report.Report
err := saga.Execute(context.TODO(), s, saga.Report(&r))
// err == r.Error()

func Validate

func Validate(s Setup) error

Validate validates that a Setup is configured safely and returns an error in the following cases:

  • `ErrEmptyName` if an Action has an empty name (or just whitespace)
  • `ErrActionNotFound` if the sequence contains an unconfigured Action
  • `ErrActionNotFound` if an unknown Action is configured as a compensating Action

Types

type CompensateErr

type CompensateErr struct {
	Err         error
	ActionError error
}

CompensateErr is returned when the compensation of a failed SAGA fails.

func CompensateError

func CompensateError(err error) (*CompensateErr, bool)

CompensateError unwraps the *CompensateErr from the given error.

func (*CompensateErr) Error

func (err *CompensateErr) Error() string

CompensateErr is returned when the compensation of a failed SAGA fails. The error message contains the original error that triggered the compensation, and the error that occurred while compensating. Use CompensateError to unwrap this error.

func (*CompensateErr) Unwrap

func (err *CompensateErr) Unwrap() error

Unwrap returns the wrapped error. This method is implemented to allow errors returned by the Executor to be unwrapped into their underlying errors.

type Executor

type Executor struct {
	Setup
	// contains filtered or unexported fields
}

An Executor executes SAGAs. Use NewExector to create an Executor.

func NewExecutor

func NewExecutor(opts ...ExecutorOption) *Executor

NewExecutor returns a SAGA executor.

func (*Executor) Execute

func (e *Executor) Execute(ctx context.Context, s Setup) error

Execute executes the given SAGA.

type ExecutorOption

type ExecutorOption func(*Executor)

ExecutorOption is an option for the Execute function.

func CommandBus

func CommandBus(bus command.Bus) ExecutorOption

CommandBus returns an ExecutorOption that provides a SAGA with an command.Bus. Actions within that SAGA that receive an action.Context may dispatch Commands through that Context over the provided Bus. Dispatches over the Command Bus are automatically made synchronous.

Example:

s := saga.New(saga.Action("foo", func(ctx action.Context) {
	cmd := command.New("foo", fooPayload{})
	err := ctx.Dispatch(ctx, cmd)
	// handle err
}))

var bus command.Bus
err := saga.Execute(context.TODO(), s, saga.CommandBus(bus))
// handle err

func CompensateTimeout

func CompensateTimeout(d time.Duration) ExecutorOption

CompensateTimeout returns an ExecutorOption that sets the timeout for compensating a failed SAGA.

func EventBus

func EventBus(bus event.Bus) ExecutorOption

EventBus returns an ExecutorOption that provides a SAGA with an event.Bus. Actions within that SAGA that receive an action.Context may publish events through that Context over the provided Bus.

Example:

s := saga.New(saga.Action("foo", func(ctx action.Context) {
	evt := event.New("foo", fooData{})
	err := ctx.Publish(ctx, evt)
	// handle err
}))

var bus event.Bus
err := saga.Execute(context.TODO(), s, saga.EventBus(bus))
// handle err

func Report

func Report(r Reporter) ExecutorOption

Report returns an ExecutorOption that configures the Reporter r to be used by the SAGA to report the execution result of the SAGA.

func Repository

func Repository(r aggregate.Repository) ExecutorOption

Repository returns an ExecutorOption that provides a SAGA with an aggregate.Repository. Action within that SAGA that receive an action.Context may fetch aggregates through that Context from the provided Repository.

Example:

s := saga.New(saga.Action("foo", func(ctx action.Context) {
	foo := newFooAggregate()
	err := ctx.Fetch(ctx, foo)
	// handle err
}))

var repo aggregate.Repository
err := saga.Execute(context.TODO(), s, saga.Repository(repo))
// handle err

func SkipValidation

func SkipValidation() ExecutorOption

SkipValidation returns an ExecutorOption that disables validation of a Setup before it is executed.

type Option

type Option func(*setup)

Option is a Setup option.

func Action

func Action(name string, run func(action.Context) error) Option

Action returns an Option that adds an Action to a SAGA. The first configured Action of a SAGA is also its starting Action, unless the StartWith Option is used.

func Add

func Add(acts ...action.Action) Option

Add adds Actions to the SAGA.

func Compensate

func Compensate(failed, compensateWith string) Option

Compensate returns an Option that configures the compensating Action for a failed Action.

func Sequence

func Sequence(actions ...string) Option

Sequence returns an Option that defines the sequence of Actions that should be run when the SAGA is executed.

When the Sequence Option is used, the StartWith Option has no effect and the first Action of the sequence is used as the starting Action.

Example:

s := saga.New(
	saga.Action("foo", func(action.Context) error { return nil }),
	saga.Action("bar", func(action.Context) error { return nil }),
	saga.Action("baz", func(action.Context) error { return nil }),
	saga.Sequence("bar", "foo", "baz"),
)
err := saga.Execute(context.TODO(), s)
// would run "bar", "foo" & "baz" sequentially

func StartWith

func StartWith(action string) Option

StartWith returns an Option that configures the starting Action if a SAGA.

type Reporter

type Reporter interface {
	Report(report.Report)
}

A Reporter reports the result of a SAGA.

type Setup

type Setup interface {
	// Sequence returns the names of the actions that should be run sequentially.
	Sequence() []string

	// Compensator finds and returns the name of the compensating action for the
	// Action with the given name. If Compensator returns an empty string, there
	// is no compensator for the given action configured.
	Compensator(string) string

	// Action returns the action with the given name. Action returns nil if no
	// Action with that name was configured.
	Action(string) action.Action
}

Setup is the setup for a SAGA.

func New

func New(opts ...Option) Setup

New returns a reusable Setup that can be safely executed concurrently.

Define Actions

The core of a SAGA are Actions. Actions are basically just named functions that can be composed to orchestrate the execution flow of a SAGA. Action can be configured with the Action Option:

s := saga.New(
	saga.Action("foo", func(action.Context) error { return nil })),
	saga.Action("bar", func(action.Context) error { return nil })),
)

By default, the first configured Action is the starting point for the SAGA when it's executed. The starting Action can be overriden with the StartWith Option:

s := saga.New(
	saga.Action("foo", func(action.Context) error { return nil })),
	saga.Action("bar", func(action.Context) error { return nil })),
	saga.StartWith("bar"),
)

Alternatively define a sequence of Actions that should be run when the SAGA is executed. The first Action of the sequence will be used as the starting Action for the SAGA:

s := saga.New(
	saga.Action("foo", func(action.Context) error { return nil }),
	saga.Action("bar", func(action.Context) error { return nil }),
	saga.Action("baz", func(action.Context) error { return nil }),
	saga.Sequence("bar", "foo", "baz"),
)
// would run "bar", "foo" & "baz" sequentially

Compensate Actions

Every Action a can be assigned a compensating Action c that is called when the SAGA fails. Actions are compensated in reverse order and only failed Actions will be compensated.

Example:

s := saga.New(
	saga.Action("foo", func(action.Context) error { return nil }),
	saga.Action("bar", func(action.Context) error { return nil }),
	saga.Action("baz", func(action.Context) error { return errors.New("whoops") }),
	saga.Action("compensate-foo", func(action.Context) error { return nil }),
	saga.Action("compensate-bar", func(action.Context) error { return nil }),
	saga.Action("compensate-baz", func(action.Context) error { return nil }),
	saga.Compensate("foo", "compensate-foo"),
	saga.Compensate("bar", "compensate-bar"),
	saga.Compensate("baz", "compensate-baz"),
	saga.Sequence("foo", "bar", "baz"),
)

The above Setup would run the following Actions in order:

`foo`, `bar`, `baz`, `compensate-bar`, `compensate-foo`

A SAGA that successfully compensated every Action still returns the error that triggered the compensation. In order to check if a SAGA was compensated, unwrap the error into a *CompensateErr:

var s saga.Setup
if err := saga.Execute(context.TODO(), s); err != nil {
	if compError, ok := saga.CompensateError(err); ok {
		log.Println(fmt.Sprintf("Compensation failed: %s", compError))
	} else {
		log.Println(fmt.Sprintf("SAGA failed: %s", err))
	}
}

Action Context

An Action has access to an action.Context that allows the Action to run other Actions in the same SAGA and, depending on the passed ExecutorOptions, publish events and dispatch Commands:

s := saga.New(
	saga.Action("foo", func(ctx action.Context) error {
		if err := ctx.Run(ctx, "bar"); err != nil {
			return fmt.Errorf("run %q: %w", "bar", err)
		}
		if err := ctx.Publish(ctx, event.New(...)); err != nil {
			return fmt.Errorf("publish event: %w", err)
		}
		if err := ctx.Dispatch(ctx, command.New(...)); err != nil {
			return fmt.Errorf("publish command: %w", err)
		}
		return nil
	}),
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL