Documentation
¶
Index ¶
- Constants
- Variables
- func DefaultCollectionName(s *State) string
- func DefaultQueueNameFunc(s *State) string
- func Walk(s Stmt, f func(s Stmt) bool) bool
- type ActionFunc
- type ActionResult
- type BreakStmt
- type CBHandler
- type CallbackRequest
- type DocWorkflow
- type Empty
- type ExecuteRequest
- type ForStmt
- type GoStmt
- type Handler
- type HandlerDocs
- type ResumeContext
- type ResumeRequest
- type ReturnStmt
- type Runner
- func (r *Runner) CallbackStateHandler(s *State, req CallbackRequest) error
- func (r *Runner) ExecStep(s *State, req ExecuteRequest) error
- func (r *Runner) LockWorkflow(ctx context.Context, id string, thread string, pc int) (*State, error)
- func (r *Runner) NewWorkflow(ctx context.Context, id, name string, state interface{}) error
- func (r *Runner) ResumeState(ctx *ResumeContext) (WorkflowState, error)
- func (r *Runner) ResumeStateHandler(s *State, req ResumeRequest) error
- func (r *Runner) Router() *mux.Router
- func (r *Runner) ScheduleTasks(s *State, tID string) error
- func (r *Runner) UnlockWorkflow(ctx context.Context, id string, s *State) error
- type RunnerConfig
- type Section
- type SelectStmt
- type State
- type Stmt
- type StmtStep
- type Stop
- type SwitchCase
- type SwitchStmt
- type Thread
- type ThreadStatus
- type Threads
- type WaitCond
- type Workflow
- type WorkflowDefinition
- type WorkflowState
- type WorkflowStatus
Constants ¶
const MaxWaittTill = 999999999999
Variables ¶
var ErrDuplicate = fmt.Errorf("PC doesn't match (duplicate/out of time event)")
Functions ¶
func DefaultCollectionName ¶ added in v0.5.0
func DefaultQueueNameFunc ¶ added in v0.5.0
Types ¶
type ActionFunc ¶ added in v0.5.0
type ActionFunc func() ActionResult
type ActionResult ¶ added in v0.5.0
type CBHandler ¶ added in v0.5.0
type CBHandler struct { Handler http.HandlerFunc Docs HandlerDocs }
type CallbackRequest ¶ added in v0.5.0
type DocWorkflow ¶ added in v0.5.0
type DocWorkflow struct { Handlers map[string]HandlerDocs Input *jsonschema.Schema Output *jsonschema.Schema }
func Docs ¶ added in v0.5.0
func Docs(def WorkflowDefinition) DocWorkflow
type Empty ¶
type Empty struct { }
following function signature is allowed for exceptional use-cases, but discouraged to be used CBHandler
type ExecuteRequest ¶ added in v0.5.0
type Handler ¶ added in v0.5.0
type Handler interface{}
Handler is a generic function that is analyzed using reflection It's a convenient way to specify input/output types as well as the implementation
type HandlerDocs ¶ added in v0.5.0
type HandlerDocs struct { Input *jsonschema.Schema Output *jsonschema.Schema }
func ReflectDoc ¶ added in v0.5.0
func ReflectDoc(handler Handler, inline bool) HandlerDocs
type ResumeContext ¶ added in v0.5.0
type ResumeContext struct { Running bool // Running means process is already resumed and we are executing statements. If process is not running - we are searching for the step we should resume from. CallbackName string // In case we are resuming a Select - this is and index of the select case to resume CallbackInput json.RawMessage // In case we are resuming a Select with a callback event - this is the data to unmarshall into callback function parameters via reflect. CallbackOutput json.RawMessage // In case we are resuming a Select with a callback event - this is the data to marshall back to client in case workflow was successfully saved. Break bool // Used for loop management // contains filtered or unexported fields }
ResumeContext is used during workflow execution It contains resume input as well as current state of the execution.
type ResumeRequest ¶ added in v0.5.0
type ReturnStmt ¶ added in v0.5.0
type ReturnStmt struct {
Value interface{}
}
func Return ¶ added in v0.5.0
func Return(v interface{}) ReturnStmt
func (ReturnStmt) Resume ¶ added in v0.5.0
func (s ReturnStmt) Resume(ctx *ResumeContext) (*Stop, error)
type Runner ¶ added in v0.5.0
type Runner struct {
// contains filtered or unexported fields
}
func NewRunner ¶ added in v0.5.0
func NewRunner(cfg RunnerConfig, db *firestore.Client, tasks *cloudtasks.Service) (*Runner, error)
func (*Runner) CallbackStateHandler ¶ added in v0.5.0
func (r *Runner) CallbackStateHandler(s *State, req CallbackRequest) error
resume process using callback
func (*Runner) ExecStep ¶ added in v0.5.0
func (r *Runner) ExecStep(s *State, req ExecuteRequest) error
func (*Runner) LockWorkflow ¶ added in v0.5.0
func (*Runner) NewWorkflow ¶ added in v0.5.0
func (*Runner) ResumeState ¶ added in v0.5.0
func (r *Runner) ResumeState(ctx *ResumeContext) (WorkflowState, error)
Resume state with specified thread. Thread can be resumed in following cases: 1. Thread just started 2. Thread timed out - unblocked on time select 3. Thread unblocked on select - (i.e. handler/event was triggered) 4. Step has finished execution and we are resuming thread from that spot.
func (*Runner) ResumeStateHandler ¶ added in v0.5.0
func (r *Runner) ResumeStateHandler(s *State, req ResumeRequest) error
Resume process (after start or step that finished executing)
func (*Runner) ScheduleTasks ¶ added in v0.5.0
type RunnerConfig ¶ added in v0.5.0
type RunnerConfig struct { QueueName string Collection string BaseURL string ProjectID string LocationID string Workflows map[string]Workflow }
func (*RunnerConfig) SetDefaultsAndValidate ¶ added in v0.5.0
func (cfg *RunnerConfig) SetDefaultsAndValidate() error
type Section ¶ added in v0.5.0
type Section []Stmt
Section is similar to code block {} with a list of statements.
type SelectStmt ¶ added in v0.5.0
func Select ¶
func Select(name string, ss ...WaitCond) SelectStmt
func WaitEvent ¶ added in v0.5.0
func WaitEvent(event string, handler Handler, sec ...Stmt) SelectStmt
On waits for event to come and then resumes the workflow. If multiple conditions are specified - only one that is fired first will fire.
func WaitFor ¶ added in v0.5.0
func WaitFor(name string, d time.Duration, sec ...Stmt) SelectStmt
After waits for specified time and then resumes the workflow. If multiple conditions are specified - only one that is fired first will fire.
func (SelectStmt) Resume ¶ added in v0.5.0
func (s SelectStmt) Resume(ctx *ResumeContext) (*Stop, error)
type State ¶ added in v0.5.0
type State struct { ID string // id of workflow instance Workflow string // name of workflow definition. Used to choose proper state type to unmarshal & resume on State interface{} // json body of workflow state Status WorkflowStatus // current status Input interface{} // json input of the workflow Output interface{} // json output of the finished workflow. Valid only if Status = Finished LockTill time.Time Threads Threads PC int }
type Stmt ¶ added in v0.5.0
type Stmt interface { // Resume continues execution of the process, based on ResumeContext // It walks the tree searching for CurStep and then continues the process // stopping at some point or exiting at the end of it. // If callback not found *Stop will be nil and ctx.Running will be false // If callback is found, but process has finished - *Stop will be nil and ctx.Running will be true // Otherwise Resume should always return *Stop or err != nil Resume(ctx *ResumeContext) (*Stop, error) }
Stmt is async statement definition that should support workflow resuming & search.
type StmtStep ¶ added in v0.5.0
type StmtStep struct { Name string Action ActionFunc }
func Step ¶ added in v0.5.0
func Step(name string, action ActionFunc) StmtStep
type Stop ¶ added in v0.5.0
type Stop struct { Step string // waiting for step execution to complete Select *SelectStmt // waiting for event Return interface{} // returning from process }
Stop tells us that syncronous part of the workflow has finished. It means we either:
type SwitchCase ¶ added in v0.5.0
func Default ¶ added in v0.5.0
func Default(sec Stmt) SwitchCase
type SwitchStmt ¶ added in v0.5.0
type SwitchStmt []SwitchCase
func Switch ¶ added in v0.5.0
func Switch(ss ...SwitchCase) SwitchStmt
func (SwitchStmt) Resume ¶ added in v0.5.0
func (s SwitchStmt) Resume(ctx *ResumeContext) (*Stop, error)
type Thread ¶
type Thread struct { ID string Name string Status ThreadStatus // current status CurStep string // current step of the workflow ExecRetries int ExecError string ExecBackoff time.Time WaitEvents []string // events workflow is waiting for. Valid only if Status = Waiting, otherwise should be empty. //Receive []*ReceiveOp //Send []*SendOp PC int }
func (Thread) EventIndex ¶ added in v0.5.0
type ThreadStatus ¶ added in v0.5.0
type ThreadStatus string
const ( ThreadExecuting ThreadStatus = "Executing" ThreadResuming ThreadStatus = "Resuming" ThreadWaiting ThreadStatus = "Waiting" ThreadPaused ThreadStatus = "Paused" )
type WaitCond ¶ added in v0.5.0
type WaitCond struct { CaseAfter time.Duration // wait for time CallbackName string // wait for event CaseRecv string // wait for receive channel CaseSend string // wait for send channels CaseWait bool // wait for custom condition. evaluated during func parsing SendData json.RawMessage Handler Handler Stmt Stmt }
func After ¶ added in v0.5.0
After waits for specified time and then resumes the workflow. If multiple conditions are specified - only one that is fired first will fire.
type Workflow ¶ added in v0.2.0
type Workflow struct { Name string // used to init proper workflow state from available workflows InitState func() WorkflowState // create new workflow state object - current workflow state will be unmarshalled into it. }
Workflow defines how we create/resume our workflow state.
type WorkflowDefinition ¶ added in v0.3.3
type WorkflowState ¶ added in v0.5.0
type WorkflowState interface {
Definition() WorkflowDefinition // Return current workflow definition. This function can be called multiple times, so be careful with doing real code execution inside.
}
WorkflowState should be a Go struct supporting JSON unmarshalling into it. When process is resumed - current state is unmarshalled into it and then Workflow() is called. With such technique all usages of receiver withing Workflow() function will refer to current values, so there's no need for lasy parameters i.e. instead of 'If( func() bool { return s.IsAvailable}' we can write 'If(s.IsAvailable)'
type WorkflowStatus ¶ added in v0.5.0
type WorkflowStatus string
const ( WorkflowRunning WorkflowStatus = "Running" WorkflowFinished WorkflowStatus = "Finished" )