async

package module
v0.9.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 15, 2021 License: GPL-3.0 Imports: 4 Imported by: 5

README

async is a library, not a framework

It makes workflow management in Go incredibly easy, but requires you to specify how you lock,store & schedule your workflows.

I've spend 4 years to come up with this particular approach of handling workflows, and i'll apreciate if you'll try it out.

Here's how it looks

type MyWorkflowState struct {
	User     string
	Approved bool
	Error    string
}

func (s *MyWorkflowState) Definition() Section {
	return S(
		Step("first action", func() error {
			s.User = "You can execute arbitrary code right in the workflow definition"
			log.Printf("No need for fancy definition/action separation")
			return nil
		}),
		If(s.User == "", Step("second action", func() error {
			log.Printf("use if/else to branch your workflow")
			return nil
		}),
			Return(),
		),
		Wait("and then wait for some events",
			On("wait some time", Timeout{
				After: time.Minute * 10,
			},
				Step("and then continue the workflow", func() error {
					return nil
				}),
				s.Finish("timed out"),
			),
			On("or wait for your custom event", WaitForUserAction{
				Message: "plz approve",
				Approve: func() {
					s.Approved = true
					log.Printf("And handle it's results")
				},
				Deny: func(reason string) {
					s.Approved = false
					s.Error = reason
				},
			}),
		),
		Go("thread2", S(
			Step("you can also run parallel threads in your workflow", func() error {
				s.User = "this allows you to orchestrate your workflow and run multiple asnychrounous actions in parallel"
				return nil
			}),
		)),
		s.Finish("finished successfully"),
	)
}

func (s *MyWorkflowState) Finish(output string) Stmt {
	return S(
		Step("you can also build reusable workflows", func() error {
			return nil
		}),
		Step("for example if you want to execute same actions for multiple workflow steps", func() error {
			return nil
		}),
		Step("or make your workflow definion clean and readable", func() error {
			return nil
		}),
		Return(),
	)
}

Architecture

Gorchestrate has stateless API. This allows framework to be adaptable to any infrastructure, including Serverless applications.

Workflow is resumed(executed) by following events:

  • Explicit call of HandleEvent() method. It's called whenever some event happens.
  • Scheduled call of Resume() method. We schedule such after HandleEvent() method and after workflow creation.
  1. Resume() will execute workflow till point when workflow is blocked and waiting for events.
  2. Then it will execute Setup() for all events we are waiting for. This allows you to register your events on external services.
  3. When event arrives - you pass it to HandleEvent() method. This handles event and schedules Resume() call
  4. When Resume() is called we execute Teardown() for all events we were waiting. This allows you to deregister your events on external services.
  5. If workflow is not finished - go back to point 1.

Features

  • Define all your workflow actions right in the Go code. Logic/action separation is done by using labels and closures.
  • You can update your workflow definition while it is running. Workflow state & definition are stored separately.
  • You can have multiple threads in single workflow and orchestrate them.
  • You can test your workflows via unit-tests.

Google Run & Google Cloud Tasks

By using Google Cloud Run & Google Cloud Tasks & Google Datastore it's possible to have fully serverless workflow setup. This provides all the benefits of serverless applications and at the same time solves issues related to concurrent execution. Example setup using this stack: https://github.com/gorchestrate/pizzaapp

Is it production ready?

It will reliably work for straightforward workflows that are just doing simple steps. There's 80% coverage and tests are covering basic flows.

For advanced usage, such as workflows with concurrency or custom event handlers - you should write tests and make sure library does what you want. (see async_test.go on how to test your workflows)

I'm keeping this library as 0.9.x version, since API may change a little bit in the future (for example we may want to pass context to callbacks in future), but it should be easy to fix.

I'd be glad if some really smart guy will help me out to make this library better or more popular)))

Feedback is welcome!

Documentation

Overview

Example
package main

import (
	"context"
	"encoding/json"
	"log"
)

type MyWorkflow struct {
	State    State
	User     string
	Approved bool
	Error    string
}

func (s *MyWorkflow) Definition() Section {
	return S(
		Step("first action", func() error {
			s.User = "You can execute arbitrary code right in the workflow definition"
			log.Printf("No need for fancy definition/action separation")
			return nil
		}),
		If(s.User == "", Step("second action", func() error {
			log.Printf("use if/else to branch your workflow")
			return nil
		}),
			Return(),
		),
		Select("and then wait for some events",
			On("myEvent", MyEvent{
				F: func() {
					log.Printf("this is executed synchronously when HandleEvent() is Called")
				},
			},
				Step("and then continue the workflow", func() error {
					return nil
				}),
				s.Finish("timed out"),
			),
		),
		Go("thread2", S(
			Step("you can also run parallel threads in your workflow", func() error {
				s.User = "this allows you to orchestrate your workflow and run multiple asnychrounous actions in parallel"
				return nil
			}),
		)),
		s.Finish("finished successfully"),
	)
}

func (s *MyWorkflow) Finish(output string) Stmt {
	return S(
		Step("you can also build reusable workflows", func() error {
			return nil
		}),
		Step("for example if you want to execute same actions for multiple workflow steps", func() error {
			return nil
		}),
		Step("or make your workflow definion clean and readable", func() error {
			return nil
		}),
		Return(),
	)
}

type MyEvent struct {
	F func()
}

func (t MyEvent) Setup(ctx context.Context, req CallbackRequest) (json.RawMessage, error) {
	return nil, nil
}

func (t MyEvent) Teardown(ctx context.Context, req CallbackRequest) error {
	return nil
}

func (t MyEvent) Handle(ctx context.Context, req CallbackRequest, input interface{}) (interface{}, error) {
	t.F()
	return input, nil
}

func main() {
	wf := MyWorkflow{
		State: NewState("1", "empty"),
	}
	err := Resume(context.Background(), &wf, &wf.State, func(scheduleResume bool) error {
		// this is callback to save updated &wf to persistent storage
		return nil
	})
	if err != nil {
		log.Fatal(err)
	}
	_, err = HandleEvent(context.Background(), "myEvent", &wf, &wf.State, nil, func(scheduleResume bool) error {
		// this is callback to schedule another Resume() call and save updated &wf to persistent storage
		return nil
	})
	if err != nil {
		log.Fatal(err)
	}
}

Index

Examples

Constants

View Source
const MainThread = "_main_"

Variables

This section is empty.

Functions

func HandleCallback added in v0.9.0

func HandleCallback(ctx context.Context, req CallbackRequest, wf WorkflowState, s *State, input interface{}, save Checkpoint) (interface{}, error)

func HandleEvent added in v0.9.0

func HandleEvent(ctx context.Context, name string, wf WorkflowState, s *State, input interface{}, save Checkpoint) (interface{}, error)

HandleEvent is shortcut for calling HandleCallback() by event name. Be careful - if you're using goroutines - there may be multiple events waiting with the same name. Also, if you're waiting for event in a loop - event handlers from previous iterations could arrive late and trigger events for future iterations. For better control over which event will be called back - you should use OnCallback() instead and specify PC & Thread explicitly.

func Resume added in v0.9.0

func Resume(ctx context.Context, wf WorkflowState, s *State, save Checkpoint) error

Resume the workflow after - workflow creation - successful callback handling - previously failed Resume() call

This method can be called multiple times. If there's nothing to resume - it will return 'nil'

func Validate added in v0.9.6

func Validate(s Section) error

func Walk added in v0.5.0

func Walk(s Stmt, f func(s Stmt) bool) (bool, error)

Types

type BreakStmt added in v0.5.0

type BreakStmt struct {
}

func Break added in v0.5.0

func Break() BreakStmt

Break for loop

func (BreakStmt) Resume added in v0.5.0

func (s BreakStmt) Resume(ctx *ResumeContext) (*Stop, error)

type CallbackRequest added in v0.5.0

type CallbackRequest struct {
	WorkflowID string
	ThreadID   string
	Name       string
	PC         int
	SetupData  json.RawMessage
}

type Checkpoint added in v0.7.0

type Checkpoint func(scheduleResume bool) error

type ForStmt added in v0.5.0

type ForStmt struct {
	CondLabel string
	Cond      bool // nil cond for infinite loop
	Stmt      Stmt
}

func (ForStmt) Resume added in v0.5.0

func (f ForStmt) Resume(ctx *ResumeContext) (*Stop, error)

type GoStmt added in v0.5.0

type GoStmt struct {
	// ID is needed to identify threads when there are many threads running with the same name.
	// (for example they were created in a loop)
	ID   func() string
	Name string
	Stmt Stmt
}

func Go added in v0.5.0

func Go(name string, body Stmt) *GoStmt

func (*GoStmt) Resume added in v0.5.0

func (s *GoStmt) Resume(ctx *ResumeContext) (*Stop, error)

func (*GoStmt) WithID added in v0.9.0

func (s *GoStmt) WithID(id func() string) *GoStmt

type Handler added in v0.5.0

type Handler interface {
	Setup(ctx context.Context, req CallbackRequest) (json.RawMessage, error)

	Handle(ctx context.Context, req CallbackRequest, input interface{}) (interface{}, error)

	Teardown(ctx context.Context, req CallbackRequest) error
}

func FindHandler added in v0.8.6

func FindHandler(req CallbackRequest, sec Stmt) (Handler, error)

type ResumeContext added in v0.5.0

type ResumeContext struct {

	// Running means process is already resumed and we are executing statements.
	// process is not running - we are searching for the step we should resume from.
	Running bool

	// In case workflow is resumed by a callback
	Callback       CallbackRequest
	CallbackInput  interface{}
	CallbackOutput interface{}

	Return bool
	// contains filtered or unexported fields
}

ResumeContext is used during workflow execution It contains resume input as well as current state of the execution.

type ReturnStmt added in v0.5.0

type ReturnStmt struct {
}

func Return added in v0.5.0

func Return() ReturnStmt

Return stops this trhead

func (ReturnStmt) Resume added in v0.5.0

func (s ReturnStmt) Resume(ctx *ResumeContext) (*Stop, error)

type Section added in v0.5.0

type Section []Stmt

Section is similar to code block {} with a list of statements.

func S added in v0.5.0

func S(ss ...Stmt) Section

S is a syntax sugar to properly indent statement sections when using gofmt

func (Section) Resume added in v0.5.0

func (s Section) Resume(ctx *ResumeContext) (*Stop, error)

type SelectStmt added in v0.5.0

type SelectStmt struct {
	Name  string
	Cases []WaitCond
}

func Select

func Select(name string, ss ...WaitCond) SelectStmt

Select for multiple conditions and execute only one

func (SelectStmt) Resume added in v0.5.0

func (s SelectStmt) Resume(ctx *ResumeContext) (*Stop, error)

type State added in v0.5.0

type State struct {
	ID       string         // id of workflow instance
	Workflow string         // name of workflow definition. Used to choose proper state type to unmarshal & resume on
	Status   WorkflowStatus // current status
	Threads  Threads
	PC       int
}

func NewState added in v0.7.0

func NewState(id, name string) State

type Stmt added in v0.5.0

type Stmt interface {
	// Resume continues execution of the process, based on ResumeContext
	// If ctx.Running == true Resume should execute the statement or continue execution after.
	// If ctx.Running = false Resume should not execute the statement, but recursively search for the statement that needs to be resumed.
	// If it needs to be resumed - don't execute it, but continue execution from this statement.
	//
	// If stmt not found *Stop will be nil and ctx.Running will be false
	// If stmt is found, but process has finished - *Stop will be nil and ctx.Running will be true
	// Otherwise Resume should always return *Stop or err != nil
	Resume(ctx *ResumeContext) (*Stop, error)
}

Stmt is async statement definition that should support workflow resuming & search.

func For added in v0.5.0

func For(cond bool, ss ...Stmt) Stmt

type StmtStep added in v0.5.0

type StmtStep struct {
	Name   string
	Action func() error
}

func FindStep added in v0.5.0

func FindStep(name string, sec Stmt) (*StmtStep, error)

func Step added in v0.5.0

func Step(name string, action func() error) StmtStep

func (StmtStep) Resume added in v0.5.0

func (s StmtStep) Resume(ctx *ResumeContext) (*Stop, error)

type Stop added in v0.5.0

type Stop struct {
	Step   string      // execute step
	Select *SelectStmt // wait for Events
	Return bool        // stop this thread
	Cond   string      // wait for cond
}

Stop tells us that syncronous part of the workflow has finished. It means we either:

type SwitchCase added in v0.5.0

type SwitchCase struct {
	CondLabel string
	Cond      bool
	Stmt      Stmt
}

func Case

func Case(cond bool, sec Stmt) SwitchCase

func Default added in v0.5.0

func Default(sec Stmt) SwitchCase

type SwitchStmt added in v0.5.0

type SwitchStmt struct {
	Cases []SwitchCase
}

func If added in v0.5.0

func If(cond bool, sec ...Stmt) *SwitchStmt

func Switch added in v0.5.0

func Switch(ss ...SwitchCase) *SwitchStmt

execute statements based on condition

func (*SwitchStmt) Else added in v0.9.5

func (s *SwitchStmt) Else(sec ...Stmt) *SwitchStmt

func (*SwitchStmt) ElseIf added in v0.9.5

func (s *SwitchStmt) ElseIf(cond bool, sec ...Stmt) *SwitchStmt

func (*SwitchStmt) Resume added in v0.5.0

func (s *SwitchStmt) Resume(ctx *ResumeContext) (*Stop, error)

type Thread

type Thread struct {
	ID         string
	Name       string
	Status     ThreadStatus // current status
	CurStep    string       // current step
	WaitEvents []WaitEvent  // waiting for Select() stmt conditions
	Break      bool
	PC         int
}

func (*Thread) WaitingForCallback added in v0.6.0

func (t *Thread) WaitingForCallback(cb CallbackRequest) error

type ThreadStatus added in v0.5.0

type ThreadStatus string
const (
	ThreadExecuting        ThreadStatus = "Executing"        // next step for this thread is to execute "CurStep" step
	ThreadResuming         ThreadStatus = "Resuming"         // next step for this thread is to continue from "CurStep" step
	ThreadWaitingEvent     ThreadStatus = "WaitingEvent"     // thread is waiting for "CurStep" wait condition and will be resumed via OnCallback()
	ThreadWaitingCondition ThreadStatus = "WaitingCondition" // thread is waiting for condition to happen. i.e. it's waiting for other thread to update some data

)

type Threads added in v0.5.0

type Threads []*Thread

func (*Threads) Add added in v0.5.0

func (tt *Threads) Add(t *Thread) error

func (*Threads) Find added in v0.5.0

func (tt *Threads) Find(id string) (*Thread, bool)

func (*Threads) Remove added in v0.5.0

func (tt *Threads) Remove(id string)

type WG added in v0.9.6

type WG struct {
	I int
}

func (*WG) Add added in v0.9.6

func (wg *WG) Add(i int)

func (*WG) Done added in v0.9.6

func (wg *WG) Done()

func (*WG) Wait added in v0.9.6

func (wg *WG) Wait(label string) Stmt

type WaitCond added in v0.5.0

type WaitCond struct {
	Callback CallbackRequest
	Handler  Handler
	Stmt     Stmt
}

func On added in v0.5.0

func On(event string, handler Handler, stmts ...Stmt) WaitCond

type WaitEvent added in v0.5.0

type WaitEvent struct {
	Req    CallbackRequest
	Status WaitEventStatus
	Error  string
}

type WaitEventStatus added in v0.6.0

type WaitEventStatus string
const (
	EventPendingSetup    WaitEventStatus = "PendingSetup"    // event is just created
	EventSetup           WaitEventStatus = "Setup"           // event was successfully setup
	EventPendingTeardown WaitEventStatus = "PendingTeardown" // event was successfully setup
	EventSetupError      WaitEventStatus = "SetupError"      // there was an error during setup
	EventTeardownError   WaitEventStatus = "TeardownError"   // there was an error during teardown
)

type WaitStmt added in v0.9.6

type WaitStmt struct {
	CondLabel string
	Cond      bool
	Handler   func() // executed when cond is true.
}

func FindWaitingStep added in v0.9.6

func FindWaitingStep(name string, sec Stmt) (WaitStmt, error)

func Wait added in v0.5.0

func Wait(cond bool, label string, handler func()) WaitStmt

Wait statement wait for condition to be true.

func (WaitStmt) Resume added in v0.9.6

func (f WaitStmt) Resume(ctx *ResumeContext) (*Stop, error)

type WorkflowState added in v0.5.0

type WorkflowState interface {
	// Definition func may be called multiple times so it should be idempotent.
	// All actions should done in callbacks or steps.
	Definition() Section
}

WorkflowState should be a Go struct supporting JSON unmarshalling into it. When process is resumed - current state is unmarshalled into it and then Definition() is called. This is needed to eliminate lasy parameters and conditions i.e. instead of 'If( func() bool { return s.IsAvailable})' we can write 'If(s.IsAvailable)'.

type WorkflowStatus added in v0.5.0

type WorkflowStatus string
const (
	WorkflowRunning  WorkflowStatus = "Running"
	WorkflowFinished WorkflowStatus = "Finished"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL