async

package module
v0.5.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2021 License: GPL-3.0 Imports: 14 Imported by: 5

README

Gorchestrate Go SDK

Usage example: https://github.com/gorchestrate/pizzaapp

Architecture

Using async.Manage your service will publish it's API to Gorchestrate Core and execute new or existing processes. Process is defined as a Go struct with methods, that will be called when process starts/unblocks. On initial run 'Start' method is called.

When method is called it could:

  • execute arbitrary code
  • call other Gorchestrate API's (aka func())
  • specify blocking conditions (aka select{} or)
  • create channels (aka make chan(type))
  • create new threads for existing process (aka go func(){})
  • finish process with result. This will stop execution of the process and unblock all selects aquired by this process

All semantics for operations are similar to Go ones and have same guarantees.

  • Select operation is exlusive and have same linearized consistency guarantees (All events are strictly ordered using HLC clock)
  • You can close channels, create buffered channels, pass channels inside channels.
  • You can have multiple Threads(goroutines) running in your process. They will always be executed(unblocked) exlusively one after another.
  • You can specify <-time.After() conditions in select statement.

All state management is done on Gorchestrate Core side. Process is locked, sent for execution, processed by service and then unlocked.

Documentation

Index

Constants

View Source
const MaxWaittTill = 999999999999

Variables

View Source
var ErrDuplicate = fmt.Errorf("PC doesn't match (duplicate/out of time event)")

Functions

func DefaultCollectionName added in v0.5.0

func DefaultCollectionName(s *State) string

func DefaultQueueNameFunc added in v0.5.0

func DefaultQueueNameFunc(s *State) string

func Walk added in v0.5.0

func Walk(s Stmt, f func(s Stmt) bool) bool

Types

type ActionFunc added in v0.5.0

type ActionFunc func() ActionResult

type ActionResult added in v0.5.0

type ActionResult struct {
	Success       bool
	Error         string
	Retries       int
	RetryInterval time.Duration
}

type BreakStmt added in v0.5.0

type BreakStmt struct {
}

func Break added in v0.5.0

func Break() BreakStmt

func (BreakStmt) Resume added in v0.5.0

func (s BreakStmt) Resume(ctx *ResumeContext) (*Stop, error)

type CBHandler added in v0.5.0

type CBHandler struct {
	Handler http.HandlerFunc
	Docs    HandlerDocs
}

type CallbackRequest added in v0.5.0

type CallbackRequest struct {
	WorkflowID string
	PC         int    // Make sure not actions were made while waiting for timeout
	ThreadID   string // Thread to resume
	Callback   string
}

type DocWorkflow added in v0.5.0

type DocWorkflow struct {
	Handlers map[string]HandlerDocs
	Input    *jsonschema.Schema
	Output   *jsonschema.Schema
}

func Docs added in v0.5.0

type Empty

type Empty struct {
}

following function signature is allowed for exceptional use-cases, but discouraged to be used CBHandler

type ExecuteRequest added in v0.5.0

type ExecuteRequest struct {
	WorkflowID string
	PC         int    // Make sure not actions were made while waiting for Execute
	ThreadID   string // Thread to resume
	Step       string // step to execute
}

type ForStmt added in v0.5.0

type ForStmt struct {
	CondLabel string
	Cond      bool // nil cond for infinite loop
	Stmt      Stmt
}

func (ForStmt) Resume added in v0.5.0

func (f ForStmt) Resume(ctx *ResumeContext) (*Stop, error)

type GoStmt added in v0.5.0

type GoStmt struct {
	ID   func() string
	Name string // name of goroutine
	Stmt Stmt
}

func Go added in v0.5.0

func Go(name string, body Stmt, id func() string) GoStmt

func (GoStmt) Resume added in v0.5.0

func (s GoStmt) Resume(ctx *ResumeContext) (*Stop, error)

When we meet Go stmt - we simply create threads and continue execution.

type Handler added in v0.5.0

type Handler interface{}

Handler is a generic function that is analyzed using reflection It's a convenient way to specify input/output types as well as the implementation

type HandlerDocs added in v0.5.0

type HandlerDocs struct {
	Input  *jsonschema.Schema
	Output *jsonschema.Schema
}

func ReflectDoc added in v0.5.0

func ReflectDoc(handler Handler, inline bool) HandlerDocs

type ResumeContext added in v0.5.0

type ResumeContext struct {
	Running bool // Running means process is already resumed and we are executing statements. If process is not running - we are searching for the step we should resume from.

	CallbackName   string          // In case we are resuming a Select - this is and index of the select case to resume
	CallbackInput  json.RawMessage // In case we are resuming a Select with a callback event - this is the data to unmarshall into callback function parameters via reflect.
	CallbackOutput json.RawMessage // In case we are resuming a Select with a callback event - this is the data to marshall back to client in case workflow was successfully saved.
	Break          bool            // Used for loop management
	// contains filtered or unexported fields
}

ResumeContext is used during workflow execution It contains resume input as well as current state of the execution.

type ResumeRequest added in v0.5.0

type ResumeRequest struct {
	WorkflowID string
	ThreadID   string
	PC         int
}

type ReturnStmt added in v0.5.0

type ReturnStmt struct {
	Value interface{}
}

func Return added in v0.5.0

func Return(v interface{}) ReturnStmt

func (ReturnStmt) Resume added in v0.5.0

func (s ReturnStmt) Resume(ctx *ResumeContext) (*Stop, error)

type Runner added in v0.5.0

type Runner struct {
	// contains filtered or unexported fields
}

func NewRunner added in v0.5.0

func NewRunner(cfg RunnerConfig, db *firestore.Client, tasks *cloudtasks.Service) (*Runner, error)

func (*Runner) CallbackStateHandler added in v0.5.0

func (r *Runner) CallbackStateHandler(s *State, req CallbackRequest) error

resume process using callback

func (*Runner) ExecStep added in v0.5.0

func (r *Runner) ExecStep(s *State, req ExecuteRequest) error

func (*Runner) LockWorkflow added in v0.5.0

func (r *Runner) LockWorkflow(ctx context.Context, id string, thread string, pc int) (*State, error)

func (*Runner) NewWorkflow added in v0.5.0

func (r *Runner) NewWorkflow(ctx context.Context, id, name string, state interface{}) error

func (*Runner) ResumeState added in v0.5.0

func (r *Runner) ResumeState(ctx *ResumeContext) (WorkflowState, error)

Resume state with specified thread. Thread can be resumed in following cases: 1. Thread just started 2. Thread timed out - unblocked on time select 3. Thread unblocked on select - (i.e. handler/event was triggered) 4. Step has finished execution and we are resuming thread from that spot.

func (*Runner) ResumeStateHandler added in v0.5.0

func (r *Runner) ResumeStateHandler(s *State, req ResumeRequest) error

Resume process (after start or step that finished executing)

func (*Runner) Router added in v0.5.0

func (r *Runner) Router() *mux.Router

TODO: new workflow!?

func (*Runner) ScheduleTasks added in v0.5.0

func (r *Runner) ScheduleTasks(s *State, tID string) error

func (*Runner) UnlockWorkflow added in v0.5.0

func (r *Runner) UnlockWorkflow(ctx context.Context, id string, s *State) error

type RunnerConfig added in v0.5.0

type RunnerConfig struct {
	QueueName  string
	Collection string
	BaseURL    string
	ProjectID  string
	LocationID string
	Workflows  map[string]Workflow
}

func (*RunnerConfig) SetDefaultsAndValidate added in v0.5.0

func (cfg *RunnerConfig) SetDefaultsAndValidate() error

type Section added in v0.5.0

type Section []Stmt

Section is similar to code block {} with a list of statements.

func S added in v0.5.0

func S(ss ...Stmt) Section

S is a syntax sugar to properly align statement sections

func (Section) Resume added in v0.5.0

func (s Section) Resume(ctx *ResumeContext) (*Stop, error)

for block of code - simply try to resume/exec all stmts until we get blocked somewhere

type SelectStmt added in v0.5.0

type SelectStmt struct {
	Name  string
	Cases []WaitCond
}

func Select

func Select(name string, ss ...WaitCond) SelectStmt

func WaitEvent added in v0.5.0

func WaitEvent(event string, handler Handler, sec ...Stmt) SelectStmt

On waits for event to come and then resumes the workflow. If multiple conditions are specified - only one that is fired first will fire.

func WaitFor added in v0.5.0

func WaitFor(name string, d time.Duration, sec ...Stmt) SelectStmt

After waits for specified time and then resumes the workflow. If multiple conditions are specified - only one that is fired first will fire.

func (SelectStmt) Resume added in v0.5.0

func (s SelectStmt) Resume(ctx *ResumeContext) (*Stop, error)

type State added in v0.5.0

type State struct {
	ID       string         // id of workflow instance
	Workflow string         // name of workflow definition. Used to choose proper state type to unmarshal & resume on
	State    interface{}    // json body of workflow state
	Status   WorkflowStatus // current status
	Input    interface{}    // json input of the workflow
	Output   interface{}    // json output of the finished workflow. Valid only if Status = Finished
	LockTill time.Time
	Threads  Threads
	PC       int
}

type Stmt added in v0.5.0

type Stmt interface {
	// Resume continues execution of the process, based on ResumeContext
	// It walks the tree searching for CurStep and then continues the process
	// stopping at some point or exiting at the end of it.
	// If callback not found *Stop will be nil and ctx.Running will be false
	// If callback is found, but process has finished - *Stop will be nil and ctx.Running will be true
	// Otherwise Resume should always return *Stop or err != nil
	Resume(ctx *ResumeContext) (*Stop, error)
}

Stmt is async statement definition that should support workflow resuming & search.

func FindStep added in v0.5.0

func FindStep(name string, sec Stmt) Stmt

func For added in v0.5.0

func For(cond bool, condLabel string, sec Stmt) Stmt

type StmtStep added in v0.5.0

type StmtStep struct {
	Name   string
	Action ActionFunc
}

func Step added in v0.5.0

func Step(name string, action ActionFunc) StmtStep

func (StmtStep) Resume added in v0.5.0

func (s StmtStep) Resume(ctx *ResumeContext) (*Stop, error)

type Stop added in v0.5.0

type Stop struct {
	Step   string      // waiting for step execution to complete
	Select *SelectStmt // waiting for event
	Return interface{} // returning from process

}

Stop tells us that syncronous part of the workflow has finished. It means we either:

type SwitchCase added in v0.5.0

type SwitchCase struct {
	CondLabel string
	Cond      bool
	Stmt      Stmt
}

func Case

func Case(cond bool, condLabel string, sec Stmt) SwitchCase

func Default added in v0.5.0

func Default(sec Stmt) SwitchCase

type SwitchStmt added in v0.5.0

type SwitchStmt []SwitchCase

func If added in v0.5.0

func If(cond bool, condLabel string, sec Stmt) SwitchStmt

func Switch added in v0.5.0

func Switch(ss ...SwitchCase) SwitchStmt

func (SwitchStmt) Resume added in v0.5.0

func (s SwitchStmt) Resume(ctx *ResumeContext) (*Stop, error)

type Thread

type Thread struct {
	ID          string
	Name        string
	Status      ThreadStatus // current status
	CurStep     string       // current step of the workflow
	ExecRetries int
	ExecError   string
	ExecBackoff time.Time
	WaitEvents  []string // events workflow is waiting for. Valid only if Status = Waiting, otherwise should be empty.
	//Receive     []*ReceiveOp
	//Send        []*SendOp
	PC int
}

func (Thread) EventIndex added in v0.5.0

func (t Thread) EventIndex(name string) int

type ThreadStatus added in v0.5.0

type ThreadStatus string
const (
	ThreadExecuting ThreadStatus = "Executing"
	ThreadResuming  ThreadStatus = "Resuming"
	ThreadWaiting   ThreadStatus = "Waiting"
	ThreadPaused    ThreadStatus = "Paused"
)

type Threads added in v0.5.0

type Threads []*Thread

func (*Threads) Add added in v0.5.0

func (tt *Threads) Add(t *Thread)

func (*Threads) Find added in v0.5.0

func (tt *Threads) Find(id string) (*Thread, bool)

func (*Threads) Remove added in v0.5.0

func (tt *Threads) Remove(id string)

type WaitCond added in v0.5.0

type WaitCond struct {
	CaseAfter    time.Duration // wait for time
	CallbackName string        // wait for event
	CaseRecv     string        // wait for receive channel
	CaseSend     string        // wait for send channels
	CaseWait     bool          // wait for custom condition. evaluated during func parsing
	SendData     json.RawMessage

	Handler Handler

	Stmt Stmt
}

func After added in v0.5.0

func After(d time.Duration, sec Stmt) WaitCond

After waits for specified time and then resumes the workflow. If multiple conditions are specified - only one that is fired first will fire.

func On added in v0.5.0

func On(event string, handler Handler, sec Stmt) WaitCond

On waits for event to come and then resumes the workflow. If multiple conditions are specified - only one that is fired first will fire.

func Wait added in v0.5.0

func Wait(event string, cond bool, sec Stmt) WaitCond

type Workflow added in v0.2.0

type Workflow struct {
	Name      string               // used to init proper workflow state from available workflows
	InitState func() WorkflowState // create new workflow state object - current workflow state will be unmarshalled into it.
}

Workflow defines how we create/resume our workflow state.

type WorkflowDefinition added in v0.3.3

type WorkflowDefinition struct {
	// New is called when the workflow is created
	// It's also used to construct the API definition for input/output
	New Handler

	// Body is the asyncronous part of the workflow
	Body Section
}

type WorkflowState added in v0.5.0

type WorkflowState interface {
	Definition() WorkflowDefinition // Return current workflow definition. This function can be called multiple times, so be careful with doing real code execution inside.
}

WorkflowState should be a Go struct supporting JSON unmarshalling into it. When process is resumed - current state is unmarshalled into it and then Workflow() is called. With such technique all usages of receiver withing Workflow() function will refer to current values, so there's no need for lasy parameters i.e. instead of 'If( func() bool { return s.IsAvailable}' we can write 'If(s.IsAvailable)'

type WorkflowStatus added in v0.5.0

type WorkflowStatus string
const (
	WorkflowRunning  WorkflowStatus = "Running"
	WorkflowFinished WorkflowStatus = "Finished"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL