Documentation
¶
Overview ¶
Example ¶
package main import ( "context" "encoding/json" "log" ) type MyWorkflow struct { State State User string Approved bool Error string } func (s *MyWorkflow) Definition() Section { return S( Step("first action", func() error { s.User = "You can execute arbitrary code right in the workflow definition" log.Printf("No need for fancy definition/action separation") return nil }), If(s.User == "", "check1", Step("second action", func() error { log.Printf("use if/else to branch your workflow") return nil }), Return(), ), Wait("and then wait for some events", On("myEvent", MyEvent{ F: func() { log.Printf("this is executed synchronously when HandleEvent() is Called") }, }, Step("and then continue the workflow", func() error { return nil }), s.Finish("timed out"), ), ), Go("thread2", S( Step("you can also run parallel threads in your workflow", func() error { s.User = "this allows you to orchestrate your workflow and run multiple asnychrounous actions in parallel" return nil }), )), s.Finish("finished successfully"), ) } func (s *MyWorkflow) Finish(output string) Stmt { return S( Step("you can also build reusable workflows", func() error { return nil }), Step("for example if you want to execute same actions for multiple workflow steps", func() error { return nil }), Step("or make your workflow definion clean and readable", func() error { return nil }), Return(), ) } type MyEvent struct { F func() } func (t MyEvent) Setup(ctx context.Context, req CallbackRequest) (json.RawMessage, error) { return nil, nil } func (t MyEvent) Teardown(ctx context.Context, req CallbackRequest, handled bool) error { return nil } func (t MyEvent) Handle(ctx context.Context, req CallbackRequest, input interface{}) (interface{}, error) { t.F() return input, nil } func main() { wf := MyWorkflow{ State: NewState("1", "empty"), } err := Resume(context.Background(), &wf, &wf.State, func(scheduleResume bool) error { // this is callback to save updated &wf to persistent storage return nil }) if err != nil { log.Fatal(err) } _, err = HandleEvent(context.Background(), "myEvent", &wf, &wf.State, nil, func(scheduleResume bool) error { // this is callback to schedule another Resume() call and save updated &wf to persistent storage return nil }) if err != nil { log.Fatal(err) } }
Index ¶
- Constants
- func HandleCallback(ctx context.Context, req CallbackRequest, wf WorkflowState, s *State, ...) (interface{}, error)
- func HandleEvent(ctx context.Context, name string, wf WorkflowState, s *State, ...) (interface{}, error)
- func Resume(ctx context.Context, wf WorkflowState, s *State, save Checkpoint) error
- func Validate(s Section) error
- func Walk(s Stmt, f func(s Stmt) bool) (bool, error)
- type BreakStmt
- type CallbackRequest
- type Checkpoint
- type Event
- type ForStmt
- type GoStmt
- type Handler
- type ReturnStmt
- type Section
- type State
- type Stmt
- type StmtStep
- type Stop
- type SwitchCase
- type SwitchStmt
- type Thread
- type ThreadStatus
- type Threads
- type WG
- type WaitCondStmt
- type WaitEvent
- type WaitEventStatus
- type WaitEventsStmt
- type WorkflowState
- type WorkflowStatus
Examples ¶
Constants ¶
const MainThread = "_main_"
Variables ¶
This section is empty.
Functions ¶
func HandleCallback ¶ added in v0.9.0
func HandleCallback(ctx context.Context, req CallbackRequest, wf WorkflowState, s *State, input interface{}, save Checkpoint) (interface{}, error)
func HandleEvent ¶ added in v0.9.0
func HandleEvent(ctx context.Context, name string, wf WorkflowState, s *State, input interface{}, save Checkpoint) (interface{}, error)
HandleEvent is shortcut for calling HandleCallback() by event name. Be careful - if you're using goroutines - there may be multiple events waiting with the same name. Also, if you're waiting for event in a loop - event handlers from previous iterations could arrive late and trigger events for future iterations. For better control over which event will be called back - you should use OnCallback() instead and specify PC & Thread explicitly.
func Resume ¶ added in v0.9.0
func Resume(ctx context.Context, wf WorkflowState, s *State, save Checkpoint) error
Resume the workflow after - workflow creation - successful callback handling - previously failed Resume() call
This method can be called multiple times. If there's nothing to resume - it will return 'nil'
Types ¶
type BreakStmt ¶ added in v0.5.0
type BreakStmt struct { }
func (BreakStmt) MarshalJSON ¶ added in v0.9.9
type CallbackRequest ¶ added in v0.5.0
type Checkpoint ¶ added in v0.7.0
type Event ¶ added in v0.9.9
type Event struct { Callback CallbackRequest Handler Handler Stmt Stmt }
func (Event) MarshalJSON ¶ added in v0.9.9
type ForStmt ¶ added in v0.5.0
func (ForStmt) MarshalJSON ¶ added in v0.9.9
type GoStmt ¶ added in v0.5.0
type GoStmt struct { // ID is needed to identify threads when there are many threads running with the same name. // (for example they were created in a loop) ID func() string `json:"-"` Name string Stmt Stmt }
func (GoStmt) MarshalJSON ¶ added in v0.9.9
type Handler ¶ added in v0.5.0
type Handler interface { Setup(ctx context.Context, req CallbackRequest) (json.RawMessage, error) Handle(ctx context.Context, req CallbackRequest, input interface{}) (interface{}, error) Teardown(ctx context.Context, req CallbackRequest, handled bool) error }
func FindHandler ¶ added in v0.8.6
func FindHandler(req CallbackRequest, sec Stmt) (Handler, error)
type ReturnStmt ¶ added in v0.5.0
type ReturnStmt struct { }
func (ReturnStmt) MarshalJSON ¶ added in v0.9.9
func (s ReturnStmt) MarshalJSON() ([]byte, error)
func (ReturnStmt) Resume ¶ added in v0.5.0
func (s ReturnStmt) Resume(ctx *resumeContext) (*Stop, error)
type Section ¶ added in v0.5.0
type Section []Stmt
Section is similar to code block {} with a list of statements.
func (Section) MarshalJSON ¶ added in v0.9.9
type State ¶ added in v0.5.0
type State struct { ID string // id of workflow instance Workflow string // name of workflow definition. Used to choose proper state type to unmarshal & resume on Status WorkflowStatus // current status Threads Threads PC int }
type Stmt ¶ added in v0.5.0
type Stmt interface { // Resume continues execution of the process, based on resumeContext // If ctx.Running == true Resume should execute the statement or continue execution after. // If ctx.Running = false Resume should not execute the statement, but recursively search for the statement that needs to be resumed. // If it needs to be resumed - don't execute it, but continue execution from this statement. // // If stmt not found *Stop will be nil and ctx.Running will be false // If stmt is found, but process has finished - *Stop will be nil and ctx.Running will be true // Otherwise Resume should always return *Stop or err != nil Resume(ctx *resumeContext) (*Stop, error) }
Stmt is async statement definition that should support workflow resuming & search.
type StmtStep ¶ added in v0.5.0
func (StmtStep) MarshalJSON ¶ added in v0.9.9
type Stop ¶ added in v0.5.0
type Stop struct { Step string // execute step WaitEvents *WaitEventsStmt // wait for Events Return bool // stop this thread Cond string // wait for cond }
Stop tells us that syncronous part of the workflow has finished. It means we either:
type SwitchCase ¶ added in v0.5.0
func Default ¶ added in v0.5.0
func Default(sec Stmt) SwitchCase
func (SwitchCase) MarshalJSON ¶ added in v0.9.9
func (s SwitchCase) MarshalJSON() ([]byte, error)
type SwitchStmt ¶ added in v0.5.0
type SwitchStmt struct {
Cases []SwitchCase
}
func Switch ¶ added in v0.5.0
func Switch(ss ...SwitchCase) *SwitchStmt
execute statements based on condition
func (*SwitchStmt) Else ¶ added in v0.9.5
func (s *SwitchStmt) Else(sec ...Stmt) *SwitchStmt
func (*SwitchStmt) ElseIf ¶ added in v0.9.5
func (s *SwitchStmt) ElseIf(cond bool, name string, sec ...Stmt) *SwitchStmt
func (SwitchStmt) MarshalJSON ¶ added in v0.9.9
func (s SwitchStmt) MarshalJSON() ([]byte, error)
func (*SwitchStmt) Resume ¶ added in v0.5.0
func (s *SwitchStmt) Resume(ctx *resumeContext) (*Stop, error)
type ThreadStatus ¶ added in v0.5.0
type ThreadStatus string
const ( ThreadExecuting ThreadStatus = "Executing" // next step for this thread is to execute "CurStep" step ThreadResuming ThreadStatus = "Resuming" // next step for this thread is to continue from "CurStep" step ThreadWaitingEvent ThreadStatus = "WaitingEvent" // thread is waiting for "CurStep" wait condition and will be resumed via OnCallback() ThreadWaitingCondition ThreadStatus = "WaitingCondition" // thread is waiting for condition to happen. i.e. it's waiting for other thread to update some data )
type WaitCondStmt ¶ added in v0.9.9
type WaitCondStmt struct { Name string Cond bool Handler func() `json:"-"` // executed when cond is true. }
func FindWaitingStep ¶ added in v0.9.6
func FindWaitingStep(name string, sec Stmt) (WaitCondStmt, error)
func WaitCond ¶ added in v0.5.0
func WaitCond(cond bool, label string, handler func()) WaitCondStmt
Wait statement wait for condition to be true.
func (WaitCondStmt) MarshalJSON ¶ added in v0.9.9
func (s WaitCondStmt) MarshalJSON() ([]byte, error)
func (WaitCondStmt) Resume ¶ added in v0.9.9
func (f WaitCondStmt) Resume(ctx *resumeContext) (*Stop, error)
type WaitEvent ¶ added in v0.5.0
type WaitEvent struct { Req CallbackRequest Status WaitEventStatus Handled bool Error string }
type WaitEventStatus ¶ added in v0.6.0
type WaitEventStatus string
const ( EventPendingSetup WaitEventStatus = "PendingSetup" // event is just created EventSetup WaitEventStatus = "Setup" // event was successfully setup EventPendingTeardown WaitEventStatus = "PendingTeardown" // event was successfully setup EventSetupError WaitEventStatus = "SetupError" // there was an error during setup EventTeardownError WaitEventStatus = "TeardownError" // there was an error during teardown )
type WaitEventsStmt ¶ added in v0.9.9
func Wait ¶ added in v0.5.0
func Wait(name string, ss ...Event) WaitEventsStmt
Wait for multiple events exclusively
func (WaitEventsStmt) MarshalJSON ¶ added in v0.9.9
func (s WaitEventsStmt) MarshalJSON() ([]byte, error)
func (WaitEventsStmt) Resume ¶ added in v0.9.9
func (s WaitEventsStmt) Resume(ctx *resumeContext) (*Stop, error)
type WorkflowState ¶ added in v0.5.0
type WorkflowState interface { // Definition func may be called multiple times so it should be idempotent. // All actions should done in callbacks or steps. Definition() Section }
WorkflowState should be a Go struct supporting JSON unmarshalling into it. When process is resumed - current state is unmarshalled into it and then Definition() is called. This is needed to eliminate lasy parameters and conditions i.e. instead of 'If( func() bool { return s.IsAvailable})' we can write 'If(s.IsAvailable)'.
type WorkflowStatus ¶ added in v0.5.0
type WorkflowStatus string
const ( WorkflowRunning WorkflowStatus = "Running" WorkflowFinished WorkflowStatus = "Finished" )