Documentation
¶
Overview ¶
Example ¶
package main
import (
"context"
"encoding/json"
"log"
)
type MyWorkflow struct {
State State
User string
Approved bool
Error string
}
func (s *MyWorkflow) Definition() Section {
return S(
Step("first action", func() error {
s.User = "You can execute arbitrary code right in the workflow definition"
log.Printf("No need for fancy definition/action separation")
return nil
}),
If(s.User == "", "check1",
Step("second action", func() error {
log.Printf("use if/else to branch your workflow")
return nil
}),
Return(),
),
Wait("and then wait for some events",
On("myEvent", MyEvent{
F: func() {
log.Printf("this is executed synchronously when HandleEvent() is Called")
},
},
Step("and then continue the workflow", func() error {
return nil
}),
s.Finish("timed out"),
),
),
Go("thread2", S(
Step("you can also run parallel threads in your workflow", func() error {
s.User = "this allows you to orchestrate your workflow and run multiple asnychrounous actions in parallel"
return nil
}),
)),
s.Finish("finished successfully"),
)
}
func (s *MyWorkflow) Finish(output string) Stmt {
return S(
Step("you can also build reusable workflows", func() error {
return nil
}),
Step("for example if you want to execute same actions for multiple workflow steps", func() error {
return nil
}),
Step("or make your workflow definion clean and readable", func() error {
return nil
}),
Return(),
)
}
type MyEvent struct {
F func()
}
func (t MyEvent) Setup(ctx context.Context, req CallbackRequest) (json.RawMessage, error) {
return nil, nil
}
func (t MyEvent) Teardown(ctx context.Context, req CallbackRequest, handled bool) error {
return nil
}
func (t MyEvent) Handle(ctx context.Context, req CallbackRequest, input interface{}) (interface{}, error) {
t.F()
return input, nil
}
func main() {
wf := MyWorkflow{
State: NewState("1", "empty"),
}
err := Resume(context.Background(), &wf, &wf.State, func(scheduleResume bool) error {
// this is callback to save updated &wf to persistent storage
return nil
})
if err != nil {
log.Fatal(err)
}
_, err = HandleEvent(context.Background(), "myEvent", &wf, &wf.State, func(scheduleResume bool) error {
// this is callback to schedule another Resume() call and save updated &wf to persistent storage
return nil
})
if err != nil {
log.Fatal(err)
}
}
Index ¶
- Constants
- func HandleCallback(ctx context.Context, req CallbackRequest, wf WorkflowState, s *State, ...) (interface{}, error)
- func HandleEvent(ctx context.Context, name string, wf WorkflowState, s *State, ...) (interface{}, error)
- func Resume(ctx context.Context, wf WorkflowState, s *State, save Checkpoint) error
- func Validate(s Section) error
- func Walk(s Stmt, f func(s Stmt) bool) (bool, error)
- type BreakStmt
- type CallbackRequest
- type Checkpoint
- type ContinueStmt
- type Event
- type ForStmt
- type GoStmt
- type Handler
- type ReturnStmt
- type Section
- type State
- type Stmt
- type StmtStep
- type Stop
- type SwitchCase
- type SwitchStmt
- type Thread
- type ThreadStatus
- type Threads
- type WG
- type WaitCondStmt
- type WaitEvent
- type WaitEventStatus
- type WaitEventsStmt
- type WorkflowState
- type WorkflowStatus
Examples ¶
Constants ¶
const MainThread = "_main_"
Variables ¶
This section is empty.
Functions ¶
func HandleCallback ¶ added in v0.9.0
func HandleCallback(ctx context.Context, req CallbackRequest, wf WorkflowState, s *State, input interface{}) (interface{}, error)
func HandleEvent ¶ added in v0.9.0
func HandleEvent(ctx context.Context, name string, wf WorkflowState, s *State, input interface{}) (interface{}, error)
HandleEvent is shortcut for calling HandleCallback() by event name. Be careful - if you're using goroutines - there may be multiple events waiting with the same name. Also, if you're waiting for event in a loop - event handlers from previous iterations could arrive late and trigger events for future iterations. For better control over which event will be called back - you should use OnCallback() instead and specify PC & Thread explicitly.
func Resume ¶ added in v0.9.0
func Resume(ctx context.Context, wf WorkflowState, s *State, save Checkpoint) error
Resume the workflow after - workflow creation - successful callback handling - previously failed Resume() call
This method can be called multiple times. If there's nothing to resume - it will return 'nil'
Types ¶
type BreakStmt ¶ added in v0.5.0
type BreakStmt struct {
}
func (BreakStmt) MarshalJSON ¶ added in v0.9.9
type CallbackRequest ¶ added in v0.5.0
type Checkpoint ¶ added in v0.7.0
type ContinueStmt ¶ added in v0.10.0
type ContinueStmt struct {
}
func (ContinueStmt) MarshalJSON ¶ added in v0.10.0
func (s ContinueStmt) MarshalJSON() ([]byte, error)
func (ContinueStmt) Resume ¶ added in v0.10.0
func (s ContinueStmt) Resume(ctx *resumeContext) (*Stop, error)
type Event ¶ added in v0.9.9
type Event struct {
Callback CallbackRequest
Handler Handler
Stmt Stmt
}
func (Event) MarshalJSON ¶ added in v0.9.9
type ForStmt ¶ added in v0.5.0
func (ForStmt) MarshalJSON ¶ added in v0.9.9
type GoStmt ¶ added in v0.5.0
type GoStmt struct {
// ID is needed to identify threads when there are many threads running with the same name.
// (for example they were created in a loop)
ID func() string `json:"-"`
Name string
Stmt Stmt
}
func (GoStmt) MarshalJSON ¶ added in v0.9.9
type Handler ¶ added in v0.5.0
type Handler interface {
Setup(ctx context.Context, req CallbackRequest) (json.RawMessage, error)
Handle(ctx context.Context, req CallbackRequest, input interface{}) (interface{}, error)
Teardown(ctx context.Context, req CallbackRequest, handled bool) error
}
func FindHandler ¶ added in v0.8.6
func FindHandler(req CallbackRequest, sec Stmt) (Handler, error)
type ReturnStmt ¶ added in v0.5.0
type ReturnStmt struct {
}
func (ReturnStmt) MarshalJSON ¶ added in v0.9.9
func (s ReturnStmt) MarshalJSON() ([]byte, error)
func (ReturnStmt) Resume ¶ added in v0.5.0
func (s ReturnStmt) Resume(ctx *resumeContext) (*Stop, error)
type Section ¶ added in v0.5.0
type Section []Stmt
Section is similar to code block {} with a list of statements.
type State ¶ added in v0.5.0
type State struct {
ID string // id of workflow instance
Workflow string // name of workflow definition. Used to choose proper state type to unmarshal & resume on
Status WorkflowStatus // current status
Threads Threads
PC int
}
type Stmt ¶ added in v0.5.0
type Stmt interface {
// Resume continues execution of the process, based on resumeContext
// If ctx.Running == true Resume should execute the statement or continue execution after.
// If ctx.Running = false Resume should not execute the statement, but recursively search for the statement that needs to be resumed.
// If it needs to be resumed - don't execute it, but continue execution from this statement.
//
// If stmt not found *Stop will be nil and ctx.Running will be false
// If stmt is found, but process has finished - *Stop will be nil and ctx.Running will be true
// Otherwise Resume should always return *Stop or err != nil
Resume(ctx *resumeContext) (*Stop, error)
}
Stmt is async statement definition that should support workflow resuming & search.
type StmtStep ¶ added in v0.5.0
func (StmtStep) MarshalJSON ¶ added in v0.9.9
type Stop ¶ added in v0.5.0
type Stop struct {
Step string // execute step
WaitEvents *WaitEventsStmt // wait for Events
Return bool // stop this thread
Cond string // wait for cond
}
Stop tells us that syncronous part of the workflow has finished. It means we either:
type SwitchCase ¶ added in v0.5.0
func Default ¶ added in v0.5.0
func Default(sec Stmt) SwitchCase
func (SwitchCase) MarshalJSON ¶ added in v0.9.9
func (s SwitchCase) MarshalJSON() ([]byte, error)
type SwitchStmt ¶ added in v0.5.0
type SwitchStmt struct {
Cases []SwitchCase
}
func Switch ¶ added in v0.5.0
func Switch(ss ...SwitchCase) *SwitchStmt
execute statements based on condition
func (*SwitchStmt) Else ¶ added in v0.9.5
func (s *SwitchStmt) Else(sec ...Stmt) *SwitchStmt
func (*SwitchStmt) ElseIf ¶ added in v0.9.5
func (s *SwitchStmt) ElseIf(cond bool, name string, sec ...Stmt) *SwitchStmt
func (SwitchStmt) MarshalJSON ¶ added in v0.9.9
func (s SwitchStmt) MarshalJSON() ([]byte, error)
func (*SwitchStmt) Resume ¶ added in v0.5.0
func (s *SwitchStmt) Resume(ctx *resumeContext) (*Stop, error)
type ThreadStatus ¶ added in v0.5.0
type ThreadStatus string
const ( ThreadExecuting ThreadStatus = "Executing" // next step for this thread is to execute "CurStep" step ThreadResuming ThreadStatus = "Resuming" // next step for this thread is to continue from "CurStep" step ThreadWaitingEvent ThreadStatus = "WaitingEvent" // thread is waiting for "CurStep" wait condition and will be resumed via OnCallback() ThreadWaitingCondition ThreadStatus = "WaitingCondition" // thread is waiting for condition to happen. i.e. it's waiting for other thread to update some data )
type WaitCondStmt ¶ added in v0.9.9
type WaitCondStmt struct {
Name string
Cond bool
Handler func() `json:"-"` // executed when cond is true.
}
func FindWaitingStep ¶ added in v0.9.6
func FindWaitingStep(name string, sec Stmt) (WaitCondStmt, error)
func WaitFor ¶ added in v0.5.0
func WaitFor(label string, cond bool, handler func()) WaitCondStmt
Wait statement wait for condition to be true.
func (WaitCondStmt) MarshalJSON ¶ added in v0.9.9
func (s WaitCondStmt) MarshalJSON() ([]byte, error)
func (WaitCondStmt) Resume ¶ added in v0.9.9
func (f WaitCondStmt) Resume(ctx *resumeContext) (*Stop, error)
type WaitEvent ¶ added in v0.5.0
type WaitEvent struct {
Req CallbackRequest
Status WaitEventStatus
Handled bool
Error string
}
type WaitEventStatus ¶ added in v0.6.0
type WaitEventStatus string
const ( EventPendingSetup WaitEventStatus = "PendingSetup" // event is just created EventSetup WaitEventStatus = "Setup" // event was successfully setup EventPendingTeardown WaitEventStatus = "PendingTeardown" // event was successfully setup EventSetupError WaitEventStatus = "SetupError" // there was an error during setup EventTeardownError WaitEventStatus = "TeardownError" // there was an error during teardown )
type WaitEventsStmt ¶ added in v0.9.9
func Wait ¶ added in v0.5.0
func Wait(name string, ss ...Event) WaitEventsStmt
Wait for multiple events exclusively
func (WaitEventsStmt) MarshalJSON ¶ added in v0.9.9
func (s WaitEventsStmt) MarshalJSON() ([]byte, error)
func (WaitEventsStmt) Resume ¶ added in v0.9.9
func (s WaitEventsStmt) Resume(ctx *resumeContext) (*Stop, error)
type WorkflowState ¶ added in v0.5.0
type WorkflowState interface {
// Definition func may be called multiple times so it should be idempotent.
// All actions should done in callbacks or steps.
Definition() Section
}
WorkflowState should be a Go struct supporting JSON unmarshalling into it. When process is resumed - current state is unmarshalled into it and then Definition() is called. This is needed to eliminate lasy parameters and conditions i.e. instead of 'If( func() bool { return s.IsAvailable})' we can write 'If(s.IsAvailable)'.
type WorkflowStatus ¶ added in v0.5.0
type WorkflowStatus string
const ( WorkflowRunning WorkflowStatus = "Running" WorkflowFinished WorkflowStatus = "Finished" )