fsm

package
v0.0.0-...-8e6450d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2020 License: MIT Imports: 22 Imported by: 0

Documentation

Overview

Package fsm layers an erlang/akka style finite state machine abstraction on top of SWF, and facilitates modeling your workflows as FSMs. The FSM will be responsible for handling the decision tasks in your workflow that implicitly model it.

The FSM takes care of serializing/deserializing and threading a data model through the workflow history for you, as well as serialization/deserialization of any payloads in events your workflows recieve, as well as optionally sending the data model snapshots to kinesis, to facilitate a CQRS style application where the query models will be built off the Kinesis stream.

From http://www.erlang.org/doc/design_principles/fsm.html, a finite state machine, or FSM, can be described as a set of relations of the form:

State(S) x Event(E) -> Actions(A), State(S')

Substituting the relevant SWF/swf4go concepts, we get

(Your main data struct) x (an swf.HistoryEvent) -> (zero or more swf.Decisions), (A possibly updated main data struct)

See the http://godoc.org/github.com/sclasen/swfsm/fsm#example-FSM for a complete usage example.

Index

Examples

Constants

View Source
const (
	FilterStatusAll                  = "ALL"                    // open + closed
	FilterStatusOpen                 = "OPEN"                   // open only
	FilterStatusOpenPriority         = "OPEN_PRIORITY"          // open (+ closed, only if open is totally empty)
	FilterStatusOpenPriorityWorkflow = "OPEN_PRIORITY_WORKFLOW" // open (+ closed, only if open not present workflow-by-workflow)
	FilterStatusClosed               = "CLOSED"                 // closed only
)
View Source
const (
	StateMarker       = "FSM.State"
	CorrelatorMarker  = "FSM.Correlator"
	ErrorMarker       = "FSM.Error"
	RepiarStateSignal = "FSM.RepairState"
	ContinueTimer     = "FSM.ContinueWorkflow"
	ContinueSignal    = "FSM.ContinueWorkflow"
	CompleteState     = "complete"
	CanceledState     = "canceled"
	FailedState       = "failed"
	ErrorState        = "error"
	//the FSM was not configured with a state named in an outcome.
	FSMErrorMissingState = "ErrorMissingFsmState"
	//the FSM encountered an erryor while serializaing stateData
	FSMErrorStateSerialization = "ErrorStateSerialization"
	//the FSM encountered an erryor while deserializaing stateData
	FSMErrorStateDeserialization = "ErrorStateDeserialization"
	//the FSM encountered an erryor while deserializaing stateData
	FSMErrorCorrelationDeserialization = "ErrorCorrelationDeserialization"
	//Signal sent when a Long Lived Worker Start()
	ActivityStartedSignal = "FSM.ActivityStarted"
	//Signal send when long Lived worker sends an update from Work()
	ActivityUpdatedSignal = "FSM.ActivityUpdated"
)

constants used as marker names or signal names

Variables

This section is empty.

Functions

func CloseDecisionIncompatableDecisionTypes

func CloseDecisionIncompatableDecisionTypes() []string

func CloseDecisionTypes

func CloseDecisionTypes() []string

func GetTagsIfTaggable

func GetTagsIfTaggable(data interface{}) []*string

func NewHistorySegmentor

func NewHistorySegmentor(c *client) *historySegmentor

func StartFSMWorkflowInput

func StartFSMWorkflowInput(serializer Serialization, data interface{}) *string

StartFSMWorkflowInput should be used to construct the input for any StartWorkflowExecutionRequests. This panics on errors cause really this should never err.

Types

type ActivityInfo

type ActivityInfo struct {
	ActivityId string
	*swf.ActivityType
	Input *string
}

ActivityInfo holds the ActivityId and ActivityType for an activity

type BoundedGoroutineDispatcher

type BoundedGoroutineDispatcher struct {
	NumGoroutines int
	// contains filtered or unexported fields
}

BoundedGoroutineDispatcher is a DecisionTaskDispatcher that uses a bounded number of goroutines to run decision handlers.

func (*BoundedGoroutineDispatcher) DispatchTask

DispatchTask calls sends the task on a channel that NumGoroutines goroutines are selecting on. Goroutines recieving a task run it in the same goroutine. note that this is unsynchronized as DispatchTask will only be called by the single poller goroutine.

type CallingGoroutineDispatcher

type CallingGoroutineDispatcher struct{}

CallingGoroutineDispatcher is a DecisionTaskDispatcher that runs the decision handler in the polling goroutine

func (*CallingGoroutineDispatcher) DispatchTask

DispatchTask calls the handler in the same goroutine.

type CancellationInfo

type CancellationInfo struct {
	Control    *string
	WorkflowId string
}

CancellationInfo holds the Control data and workflow that was being canceled

type ChildInfo

type ChildInfo struct {
	WorkflowId string
	Input      *string
	*swf.WorkflowType
}

ChildInfo holds the Input data and Workflow info for the child workflow being started

type ClientSWFOps

type ClientSWFOps interface {
	ListOpenWorkflowExecutions(req *swf.ListOpenWorkflowExecutionsInput) (resp *swf.WorkflowExecutionInfos, err error)
	ListClosedWorkflowExecutions(req *swf.ListClosedWorkflowExecutionsInput) (resp *swf.WorkflowExecutionInfos, err error)
	GetWorkflowExecutionHistory(req *swf.GetWorkflowExecutionHistoryInput) (resp *swf.GetWorkflowExecutionHistoryOutput, err error)
	GetWorkflowExecutionHistoryPages(input *swf.GetWorkflowExecutionHistoryInput, fn func(p *swf.GetWorkflowExecutionHistoryOutput, lastPage bool) (shouldContinue bool)) error
	SignalWorkflowExecution(req *swf.SignalWorkflowExecutionInput) (resp *swf.SignalWorkflowExecutionOutput, err error)
	StartWorkflowExecution(req *swf.StartWorkflowExecutionInput) (resp *swf.StartWorkflowExecutionOutput, err error)
	TerminateWorkflowExecution(req *swf.TerminateWorkflowExecutionInput) (resp *swf.TerminateWorkflowExecutionOutput, err error)
	RequestCancelWorkflowExecution(req *swf.RequestCancelWorkflowExecutionInput) (resp *swf.RequestCancelWorkflowExecutionOutput, err error)
}

type ComposedDecider

type ComposedDecider struct {
	// contains filtered or unexported fields
}

ComposedDecider can be used to build a decider out of a number of sub Deciders the sub deciders should return Pass when they dont wish to handle an event.

Example
//to reduce boilerplate you can create reusable components to compose Deciders with,
//that use functions that have the dataType of your FSM.
typedFuncs := Typed(new(TestingType))

//for example. reduced boilerplate for the retry of failed activities.
//first, you would have one of these typed DecisionFuncs for each activity decision type you create.
fooActivityDecision := func(ctx *FSMContext, h *swf.HistoryEvent, data *TestingType) *swf.Decision {
	return &swf.Decision{
		DecisionType: aws.String(swf.DecisionTypeScheduleActivityTask),
		ScheduleActivityTaskDecisionAttributes: &swf.ScheduleActivityTaskDecisionAttributes{
			ActivityType: &swf.ActivityType{Name: aws.String("foo-activity"), Version: aws.String("1")},
		},
	}
}

barActivityDecision := func(ctx *FSMContext, h *swf.HistoryEvent, data *TestingType) *swf.Decision {
	return &swf.Decision{
		DecisionType: aws.String(swf.DecisionTypeScheduleActivityTask),
		ScheduleActivityTaskDecisionAttributes: &swf.ScheduleActivityTaskDecisionAttributes{
			ActivityType: &swf.ActivityType{Name: aws.String("bar-activity"), Version: aws.String("1")},
		},
	}
}

// optionally a type alias for your 'typed' decision fn.
// if you dont do this the retryFailedActivities below will need to be
// func(activityName string, activityFn interface{})
// instead of
// func(activityName string, activityFn TestingTypeDecisionFunc)
type TestingTypeDecisionFunc func(*FSMContext, *swf.HistoryEvent, *TestingType) *swf.Decision

//now the retryFailedActivities function, which can be used for all activity funcs like the above.
retryFailedActivities := func(activityName string, activityFn TestingTypeDecisionFunc) Decider {
	typedDecisionFn := typedFuncs.DecisionFunc(activityFn)
	return func(ctx *FSMContext, h *swf.HistoryEvent, data interface{}) Outcome {
		switch *h.EventType {
		case swf.EventTypeActivityTaskFailed, swf.EventTypeActivityTaskTimedOut, swf.EventTypeActivityTaskCanceled:
			if *ctx.ActivityInfo(h).Name == activityName {
				decisions := ctx.EmptyDecisions()
				retry := typedDecisionFn(ctx, h, data)
				decisions = append(decisions, retry)
				return ctx.Stay(data, decisions)
			}
		}
		return ctx.Pass()
	}
}

//now build a decider out of the parts.
//the one thing you need to be careful of is having a unit test that executes the following
//since the type checking can only be done at initialization at runtime here.
decider := NewComposedDecider(
	retryFailedActivities("foo-activity", fooActivityDecision),
	retryFailedActivities("bar-activity", barActivityDecision),
	DefaultDecider(),
)

decider(new(FSMContext), &swf.HistoryEvent{}, new(TestData))
Output:

func (*ComposedDecider) Decide

func (c *ComposedDecider) Decide(ctx *FSMContext, h *swf.HistoryEvent, data interface{}) Outcome

Decide is the the Decider func for a ComposedDecider

type ComposedDecisionInterceptor

type ComposedDecisionInterceptor struct {
	// contains filtered or unexported fields
}

func (*ComposedDecisionInterceptor) AfterDecision

func (c *ComposedDecisionInterceptor) AfterDecision(decision *swf.PollForDecisionTaskOutput, ctx *FSMContext, outcome *Outcome)

func (*ComposedDecisionInterceptor) BeforeDecision

func (c *ComposedDecisionInterceptor) BeforeDecision(decision *swf.PollForDecisionTaskOutput, ctx *FSMContext, outcome *Outcome)

func (*ComposedDecisionInterceptor) BeforeTask

type Decider

type Decider func(*FSMContext, *swf.HistoryEvent, interface{}) Outcome

Decider decides an Outcome based on an event and the current data for an FSM. You can assert the interface{} parameter that is passed to the Decider as the type of the DataType field in the FSM. Alternatively, you can use TypedFuncs to create a typed decider to avoid having to do the assertion.

func AddDecision

func AddDecision(decisionFn DecisionFunc) Decider

AddDecision adds a single decision to a ContinueDecider outcome

func AddDecisions

func AddDecisions(signalFn MultiDecisionFunc) Decider

AddDecisions adds decisions to a ContinueDecider outcome

func CancelWorkflow

func CancelWorkflow(details *string) Decider

CancelWorkflow cancels the workflow

func CompleteWorkflow

func CompleteWorkflow() Decider

CompleteWorkflow completes the workflow

func DefaultDecider

func DefaultDecider() Decider

DefaultDecider is a 'catch-all' decider that simply logs the unhandled decision. You should place this or one like it as the last decider in your top level ComposableDecider.

func FailWorkflow

func FailWorkflow(details *string) Decider

FailWorkflow fails the workflow

func NewComposedDecider

func NewComposedDecider(deciders ...Decider) Decider

NewComposedDecider builds a Composed Decider from a list of sub Deciders. You can compose your fiinal composable decider from other composable deciders, but you should make sure that the final decider includes a 'catch-all' decider in last place you can use DefaultDecider() or your own.

func OnActivityCanceled

func OnActivityCanceled(activityName string, deciders ...Decider) Decider

OnActivityCanceled builds a composed decider that fires when a matching activity is canceled.

func OnActivityCompleted

func OnActivityCompleted(activityName string, deciders ...Decider) Decider

OnActivityCompleted builds a composed decider that fires when a matching activity completes.

func OnActivityEvents

func OnActivityEvents(activityName string, eventTypes []string, deciders ...Decider) Decider

func OnActivityFailed

func OnActivityFailed(activityName string, deciders ...Decider) Decider

OnActivityFailed builds a composed decider that fires when a matching activity fails.

func OnActivityFailedTimedOutCanceled

func OnActivityFailedTimedOutCanceled(activityName string, deciders ...Decider) Decider

OnActivityFailedTimedOutCanceled builds a composed decider that fires when a matching activity fails, times out, or is canceled.

func OnActivityHeartbeatTimeout

func OnActivityHeartbeatTimeout(activityName string, deciders ...Decider) Decider

func OnActivityScheduleToCloseTimeout

func OnActivityScheduleToCloseTimeout(activityName string, deciders ...Decider) Decider

func OnActivityScheduleToStartTimeout

func OnActivityScheduleToStartTimeout(activityName string, deciders ...Decider) Decider

func OnActivityStartToCloseTimeout

func OnActivityStartToCloseTimeout(activityName string, deciders ...Decider) Decider

func OnActivityStarted

func OnActivityStarted(activityName string, deciders ...Decider) Decider

OnActivityStarted builds a composed decider that fires when a matching activity starts.

func OnActivityTimedOut

func OnActivityTimedOut(activityName string, deciders ...Decider) Decider

OnActivityTimedOut builds a composed decider that fires when a matching activity times out.

func OnChildCompleted

func OnChildCompleted(deciders ...Decider) Decider

OnChildCompleted builds a composed decider that fires on EventTypeChildWorkflowExecutionCompleted.

func OnChildStartFailed

func OnChildStartFailed(deciders ...Decider) Decider

OnChildStarted builds a composed decider that fires on EventTypeStartChildWorkflowExecutionFailed.

func OnChildStartFailedAlreadyRunning

func OnChildStartFailedAlreadyRunning(deciders ...Decider) Decider

func OnChildStartFailedAndNotAlreadyRunning

func OnChildStartFailedAndNotAlreadyRunning(deciders ...Decider) Decider

OnChildStartFailedAndNotAlreadyRunning builds a composed decider that fires on EventTypeStartChildWorkflowExecutionFailed and Cause != "WORKFLOW_ALREADY_RUNNING".

func OnChildStarted

func OnChildStarted(deciders ...Decider) Decider

OnChildStarted builds a composed decider that fires on swf.EventTypeChildWorkflowExecutionStarted.

func OnChildStartedOrAlreadyRunning

func OnChildStartedOrAlreadyRunning(deciders ...Decider) Decider

OnChildStartedOrAlreadyRunning builds a composed decider that fires on EventTypeChildWorkflowExecutionStarted OR EventTypeStartChildWorkflowExecutionFailed with Cause == "WORKFLOW_ALREADY_RUNNING".

func OnContinueFailed

func OnContinueFailed(deciders ...Decider) Decider

func OnData

func OnData(predicate PredicateFunc, deciders ...Decider) Decider

OnData builds a composed decider that fires on when the PredicateFunc is satisfied.

func OnDataUnless

func OnDataUnless(predicate PredicateFunc, deciders ...Decider) Decider

OnDataUnless builds a composed decider that fires on when the PredicateFunc is NOT satisfied.

func OnExternalCancellationResponse

func OnExternalCancellationResponse(exitDecider Decider) Decider

func OnExternalWorkflowExecutionCancelRequested

func OnExternalWorkflowExecutionCancelRequested(deciders ...Decider) Decider

func OnRequestCancelExternalWorkflowExecutionFailed

func OnRequestCancelExternalWorkflowExecutionFailed(deciders ...Decider) Decider

func OnSignalFailed

func OnSignalFailed(signalName string, deciders ...Decider) Decider

OnSignalFailed builds a composed decider that fires on when a matching signal fails.

func OnSignalFailedAndNotUnknown

func OnSignalFailedAndNotUnknown(signalName string, deciders ...Decider) Decider

OnSignalFailedAndNotUnknown passes the event to OnSignalFailed only if the signal specified by signalName matches and the signalling was targeting a known Workflow ID.

func OnSignalReceived

func OnSignalReceived(signalName string, deciders ...Decider) Decider

OnSignalReceived builds a composed decider that fires on when a matching signal is received.

func OnSignalSent

func OnSignalSent(signalName string, deciders ...Decider) Decider

OnSignalSent builds a composed decider that fires on when a matching signal is received.

func OnSignalsReceived

func OnSignalsReceived(signalNames []string, deciders ...Decider) Decider

OnSignalsReceived builds a composed decider that fires on when one of the matching signal is received.

func OnStartTimerFailed

func OnStartTimerFailed(timer string, deciders ...Decider) Decider

OnStartTimerFailed builds a composed decider that fires on EventTypeStartTimerFailed.

func OnStarted

func OnStarted(deciders ...Decider) Decider

OnStarted builds a composed decider that fires on swf.EventTypeWorkflowExecutionStarted.

func OnTimerCanceled

func OnTimerCanceled(timer string, deciders ...Decider) Decider

OnTimerCanceled builds a composed decider that fires on EventTypeTimerCanceled.

func OnTimerFired

func OnTimerFired(timerId string, deciders ...Decider) Decider

OnTimerFired builds a composed decider that fires on when a matching timer is fired.

func OnUnknownWorkflowSignaled

func OnUnknownWorkflowSignaled(signalName string, deciders ...Decider) Decider

OnUnknownWorkflowSignaled builds a composed decider that fires if the signal specified by signalName is signaled on an unknown Workflow ID.

func OnWorkflowCancelRequested

func OnWorkflowCancelRequested(deciders ...Decider) Decider

func Stay

func Stay() Decider

Stay keeps the fsm in the same state, and terminates the decider.

func Transition

func Transition(toState string) Decider

Transition transitions the FSM to a new state, and terminates the decdier.

func UpdateState

func UpdateState(updateFunc StateFunc) Decider

UpdateState allows you to modicy the state data without generating decisions.

type DecisionErrorHandler

type DecisionErrorHandler func(ctx *FSMContext, event *swf.HistoryEvent, stateBeforeEvent interface{}, stateAfterError interface{}, err error) (*Outcome, error)

DecisionErrorHandler is the error handling contract for panics that occur in Deciders. If your DecisionErrorHandler does not return a non nil Outcome, any further attempt to process the decisionTask is abandoned and the task will time out.

type DecisionFunc

type DecisionFunc func(ctx *FSMContext, h *swf.HistoryEvent, data interface{}) *swf.Decision

DecisionFunc is a building block for composable deciders that returns a decision.

type DecisionInterceptor

type DecisionInterceptor interface {
	BeforeTask(decision *swf.PollForDecisionTaskOutput)
	BeforeDecision(decision *swf.PollForDecisionTaskOutput, ctx *FSMContext, outcome *Outcome)
	AfterDecision(decision *swf.PollForDecisionTaskOutput, ctx *FSMContext, outcome *Outcome)
}

DecisionInterceptor allows manipulation of the decision task and the outcome at key points in the task lifecycle.

func CloseWorkflowRemoveIncompatibleDecisionInterceptor

func CloseWorkflowRemoveIncompatibleDecisionInterceptor() DecisionInterceptor

CloseWorkflowRemoveIncompatibleDecisionInterceptor checks for incompatible decisions with a Complete workflow decision, and if found removes it from the outcome.

func DedupeDecisions

func DedupeDecisions(decisionType string) DecisionInterceptor

DedupeDecisions returns an interceptor that executes after a decision and removes any duplicate decisions of the specified type from the outcome. Duplicates are removed from the beginning of the input list, so that the last decision of the specified type is the one that remains in the list.

e.g. An outcome with a list of decisions [a, a, b, a, c] where the type to dedupe was 'a' would result in an outcome with a list of decisions [b, a, c]

func DedupeWorkflowCancellations

func DedupeWorkflowCancellations() DecisionInterceptor

DedupeWorkflowCancellations returns an interceptor that executes after a decision and removes any duplicate swf.DecisionTypeCancelWorkflowExecution decisions from the outcome. Duplicates are removed from the beginning of the input list, so that the last cancel decision is the one that remains in the list.

func DedupeWorkflowCloseDecisions

func DedupeWorkflowCloseDecisions() DecisionInterceptor

DedupeWorkflowCloseDecisions returns an interceptor that executes after a decision and removes any duplicate workflow close decisions (cancel, complete, fail) from the outcome. Duplicates are removed from the beginning of the input list, so that the last failure decision is the one that remains in the list.

func DedupeWorkflowCompletes

func DedupeWorkflowCompletes() DecisionInterceptor

DedupeWorkflowCompletes returns an interceptor that executes after a decision and removes any duplicate swf.DecisionTypeCompleteWorkflowExecution decisions from the outcome. Duplicates are removed from the beginning of the input list, so that the last complete decision is the one that remains in the list.

func DedupeWorkflowFailures

func DedupeWorkflowFailures() DecisionInterceptor

DedupeWorkflowFailures returns an interceptor that executes after a decision and removes any duplicate swf.DecisionTypeFailWorkflowExecution decisions from the outcome. Duplicates are removed from the beginning of the input list, so that the last failure decision is the one that remains in the list.

func ManagedContinuations

func ManagedContinuations(historySize int, workflowAgeInSec int, timerRetrySeconds int) DecisionInterceptor

ManagedContinuations is an interceptor that will handle most of the mechanics of automatically continuing workflows.

For workflows without persistent, heartbeating activities, it should do everything.

ContinueSignal: How to continue fsms with persistent activities. How it works FSM in steady+activity, listens for ContinueTimer. OnTimer, cancel activity, transition to continuing. In continuing, OnActivityCanceled send ContinueSignal. Interceptor handles ContinueSignal, if 0,0,0,0 Continue, else start ContinueTimer. In continuing, OnStarted re-starts the activity, transition back to steady.

func ManagedContinuationsWithJitter

func ManagedContinuationsWithJitter(historySize int, maxSizeJitter int, workflowAgeInSec int, maxAgeJitterInSec int, timerRetrySeconds int) DecisionInterceptor

To avoid stampedes of workflows that are started at the same time being continued at the same time ManagedContinuationsWithJitter will schedule the initial continue randomly between workflowAgeInSec and workflowAgeInSec + maxAgeJitterInSec and will attempt to continue workflows with more than between historySize and historySize + maxSizeJitter events

func MoveDecisionsToEnd

func MoveDecisionsToEnd(decisionType string) DecisionInterceptor

MoveDecisionsToEnd returns an interceptor that executes after a decision and moves any decisions of the specified type to the end of an outcome's decision list.

e.g. An outcome with a list of decisions [a, a, b, a, c] where the type to move was 'a' would result in an outcome with a list of decisions [b, c, a, a, a]

func MoveWorkflowCloseDecisionsToEnd

func MoveWorkflowCloseDecisionsToEnd() DecisionInterceptor

MoveWorkflowCloseDecisionsToEnd returns an interceptor that executes after a decision and moves any workflow close decisions (complete, fail, cancel) to the end of an outcome's decision list.

Note: SWF responds with a 400 error if a workflow close decision is not the last decision in the list of decisions.

func NewComposedDecisionInterceptor

func NewComposedDecisionInterceptor(interceptors ...DecisionInterceptor) DecisionInterceptor

func RemoveLowerPriorityDecisions

func RemoveLowerPriorityDecisions(prioritizedDecisionTypes ...string) DecisionInterceptor

RemoveLowerPriorityDecisions returns an interceptor that executes after a decision and removes any lower priority decisions from an outcome if a higher priority decision exists. The decisionTypes passed to this function should be listed in highest to lowest priority order.

e.g. An outcome with a list of decisions [a, a, b, a, c] where the priority was a > b > c would return [a, a, a]

func StartCancelInterceptor

func StartCancelInterceptor() DecisionInterceptor

type DecisionTaskDispatcher

type DecisionTaskDispatcher interface {
	DispatchTask(*swf.PollForDecisionTaskOutput, func(*swf.PollForDecisionTaskOutput))
}

DecisionTaskDispatcher is used by the FSM machinery to

func GoroutinePerWorkflowDispatcher

func GoroutinePerWorkflowDispatcher(maxPendingTasks int) DecisionTaskDispatcher

GoroutinePerWorkflowDispatcher allows a single goroutine per workflow execution (RunID) to run at a time. Tasks are queued for each workflow execution. Any workflow execution with maxPendingTasks can cause DispatchTask to block until at least one of them gets handled.

type EventCorrelator

type EventCorrelator struct {
	Activities          map[string]*ActivityInfo     // schedueledEventId -> info
	ActivityAttempts    map[string]int               // activityId -> attempts
	Signals             map[string]*SignalInfo       // schedueledEventId -> info
	SignalAttempts      map[string]int               // workflowId + signalName -> attempts
	Timers              map[string]*TimerInfo        // startedEventId -> info
	Cancellations       map[string]*CancellationInfo // schedueledEventId -> info
	CancelationAttempts map[string]int               // workflowId -> attempts
	Children            map[string]*ChildInfo        // initiatedEventID -> info
	ChildrenAttempts    map[string]int               // workflowID -> attempts
	Serializer          StateSerializer              `json:"-"`
}

EventCorrelator is a serialization-friendly struct that is automatically managed by the FSM machinery It tracks signal and activity correlation info, so you know how to react when an event that signals the end of an activity or signal hits your Decider. This is missing from the SWF api. Activities and Signals are string instead of int64 beacuse json.

func (*EventCorrelator) ActivityInfo

func (a *EventCorrelator) ActivityInfo(h *swf.HistoryEvent) *ActivityInfo

ActivityInfo returns the ActivityInfo that is correlates with a given event. The HistoryEvent is expected to be of type EventTypeActivityTaskCompleted,EventTypeActivityTaskFailed,EventTypeActivityTaskTimedOut.

func (*EventCorrelator) Attempts

func (a *EventCorrelator) Attempts(h *swf.HistoryEvent) int

func (*EventCorrelator) AttemptsForActivity

func (a *EventCorrelator) AttemptsForActivity(info *ActivityInfo) int

AttemptsForActivity returns the number of times a given activity has been attempted. It will return 0 if the activity has never failed, has been canceled, or has been completed successfully

func (*EventCorrelator) AttemptsForCancellation

func (a *EventCorrelator) AttemptsForCancellation(info *CancellationInfo) int

AttemptsForCancellation returns the number of times a given signal has been attempted. It will return 0 if the signal has never failed, or has been completed successfully

func (*EventCorrelator) AttemptsForChild

func (a *EventCorrelator) AttemptsForChild(info *ChildInfo) int

AttemptsForCancellation returns the number of times a given signal has been attempted. It will return 0 if the signal has never failed, or has been completed successfully

func (*EventCorrelator) AttemptsForSignal

func (a *EventCorrelator) AttemptsForSignal(signalInfo *SignalInfo) int

AttemptsForSignal returns the number of times a given signal has been attempted. It will return 0 if the signal has never failed, or has been completed successfully

func (*EventCorrelator) CancellationInfo

func (a *EventCorrelator) CancellationInfo(h *swf.HistoryEvent) *CancellationInfo

func (*EventCorrelator) ChildInfo

func (a *EventCorrelator) ChildInfo(h *swf.HistoryEvent) *ChildInfo

func (*EventCorrelator) Correlate

func (a *EventCorrelator) Correlate(h *swf.HistoryEvent)

Correlate establishes a mapping of eventId to ActivityType. The HistoryEvent is expected to be of type EventTypeActivityTaskScheduled.

func (*EventCorrelator) RemoveCorrelation

func (a *EventCorrelator) RemoveCorrelation(h *swf.HistoryEvent)

RemoveCorrelation gcs a mapping of eventId to ActivityType. The HistoryEvent is expected to be of type EventTypeActivityTaskCompleted,EventTypeActivityTaskFailed,EventTypeActivityTaskTimedOut.

func (*EventCorrelator) SignalInfo

func (a *EventCorrelator) SignalInfo(h *swf.HistoryEvent) *SignalInfo

SignalInfo returns the SignalInfo that is correlates with a given event. The HistoryEvent is expected to be of type EventTypeSignalExternalWorkflowExecutionFailed,EventTypeExternalWorkflowExecutionSignaled.

func (*EventCorrelator) TimerInfo

func (a *EventCorrelator) TimerInfo(h *swf.HistoryEvent) *TimerInfo

func (*EventCorrelator) TimerScheduled

func (a *EventCorrelator) TimerScheduled(timerId string) bool

func (*EventCorrelator) Track

func (a *EventCorrelator) Track(h *swf.HistoryEvent)

Track will add or remove entries based on the EventType. A new entry is added when there is a new ActivityTask, or an entry is removed when the ActivityTask is terminating.

type FSM

type FSM struct {
	//Name of the fsm. Used when emitting logs. Should probably be set to the name of the workflow associated with the fsm.
	Name string
	// Domain of the workflow associated with the FSM.
	Domain string
	// TaskList that the underlying poller will poll for decision tasks.
	TaskList string
	// Identity used in PollForDecisionTaskRequests, can be empty.
	Identity string
	// Client used to make SWF api requests.
	SWF SWFOps
	// Strategy for replication of state. Events may be delivered out of order.
	ReplicationHandler ReplicationHandler
	// DataType of the data struct associated with this FSM.
	// The data is automatically peristed to and loaded from workflow history by the FSM.
	DataType interface{}
	// Serializer used to serialize/deserialise fsm state data to/from workflow history.
	Serializer StateSerializer
	// Serializer used to serialize/deserialise in json the fsm managed marker recorded events to/from workflow history.
	SystemSerializer StateSerializer
	//PollerShutdownManager is used when the FSM is managing the polling
	ShutdownManager *poller.ShutdownManager
	//PollerCount is the number of DecisionTaskPollers to start when the FSM is started.
	//Default 1, if you increase this, be sure your DecisionTaskDispatcher is goroutine-safe.
	PollerCount int
	//DecisionTaskDispatcher determines the concurrency strategy for processing tasks in your fsm
	DecisionTaskDispatcher DecisionTaskDispatcher
	// DecisionInterceptor fsm will call BeforeDecision/AfterDecision.  If unset
	// will use DefaultDecisionInterceptor.
	DecisionInterceptor DecisionInterceptor
	//DecisionErrorHandler  is called whenever there is a panic in your decider.
	//if it returns a nil *Outcome, the attempt to handle the DecisionTask is abandoned.
	//fsm will then mark the workflow as being in error, by recording 3 markers. state, correlator and error
	//the error marker  contains an ErrorState which tracks the range of unprocessed events since the error occurred.
	//on subsequent decision tasks if the fsm detects an error state, it will get the ErrorEvent from the ErrorState
	//and call the DecisionErrorHandler again.
	//
	//If there are errors here a new ErrorMarker with the increased range of unprocessed events
	//will be recorded.
	//If there is a good outcome, then we use that as the starting point from which to grab and Decide on the range of unprocessed
	//events. If this works out fine, we then process the initiating decisionTask range of events.
	DecisionErrorHandler DecisionErrorHandler
	// TaskErrorHandler is called when an error occurs
	// outside of the Decider machinery.  When this handler is called the decision
	// task has been abandoned and the task will timeout without any further intervention.
	//
	// If unset, the DefaultTaskErrorHandler will be used.
	// If more "cleanup" is desired, set this field with a custom TaskErrorHandler.
	TaskErrorHandler TaskErrorHandler
	//FSMErrorReporter  is called whenever there is an error within the FSM, usually indicating bad state or configuration of your FSM.
	FSMErrorReporter FSMErrorReporter
	//AllowPanics is mainly for testing, it should be set to false in production.
	//when true, instead of recovering from panics in deciders, it allows them to propagate.
	AllowPanics bool
	// Logger is used for output on a FSM. If not set, will use log.Log
	Logger StdLogger
	// contains filtered or unexported fields
}

FSM models the decision handling logic a workflow in SWF

Example
// create with swf.NewClient
var client *swf.SWF
// data type that will be managed by the
type StateData struct {
	Message string `json:"message,omitempty"`
	Count   int    `json:"count,omitempty"`
}
//event type that will be signalled to the FSM with signal name "hello"
type Hello struct {
	Message string `json:"message,omitempty"`
}

//This is an example of building Deciders without using decider composition.
//the FSM we will create will oscillate between 2 states,
//waitForSignal -> will wait till the workflow is started or signalled, and update the StateData based on the Hello message received, set a timer, and transition to waitForTimer
//waitForTimer -> will wait till the timer set by waitForSignal fires, and will signal the workflow with a Hello message, and transition to waitFotSignal
waitForSignal := func(f *FSMContext, h *swf.HistoryEvent, d *StateData) Outcome {
	decisions := f.EmptyDecisions()
	switch *h.EventType {
	case swf.EventTypeWorkflowExecutionStarted, swf.EventTypeWorkflowExecutionSignaled:
		if *h.EventType == swf.EventTypeWorkflowExecutionSignaled && *h.WorkflowExecutionSignaledEventAttributes.SignalName == "hello" {
			hello := &Hello{}
			f.EventData(h, &Hello{})
			d.Count++
			d.Message = hello.Message
		}
		timeoutSeconds := "5" //swf uses stringy numbers in many places
		timerDecision := &swf.Decision{
			DecisionType: S(swf.DecisionTypeStartTimer),
			StartTimerDecisionAttributes: &swf.StartTimerDecisionAttributes{
				StartToFireTimeout: S(timeoutSeconds),
				TimerId:            S("timeToSignal"),
			},
		}
		decisions = append(decisions, timerDecision)
		return f.Goto("waitForTimer", d, decisions)
	}
	//if the event was unexpected just stay here
	return f.Stay(d, decisions)

}

waitForTimer := func(f *FSMContext, h *swf.HistoryEvent, d *StateData) Outcome {
	decisions := f.EmptyDecisions()
	switch *h.EventType {
	case swf.EventTypeTimerFired:
		//every time the timer fires, signal the workflow with a Hello
		message := strconv.FormatInt(time.Now().Unix(), 10)
		signalInput := &Hello{message}
		signalDecision := &swf.Decision{
			DecisionType: S(swf.DecisionTypeSignalExternalWorkflowExecution),
			SignalExternalWorkflowExecutionDecisionAttributes: &swf.SignalExternalWorkflowExecutionDecisionAttributes{
				SignalName: S("hello"),
				Input:      S(f.Serialize(signalInput)),
				RunId:      f.RunId,
				WorkflowId: f.WorkflowId,
			},
		}
		decisions = append(decisions, signalDecision)

		return f.Goto("waitForSignal", d, decisions)
	}
	//if the event was unexpected just stay here
	return f.Stay(d, decisions)
}

//These 2 deciders are the same as the ones above, but use composable decider bits.
typed := Typed(new(StateData))

updateState := typed.StateFunc(func(f *FSMContext, h *swf.HistoryEvent, d *StateData) {
	hello := &Hello{}
	f.EventData(h, &Hello{})
	d.Count++
	d.Message = hello.Message
})

setTimer := typed.DecisionFunc(func(f *FSMContext, h *swf.HistoryEvent, d *StateData) *swf.Decision {
	timeoutSeconds := "5" //swf uses stringy numbers in many places
	return &swf.Decision{
		DecisionType: S(swf.DecisionTypeStartTimer),
		StartTimerDecisionAttributes: &swf.StartTimerDecisionAttributes{
			StartToFireTimeout: S(timeoutSeconds),
			TimerId:            S("timeToSignal"),
		},
	}
})

sendSignal := typed.DecisionFunc(func(f *FSMContext, h *swf.HistoryEvent, d *StateData) *swf.Decision {
	message := strconv.FormatInt(time.Now().Unix(), 10)
	signalInput := &Hello{message}
	return &swf.Decision{
		DecisionType: S(swf.DecisionTypeSignalExternalWorkflowExecution),
		SignalExternalWorkflowExecutionDecisionAttributes: &swf.SignalExternalWorkflowExecutionDecisionAttributes{
			SignalName: S("hello"),
			Input:      S(f.Serialize(signalInput)),
			RunId:      f.RunId,
			WorkflowId: f.WorkflowId,
		},
	}
})

waitForSignalComposedDecider := NewComposedDecider(
	OnStarted(UpdateState(updateState), AddDecision(setTimer), Transition("waitForTimer")),
	OnSignalReceived("hello", UpdateState(updateState), AddDecision(setTimer), Transition("waitForTimer")),
	DefaultDecider(),
)

waitForTimerComposedDecider := NewComposedDecider(
	OnTimerFired("timeToSignal", AddDecision(sendSignal), Transition("waitForSignal")),
	DefaultDecider(),
)

//create the FSMState by passing the decider function through TypedDecider(),
//which lets you use d *StateData rather than d interface{} in your decider.
waitForSignalState := &FSMState{Name: "waitForSignal", Decider: typed.Decider(waitForSignal)}
waitForTimerState := &FSMState{Name: "waitForTimer", Decider: typed.Decider(waitForTimer)}
//or with the composed deciders
waitForSignalState = &FSMState{Name: "waitForSignal", Decider: waitForSignalComposedDecider}
waitForTimerState = &FSMState{Name: "waitForTimer", Decider: waitForTimerComposedDecider}
//wire it up in an fsm
fsm := &FSM{
	Name:       "example-fsm",
	SWF:        client,
	DataType:   StateData{},
	Domain:     "exaple-swf-domain",
	TaskList:   "example-decision-task-list-to-poll",
	Serializer: &JSONStateSerializer{},
}
//add states to FSM
fsm.AddInitialState(waitForSignalState)
fsm.AddState(waitForTimerState)

//start it up!
fsm.Start()

//To start workflows using this fsm
client.StartWorkflowExecution(&swf.StartWorkflowExecutionInput{
	Domain:     S("exaple-swf-domain"),
	WorkflowId: S("your-id"),
	//you will have previously regiestered a WorkflowType that this FSM will work.
	WorkflowType: &swf.WorkflowType{Name: S("the-name"), Version: S("the-version")},
	Input:        StartFSMWorkflowInput(fsm, &StateData{Count: 0, Message: "starting message"}),
})
Output:

func (*FSM) AddCanceledState

func (f *FSM) AddCanceledState(state *FSMState)

AddCanceledState adds a state to the FSM and uses it as the final state of a workflow. It will only receive events if you returned FSMContext.CancelWorkflow(...) and the workflow was unable to cancel.

func (*FSM) AddCompleteState

func (f *FSM) AddCompleteState(state *FSMState)

AddCompleteState adds a state to the FSM and uses it as the final state of a workflow. It will only receive events if you returned FSMContext.Complete(...) and the workflow was unable to complete.

func (*FSM) AddCompleteStateWithHandler

func (f *FSM) AddCompleteStateWithHandler(state *FSMState, handler DecisionErrorHandler)

AddCompleteStateWithHandler adds a state to the FSM and uses it as the final state of a workflow. it will only receive events if you returned FSMContext.Complete(...) and the workflow was unable to complete. It also adds a DecisionErrorHandler to the state.

func (*FSM) AddErrorHandler

func (f *FSM) AddErrorHandler(state string, handler DecisionErrorHandler)

AddErrorHandler adds a DecisionErrorHandler to a state in the FSM.

func (*FSM) AddFailedState

func (f *FSM) AddFailedState(state *FSMState)

AddFailedState adds a state to the FSM and uses it as the final state of a workflow. It will only receive events if you returned FSMContext.FailWorkflow(...) and the workflow was unable to fail.

func (*FSM) AddInitialState

func (f *FSM) AddInitialState(state *FSMState)

AddInitialState adds a state to the FSM and uses it as the initial state when a workflow execution is started.

func (*FSM) AddInitialStateWithHandler

func (f *FSM) AddInitialStateWithHandler(state *FSMState, handler DecisionErrorHandler)

AddInitialStateWithHandler adds a state to the FSM and uses it as the initial state when a workflow execution is started. it uses the FSM DefaultDecisionErrorHandler, which defaults to FSM.DefaultDecisionErrorHandler if unset.

func (*FSM) AddState

func (f *FSM) AddState(state *FSMState)

AddState adds a state to the FSM.

func (*FSM) DefaultCanceledState

func (f *FSM) DefaultCanceledState() *FSMState

DefaultCanceledState is the canceled state used in an FSM if one has not been set. It simply responds with a CancelWorkflow which attempts to Cancel the workflow. This state will only get events if you previously attempted to cancel the workflow and it failed.

func (*FSM) DefaultCompleteState

func (f *FSM) DefaultCompleteState() *FSMState

DefaultCompleteState is the complete state used in an FSM if one has not been set. It simply responds with a CompleteDecision which attempts to Complete the workflow. This state will only get events if you previously attempted to complete the workflow and it failed.

func (*FSM) DefaultDecisionErrorHandler

func (f *FSM) DefaultDecisionErrorHandler(ctx *FSMContext, event *swf.HistoryEvent, stateBeforeEvent interface{}, stateAfterError interface{}, err error) (*Outcome, error)

DefaultDecisionErrorHandler is the default DecisionErrorHandler that is used if a handler is not set on the FSM or a handler is not associated with the current state. This default handler simply logs the error and the decision task will timeout.

func (*FSM) DefaultDecisionInterceptor

func (f *FSM) DefaultDecisionInterceptor() DecisionInterceptor

DefaultDecisionInterceptor is an interceptor that handles removing duplicate close decisions, moving close decisions to the end of the decision list for an outcome, and making sure the highest priority close decision is the one returned to SWF.

Close decision types in priority order are: swf.DecisionTypeFailWorkflowExecution swf.DecisionTypeCompleteWorkflowExecution swf.DecisionTypeCancelWorkflowExecution

func (*FSM) DefaultFailedState

func (f *FSM) DefaultFailedState() *FSMState

DefaultFailedState is the failed state used in an FSM if one has not been set. It simply responds with a FailWorkflow which attempts to Fail the workflow. This state will only get events if you previously attempted to fail the workflow and the call failed.

func (*FSM) DefaultTaskErrorHandler

func (f *FSM) DefaultTaskErrorHandler(decisionTask *swf.PollForDecisionTaskOutput, err error)

DefaultTaskErrorHandler is the default TaskErrorHandler that is used if a TaskErrorHandler is not set on this FSM. DefaultTaskErrorHandler simply logs the error. With no further intervention the decision task will timeout.

func (*FSM) Deserialize

func (f *FSM) Deserialize(serialized string, data interface{})

Deserialize uses the FSM.Serializer to deserialize data from a string. If there is an error in deserialization this func will panic, so this should usually only be used inside Deciders where the panics are recovered and proper errors are recorded in the workflow.

func (*FSM) EmptyDecisions

func (f *FSM) EmptyDecisions() []*swf.Decision

EmptyDecisions is a helper method to give you an empty decisions array for use in your Deciders.

func (*FSM) ErrorDeserializingStateData

func (f *FSM) ErrorDeserializingStateData(decisionTask *swf.PollForDecisionTaskOutput, serializedStateData string, err error)

ErrorDeserializingStateData is part of the FSM implementation of FSMErrorReporter

func (*FSM) ErrorFindingCorrelator

func (f *FSM) ErrorFindingCorrelator(decisionTask *swf.PollForDecisionTaskOutput, err error)

ErrorFindingCorrelator is part of the FSM implementation of FSMErrorReporter

func (*FSM) ErrorFindingStateData

func (f *FSM) ErrorFindingStateData(decisionTask *swf.PollForDecisionTaskOutput, err error)

ErrorFindingStateData is part of the FSM implementation of FSMErrorReporter

func (*FSM) ErrorMissingFSMState

func (f *FSM) ErrorMissingFSMState(decisionTask *swf.PollForDecisionTaskOutput, outcome Outcome)

ErrorMissingFSMState is part of the FSM implementation of FSMErrorReporter

func (*FSM) ErrorSerializingStateData

func (f *FSM) ErrorSerializingStateData(decisionTask *swf.PollForDecisionTaskOutput, outcome Outcome, eventCorrelator EventCorrelator, err error)

ErrorSerializingStateData is part of the FSM implementation of FSMErrorReporter

func (*FSM) ErrorStateTick

func (f *FSM) ErrorStateTick(decisionTask *swf.PollForDecisionTaskOutput, error *SerializedErrorState, context *FSMContext, data interface{}) (*Outcome, error)

ErrorStateTick is called when the DecisionTaskPoller receives a PollForDecisionTaskResponse in its polling loop that contains an error marker in its history.

func (*FSM) EventData

func (f *FSM) EventData(event *swf.HistoryEvent, eventData interface{})

EventData works in combination with the FSM.Serializer to provide deserialization of data sent in a HistoryEvent. It is sugar around extracting the event payload from the proper field of the proper Attributes struct on the HistoryEvent

func (*FSM) Init

func (f *FSM) Init()

Init initializes any optional, unspecified values such as the error state, stop channel, serializer, PollerShutdownManager. it gets called by Start(), so you should only call this if you are manually managing polling for tasks, and calling Tick yourself.

func (*FSM) InitialState

func (f *FSM) InitialState() string

InitialState is the implementation of FSMSerializer.InitialState()

func (*FSM) Serialize

func (f *FSM) Serialize(data interface{}) string

Serialize uses the FSM.Serializer to serialize data to a string. If there is an error in serialization this func will panic, so this should usually only be used inside Deciders where the panics are recovered and proper errors are recorded in the workflow.

func (*FSM) Start

func (f *FSM) Start()

Start begins processing DecisionTasks with the FSM. It creates one or more DecisionTaskPollers and spawns a goroutine that continues polling until Stop() is called and any in-flight polls have completed. If you wish to manage polling and calling Tick() yourself, you dont need to start the FSM, just call Init().

func (*FSM) StateSerializer

func (f *FSM) StateSerializer() StateSerializer

StateSerializer is the implementation of FSMSerializer.StateSerializer()

func (*FSM) Stop

func (f *FSM) Stop()

Stop causes the DecisionTask select loop to exit, and to stop the DecisionTaskPoller

func (*FSM) Tick

func (f *FSM) Tick(decisionTask *swf.PollForDecisionTaskOutput) (*FSMContext, []*swf.Decision, *SerializedState, error)

Tick is called when the DecisionTaskPoller receives a PollForDecisionTaskResponse in its polling loop. On errors, a nil *SerializedState is returned, and an error Outcome is included in the Decision list. It is exported to facilitate testing.

type FSMClient

type FSMClient interface {
	GetState(id string) (string, interface{}, error)
	GetStateForRun(workflow, run string) (string, interface{}, error)
	GetSerializedStateForRun(workflow, run string) (*SerializedState, *swf.GetWorkflowExecutionHistoryOutput, error)
	Signal(id string, signal string, input interface{}) error
	Start(startTemplate swf.StartWorkflowExecutionInput, id string, input interface{}) (*swf.StartWorkflowExecutionOutput, error)
	RequestCancel(id string) error
	GetWorkflowExecutionHistoryPages(execution *swf.WorkflowExecution, fn func(p *swf.GetWorkflowExecutionHistoryOutput, lastPage bool) (shouldContinue bool)) error
	GetWorkflowExecutionHistoryFromReader(reader io.Reader) (*swf.GetWorkflowExecutionHistoryOutput, error)
	FindAll(input *FindInput) (output *FindOutput, err error)
	FindAllWalk(input *FindInput, fn func(info *swf.WorkflowExecutionInfo, done bool) (cont bool)) (err error)
	FindLatestByWorkflowID(workflowID string) (exec *swf.WorkflowExecution, err error)
	NewHistorySegmentor() HistorySegmentor
}

func NewFSMClient

func NewFSMClient(f *FSM, c ClientSWFOps) FSMClient

type FSMContext

type FSMContext struct {
	swf.WorkflowType
	swf.WorkflowExecution

	State string
	// contains filtered or unexported fields
}

FSMContext is populated by the FSM machinery and passed to Deciders.

func NewFSMContext

func NewFSMContext(
	serialization Serialization,
	wfType swf.WorkflowType, wfExec swf.WorkflowExecution,
	eventCorrelator *EventCorrelator,
	state string, stateData interface{}, stateVersion uint64) *FSMContext

NewFSMContext constructs an FSMContext.

func (*FSMContext) ActivitiesInfo

func (f *FSMContext) ActivitiesInfo() map[string]*ActivityInfo

ActivitiesInfo will return a map of scheduledId -> ActivityInfo for all in-flight activities in the workflow.

func (*FSMContext) ActivityInfo

func (f *FSMContext) ActivityInfo(h *swf.HistoryEvent) *ActivityInfo

ActivityInfo will find information for ActivityTasks being tracked. It can only be used when handling events related to ActivityTasks. ActivityTasks are automatically tracked after a EventTypeActivityTaskScheduled event. When there is no pending activity related to the event, nil is returned.

func (*FSMContext) Attempts

func (f *FSMContext) Attempts(h *swf.HistoryEvent) int

func (*FSMContext) CancelWorkflow

func (f *FSMContext) CancelWorkflow(data interface{}, details *string) Outcome

CancelWorkflow is a helper func to easily create a CompleteOutcome that sends a CancelWorklfow decision.

func (*FSMContext) CompleteWorkflow

func (f *FSMContext) CompleteWorkflow(data interface{}, decisions ...*swf.Decision) Outcome

CompleteWorkflow is a helper func to easily create a CompleteOutcome that sends a CompleteWorkflow decision.

func (*FSMContext) CompleteWorkflowDecision

func (f *FSMContext) CompleteWorkflowDecision(data interface{}) *swf.Decision

CompleteWorkflowDecision will build a CompleteWorkflowExecutionDecision decision that has the expected SerializedState marshalled to json as its result. This decision should be used when it is appropriate to Complete your workflow.

func (*FSMContext) ContinueDecider

func (f *FSMContext) ContinueDecider(data interface{}, decisions []*swf.Decision) Outcome

ContinueDecider is a helper func to easily create a ContinueOutcome.

func (*FSMContext) ContinueWorkflow

func (f *FSMContext) ContinueWorkflow(data interface{}, decisions ...*swf.Decision) Outcome

ContinueWorkflow is a helper func to easily create a CompleteOutcome that sends a ContinueWorklfow decision.

func (*FSMContext) ContinueWorkflowDecision

func (f *FSMContext) ContinueWorkflowDecision(continuedState string, data interface{}) *swf.Decision

ContinueWorkflowDecision will build a ContinueAsNewWorkflow decision that has the expected SerializedState marshalled to json as its input. This decision should be used when it is appropriate to Continue your workflow. You are unable to ContinueAsNew a workflow that has running activites, so you should assure there are none running before using this. As such there is no need to copy over the ActivityCorrelator. If the FSM Data Struct is Taggable, its tags will be used on the Continue Decisions

func (*FSMContext) Correlator

func (f *FSMContext) Correlator() *EventCorrelator

func (*FSMContext) Decide

func (f *FSMContext) Decide(h *swf.HistoryEvent, data interface{}, decider Decider) Outcome

Decide executes a decider making sure that Activity tasks are being tracked.

func (*FSMContext) Decision

func (f *FSMContext) Decision(d *swf.Decision) []*swf.Decision

EmptyDecisions is a helper to give you an empty Decision slice.

func (*FSMContext) Deserialize

func (f *FSMContext) Deserialize(serialized string, data interface{})

Deserialize will use the current fsm' Serializer to deserialize the given string into the given struct. It will panic on errors, which is ok in the context of a Decider. If you want to handle errors, use Serializer().Deserialize(...) instead.

func (*FSMContext) EmptyDecisions

func (f *FSMContext) EmptyDecisions() []*swf.Decision

EmptyDecisions is a helper to give you an empty Decision slice.

func (*FSMContext) EventData

func (f *FSMContext) EventData(h *swf.HistoryEvent, data interface{})

EventData will extract a payload from the given HistoryEvent and unmarshall it into the given struct.

func (*FSMContext) FailWorkflow

func (f *FSMContext) FailWorkflow(data interface{}, details *string) Outcome

FailWorkflow is a helper func to easily create a FailOutcome that sends a FailWorklfow decision.

func (*FSMContext) Goto

func (f *FSMContext) Goto(state string, data interface{}, decisions []*swf.Decision) Outcome

Goto is a helper func to easily create a TransitionOutcome.

func (*FSMContext) InitialState

func (f *FSMContext) InitialState() string

func (*FSMContext) Pass

func (f *FSMContext) Pass() Outcome

func (*FSMContext) Serialize

func (f *FSMContext) Serialize(data interface{}) string

Serialize will use the current fsm's Serializer to serialize the given struct. It will panic on errors, which is ok in the context of a Decider. If you want to handle errors, use Serializer().Serialize(...) instead.

func (*FSMContext) Serializer

func (f *FSMContext) Serializer() StateSerializer

Serializer returns the current fsm's Serializer.

func (*FSMContext) SignalInfo

func (f *FSMContext) SignalInfo(h *swf.HistoryEvent) *SignalInfo

SignalInfo will find information for ActivityTasks being tracked. It can only be used when handling events related to ActivityTasks. ActivityTasks are automatically tracked after a EventTypeActivityTaskScheduled event. When there is no pending activity related to the event, nil is returned.

func (*FSMContext) SignalsInfo

func (f *FSMContext) SignalsInfo() map[string]*SignalInfo

SignalsInfo will return a map of scheduledId -> ActivityInfo for all in-flight activities in the workflow.

func (*FSMContext) StateSerializer

func (f *FSMContext) StateSerializer() StateSerializer

func (*FSMContext) Stay

func (f *FSMContext) Stay(data interface{}, decisions []*swf.Decision) Outcome

Stay is a helper func to easily create a StayOutcome.

type FSMErrorReporter

type FSMErrorReporter interface {
	ErrorFindingStateData(decisionTask *swf.PollForDecisionTaskOutput, err error)
	ErrorFindingCorrelator(decisionTask *swf.PollForDecisionTaskOutput, err error)
	ErrorMissingFSMState(decisionTask *swf.PollForDecisionTaskOutput, outcome Outcome)
	ErrorDeserializingStateData(decisionTask *swf.PollForDecisionTaskOutput, serializedStateData string, err error)
	ErrorSerializingStateData(decisionTask *swf.PollForDecisionTaskOutput, outcome Outcome, eventCorrelator EventCorrelator, err error)
}

FSMErrorHandler is the error handling contract for errors in the FSM machinery itself. These are generally a misconfiguration of your FSM or mismatch between struct and serialized form and cant be resolved without config/code changes the paramaters to each method provide all availabe info at the time of the error so you can diagnose issues. Note that this is a diagnostic interface that basically leaks implementation details, and as such may change from release to release.

type FSMState

type FSMState struct {
	// Name is the name of the state. When returning an Outcome, the NextState should match the Name of an FSMState in your FSM.
	Name string
	// Decider decides an Outcome given the current state, data, and an event.
	Decider Decider
}

FSMState defines the behavior of one state of an FSM

type FindInput

type FindInput struct {
	MaximumPageSize *int64

	OpenNextPageToken   *string
	ClosedNextPageToken *string

	ReverseOrder *bool

	StatusFilter string

	StartTimeFilter *swf.ExecutionTimeFilter
	CloseTimeFilter *swf.ExecutionTimeFilter // only closed

	ExecutionFilter   *swf.WorkflowExecutionFilter
	TagFilter         *swf.TagFilter
	TypeFilter        *swf.WorkflowTypeFilter
	CloseStatusFilter *swf.CloseStatusFilter // only closed
}

type FindOutput

type FindOutput struct {
	ExecutionInfos      []*swf.WorkflowExecutionInfo
	OpenNextPageToken   *string
	ClosedNextPageToken *string
}

type Finder

type Finder interface {
	FindAll(*FindInput) (*FindOutput, error)
	FindLatestByWorkflowID(workflowID string) (*swf.WorkflowExecution, error)
	Reset()
}

func NewFinder

func NewFinder(domain string, c ClientSWFOps) Finder

type FuncInterceptor

type FuncInterceptor struct {
	BeforeTaskFn     func(decision *swf.PollForDecisionTaskOutput)
	BeforeDecisionFn func(decision *swf.PollForDecisionTaskOutput, ctx *FSMContext, outcome *Outcome)
	AfterDecisionFn  func(decision *swf.PollForDecisionTaskOutput, ctx *FSMContext, outcome *Outcome)
}

FuncInterceptor is a DecisionInterceptor that you can set handler funcs on. if any are unset, they are no-ops.

func (*FuncInterceptor) AfterDecision

func (i *FuncInterceptor) AfterDecision(decision *swf.PollForDecisionTaskOutput, ctx *FSMContext, outcome *Outcome)

AfterDecision runs the AfterDecisionFn if not nil

func (*FuncInterceptor) BeforeDecision

func (i *FuncInterceptor) BeforeDecision(decision *swf.PollForDecisionTaskOutput, ctx *FSMContext, outcome *Outcome)

BeforeDecision runs the BeforeDecisionFn if not nil

func (*FuncInterceptor) BeforeTask

func (i *FuncInterceptor) BeforeTask(decision *swf.PollForDecisionTaskOutput)

BeforeTask runs the BeforeTaskFn if not nil

type HistorySegment

type HistorySegment struct {
	State                   *HistorySegmentState
	Correlator              *EventCorrelator
	Error                   *SerializedErrorState
	Events                  []*HistorySegmentEvent
	ContinuedExecutionRunId *string
}

type HistorySegmentEvent

type HistorySegmentEvent struct {
	ID         *int64
	Timestamp  *time.Time
	Type       *string
	Attributes *map[string]interface{}
	References []*int64
}

type HistorySegmentState

type HistorySegmentState struct {
	ID        *int64
	Timestamp *time.Time
	Version   *uint64
	Name      *string
	Data      *interface{}
}

type HistorySegmentor

type HistorySegmentor interface {
	FromPage(p *swf.GetWorkflowExecutionHistoryOutput, lastPage bool) (shouldContinue bool)
	OnStart(fn func()) HistorySegmentor
	OnSegment(func(HistorySegment)) HistorySegmentor
	OnPage(fn func()) HistorySegmentor
	OnError(func(error)) HistorySegmentor
	OnFinish(fn func()) HistorySegmentor
}

type JSONStateSerializer

type JSONStateSerializer struct{}

JSONStateSerializer is a StateSerializer that uses go json serialization.

func (JSONStateSerializer) Deserialize

func (j JSONStateSerializer) Deserialize(serialized string, state interface{}) error

Deserialize unmarshalls the given (json) string into the given struct

func (JSONStateSerializer) Serialize

func (j JSONStateSerializer) Serialize(state interface{}) (string, error)

Serialize serializes the given struct to a json string.

type KinesisOps

type KinesisOps interface {
	PutRecord(*kinesis.PutRecordInput) (*kinesis.PutRecordOutput, error)
}

KinesisOps is the subset of kinesis.Kinesis ops required by KinesisReplication

type KinesisReplication

type KinesisReplication struct {
	KinesisStream     string
	KinesisReplicator KinesisReplicator
	KinesisOps        KinesisOps
}

KinesisReplication can be used as a ReplicationHandler by setting its Handler func as the FSM ReplicationHandler

func (*KinesisReplication) Handler

func (f *KinesisReplication) Handler(ctx *FSMContext, decisionTask *swf.PollForDecisionTaskOutput, completedDecision *swf.RespondDecisionTaskCompletedInput, state *SerializedState) error

Handler is a ReplicationHandler. to configure it on your FSM, do fsm.ReplicationHandler = &KinesisReplication{...).Handler

type KinesisReplicator

type KinesisReplicator func(fsm, workflowId string, put func() (*kinesis.PutRecordOutput, error)) (*kinesis.PutRecordOutput, error)

KinesisReplicator lets you customize the retry logic around Replicating State to Kinesis.

type MultiDecisionFunc

type MultiDecisionFunc func(ctx *FSMContext, h *swf.HistoryEvent, data interface{}) []*swf.Decision

MultiDecisionFunc is a building block for composable deciders that returns a [] of decision.

type NewGoroutineDispatcher

type NewGoroutineDispatcher struct {
}

NewGoroutineDispatcher is a DecisionTaskDispatcher that runs the decision handler in a new goroutine.

func (*NewGoroutineDispatcher) DispatchTask

DispatchTask calls the handler in a new goroutine.

type Outcome

type Outcome struct {
	//State is the desired next state in the FSM. the empty string ("") is a signal that you wish decision processing to continue
	//if the FSM machinery recieves the empty string as the state of a final outcome, it will substitute the current state.
	State     string
	Data      interface{}
	Decisions []*swf.Decision
}

Outcome is the result of a Decider processing a HistoryEvent

type PredicateFunc

type PredicateFunc func(data interface{}) bool

PredicateFunc is a building block for composable deciders, a predicate based on the FSM stateData.

type ReplicationHandler

ReplicationHandler can be configured on an FSM and will be called when a DecisionTask is successfully completed. Note that events can be delivered out of order to the ReplicationHandler.

type SWFOps

type SWFOps interface {
	PollForDecisionTaskPages(*swf.PollForDecisionTaskInput, func(*swf.PollForDecisionTaskOutput, bool) bool) error
	RespondDecisionTaskCompleted(*swf.RespondDecisionTaskCompletedInput) (*swf.RespondDecisionTaskCompletedOutput, error)
}

SWFOps is the subset of swf.SWF ops required by the fsm package

type Serialization

type Serialization interface {
	EventData(h *swf.HistoryEvent, data interface{})
	Serialize(data interface{}) string
	StateSerializer() StateSerializer
	Deserialize(serialized string, data interface{})
	InitialState() string
}

Serialization is the contract for de/serializing state inside an FSM, typically implemented by the FSM itself but serves to break the circular dep between FSMContext and FSM.

type SerializedActivityState

type SerializedActivityState struct {
	ActivityId string
	Input      *string
}

Payload of Signals ActivityStartedSignal and ActivityUpdatedSignal

type SerializedErrorState

type SerializedErrorState struct {
	Details                    string
	EarliestUnprocessedEventId int64
	LatestUnprocessedEventId   int64
	ErrorEvent                 *swf.HistoryEvent
}

ErrorState is used as the input to a marker that signifies that the workflow is in an error state.

type SerializedState

type SerializedState struct {
	StateVersion uint64 `json:"stateVersion"`
	StateName    string `json:"stateName"`
	StateData    string `json:"stateData"`
	WorkflowId   string `json:"workflowId"`
}

SerializedState is a wrapper struct that allows serializing the current state and current data for the FSM in a MarkerRecorded event in the workflow history. We also maintain an epoch, which counts the number of times a workflow has been continued, and the StartedId of the DecisionTask that generated this state. The epoch + the id provide a total ordering of state over the lifetime of different runs of a workflow.

type SignalInfo

type SignalInfo struct {
	SignalName string
	WorkflowId string
	Input      *string
}

SignalInfo holds the SignalName and Input for an activity

type StartCancelPair

type StartCancelPair struct {
	// contains filtered or unexported fields
}

type Stasher

type Stasher struct {
	// contains filtered or unexported fields
}

Stasher is used to take snapshots of StateData between each event so that we can have shap

func NewStasher

func NewStasher(dataType interface{}) *Stasher

func (*Stasher) Stash

func (s *Stasher) Stash(data interface{}) *bytes.Buffer

func (*Stasher) Unstash

func (s *Stasher) Unstash(stashed *bytes.Buffer, into interface{})

type StateFunc

type StateFunc func(ctx *FSMContext, h *swf.HistoryEvent, data interface{})

StateFunc is a building block for composable deciders mutates the FSM stateData.

type StateSerializer

type StateSerializer interface {
	Serialize(state interface{}) (string, error)
	Deserialize(serialized string, state interface{}) error
}

StateSerializer defines the interface for serializing state to and deserializing state from the workflow history.

type Taggable

type Taggable interface {
	Tags() []*string
}

FSM Data types that implement this interface will have the resulting tags used by FSMClient when starting workflows and by the FSMContext when calling ContinueWorkflow() it is []*string since thats what SWF api takes atm.

type TaskErrorHandler

type TaskErrorHandler func(decisionTask *swf.PollForDecisionTaskOutput, err error)

TaskErrorHandler is the error handling contract for errors that occur outside of the Decider machinery when handling receiving incoming tasks, sending outgoing decisions for tasks, or replicating state. This handler is called when a decision task has been abandoned and the task will timeout without any further intervention.

type TimerInfo

type TimerInfo struct {
	Control            *string
	TimerId            string
	StartToFireTimeout string
}

TimerInfo holds the Control data from a Timer

type TypedFuncs

type TypedFuncs struct {
	// contains filtered or unexported fields
}

TypedFuncs lets you construct building block for composable deciders, that have arguments that are checked against the type of your FSM stateData.

func Typed

func Typed(typed interface{}) *TypedFuncs

Typed allows you to create Typed building blocks for composable deciders. the type checking here is done on constriction at runtime, so be sure to have a unit test that constructs your funcs.

func (*TypedFuncs) Decider

func (t *TypedFuncs) Decider(decider interface{}) Decider

Decider builds a Decider from your typed Decider that verifies the right typing at construction time.

func (*TypedFuncs) DecisionFunc

func (t *TypedFuncs) DecisionFunc(decisionFunc interface{}) DecisionFunc

DecisionFunc builds a DecisionFunc from your typed DecisionFunc that verifies the right typing at construction time.

func (*TypedFuncs) MultiDecisionFunc

func (t *TypedFuncs) MultiDecisionFunc(decisionFunc interface{}) MultiDecisionFunc

MultiDecisionFunc builds a MultiDecisionFunc from your typed MultiDecisionFunc that verifies the right typing at construction time.

func (*TypedFuncs) PredicateFunc

func (t *TypedFuncs) PredicateFunc(stateFunc interface{}) PredicateFunc

PredicateFunc builds a PredicateFunc from your typed PredicateFunc that verifies the right typing at construction time.

func (*TypedFuncs) StateFunc

func (t *TypedFuncs) StateFunc(stateFunc interface{}) StateFunc

StateFunc builds a StateFunc from your typed StateFunc that verifies the right typing at construction time.

Directories

Path Synopsis
Package jsonpbserializer implements a fsm.StateSerializer that uses jsonpb as the underlying JSON serializer, rather than stdlibs.
Package jsonpbserializer implements a fsm.StateSerializer that uses jsonpb as the underlying JSON serializer, rather than stdlibs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL