async

package module
v0.9.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 9, 2021 License: GPL-3.0 Imports: 4 Imported by: 5

README

Gorchestrate your workflows!

async is a Go library to integrate workflow management into your application using your existing infrastructure.

I've spend 4 years to come up with this particular approach, and i'll apreciate if you'll try it out.

Here's how it looks

type MyWorkflowState struct {
    User string
    Approved bool
    Error string
}

func (s* MyWorkflowState) Definition() async.Section {
    return S(
        Step("first action", func() error {
            s.User = "You can execute arbitrary code right in the workflow definition"
            log.Printf("No need for fancy definition/action separation")
        }),
        If(s.User == "", 
            log.Printf("use if/else to branch your workflow")
            Return(nil)
        ),
        Wait("and then wait for some events",
            On("wait some time", Timeout{
                After: time.Minute*10,
            },
                Step("and then continue the workflow", func() error {
                    return nil
                }),
                s.Finish("timed out"),
            )
            On("or wait for your custom event", WaitForUserAction{
                Message: "plz approve",
                Approve: func() {
                    s.Approved = true
                    log.Printf("And handle it's results")
                },
                Deny: func(reason string) {
                    s.Approved = false
                    s.Error = reason
                },
            }),
        ),
        Go("thread2", S(
            Step("you can also run parallel threads in your workflow", func() error {
                s.User = "this allows you to orchestrate your workflow and run multiple asnychrounous actions in parallel")
            }),
        ), "g1"),
        s.Finish("finished successfully"),
    )
}

func (s *MyWorkflowState) Finish(output string) async.Stmt{
    Step("you can also build reusable workflows", func() error {
        return nil
    }).
    Step("for example if you want to execute same actions for multiple workflow steps", func() error {
        return nil
    }).
    Step("or make your workflow definion clean and readable", func() error {
        return nil
    }).
    Return()
}

Features

  • You can run this on serverless
  • You can define all your workflow actions right in the Go code
  • You can update workflow while it is running
  • Full control of how your workflows are handled

Architecture

Gorchestrate was created with being stateless in mind. This allows framework to be adaptable to any infrastructure, including Serverless applications.

Workflow is resumed(executed) by following events:

  1. Explicit call of OnCallback() method. It's called whenever you want it to, for example when custom event happens.
  2. Scheduled call of OnResume() method. We schedule such after OnCallback() method and after workflow creation.
  • OnResume() will continue workflow till point when it's waiting for events.
  • Then we execute Setup() for all events we are waiting for
  • Then we handle event via OnCallback() method
  • Then we execute Teardown() for all eents we were waiting for previously
  • Then we continue the workflow... (back to beginning)

OnCallback() will execute callback handler for the workflow and schedule OnResume() call to continue the workflow using flow above.

By using external Scheduler(for ex. Google Cloud Run) it's possible to have fully serverless workflows. This avoids infrastructure burden of monitoring and setting up servers and at the same time solves common issues related to serverless applications.

  • No need to monitor servers & scale your cluster.
  • No need for singleton patterns to avoid concurrency issues.
  • No need for cron or similar schedulers. Just create workflow with a loop.

Is it production ready?

Not yet.

API may change and there are definitely some bugs that has to be fixed.

I'd be glad if some really smart guy will help me out to make this library better.

Feedback is welcome!

Documentation

Index

Constants

View Source
const MainThread = "_main_"

Variables

This section is empty.

Functions

func HandleCallback added in v0.9.0

func HandleCallback(ctx context.Context, req CallbackRequest, wf WorkflowState, s *State, input interface{}, save Checkpoint) (interface{}, error)

func HandleEvent added in v0.9.0

func HandleEvent(ctx context.Context, name string, wf WorkflowState, s *State, input interface{}, save Checkpoint) (interface{}, error)

HandleEvent is shortcut for calling HandleCallback() by event name. Be careful - if you have events with the same name waiting - you will not have control over which event will be called back. If this is important for you - you should use OnCallback() instead

func Resume added in v0.9.0

func Resume(ctx context.Context, wf WorkflowState, s *State, save Checkpoint) error

Resume the workflow after - workflow creation - successful callback handling - previously failed Resume() call

This method can be called multiple times. If there's nothing to resume - it will return 'nil'

func Walk added in v0.5.0

func Walk(s Stmt, f func(s Stmt) bool) (bool, error)

Types

type BreakStmt added in v0.5.0

type BreakStmt struct {
}

func Break added in v0.5.0

func Break() BreakStmt

Break for loop

func (BreakStmt) Resume added in v0.5.0

func (s BreakStmt) Resume(ctx *ResumeContext) (*Stop, error)

type CallbackRequest added in v0.5.0

type CallbackRequest struct {
	WorkflowID string
	ThreadID   string
	Name       string
	PC         int
	SetupData  json.RawMessage
}

type Checkpoint added in v0.7.0

type Checkpoint func(scheduleResume bool) error

type ForStmt added in v0.5.0

type ForStmt struct {
	CondLabel string
	Cond      bool // nil cond for infinite loop
	Stmt      Stmt
}

func (ForStmt) Resume added in v0.5.0

func (f ForStmt) Resume(ctx *ResumeContext) (*Stop, error)

type GoStmt added in v0.5.0

type GoStmt struct {
	// ID is needed to identify threads when there are many threads running with the same name.
	// (for example they were created in a loop)
	ID   func() string
	Name string
	Stmt Stmt
}

func Go added in v0.5.0

func Go(name string, body Stmt) GoStmt

func (GoStmt) Resume added in v0.5.0

func (s GoStmt) Resume(ctx *ResumeContext) (*Stop, error)

func (*GoStmt) WithID added in v0.9.0

func (s *GoStmt) WithID(id func() string)

type Handler added in v0.5.0

type Handler interface {
	Setup(ctx context.Context, req CallbackRequest) (json.RawMessage, error)

	Handle(ctx context.Context, req CallbackRequest, input interface{}) (interface{}, error)

	Teardown(ctx context.Context, req CallbackRequest) error
}

func FindHandler added in v0.8.6

func FindHandler(req CallbackRequest, sec Stmt) (Handler, error)

type ResumeContext added in v0.5.0

type ResumeContext struct {

	// Running means process is already resumed and we are executing statements.
	// process is not running - we are searching for the step we should resume from.
	Running bool

	// In case workflow is resumed by a callback
	Callback       CallbackRequest
	CallbackInput  interface{}
	CallbackOutput interface{}

	Return bool
	Break  bool
	// contains filtered or unexported fields
}

ResumeContext is used during workflow execution It contains resume input as well as current state of the execution.

type ReturnStmt added in v0.5.0

type ReturnStmt struct {
}

func Return added in v0.5.0

func Return() ReturnStmt

Return stops this trhead

func (ReturnStmt) Resume added in v0.5.0

func (s ReturnStmt) Resume(ctx *ResumeContext) (*Stop, error)

type Section added in v0.5.0

type Section []Stmt

Section is similar to code block {} with a list of statements.

func S added in v0.5.0

func S(ss ...Stmt) Section

S is a syntax sugar to properly indent statement sections when using gofmt

func (Section) Resume added in v0.5.0

func (s Section) Resume(ctx *ResumeContext) (*Stop, error)

type SelectStmt added in v0.5.0

type SelectStmt struct {
	Name  string
	Cases []WaitCond
}

func Wait added in v0.5.0

func Wait(name string, ss ...WaitCond) SelectStmt

Wait for multiple conditions and execute only one

func (SelectStmt) Resume added in v0.5.0

func (s SelectStmt) Resume(ctx *ResumeContext) (*Stop, error)

type State added in v0.5.0

type State struct {
	ID       string         // id of workflow instance
	Workflow string         // name of workflow definition. Used to choose proper state type to unmarshal & resume on
	Status   WorkflowStatus // current status
	Threads  Threads
	PC       int
}

func NewState added in v0.7.0

func NewState(id, name string) State

type Stmt added in v0.5.0

type Stmt interface {
	// Resume continues execution of the process, based on ResumeContext
	// If ctx.Running == true Resume should execute the statement or continue execution after.
	// If ctx.Running = false Resume should not execute the statement, but recursively search for the statement that needs to be resumed.
	// If it needs to be resumed - don't execute it, but continue execution from this statement.
	//
	// If stmt not found *Stop will be nil and ctx.Running will be false
	// If stmt is found, but process has finished - *Stop will be nil and ctx.Running will be true
	// Otherwise Resume should always return *Stop or err != nil
	Resume(ctx *ResumeContext) (*Stop, error)
}

Stmt is async statement definition that should support workflow resuming & search.

func FindStep added in v0.5.0

func FindStep(name string, sec Stmt) Stmt

func For added in v0.5.0

func For(cond bool, condLabel string, sec Stmt) Stmt

type StmtStep added in v0.5.0

type StmtStep struct {
	Name   string
	Action func() error
}

func Step added in v0.5.0

func Step(name string, action func() error) StmtStep

func (StmtStep) Resume added in v0.5.0

func (s StmtStep) Resume(ctx *ResumeContext) (*Stop, error)

type Stop added in v0.5.0

type Stop struct {
	Step   string      // execute step
	Select *SelectStmt // wait for Events
	Return bool        // stop this thread
}

Stop tells us that syncronous part of the workflow has finished. It means we either:

type SwitchCase added in v0.5.0

type SwitchCase struct {
	CondLabel string
	Cond      bool
	Stmt      Stmt
}

func Case

func Case(cond bool, sec Stmt) SwitchCase

func Default added in v0.5.0

func Default(sec Stmt) SwitchCase

type SwitchStmt added in v0.5.0

type SwitchStmt []SwitchCase

func If added in v0.5.0

func If(cond bool, sec Stmt) SwitchStmt

func Switch added in v0.5.0

func Switch(ss ...SwitchCase) SwitchStmt

execute statements based on condition

func (SwitchStmt) Resume added in v0.5.0

func (s SwitchStmt) Resume(ctx *ResumeContext) (*Stop, error)

type Thread

type Thread struct {
	ID         string
	Name       string
	Status     ThreadStatus // current status
	CurStep    string       // current step of the workflow
	WaitEvents []WaitEvent  // events workflow is waiting for. Valid only if Status = Waiting, otherwise should be empty.
	PC         int
}

func (*Thread) WaitingForCallback added in v0.6.0

func (t *Thread) WaitingForCallback(cb CallbackRequest) error

type ThreadStatus added in v0.5.0

type ThreadStatus string
const (
	ThreadExecuting ThreadStatus = "Executing" // next step for this thread is to execute "CurStep" step
	ThreadResuming  ThreadStatus = "Resuming"  // next step for this thread is to continue from "CurStep" step
	ThreadWaiting   ThreadStatus = "Waiting"   // thread is waiting for "CurStep" wait condition and will be resumed via OnCallback()
)

type Threads added in v0.5.0

type Threads []*Thread

func (*Threads) Add added in v0.5.0

func (tt *Threads) Add(t *Thread) error

func (*Threads) Find added in v0.5.0

func (tt *Threads) Find(id string) (*Thread, bool)

func (*Threads) Remove added in v0.5.0

func (tt *Threads) Remove(id string)

type WaitCond added in v0.5.0

type WaitCond struct {
	Callback CallbackRequest
	Handler  Handler
	Stmt     Stmt
}

func On added in v0.5.0

func On(event string, handler Handler, stmts ...Stmt) WaitCond

type WaitEvent added in v0.5.0

type WaitEvent struct {
	Req    CallbackRequest
	Status WaitEventStatus
	Error  string
}

type WaitEventStatus added in v0.6.0

type WaitEventStatus string
const (
	EventPending       WaitEventStatus = "Pending"       // event is just created
	EventSetup         WaitEventStatus = "Setup"         // event was successfully setup
	EventSetupError    WaitEventStatus = "SetupError"    // there was an error during setup
	EventTeardownError WaitEventStatus = "TeardownError" // there was an error during teardown
)

type WorkflowState added in v0.5.0

type WorkflowState interface {
	// Definition func may be called multiple times so it should be idempotent.
	// All actions should done in callbacks or steps.
	Definition() Section
}

WorkflowState should be a Go struct supporting JSON unmarshalling into it. When process is resumed - current state is unmarshalled into it and then Definition() is called. This is needed to eliminate lasy parameters i.e. instead of 'If( func() bool { return s.IsAvailable})' we can write 'If(s.IsAvailable)'.

type WorkflowStatus added in v0.5.0

type WorkflowStatus string
const (
	WorkflowRunning  WorkflowStatus = "Running"
	WorkflowFinished WorkflowStatus = "Finished"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL