automa

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2023 License: Apache-2.0 Imports: 5 Imported by: 0

README

Automa

test Commitizen friendly codecov

Automa is a Saga Workflow Engine. It is designed to be used for sequential and transactional business processes. It implements the choreography Saga pattern. The difference with the traditional choreography pattern is that this does not use a centralized message broker, rather each step calls the next step on success or undo previous on error.

The name automa is derived from the word automate.

All steps are executed sequentially in the Automa Workflow. On success, it moves forward sequentially and on error it moves backward sequentially. Note that some steps cannot be rollback in reality, for example if an email has been sent. In that case, some form of compensating behaviour should be implemented, for example, it should send another compensating email in order to void the notification email that was sent before.

Apart from Saga workflow pattern, Automa also supports generating a report of the execution for every step in the workflow. A report data model can be found in file reports.go. Developers need to populate a Report object in every Run and Rollback method as shown in the example.

Usage

NOTE: This project is still in active development (v1.0.0 is not released yet) and so API may change between releases.

  1. Add dependency using go get -u "github.com/automa-saga/automa".
  2. Implement workflow steps (that implement automa.AtomicStep interface) using the pattern as below:
// MyStep1 is an example AtomicStep that does not implement AtomicStep interface directly and uses the default 
// implementation provided by automa.Step 
type MyStep1 struct {
    automa.Step
    params map[string]string // define any parameter data model as required
}

// run implements the SagaRun method interface to leverage default Run control logic that is implemented in automa.Step
// Note if not provided, Run action will be marked as SKIPPED. See automa Step implementation.
func (s *MyStep1) run(ctx context.Context) (skipped bool, err error) {
    // perform run action
    // use params or cache as needed

	// if error happens, just return the error as below to trigger rollback
	return false, err
	
    // if this action needs to be skipped because of a condition, invoke next step using
	return true, nil
}

// rollback implements the SagaUndo method interface to leverage default Rollback control logic that is implemented in automa.Step
// Note this is optional and if not provided, Rollback action will be marked as SKIPPED
func (s *MyStep1) rollback(ctx context.Context) (skipped bool, err error) {
    // perform rollback action
    // use params or cache as needed

    // if error happens, just return the error 
    return false, err

    // if this action needs to be skipped because of a condition, return: 
    return true, nil
}

// MyStep2 is an example AtomicStep that implements the Run and Rollback method interface directly
type MyStep2 struct {
	automa.Step
	params map[string]string // define any parameter data model as required
}

// Run implements the automa.AtomicStep interface
func (s *MyStep2) Run(ctx context.Context, prevSuccess *Success) (*WorkflowReport, error) {
	report := NewStepReport(s.ID, RunAction)
	
	// perform run action with custom logic
	// use params or cache as needed.
	// extra control logic that is not already provided in the default automa.Step.Run
	
	// if error happens, invoke rollback using below
	// return s.Rollback(ctx, NewFailedRun(ctx, prevSuccess, err, report))
	
	// if this action needs to be skipped because of a condition, invoke next Run using..
	// return s.SkippedRun(ctx, prevSuccess, report) 
	
	return s.RunNext(ctx, prevSuccess, report)
}

// Rollback implements the automa.AtomicStep interface
func (s *MyStep2) Rollback(ctx context.Context, prevFailure *Failure) (*WorkflowReport, error) {
	report := NewStepReport(s.ID, RollbackAction)
	
	// perform rollback action.
	// use params or local cache as needed.
	// extra control logic that is not already provided in the default automa.Step.Rollback
	
	// if error happens, invoke previous rollback using below
	// return s.FailedRollback(ctx, prevFailure, err, report)
	
	// if this action needs to be skipped because of a condition, invoke ..
	// return s.SkippedRollback(ctx, prevFailure, report) 
	
	return s.RollbackPrev(ctx, prevFailure, report)
}
  1. Then create and run a workflow using automa.StepRegistry as below:

func buildWorkflow1(ctx context.Context, params map[string]string) automa.Workflow {
    workflowID := "workflow-1"
	
    step1 := &MyStep1 {
        Step:  Step{ID: "step_1"}, // underscore separated string is suitable for IDs

        // add parameters as needed
        params: params
    }
	step1.RegisterSaga(step1.run, step1.rollback) // we need to register so that default Run and Rollback logic can be used

    step2 := &MyStep2 {
        Step:  Step{ID: "step_2"},

        // add parameters as needed
        params: params
    }
	// Note: No need to invoke step2.RegisterSaga() since MyStep2 implements the AtomicStep interface

    // pass custom zap logger if necessary
    registry := automa.NewStepRegistry(zap.NewNop()).RegisterSteps(map[string]AtomicStep{
        step1.GetID(): step1,
        step2.GetID(): step2,
    })

    // prepare the sequence of steps
    // Note that you may create different workflows from the same registry if needed.
    // However, if the same registry is being reused, ensure each step clears its local cache (if it has any)
    // before executing its action as necessary.
    workflow1Steps := StepIDs{
        step1.GetID(),
        step2.GetID(),
    }
	
    workflow := registry.BuildWorkflow(workflowID, workflow1Steps)
    return workflow
}

func runWorkflow(ctx context.Context) error {
    params := map[string]string{} // add params as necessary
	
    workflow := buildWorkflow1(ctx, params)
    defer workflow1.End(ctx)

    report, err := workflow.Start(context.Background())
    if err != nil {
        return err
    }

    // do something with the report if necessary 
    // 'report' can be exported as YAML or JSON. See examples directory.

    return nil
	
}

See an example in the example directory.

Development

  • make test runs the tests.
  • In order to build example, do cd examples && go build. Then the example can be then run using ./examples/example.

Contribution

Any feedback, comment and contributions are very much welcome.

Developers are encouraged to adopt the usual open source development practices with a PR and sign-off as well as verified signed commits. Developers are also encouraged to use commitizen for commits messages.

Please note the PR will be squashed merge to master with commitizen format for the PR title. So even if commitizen is not used for individual commits in the PR, the repository maintainer are requested to ensure that the PR title follows commitizen format before squash-merging the PR.

For beginners use this guide as a start.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AtomicStep

type AtomicStep interface {
	// GetID returns the step ID
	GetID() string

	Forward
	Backward
	Choreographer
}

AtomicStep provides interface for an atomic state Note that an AtomicStep may skip rollback if that makes sense and in that case it is not Atomic in nature.

type AtomicStepRegistry

type AtomicStepRegistry interface {
	// RegisterSteps registers a set of AtomicStep
	// steps argument is a map where key is the step identifier
	RegisterSteps(steps map[string]AtomicStep) AtomicStepRegistry

	// GetStep returns an AtomicStep from the registry given the id
	// If it doesn't exist, it returns nil. So the caller should handle the nil AtomicStep safely.
	GetStep(id string) AtomicStep

	// BuildWorkflow builds an AtomicWorkflow comprising the list of AtomicStep identified by ids
	BuildWorkflow(id string, stepIDs StepIDs) (AtomicWorkflow, error)
}

AtomicStepRegistry is a registry of rollbackable steps

type AtomicWorkflow

type AtomicWorkflow interface {
	// GetID returns the workflow ID
	GetID() string

	// Start starts the AtomicWorkflow execution
	Start(ctx context.Context) (WorkflowReport, error)

	// End performs cleanup after the AtomicWorkflow engine finish its execution
	End(ctx context.Context)
}

AtomicWorkflow defines interface for a Workflow

type Backward

type Backward interface {
	// Rollback defines the actions compensating the business logic executed in Run method
	// A step may skip rollback if that makes sense. In that case it would mean the AtomicStep is not Atomic in nature.
	Rollback(ctx context.Context, prevFailure *Failure) (WorkflowReport, error)
}

Backward defines the methods to be executed to move the workflow backward on error

type Choreographer

type Choreographer interface {
	SetNext(next Forward)
	SetPrev(prev Backward)
	GetNext() Forward
	GetPrev() Backward
}

Choreographer interface defines the methods to support double link list of states This is needed to support Choreography execution of the Saga workflow

type Failure

type Failure struct {
	// contains filtered or unexported fields
}

Failure defines a failure event for a step

func NewFailedRollback

func NewFailedRollback(ctx context.Context, prevFailure *Failure, err error, report *StepReport) *Failure

NewFailedRollback returns a Failure event when steps rollback action failed It sets the step's RollbackAction status as StatusFailed

func NewFailedRun

func NewFailedRun(ctx context.Context, prevSuccess *Success, err error, report *StepReport) *Failure

NewFailedRun returns a Failure event to be used for first Rollback method It is used by a step to trigger its own rollback action It sets the step's RunAction status as StatusFailed

func NewFailure

func NewFailure(prevFailure *Failure, report *StepReport) *Failure

NewFailure creates a Failure event for rollback action It is used by a step to trigger rollback action of the previous step when its own rollback succeeds. It sets the step's RollbackAction status as StatusSuccess.

func NewSkippedRollback

func NewSkippedRollback(prevFailure *Failure, report *StepReport) *Failure

NewSkippedRollback creates a Failure event with StatusSkipped for RollbackAction This is a helper method to be used in rollback action when the rollback action is skipped.

type Forward

type Forward interface {
	// Run runs the business logic to be performed in the AtomicStep
	Run(ctx context.Context, prevSuccess *Success) (WorkflowReport, error)
}

Forward defines the methods to execute business logic of an AtomicStep and move the workflow forward

type SagaRun

type SagaRun func(ctx context.Context) (skipped bool, err error)

SagaRun is a func definition to contain the run logic

skipped return value denotes if the execution was skipped or not err return value denotes any error during execution (if any)

type SagaUndo

type SagaUndo func(ctx context.Context) (skipped bool, err error)

SagaUndo is a func definition to contain the compensating logic

skipped return value denotes if the execution was skipped or not err return value denotes any error during execution (if any)

type Status

type Status string

Status defines the execution status of the AtomicStep

const (
	StatusSuccess   Status = "SUCCESS"
	StatusFailed    Status = "FAILED"
	StatusSkipped   Status = "SKIPPED"
	StatusUndefined Status = "UNDEFINED"
)

type Step

type Step struct {
	ID   string
	Next Forward
	Prev Backward
	// contains filtered or unexported fields
}

Step is the kernel for AtomicStep implementation containing SagaRun and SagaUndo function It is to be used as inheritance by composition pattern by actual Step implementations If the saga methods are not registered, then Step will skip those operations during invocation of Run and Rollback Note that user may override the Run and Rollback methods in the actual implementation in order to change the control logic

func (*Step) FailedRollback

func (s *Step) FailedRollback(ctx context.Context, prevFailure *Failure, err error, report *StepReport) (WorkflowReport, error)

FailedRollback is a helper method to report that current step's rollback has failed and trigger previous step's rollback It marks the current step RollbackAction as StatusFailed

func (*Step) GetID

func (s *Step) GetID() string

GetID returns the step ID

func (*Step) GetNext

func (s *Step) GetNext() Forward

GetNext returns the step to be used to move in the forward direction

func (*Step) GetPrev

func (s *Step) GetPrev() Backward

GetPrev returns the step to be used to move in the backward direction

func (*Step) RegisterSaga

func (s *Step) RegisterSaga(run SagaRun, undo SagaUndo) *Step

RegisterSaga register saga logic for run and undo in order to leverage the default controller logic for Run and Rollback This is just a helper function where user would like to use the default Run and Rollback logic. This method usage is optional and user is free to implement Run and Rollback method of AtomicStep as they wish.

func (*Step) Rollback

func (s *Step) Rollback(ctx context.Context, prevFailure *Failure) (WorkflowReport, error)

Rollback implements Rollback controller logic for automa.AtomicStep interface This is a wrapper function to help simplify AtomicStep implementations Note that user may implement Rollback method in order to change the control logic as required.

func (*Step) RollbackPrev

func (s *Step) RollbackPrev(ctx context.Context, prevFailure *Failure, report *StepReport) (WorkflowReport, error)

RollbackPrev is a helper method to report that current rollback step has been executed and trigger previous step's rollback It marks the current step as StatusFailed

func (*Step) Run

func (s *Step) Run(ctx context.Context, prevSuccess *Success) (WorkflowReport, error)

Run implements Run controller logic for automa.AtomicStep interface This is a wrapper function to help simplify AtomicStep implementations Note that user may implement Run method in order to change the control logic as required.

func (*Step) RunNext

func (s *Step) RunNext(ctx context.Context, prevSuccess *Success, report *StepReport) (WorkflowReport, error)

RunNext is a helper method to report that current step has been successful and trigger next step's execution It marks the current step as StatusSuccess

func (*Step) SetNext

func (s *Step) SetNext(next Forward)

SetNext sets the next step of the Workflow to be able to move in the forward direction on success

func (*Step) SetPrev

func (s *Step) SetPrev(prev Backward)

SetPrev sets the previous step of the Workflow to be able to move in the backward direction on error

func (*Step) SkippedRollback

func (s *Step) SkippedRollback(ctx context.Context, prevFailure *Failure, report *StepReport) (WorkflowReport, error)

SkippedRollback is a helper method to report that current step's rollback has been skipped and trigger previous step's rollback It marks the current step as StatusSkipped

func (*Step) SkippedRun

func (s *Step) SkippedRun(ctx context.Context, prevSuccess *Success, report *StepReport) (WorkflowReport, error)

SkippedRun is a helper method to report that current step has been skipped and trigger next step's execution It marks the current step as StatusSkipped

type StepActionType

type StepActionType string

StepActionType defines the action taken by a step It is used as key for StepReport.Actions

const (
	RunAction      StepActionType = "run"
	RollbackAction StepActionType = "rollback"
)

type StepIDs

type StepIDs []string

StepIDs is just a wrapper definition for a list of string

type StepRegistry

type StepRegistry struct {
	// contains filtered or unexported fields
}

StepRegistry is an implementation of AtomicStepRegistry interface

func NewStepRegistry

func NewStepRegistry(logger *zap.Logger) *StepRegistry

NewStepRegistry returns an instance of StepRegistry that implements AtomicStepRegistry if logger is nil, it initializes itself with a NoOp logger

func (*StepRegistry) BuildWorkflow

func (r *StepRegistry) BuildWorkflow(workflowID string, stepIDs StepIDs) (AtomicWorkflow, error)

BuildWorkflow is a helper method to build a Workflow from the given set of AtomicStep IDs

func (*StepRegistry) GetStep

func (r *StepRegistry) GetStep(id string) AtomicStep

GetStep returns an AtomicStep by the id It returns error if a step cannot be found by the given ID

func (*StepRegistry) RegisterSteps

func (r *StepRegistry) RegisterSteps(steps map[string]AtomicStep) AtomicStepRegistry

RegisterSteps is a helper method to register multiple AtomicSteps at a time

type StepReport

type StepReport struct {
	StepID        string              `yaml:"step_id" json:"stepID"`
	Action        StepActionType      `yaml:"action" json:"action"`
	StartTime     time.Time           `yaml:"start_time" json:"startTime"`
	EndTime       time.Time           `yaml:"end_time" json:"endTime"`
	Status        Status              `yaml:"status" json:"status"`
	FailureReason errors.EncodedError `yaml:"reason" json:"reason"`
	Metadata      map[string][]byte   `yaml:"metadata" json:"metadata"`
}

StepReport defines the report data model for each AtomicStep execution

func NewStepReport

func NewStepReport(id string, action StepActionType) *StepReport

NewStepReport returns a new report with a given stepID

type Success

type Success struct {
	// contains filtered or unexported fields
}

Success defines a success event for a step

func NewSkippedRun

func NewSkippedRun(prevSuccess *Success, report *StepReport) *Success

NewSkippedRun creates a Success event with StatusSkipped for RunAction This is a helper method to be used in run action when the run action is skipped.

func NewStartTrigger

func NewStartTrigger(reports WorkflowReport) *Success

NewStartTrigger returns a Success event to be use for Run method It is used by the Workflow to trigger the execution of the first step

func NewSuccess

func NewSuccess(prevSuccess *Success, report *StepReport) *Success

NewSuccess creates a Success event for run action It is used by a step to trigger run action of the nex step when its own run succeeds. It sets the step's RunAction status as StatusSuccess.

type Workflow

type Workflow struct {
	// contains filtered or unexported fields
}

Workflow implements AtomicWorkflow interface It implements a Saga workflow using Choreography execution pattern

In order to enable Choreography pattern it forms a double linked list of AtomicSteps to traverse 'Forward' on Success and 'Backward' on Failure

func NewWorkflow

func NewWorkflow(id string, opts ...WorkflowOption) *Workflow

NewWorkflow returns an instance of WorkFlow that implements AtomicWorkflow interface

func (*Workflow) End

func (wf *Workflow) End(ctx context.Context)

End performs any cleanup after the Workflow execution This is a NOOP currently, but left as placeholder for any future cleanup steps if required

func (*Workflow) GetID

func (wf *Workflow) GetID() string

GetID returns the id of the Workflow

func (*Workflow) Start

func (wf *Workflow) Start(ctx context.Context) (WorkflowReport, error)

Start starts the workflow and returns the WorkflowReport

type WorkflowOption

type WorkflowOption func(wf *Workflow)

WorkflowOption exposes "constructor with option" pattern for Workflow

func WithLogger

func WithLogger(logger *zap.Logger) WorkflowOption

WithLogger allows Workflow to be initialized with a logger By default a Workflow is initialized with a NoOp logger

func WithSteps

func WithSteps(steps ...AtomicStep) WorkflowOption

WithSteps allow Workflow to be initialized with the list of ordered steps

type WorkflowReport

type WorkflowReport struct {
	WorkflowID   string        `yaml:"workflow_id" json:"workflowID"`
	StartTime    time.Time     `yaml:"start_time" json:"startTime"`
	EndTime      time.Time     `yaml:"end_time" json:"endTime"`
	Status       Status        `yaml:"status" json:"status"`
	StepSequence StepIDs       `yaml:"step_sequence" json:"stepSequence"`
	StepReports  []*StepReport `yaml:"step_reports" json:"stepReports"`
}

WorkflowReport defines a map of StepReport with key as the step ID

func NewWorkflowReport

func NewWorkflowReport(id string, steps StepIDs) *WorkflowReport

NewWorkflowReport returns an instance of WorkflowReport

func (*WorkflowReport) Append

func (wfr *WorkflowReport) Append(stepReport *StepReport, action StepActionType, status Status)

Append appends the current report to the previous report It adds an end time and sets the status for the current report

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL