async

package module
v0.13.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2022 License: GPL-3.0 Imports: 6 Imported by: 5

README

gorchestrate your workflows

link to docs

Gorchestrate makes stateful workflow management in Go incredibly easy. I've spend 4 years to come up with this particular approach of handling workflows, and i'll apreciate if you'll try it out.

Example App

https://github.com/gorchestrate/pizzaapp This is an example of how you can use Google Cloud to build fully serverless stateful workflows, that could be scaled horizontally without too much efforts.

Example Code

type MyWorkflowState struct {
	User     string
	Approved bool
	Error    string
}

func (s *MyWorkflow) Definition() Section {
	return S(
		Step("first action", func() error {
			s.User = "You can execute arbitrary code right in the workflow definition"
			log.Printf("No need for fancy definition/action separation")
			return nil
		}),
		If(s.User == "",
			Step("second action", func() error {
				log.Printf("use if/else to branch your workflow")
				return nil
			}),
			Return(),
		),
		Select("and then wait for some events",
			On("myEvent", MyEvent{
				F: func() {
					log.Printf("this is executed synchronously when HandleEvent() is Called")
				},
			},
				Step("and then continue the workflow", func() error {
					return nil
				}),
				s.Finish("timed out"),
			),
		),
		Go("thread2", S(
			Step("you can also run parallel threads in your workflow", func() error {
				s.User = "this allows you to orchestrate your workflow and run multiple asnychrounous actions in parallel"
				return nil
			}),
		)),
		s.Finish("finished successfully"),
	)
}

func (s *MyWorkflow) Finish(output string) Stmt {
	return S(
		Step("you can also build sub steps for your workflows", func() error {
			return nil
		}),
		Step("for example if you want to execute same actions for multiple workflow steps", func() error {
			return nil
		}),
		Step("or make your workflow definion clean and readable", func() error {
			return nil
		}),
		Return(),
	)
}

Architecture

Gorchestrate has stateless API and does not come with a database or scheduler. It's intended to be integrated into applications, which will determine how & when workflows will be executed.

Think this is a stateful script - you want to execute some steps and want to save the state of the script in persistent storage and then resume the script when some event comes in.

There are 2 ways to execute workflow:

  • Scheduled call of Resume() method. This is called when workflow starts or event was handled and we need to continue execution.
  • Explicit call of HandleEvent() method. This is called whenever some event happens.

Brief description of how workflows are executed: 0. Workflow is created and Resume() call is scheduled.

  1. Scheduled Resume() will execute workflow till point when workflow is blocked and waiting for events.
  2. Then it will execute Setup() for all events we are waiting for. This allows you to register your events on external services.
  3. When event arrives - you pass it to HandleEvent() method. This handles event and schedules Resume() call
  4. Resume() will execute Teardown() for all events we were waiting for. This allows you to deregister your events on external services.
  5. Go to point 1 if main thread is not yet exited.

How library knows where to resume the workflow from?

  • All threads and their current steps are located in workflows State.
  • When Resume() is called - we try to continue all steps in all threads, from CurStep specified in thread. So if you add a new step to the workflow - it will not break the execution flow. Removing step is harder, since you need to make sure that the step is not currently executed and it's removal won't break the logic of your workflow.
  • When HandleEvent() is called - we find step waiting for this callback and continue execution. Calling HandleEvent() with event that workflow is not waiting for will return an error.

What about performance?

Performance is mainly limited by DB operations, rather than library itself. To build stateful workflows you have to lock, save & unlock the workflow for each execution step, which may be slow and resource-hungry.

Here's a benchmark for a complete workflow execution consisting of 19 execution steps, including loops, steps & event handlers.

go test -bench=. -benchtime=1s
cpu: AMD Ryzen 7 4700U with Radeon Graphics         
BenchmarkXxx-8   	   33121	     35601 ns/op

Is it production ready?

It will reliably work for straightforward workflows that are just doing simple steps. There's 80% coverage and tests are covering basic flows.

For advanced usage, such as workflows with concurrency or custom event handlers - you should write tests and make sure library does what you want. (see async_test.go on how to test your workflows)

I'm keeping this library as 0.9.x version, since API may change a little bit in the future (for example we may want to pass context to callbacks in future), but it should be easy to fix.

I'd be glad if some really smart guy will help me out to make this library better or more popular)))

Feedback is welcome!

Documentation

Index

Constants

View Source
const HandlerTypeReflect = "reflect"
View Source
const MainThread = "_main_"

Variables

This section is empty.

Functions

func HandleCallback added in v0.9.0

func HandleCallback(ctx context.Context, req CallbackRequest, wf WorkflowState, s *State, input interface{}) (interface{}, error)

Handle incoming event. This func will only execute callback handler and update the state After this function completes successfully - workflow should be resumed using Resume()

func Resume added in v0.9.0

func Resume(ctx context.Context, wf WorkflowState, s *State, save Checkpoint) error

Resume will continue workflow, executing steps in a process. Resume may fail in the middle of the processing. To avoid data loss and out-of-order duplicated step execution - save() will be called to checkpoint current state of the workflow.

func Validate added in v0.9.6

func Validate(s Section) error

TODO: more robust validations

func Walk added in v0.5.0

func Walk(s Stmt, f func(s Stmt) bool) (bool, error)

Types

type BreakStmt added in v0.5.0

type BreakStmt struct {
}

func Break added in v0.5.0

func Break() BreakStmt

Break for loop

func (BreakStmt) MarshalJSON added in v0.9.9

func (s BreakStmt) MarshalJSON() ([]byte, error)

func (BreakStmt) Resume added in v0.5.0

func (s BreakStmt) Resume(ctx *resumeContext) (*Stop, error)

type CallbackRequest added in v0.5.0

type CallbackRequest struct {
	WorkflowID string
	ThreadID   string
	Name       string
	PC         int
	SetupData  string
}

type Checkpoint added in v0.7.0

type Checkpoint func(ctx context.Context, t CheckpointType) error

Checkpoint is used to save workflow state while it's being processed You may want to checkpoint/save your workflow only for specific checkpoint types to increase performance and avoid unnecessary saves.

type CheckpointType added in v0.12.0

type CheckpointType string

Type of checkpoint event

const (
	// Step was executed successfully. You probably want to save your workflow after this event
	CheckpointAfterStep      CheckpointType = "afterStep"
	CheckpointAfterSetup     CheckpointType = "afterSetup"
	CheckpointAfterTeardown  CheckpointType = "afterTeardown"
	CheckpointAfterResume    CheckpointType = "afterResume"
	CheckpointAfterCondition CheckpointType = "afterCondition"
)

type ContinueStmt added in v0.10.0

type ContinueStmt struct {
}

func Continue added in v0.10.0

func Continue() ContinueStmt

Continue for loop

func (ContinueStmt) MarshalJSON added in v0.10.0

func (s ContinueStmt) MarshalJSON() ([]byte, error)

func (ContinueStmt) Resume added in v0.10.0

func (s ContinueStmt) Resume(ctx *resumeContext) (*Stop, error)

type Empty

type Empty struct {
}

type ErrExec added in v0.11.0

type ErrExec struct {
	Step string
	// contains filtered or unexported fields
}

func (ErrExec) Error added in v0.11.0

func (e ErrExec) Error() string

func (ErrExec) Unwrap added in v0.11.0

func (e ErrExec) Unwrap() error

type Event added in v0.9.9

type Event struct {
	Callback CallbackRequest
	Handler  Handler
	Stmt     Stmt
}

func On added in v0.5.0

func On(event string, handler Handler, stmts ...Stmt) Event

func OnEvent added in v0.12.0

func OnEvent(name string, h interface{}, stmts ...Stmt) Event

func (Event) MarshalJSON added in v0.9.9

func (s Event) MarshalJSON() ([]byte, error)

type ForStmt added in v0.5.0

type ForStmt struct {
	Name    string
	Cond    bool // nil cond for infinite loop
	Section Section
}

func (ForStmt) MarshalJSON added in v0.9.9

func (s ForStmt) MarshalJSON() ([]byte, error)

func (ForStmt) Resume added in v0.5.0

func (f ForStmt) Resume(ctx *resumeContext) (*Stop, error)

type GoStmt added in v0.5.0

type GoStmt struct {
	// ID is needed to identify threads when there are many threads running with the same name.
	// (for example they were created in a loop)
	ID   func() string `json:"-"`
	Name string
	Stmt Stmt
}

func Go added in v0.5.0

func Go(name string, body Stmt) *GoStmt

func (GoStmt) MarshalJSON added in v0.9.9

func (s GoStmt) MarshalJSON() ([]byte, error)

func (*GoStmt) Resume added in v0.5.0

func (s *GoStmt) Resume(ctx *resumeContext) (*Stop, error)

func (*GoStmt) WithID added in v0.9.0

func (s *GoStmt) WithID(id func() string) *GoStmt

type Handler added in v0.5.0

type Handler interface {
	Type() string

	Setup(ctx context.Context, req CallbackRequest) (string, error)

	Handle(ctx context.Context, req CallbackRequest, input interface{}) (interface{}, error)

	Teardown(ctx context.Context, req CallbackRequest, handled bool) error
}

func FindHandler added in v0.8.6

func FindHandler(req CallbackRequest, sec Stmt) (Handler, error)

type ReflectHandler added in v0.13.0

type ReflectHandler struct {
	Handler interface{} `json:"-"`
}

This is an example of how to create your custom events

func (*ReflectHandler) Handle added in v0.13.0

func (h *ReflectHandler) Handle(ctx context.Context, req CallbackRequest, input interface{}) (interface{}, error)

code that will be executed when event is received

func (ReflectHandler) MarshalJSON added in v0.13.0

func (h ReflectHandler) MarshalJSON() ([]byte, error)

func (ReflectHandler) Schemas added in v0.13.0

func (h ReflectHandler) Schemas() (in *jsonschema.Schema, out *jsonschema.Schema, err error)

func (*ReflectHandler) Setup added in v0.13.0

func (t *ReflectHandler) Setup(ctx context.Context, req CallbackRequest) (string, error)

when we will start listening for this event - Setup() will be called for us to setup this event on external services

func (*ReflectHandler) Teardown added in v0.13.0

func (t *ReflectHandler) Teardown(ctx context.Context, req CallbackRequest, handled bool) error

when we will stop listening for this event - Teardown() will be called for us to remove this event on external services

func (*ReflectHandler) Type added in v0.13.0

func (h *ReflectHandler) Type() string

type ReturnStmt added in v0.5.0

type ReturnStmt struct {
}

func Return added in v0.5.0

func Return() ReturnStmt

Return stops this trhead

func (ReturnStmt) MarshalJSON added in v0.9.9

func (s ReturnStmt) MarshalJSON() ([]byte, error)

func (ReturnStmt) Resume added in v0.5.0

func (s ReturnStmt) Resume(ctx *resumeContext) (*Stop, error)

type Section added in v0.5.0

type Section []Stmt

Section is similar to code block {} with a list of statements.

func S added in v0.5.0

func S(ss ...Stmt) Section

S is a syntax sugar to properly indent statement sections when using gofmt

func (Section) Resume added in v0.5.0

func (s Section) Resume(ctx *resumeContext) (*Stop, error)

type State added in v0.5.0

type State struct {
	ID       string         // id of workflow instance
	Workflow string         // name of workflow definition. Used to choose proper state type to unmarshal & resume on
	Status   WorkflowStatus // current status
	Threads  Threads
	PC       int
}

func NewState added in v0.7.0

func NewState(id, name string) State

func (*State) UpdateWorkflowsStatus added in v0.13.1

func (s *State) UpdateWorkflowsStatus()

type Stmt added in v0.5.0

type Stmt interface {
	// Resume continues execution of the process, based on resumeContext
	// If ctx.Running == true Resume should execute the statement or continue execution after.
	// If ctx.Running = false Resume should not execute the statement, but recursively search for the statement that needs to be resumed.
	// If it needs to be resumed - don't execute it, but continue execution from this statement.
	//
	// If stmt not found *Stop will be nil and ctx.Running will be false
	// If stmt is found, but process has finished - *Stop will be nil and ctx.Running will be true
	// Otherwise Resume should always return *Stop or err != nil
	Resume(ctx *resumeContext) (*Stop, error)
}

Stmt is async statement definition that should support workflow resuming & search.

func For added in v0.5.0

func For(name string, cond bool, ss ...Stmt) Stmt

type StmtStep added in v0.5.0

type StmtStep struct {
	Name   string
	Action func(ctx context.Context) error `json:"-"`
}

func FindStep added in v0.5.0

func FindStep(name string, sec Stmt) (*StmtStep, error)

func Step added in v0.5.0

func Step(name string, action func(ctx context.Context) error) StmtStep

if action func returns error - step will be retried automatically.

func (StmtStep) MarshalJSON added in v0.9.9

func (s StmtStep) MarshalJSON() ([]byte, error)

func (StmtStep) Resume added in v0.5.0

func (s StmtStep) Resume(ctx *resumeContext) (*Stop, error)

type Stop added in v0.5.0

type Stop struct {
	Step       string          // execute step
	WaitEvents *WaitEventsStmt // wait for Events
	Return     bool            // stop this thread
	Cond       string          // wait for cond
}

Stop tells us that syncronous part of the workflow has finished. It means we either:

type SwitchCase added in v0.5.0

type SwitchCase struct {
	Name    string
	Cond    bool
	Stmt    Stmt
	Default bool
}

func Case

func Case(cond bool, name string, sec Stmt) SwitchCase

func Default added in v0.5.0

func Default(name string, sec Stmt) SwitchCase

func (SwitchCase) MarshalJSON added in v0.9.9

func (s SwitchCase) MarshalJSON() ([]byte, error)

type SwitchStmt added in v0.5.0

type SwitchStmt struct {
	Cases []SwitchCase
}

func If added in v0.5.0

func If(cond bool, name string, sec ...Stmt) *SwitchStmt

func Switch added in v0.5.0

func Switch(ss ...SwitchCase) *SwitchStmt

execute statements based on condition

func (*SwitchStmt) Else added in v0.9.5

func (s *SwitchStmt) Else(name string, sec ...Stmt) *SwitchStmt

func (*SwitchStmt) ElseIf added in v0.9.5

func (s *SwitchStmt) ElseIf(cond bool, name string, sec ...Stmt) *SwitchStmt

func (SwitchStmt) MarshalJSON added in v0.9.9

func (s SwitchStmt) MarshalJSON() ([]byte, error)

func (*SwitchStmt) Resume added in v0.5.0

func (s *SwitchStmt) Resume(ctx *resumeContext) (*Stop, error)

type Thread

type Thread struct {
	ID          string
	Name        string
	Status      ThreadStatus // current status
	CurStep     string       // current step
	CurCallback string
	WaitEvents  []WaitEvent // waiting for Select() stmt conditions
	Break       bool
	Continue    bool
	PC          int
}

type ThreadStatus added in v0.5.0

type ThreadStatus string
const (
	ThreadExecuting        ThreadStatus = "Executing"        // next step for this thread is to execute "CurStep" step
	ThreadResuming         ThreadStatus = "Resuming"         // next step for this thread is to continue from "CurStep" step
	ThreadWaitingEvent     ThreadStatus = "WaitingEvent"     // thread is waiting for "CurStep" wait condition and will be resumed via OnCallback()
	ThreadWaitingCondition ThreadStatus = "WaitingCondition" // thread is waiting for condition to happen. i.e. it's waiting for other thread to update some data

)

type Threads added in v0.5.0

type Threads []*Thread

func (*Threads) Add added in v0.5.0

func (tt *Threads) Add(t *Thread) error

func (*Threads) Find added in v0.5.0

func (tt *Threads) Find(id string) (*Thread, bool)

func (*Threads) Remove added in v0.5.0

func (tt *Threads) Remove(id string)

type WG added in v0.9.6

type WG struct {
	I int
}

func (*WG) Add added in v0.9.6

func (wg *WG) Add(i int)

func (*WG) Done added in v0.9.6

func (wg *WG) Done()

func (*WG) Wait added in v0.9.6

func (wg *WG) Wait(label string) Stmt

type WaitCondStmt added in v0.9.9

type WaitCondStmt struct {
	Name    string
	Cond    bool
	Handler func(ctx context.Context) `json:"-"` // executed when cond is true.
}

func FindWaitingStep added in v0.9.6

func FindWaitingStep(name string, sec Stmt) (WaitCondStmt, error)

func WaitFor added in v0.5.0

func WaitFor(label string, cond bool, handler func(ctx context.Context)) WaitCondStmt

Wait statement wait for condition to be true.

func (WaitCondStmt) MarshalJSON added in v0.9.9

func (s WaitCondStmt) MarshalJSON() ([]byte, error)

func (WaitCondStmt) Resume added in v0.9.9

func (f WaitCondStmt) Resume(ctx *resumeContext) (*Stop, error)

type WaitEvent added in v0.5.0

type WaitEvent struct {
	Type    string
	Req     CallbackRequest
	Status  WaitEventStatus
	Handled bool
	Error   string
}

type WaitEventStatus added in v0.6.0

type WaitEventStatus string

Event statuses are needed to make sure that the process of setting up, tearing down and error handling do not interfere with each other. So if setup or teardown of one event fails - we don't retry other events

const (
	EventPendingSetup    WaitEventStatus = "PendingSetup"    // event is just created
	EventSetup           WaitEventStatus = "Setup"           // event was successfully setup
	EventPendingTeardown WaitEventStatus = "PendingTeardown" // event was successfully setup
	EventSetupError      WaitEventStatus = "SetupError"      // there was an error during setup
	EventTeardownError   WaitEventStatus = "TeardownError"   // there was an error during teardown
)

type WaitEventsStmt added in v0.9.9

type WaitEventsStmt struct {
	Name  string
	Cases []Event
}

func Wait added in v0.5.0

func Wait(name string, ss ...Event) WaitEventsStmt

Wait for multiple events exclusively

func (WaitEventsStmt) MarshalJSON added in v0.9.9

func (s WaitEventsStmt) MarshalJSON() ([]byte, error)

func (WaitEventsStmt) Resume added in v0.9.9

func (s WaitEventsStmt) Resume(ctx *resumeContext) (*Stop, error)

type WorkflowState added in v0.5.0

type WorkflowState interface {
	// Definition func may be called multiple times so it should be idempotent.
	// All actions should done in callbacks or steps.
	Definition() Section
}

WorkflowState should be a Go struct supporting JSON unmarshalling into it. When process is resumed - current state is unmarshalled into it and then Definition() is called. This is needed to eliminate lasy parameters and conditions i.e. instead of 'If( func() bool { return s.IsAvailable})' we can write 'If(s.IsAvailable)'.

type WorkflowStatus added in v0.5.0

type WorkflowStatus string
const (
	WorkflowResuming WorkflowStatus = "Resuming" // at least 1 thread is not waiting
	WorkflowWaiting  WorkflowStatus = "Waiting"  // all threads are waiting
	WorkflowFinished WorkflowStatus = "Finished"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL