async

package module
v0.9.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 20, 2021 License: GPL-3.0 Imports: 4 Imported by: 5

README

gorchestrate your workflows

Gorchestrate makes stateful workflow management in Go incredibly easy.

I've spend 4 years to come up with this particular approach of handling workflows, and i'll apreciate if you'll try it out.

Example App

https://github.com/gorchestrate/pizzaapp This is an example of how you can use Google Cloud to build fully serverless stateful workflows, that could be scaled horizontally without too much efforts.

Example Code

type MyWorkflowState struct {
	User     string
	Approved bool
	Error    string
}

func (s *MyWorkflow) Definition() Section {
	return S(
		Step("first action", func() error {
			s.User = "You can execute arbitrary code right in the workflow definition"
			log.Printf("No need for fancy definition/action separation")
			return nil
		}),
		If(s.User == "",
			Step("second action", func() error {
				log.Printf("use if/else to branch your workflow")
				return nil
			}),
			Return(),
		),
		Select("and then wait for some events",
			On("myEvent", MyEvent{
				F: func() {
					log.Printf("this is executed synchronously when HandleEvent() is Called")
				},
			},
				Step("and then continue the workflow", func() error {
					return nil
				}),
				s.Finish("timed out"),
			),
		),
		Go("thread2", S(
			Step("you can also run parallel threads in your workflow", func() error {
				s.User = "this allows you to orchestrate your workflow and run multiple asnychrounous actions in parallel"
				return nil
			}),
		)),
		s.Finish("finished successfully"),
	)
}

func (s *MyWorkflow) Finish(output string) Stmt {
	return S(
		Step("you can also build sub steps for your workflows", func() error {
			return nil
		}),
		Step("for example if you want to execute same actions for multiple workflow steps", func() error {
			return nil
		}),
		Step("or make your workflow definion clean and readable", func() error {
			return nil
		}),
		Return(),
	)
}

Architecture

Gorchestrate has stateless API and does not come with a database or scheduler. It's intended to be integrated into applications, which will determine how & when workflows will be executed.

Think this is a stateful script - you want to execute some steps and want to save the state of the script in persistent storage and then resume the script when some event comes in.

There are 2 ways to execute workflow:

  • Scheduled call of Resume() method. This is called when workflow starts or event was handled and we need to continue execution.
  • Explicit call of HandleEvent() method. This is called whenever some event happens.

Brief description of how workflows are executed: 0. Workflow is created and Resume() call is scheduled.

  1. Scheduled Resume() will execute workflow till point when workflow is blocked and waiting for events.
  2. Then it will execute Setup() for all events we are waiting for. This allows you to register your events on external services.
  3. When event arrives - you pass it to HandleEvent() method. This handles event and schedules Resume() call
  4. Resume() will execute Teardown() for all events we were waiting for. This allows you to deregister your events on external services.
  5. Go to point 1 if main thread is not yet exited.

How library knows where to resume the workflow from?

  • All threads and their current steps are located in workflows State.
  • When Resume() is called - we try to continue all steps in all threads, from CurStep specified in thread. So if you add a new step to the workflow - it will not break the execution flow. Removing step is harder, since you need to make sure that the step is not currently executed and it's removal won't break the logic of your workflow.
  • When HandleEvent() is called - we find step waiting for this callback and continue execution. Calling HandleEvent() with event that workflow is not waiting for will return an error.

What about performance?

Performance is mainly limited by DB operations, rather than library itself. To build stateful workflows you have to lock, save & unlock the workflow for each execution step, which may be slow and resource-hungry.

Here's a benchmark for a complete workflow execution consisting of 19 execution steps, including loops, steps & event handlers.

go test -bench=. -benchtime=1s
cpu: AMD Ryzen 7 4700U with Radeon Graphics         
BenchmarkXxx-8   	   33121	     35601 ns/op

Is it production ready?

It will reliably work for straightforward workflows that are just doing simple steps. There's 80% coverage and tests are covering basic flows.

For advanced usage, such as workflows with concurrency or custom event handlers - you should write tests and make sure library does what you want. (see async_test.go on how to test your workflows)

I'm keeping this library as 0.9.x version, since API may change a little bit in the future (for example we may want to pass context to callbacks in future), but it should be easy to fix.

I'd be glad if some really smart guy will help me out to make this library better or more popular)))

Feedback is welcome!

Documentation

Overview

Example
package main

import (
	"context"
	"encoding/json"
	"log"
)

type MyWorkflow struct {
	State    State
	User     string
	Approved bool
	Error    string
}

func (s *MyWorkflow) Definition() Section {
	return S(
		Step("first action", func() error {
			s.User = "You can execute arbitrary code right in the workflow definition"
			log.Printf("No need for fancy definition/action separation")
			return nil
		}),
		If(s.User == "", "check1",
			Step("second action", func() error {
				log.Printf("use if/else to branch your workflow")
				return nil
			}),
			Return(),
		),
		Wait("and then wait for some events",
			On("myEvent", MyEvent{
				F: func() {
					log.Printf("this is executed synchronously when HandleEvent() is Called")
				},
			},
				Step("and then continue the workflow", func() error {
					return nil
				}),
				s.Finish("timed out"),
			),
		),
		Go("thread2", S(
			Step("you can also run parallel threads in your workflow", func() error {
				s.User = "this allows you to orchestrate your workflow and run multiple asnychrounous actions in parallel"
				return nil
			}),
		)),
		s.Finish("finished successfully"),
	)
}

func (s *MyWorkflow) Finish(output string) Stmt {
	return S(
		Step("you can also build reusable workflows", func() error {
			return nil
		}),
		Step("for example if you want to execute same actions for multiple workflow steps", func() error {
			return nil
		}),
		Step("or make your workflow definion clean and readable", func() error {
			return nil
		}),
		Return(),
	)
}

type MyEvent struct {
	F func()
}

func (t MyEvent) Setup(ctx context.Context, req CallbackRequest) (json.RawMessage, error) {
	return nil, nil
}

func (t MyEvent) Teardown(ctx context.Context, req CallbackRequest, handled bool) error {
	return nil
}

func (t MyEvent) Handle(ctx context.Context, req CallbackRequest, input interface{}) (interface{}, error) {
	t.F()
	return input, nil
}

func main() {
	wf := MyWorkflow{
		State: NewState("1", "empty"),
	}
	err := Resume(context.Background(), &wf, &wf.State, func(scheduleResume bool) error {
		// this is callback to save updated &wf to persistent storage
		return nil
	})
	if err != nil {
		log.Fatal(err)
	}
	_, err = HandleEvent(context.Background(), "myEvent", &wf, &wf.State, nil, func(scheduleResume bool) error {
		// this is callback to schedule another Resume() call and save updated &wf to persistent storage
		return nil
	})
	if err != nil {
		log.Fatal(err)
	}
}

Index

Examples

Constants

View Source
const MainThread = "_main_"

Variables

This section is empty.

Functions

func HandleCallback added in v0.9.0

func HandleCallback(ctx context.Context, req CallbackRequest, wf WorkflowState, s *State, input interface{}, save Checkpoint) (interface{}, error)

func HandleEvent added in v0.9.0

func HandleEvent(ctx context.Context, name string, wf WorkflowState, s *State, input interface{}, save Checkpoint) (interface{}, error)

HandleEvent is shortcut for calling HandleCallback() by event name. Be careful - if you're using goroutines - there may be multiple events waiting with the same name. Also, if you're waiting for event in a loop - event handlers from previous iterations could arrive late and trigger events for future iterations. For better control over which event will be called back - you should use OnCallback() instead and specify PC & Thread explicitly.

func Resume added in v0.9.0

func Resume(ctx context.Context, wf WorkflowState, s *State, save Checkpoint) error

Resume the workflow after - workflow creation - successful callback handling - previously failed Resume() call

This method can be called multiple times. If there's nothing to resume - it will return 'nil'

func Validate added in v0.9.6

func Validate(s Section) error

TODO: more robust validations

func Walk added in v0.5.0

func Walk(s Stmt, f func(s Stmt) bool) (bool, error)

Types

type BreakStmt added in v0.5.0

type BreakStmt struct {
}

func Break added in v0.5.0

func Break() BreakStmt

Break for loop

func (BreakStmt) MarshalJSON added in v0.9.9

func (s BreakStmt) MarshalJSON() ([]byte, error)

func (BreakStmt) Resume added in v0.5.0

func (s BreakStmt) Resume(ctx *resumeContext) (*Stop, error)

type CallbackRequest added in v0.5.0

type CallbackRequest struct {
	WorkflowID string
	ThreadID   string
	Name       string
	PC         int
	SetupData  json.RawMessage
}

type Checkpoint added in v0.7.0

type Checkpoint func(scheduleResume bool) error

type Event added in v0.9.9

type Event struct {
	Callback CallbackRequest
	Handler  Handler
	Stmt     Stmt
}

func On added in v0.5.0

func On(event string, handler Handler, stmts ...Stmt) Event

func (Event) MarshalJSON added in v0.9.9

func (s Event) MarshalJSON() ([]byte, error)

type ForStmt added in v0.5.0

type ForStmt struct {
	Name string
	Cond bool // nil cond for infinite loop
	Stmt Stmt
}

func (ForStmt) MarshalJSON added in v0.9.9

func (s ForStmt) MarshalJSON() ([]byte, error)

func (ForStmt) Resume added in v0.5.0

func (f ForStmt) Resume(ctx *resumeContext) (*Stop, error)

type GoStmt added in v0.5.0

type GoStmt struct {
	// ID is needed to identify threads when there are many threads running with the same name.
	// (for example they were created in a loop)
	ID   func() string `json:"-"`
	Name string
	Stmt Stmt
}

func Go added in v0.5.0

func Go(name string, body Stmt) *GoStmt

func (GoStmt) MarshalJSON added in v0.9.9

func (s GoStmt) MarshalJSON() ([]byte, error)

func (*GoStmt) Resume added in v0.5.0

func (s *GoStmt) Resume(ctx *resumeContext) (*Stop, error)

func (*GoStmt) WithID added in v0.9.0

func (s *GoStmt) WithID(id func() string) *GoStmt

type Handler added in v0.5.0

type Handler interface {
	Setup(ctx context.Context, req CallbackRequest) (json.RawMessage, error)

	Handle(ctx context.Context, req CallbackRequest, input interface{}) (interface{}, error)

	Teardown(ctx context.Context, req CallbackRequest, handled bool) error
}

func FindHandler added in v0.8.6

func FindHandler(req CallbackRequest, sec Stmt) (Handler, error)

type ReturnStmt added in v0.5.0

type ReturnStmt struct {
}

func Return added in v0.5.0

func Return() ReturnStmt

Return stops this trhead

func (ReturnStmt) MarshalJSON added in v0.9.9

func (s ReturnStmt) MarshalJSON() ([]byte, error)

func (ReturnStmt) Resume added in v0.5.0

func (s ReturnStmt) Resume(ctx *resumeContext) (*Stop, error)

type Section added in v0.5.0

type Section []Stmt

Section is similar to code block {} with a list of statements.

func S added in v0.5.0

func S(ss ...Stmt) Section

S is a syntax sugar to properly indent statement sections when using gofmt

func (Section) MarshalJSON added in v0.9.9

func (s Section) MarshalJSON() ([]byte, error)

func (Section) Resume added in v0.5.0

func (s Section) Resume(ctx *resumeContext) (*Stop, error)

type State added in v0.5.0

type State struct {
	ID       string         // id of workflow instance
	Workflow string         // name of workflow definition. Used to choose proper state type to unmarshal & resume on
	Status   WorkflowStatus // current status
	Threads  Threads
	PC       int
}

func NewState added in v0.7.0

func NewState(id, name string) State

type Stmt added in v0.5.0

type Stmt interface {
	// Resume continues execution of the process, based on resumeContext
	// If ctx.Running == true Resume should execute the statement or continue execution after.
	// If ctx.Running = false Resume should not execute the statement, but recursively search for the statement that needs to be resumed.
	// If it needs to be resumed - don't execute it, but continue execution from this statement.
	//
	// If stmt not found *Stop will be nil and ctx.Running will be false
	// If stmt is found, but process has finished - *Stop will be nil and ctx.Running will be true
	// Otherwise Resume should always return *Stop or err != nil
	Resume(ctx *resumeContext) (*Stop, error)
}

Stmt is async statement definition that should support workflow resuming & search.

func For added in v0.5.0

func For(cond bool, ss ...Stmt) Stmt

type StmtStep added in v0.5.0

type StmtStep struct {
	Name   string
	Action func() error `json:"-"`
}

func FindStep added in v0.5.0

func FindStep(name string, sec Stmt) (*StmtStep, error)

func Step added in v0.5.0

func Step(name string, action func() error) StmtStep

func (StmtStep) MarshalJSON added in v0.9.9

func (s StmtStep) MarshalJSON() ([]byte, error)

func (StmtStep) Resume added in v0.5.0

func (s StmtStep) Resume(ctx *resumeContext) (*Stop, error)

type Stop added in v0.5.0

type Stop struct {
	Step       string          // execute step
	WaitEvents *WaitEventsStmt // wait for Events
	Return     bool            // stop this thread
	Cond       string          // wait for cond
}

Stop tells us that syncronous part of the workflow has finished. It means we either:

type SwitchCase added in v0.5.0

type SwitchCase struct {
	Name string
	Cond bool
	Stmt Stmt
}

func Case

func Case(cond bool, name string, sec Stmt) SwitchCase

func Default added in v0.5.0

func Default(sec Stmt) SwitchCase

func (SwitchCase) MarshalJSON added in v0.9.9

func (s SwitchCase) MarshalJSON() ([]byte, error)

type SwitchStmt added in v0.5.0

type SwitchStmt struct {
	Cases []SwitchCase
}

func If added in v0.5.0

func If(cond bool, name string, sec ...Stmt) *SwitchStmt

func Switch added in v0.5.0

func Switch(ss ...SwitchCase) *SwitchStmt

execute statements based on condition

func (*SwitchStmt) Else added in v0.9.5

func (s *SwitchStmt) Else(sec ...Stmt) *SwitchStmt

func (*SwitchStmt) ElseIf added in v0.9.5

func (s *SwitchStmt) ElseIf(cond bool, name string, sec ...Stmt) *SwitchStmt

func (SwitchStmt) MarshalJSON added in v0.9.9

func (s SwitchStmt) MarshalJSON() ([]byte, error)

func (*SwitchStmt) Resume added in v0.5.0

func (s *SwitchStmt) Resume(ctx *resumeContext) (*Stop, error)

type Thread

type Thread struct {
	ID         string
	Name       string
	Status     ThreadStatus // current status
	CurStep    string       // current step
	WaitEvents []WaitEvent  // waiting for Select() stmt conditions
	Break      bool
	PC         int
}

type ThreadStatus added in v0.5.0

type ThreadStatus string
const (
	ThreadExecuting        ThreadStatus = "Executing"        // next step for this thread is to execute "CurStep" step
	ThreadResuming         ThreadStatus = "Resuming"         // next step for this thread is to continue from "CurStep" step
	ThreadWaitingEvent     ThreadStatus = "WaitingEvent"     // thread is waiting for "CurStep" wait condition and will be resumed via OnCallback()
	ThreadWaitingCondition ThreadStatus = "WaitingCondition" // thread is waiting for condition to happen. i.e. it's waiting for other thread to update some data

)

type Threads added in v0.5.0

type Threads []*Thread

func (*Threads) Add added in v0.5.0

func (tt *Threads) Add(t *Thread) error

func (*Threads) Find added in v0.5.0

func (tt *Threads) Find(id string) (*Thread, bool)

func (*Threads) Remove added in v0.5.0

func (tt *Threads) Remove(id string)

type WG added in v0.9.6

type WG struct {
	I int
}

func (*WG) Add added in v0.9.6

func (wg *WG) Add(i int)

func (*WG) Done added in v0.9.6

func (wg *WG) Done()

func (*WG) Wait added in v0.9.6

func (wg *WG) Wait(label string) Stmt

type WaitCondStmt added in v0.9.9

type WaitCondStmt struct {
	Name    string
	Cond    bool
	Handler func() `json:"-"` // executed when cond is true.
}

func FindWaitingStep added in v0.9.6

func FindWaitingStep(name string, sec Stmt) (WaitCondStmt, error)

func WaitCond added in v0.5.0

func WaitCond(cond bool, label string, handler func()) WaitCondStmt

Wait statement wait for condition to be true.

func (WaitCondStmt) MarshalJSON added in v0.9.9

func (s WaitCondStmt) MarshalJSON() ([]byte, error)

func (WaitCondStmt) Resume added in v0.9.9

func (f WaitCondStmt) Resume(ctx *resumeContext) (*Stop, error)

type WaitEvent added in v0.5.0

type WaitEvent struct {
	Req     CallbackRequest
	Status  WaitEventStatus
	Handled bool
	Error   string
}

type WaitEventStatus added in v0.6.0

type WaitEventStatus string
const (
	EventPendingSetup    WaitEventStatus = "PendingSetup"    // event is just created
	EventSetup           WaitEventStatus = "Setup"           // event was successfully setup
	EventPendingTeardown WaitEventStatus = "PendingTeardown" // event was successfully setup
	EventSetupError      WaitEventStatus = "SetupError"      // there was an error during setup
	EventTeardownError   WaitEventStatus = "TeardownError"   // there was an error during teardown
)

type WaitEventsStmt added in v0.9.9

type WaitEventsStmt struct {
	Name  string
	Cases []Event
}

func Wait added in v0.5.0

func Wait(name string, ss ...Event) WaitEventsStmt

Wait for multiple events exclusively

func (WaitEventsStmt) MarshalJSON added in v0.9.9

func (s WaitEventsStmt) MarshalJSON() ([]byte, error)

func (WaitEventsStmt) Resume added in v0.9.9

func (s WaitEventsStmt) Resume(ctx *resumeContext) (*Stop, error)

type WorkflowState added in v0.5.0

type WorkflowState interface {
	// Definition func may be called multiple times so it should be idempotent.
	// All actions should done in callbacks or steps.
	Definition() Section
}

WorkflowState should be a Go struct supporting JSON unmarshalling into it. When process is resumed - current state is unmarshalled into it and then Definition() is called. This is needed to eliminate lasy parameters and conditions i.e. instead of 'If( func() bool { return s.IsAvailable})' we can write 'If(s.IsAvailable)'.

type WorkflowStatus added in v0.5.0

type WorkflowStatus string
const (
	WorkflowRunning  WorkflowStatus = "Running"
	WorkflowFinished WorkflowStatus = "Finished"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL