async

package module
v0.8.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 8, 2021 License: GPL-3.0 Imports: 5 Imported by: 5

README

Gorchestrate your workflows!

async is a Go library to integrate workflow management into your application using your existing infrastructure.

I've spend 4 years to come up with this particular approach, and i'll apreciate if you'll try it out.

Here's how it looks

type MyWorkflowState struct {
    User string
    Approved bool
    Error string
}

func (s* MyWorkflowState) Definition() async.Section {
    return S(
        Step("first action", func() error {
            s.User = "You can execute arbitrary code right in the workflow definition"
            log.Printf("No need for fancy definition/action separation")
        }),
        If(s.User == "", 
            log.Printf("use if/else to branch your workflow")
            Return(nil)
        ),
        Wait("and then wait for some events",
            On("wait some time", Timeout{
                After: time.Minute*10,
            },
                Step("and then continue the workflow", func() error {
                    return nil
                }),
                s.Finish("timed out"),
            )
            On("or wait for your custom event", WaitForUserAction{
                Message: "plz approve",
                Approve: func() {
                    s.Approved = true
                    log.Printf("And handle it's results")
                },
                Deny: func(reason string) {
                    s.Approved = false
                    s.Error = reason
                },
            }),
        ),
        Go("thread2", S(
            Step("you can also run parallel threads in your workflow", func() error {
                s.User = "this allows you to orchestrate your workflow and run multiple asnychrounous actions in parallel")
            }),
        ), "g1"),
        s.Finish("finished successfully"),
    )
}

func (s *MyWorkflowState) Finish(output string) async.Stmt{
    Step("you can also build reusable workflows", func() error {
        return nil
    }).
    Step("for example if you want to execute same actions for multiple workflow steps", func() error {
        return nil
    }).
    Step("or make your workflow definion clean and readable", func() error {
        return nil
    }).
    Return()
}

Features

  • You can run this on serverless
  • You can define all your workflow actions right in the Go code
  • You can update workflow while it is running
  • Full control of how your workflows are handled

Architecture

Gorchestrate was created with being stateless in mind. This allows framework to be adaptable to any infrastructure, including Serverless applications.

Workflow is resumed(executed) by following events:

  1. Explicit call of OnCallback() method. It's called whenever you want it to, for example when custom event happens.
  2. Scheduled call of OnResume() method. We schedule such after OnCallback() method and after workflow creation.
  • OnResume() will continue workflow till point when it's waiting for events.
  • Then we execute Setup() for all events we are waiting for
  • Then we handle event via OnCallback() method
  • Then we execute Teardown() for all eents we were waiting for previously
  • Then we continue the workflow... (back to beginning)

OnCallback() will execute callback handler for the workflow and schedule OnResume() call to continue the workflow using flow above.

By using external Scheduler(for ex. Google Cloud Run) it's possible to have fully serverless workflows. This avoids infrastructure burden of monitoring and setting up servers and at the same time solves common issues related to serverless applications.

  • No need to monitor servers & scale your cluster.
  • No need for singleton patterns to avoid concurrency issues.
  • No need for cron or similar schedulers. Just create workflow with a loop.

Is it production ready?

Not yet.

API may change and there are definitely some bugs that has to be fixed.

I'd be glad if some really smart guy will help me out to make this library better.

Feedback is welcome!

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Walk added in v0.5.0

func Walk(s Stmt, f func(s Stmt) bool) bool

Types

type ActionFunc added in v0.5.0

type ActionFunc func() error

type ActionResult added in v0.5.0

type ActionResult struct {
	Success       bool
	Error         string
	Retries       int
	RetryInterval time.Duration
}

type BreakStmt added in v0.5.0

type BreakStmt struct {
}

func Break added in v0.5.0

func Break() BreakStmt

Break for loop

func (BreakStmt) Resume added in v0.5.0

func (s BreakStmt) Resume(ctx *ResumeContext) (*Stop, error)

type CallbackRequest added in v0.5.0

type CallbackRequest struct {
	Type       string
	WorkflowID string
	ThreadID   string
	Name       string
	PC         int
	Handler    interface{} `json:"-"`
}

type Checkpoint added in v0.7.0

type Checkpoint func(scheduleResume bool) error

type ForStmt added in v0.5.0

type ForStmt struct {
	CondLabel string
	Cond      bool // nil cond for infinite loop
	Stmt      Stmt
}

func (ForStmt) Resume added in v0.5.0

func (f ForStmt) Resume(ctx *ResumeContext) (*Stop, error)

type GoStmt added in v0.5.0

type GoStmt struct {
	ID   func() string
	Name string // name of goroutine
	Stmt Stmt
}

func Go added in v0.5.0

func Go(name string, body Stmt, id func() string) GoStmt

Run statements in a separate Thread

func (GoStmt) Resume added in v0.5.0

func (s GoStmt) Resume(ctx *ResumeContext) (*Stop, error)

When we meet Go stmt - we simply create threads and continue execution.

type Handler added in v0.5.0

type Handler interface {
	Type() string
	Handle(ctx context.Context, req CallbackRequest, input interface{}) (interface{}, error)
	Setup(ctx context.Context, req CallbackRequest) error
	Teardown(ctx context.Context, req CallbackRequest) error
}

func FindHandler added in v0.8.6

func FindHandler(req CallbackRequest, sec Stmt) Handler

type ResumeContext added in v0.5.0

type ResumeContext struct {

	// Running means process is already resumed and we are executing statements.
	// process is not running - we are searching for the step we should resume from.
	Running bool

	// In case workflow is resumed by a callback
	Callback       CallbackRequest
	CallbackInput  interface{}
	CallbackOutput interface{}

	Return bool
	Break  bool
	// contains filtered or unexported fields
}

ResumeContext is used during workflow execution It contains resume input as well as current state of the execution.

type ReturnStmt added in v0.5.0

type ReturnStmt struct {
}

func Return added in v0.5.0

func Return() ReturnStmt

Finish workflow and return result

func (ReturnStmt) Resume added in v0.5.0

func (s ReturnStmt) Resume(ctx *ResumeContext) (*Stop, error)

type Runner added in v0.5.0

type Runner struct {
}

func (*Runner) OnCallback added in v0.6.0

func (r *Runner) OnCallback(ctx context.Context, req CallbackRequest, wf WorkflowState, s *State, input interface{}, save Checkpoint) (interface{}, error)

func (*Runner) OnResume added in v0.7.0

func (r *Runner) OnResume(ctx context.Context, wf WorkflowState, s *State, save Checkpoint) error

func (*Runner) ResumeState added in v0.5.0

func (r *Runner) ResumeState(ctx *ResumeContext, state WorkflowState) error

Resume state with specified thread. Thread can be resumed in following cases: 1. Thread just started 2. Thread timed out - unblocked on time select 3. Thread unblocked on select - (i.e. handler/event was triggered) 4. Step has finished execution and we are resuming thread from that spot.

type Section added in v0.5.0

type Section []Stmt

Section is similar to code block {} with a list of statements.

func S added in v0.5.0

func S(ss ...Stmt) Section

S is a syntax sugar to properly indent statement sections when using gofmt

func (Section) Resume added in v0.5.0

func (s Section) Resume(ctx *ResumeContext) (*Stop, error)

for block of code - simply try to resume/exec all stmts until we get blocked somewhere

type SelectStmt added in v0.5.0

type SelectStmt struct {
	Name  string
	Cases []WaitCond
}

func Wait added in v0.5.0

func Wait(name string, ss ...WaitCond) SelectStmt

wait for multiple conditions and execute only one

func (SelectStmt) Resume added in v0.5.0

func (s SelectStmt) Resume(ctx *ResumeContext) (*Stop, error)

type State added in v0.5.0

type State struct {
	ID       string         // id of workflow instance
	Workflow string         // name of workflow definition. Used to choose proper state type to unmarshal & resume on
	Status   WorkflowStatus // current status
	Threads  Threads
	PC       int
}

func NewState added in v0.7.0

func NewState(id, name string) State

type Stmt added in v0.5.0

type Stmt interface {
	// Resume continues execution of the process, based on ResumeContext
	// It walks the tree searching for CurStep and then continues the process
	// stopping at some point or exiting at the end of it.
	// If callback not found *Stop will be nil and ctx.Running will be false
	// If callback is found, but process has finished - *Stop will be nil and ctx.Running will be true
	// Otherwise Resume should always return *Stop or err != nil
	Resume(ctx *ResumeContext) (*Stop, error)
}

Stmt is async statement definition that should support workflow resuming & search.

func FindStep added in v0.5.0

func FindStep(name string, sec Stmt) Stmt

func For added in v0.5.0

func For(cond bool, condLabel string, sec Stmt) Stmt

execute statements in the loop while condition is met

type StmtStep added in v0.5.0

type StmtStep struct {
	Name   string
	Action ActionFunc
}

func Step added in v0.5.0

func Step(name string, action ActionFunc) StmtStep

Execute step and retry it on failure.

func (StmtStep) Resume added in v0.5.0

func (s StmtStep) Resume(ctx *ResumeContext) (*Stop, error)

type Stop added in v0.5.0

type Stop struct {
	Step   string      // waiting for step execution to complete
	Select *SelectStmt // waiting for event
	Return bool        // returning from process
}

Stop tells us that syncronous part of the workflow has finished. It means we either:

type SwitchCase added in v0.5.0

type SwitchCase struct {
	CondLabel string
	Cond      bool
	Stmt      Stmt
}

func Case

func Case(cond bool, sec Stmt) SwitchCase

execute statements if ...

func Default added in v0.5.0

func Default(sec Stmt) SwitchCase

execute statements if none of previous statements matched

type SwitchStmt added in v0.5.0

type SwitchStmt []SwitchCase

func If added in v0.5.0

func If(cond bool, sec Stmt) SwitchStmt

execute statements if ...

func Switch added in v0.5.0

func Switch(ss ...SwitchCase) SwitchStmt

execute statements based on condition

func (SwitchStmt) Resume added in v0.5.0

func (s SwitchStmt) Resume(ctx *ResumeContext) (*Stop, error)

type Thread

type Thread struct {
	ID          string
	Name        string
	Status      ThreadStatus // current status
	CurStep     string       // current step of the workflow
	ExecRetries int          // TODO: better retries approach
	ExecError   string       // TODO: better retries approach
	ExecBackoff time.Time    // TODO: better retries approach
	WaitEvents  []WaitEvent  // events workflow is waiting for. Valid only if Status = Waiting, otherwise should be empty.
	PC          int
}

func (*Thread) WaitingForCallback added in v0.6.0

func (t *Thread) WaitingForCallback(cb CallbackRequest) bool

type ThreadStatus added in v0.5.0

type ThreadStatus string
const (
	ThreadExecuting ThreadStatus = "Executing"
	ThreadResuming  ThreadStatus = "Resuming"
	ThreadWaiting   ThreadStatus = "Waiting"
	ThreadPaused    ThreadStatus = "Paused"
)

type Threads added in v0.5.0

type Threads []*Thread

func (*Threads) Add added in v0.5.0

func (tt *Threads) Add(t *Thread)

func (*Threads) Find added in v0.5.0

func (tt *Threads) Find(id string) (*Thread, bool)

func (*Threads) Remove added in v0.5.0

func (tt *Threads) Remove(id string)

type WaitCond added in v0.5.0

type WaitCond struct {
	Callback CallbackRequest
	Handler  Handler

	Stmt Stmt
}

func On added in v0.5.0

func On(event string, handler Handler, stmts ...Stmt) WaitCond

type WaitEvent added in v0.5.0

type WaitEvent struct {
	Req    CallbackRequest
	Status string
	Error  string
}

type WaitEventStatus added in v0.6.0

type WaitEventStatus string
const (
	EventPending       WaitEventStatus = "Pending"       // event is just created
	EventSetup         WaitEventStatus = "Setup"         // event was successfully setup
	EventSetupError    WaitEventStatus = "SetupError"    // there was an error during setup
	EventTeardownError WaitEventStatus = "TeardownError" // there was an error during teardown
)

type WorkflowState added in v0.5.0

type WorkflowState interface {
	// Definition func may be called multiple times so it should be idempotent.
	// All actions should done in callbacks or steps.
	Definition() Section
}

WorkflowState should be a Go struct supporting JSON unmarshalling into it. When process is resumed - current state is unmarshalled into it and then Definition() is called. This is needed to eliminate lasy parameters i.e. instead of 'If( func() bool { return s.IsAvailable})' we can write 'If(s.IsAvailable)'.

type WorkflowStatus added in v0.5.0

type WorkflowStatus string
const (
	WorkflowRunning  WorkflowStatus = "Running"
	WorkflowWaiting  WorkflowStatus = "Waiting"
	WorkflowFinished WorkflowStatus = "Finished"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL