activity

package
v0.0.0-...-8e6450d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2020 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Examples

Constants

View Source
const (
	TaskGone      = "Unknown activity"
	ExecutionGone = "Unknown execution"
)
View Source
const (
	FailureReasonMaxChars = 256
)

Various constants defined by SWF

Variables

This section is empty.

Functions

This section is empty.

Types

type ActivityHandler

type ActivityHandler struct {
	Activity    string
	HandlerFunc ActivityHandlerFunc
	Input       interface{}
}

func NewActivityHandler

func NewActivityHandler(activity string, handler interface{}) *ActivityHandler

func (*ActivityHandler) ZeroInput

func (a *ActivityHandler) ZeroInput() interface{}

type ActivityHandlerFunc

type ActivityHandlerFunc func(activityTask *swf.PollForActivityTaskOutput, input interface{}) (interface{}, error)

type ActivityInterceptor

type ActivityInterceptor interface {
	BeforeTask(*swf.PollForActivityTaskOutput)
	AfterTask(t *swf.PollForActivityTaskOutput, result interface{}, err error) (interface{}, error)
	AfterTaskComplete(t *swf.PollForActivityTaskOutput, result interface{})
	AfterTaskFailed(t *swf.PollForActivityTaskOutput, err error)
	AfterTaskCanceled(t *swf.PollForActivityTaskOutput, details string)
}

ActivityInterceptor allows manipulation of the decision task and the outcome at key points in the task lifecycle.

func NewComposedDecisionInterceptor

func NewComposedDecisionInterceptor(interceptors ...ActivityInterceptor) ActivityInterceptor

type ActivityTaskCanceledError

type ActivityTaskCanceledError struct {
	// contains filtered or unexported fields
}

func (ActivityTaskCanceledError) Details

func (e ActivityTaskCanceledError) Details() *string

func (ActivityTaskCanceledError) Error

type ActivityTaskDispatcher

type ActivityTaskDispatcher interface {
	DispatchTask(*swf.PollForActivityTaskOutput, func(*swf.PollForActivityTaskOutput))
}

ActivityTaskDispatcher is used by the ActivityWorker machinery to dispatch the handling of ActivityTasks. Different implementations can provide different concurrency models.

type ActivityWorker

type ActivityWorker struct {
	Serializer       fsm.StateSerializer
	SystemSerializer fsm.StateSerializer
	// Domain of the workflow associated with the FSM.
	Domain string
	// TaskList that the underlying poller will poll for decision tasks.
	TaskList string
	// Identity used in PollForActivityTaskRequests, can be empty.
	Identity string
	// Client used to make SWF api requests.
	SWF SWFOps

	// ShutdownManager
	ShutdownManager *poller.ShutdownManager
	// ActivityTaskDispatcher
	ActivityTaskDispatcher ActivityTaskDispatcher
	// ActivityInterceptor
	ActivityInterceptor ActivityInterceptor
	// allow panics in activities rather than recovering and failing the activity, useful for testing
	AllowPanics bool
	// reads the EventCorrelator and backs off based on what retry # the activity is.
	BackoffOnFailure bool
	// maximum backoff sleep on retries that fail.
	MaxBackoffSeconds int
	// contains filtered or unexported fields
}
Example
var swfOps SWFOps

taskList := "aTaskListSharedBetweenTaskOneAndTwo"

handleTask1 := func(task *swf.PollForActivityTaskOutput, input interface{}) (interface{}, error) {
	return input, nil
}

handleTask2 := func(task *swf.PollForActivityTaskOutput, input interface{}) (interface{}, error) {
	return input, nil
}

handler1 := &ActivityHandler{Activity: "one", HandlerFunc: handleTask1}

handler2 := &ActivityHandler{Activity: "two", HandlerFunc: handleTask2}

worker := &ActivityWorker{
	Domain:     "swf-domain",
	Serializer: fsm.JSONStateSerializer{},
	TaskList:   taskList,
	SWF:        swfOps,
	Identity:   "test-activity-worker",
}

worker.AddHandler(handler1)

worker.AddHandler(handler2)

go worker.Start()
Output:

func (*ActivityWorker) AddCoordinatedHandler

func (w *ActivityWorker) AddCoordinatedHandler(heartbeatInterval, tickMinInterval time.Duration, handler *CoordinatedActivityHandler)

AddCoordinatedHandler automatically takes care of sending back heartbeats and updating state on workflows for an activity task. tickMinInterval determines the max rate at which the CoordinatedActivityHandler.Tick function will be called.

For example, when the Tick function returns quickly (e.g.: noop), and tickMinInterval is 1 * time.Second, Tick is guaranteed to be called at most once per second. The rate can be slower if Tick takes more than tickMinInterval to complete.

func (*ActivityWorker) AddHandler

func (a *ActivityWorker) AddHandler(handler *ActivityHandler)

func (*ActivityWorker) HandleActivityTask

func (a *ActivityWorker) HandleActivityTask(activityTask *swf.PollForActivityTaskOutput)

HandleActivityTask is the callback passed into the registered ActivityTaskDispatcher. It is exposed so that users can handle polling themselves and call DispatchTask directly with this as the callback.

e.g. activityWorker.ActivityTaskDispatcher.DispatchTask(activityTask, a.HandleWithRecovery(a.HandleActivityTask))

Note: You will need to handle recovering from panics if you call this directly without wrapping with HandleWithRecovery.

func (*ActivityWorker) HandleWithRecovery

func (h *ActivityWorker) HandleWithRecovery(handler func(*swf.PollForActivityTaskOutput)) func(*swf.PollForActivityTaskOutput)

HandleWithRecovery is used to wrap handler functions (such as HandleActivityTask) so they gracefully recover from panics.

func (*ActivityWorker) Init

func (a *ActivityWorker) Init()

func (*ActivityWorker) Start

func (a *ActivityWorker) Start()

type BoundedGoroutineDispatcher

type BoundedGoroutineDispatcher struct {
	NumGoroutines int
	// contains filtered or unexported fields
}

BoundedGoroutineDispatcher is a DecisionTaskDispatcher that uses a bounded number of goroutines to run decision handlers.

func (*BoundedGoroutineDispatcher) DispatchTask

DispatchTask calls sends the task on a channel that NumGoroutines goroutines are selecting on. Goroutines recieving a task run it in the same goroutine. note that this is unsynchronized as DispatchTask will only be called by the single poller goroutine.

type CallingGoroutineDispatcher

type CallingGoroutineDispatcher struct{}

CallingGoroutineDispatcher is a DecisionTaskDispatcher that runs the decision handler in the polling goroutine

func (*CallingGoroutineDispatcher) DispatchTask

DispatchTask calls the handler in the same goroutine.

type ComposedDecisionInterceptor

type ComposedDecisionInterceptor struct {
	// contains filtered or unexported fields
}

func (*ComposedDecisionInterceptor) AfterTask

func (c *ComposedDecisionInterceptor) AfterTask(t *swf.PollForActivityTaskOutput, result interface{}, err error) (interface{}, error)

func (*ComposedDecisionInterceptor) AfterTaskCanceled

func (c *ComposedDecisionInterceptor) AfterTaskCanceled(t *swf.PollForActivityTaskOutput, details string)

func (*ComposedDecisionInterceptor) AfterTaskComplete

func (c *ComposedDecisionInterceptor) AfterTaskComplete(t *swf.PollForActivityTaskOutput, result interface{})

func (*ComposedDecisionInterceptor) AfterTaskFailed

func (c *ComposedDecisionInterceptor) AfterTaskFailed(t *swf.PollForActivityTaskOutput, err error)

func (*ComposedDecisionInterceptor) BeforeTask

type CoordinatedActivityHandler

type CoordinatedActivityHandler struct {
	// Start is called when a new activity is ready to be handled.
	Start CoordinatedActivityHandlerStartFunc

	// Tick is called regularly to process a running activity.
	// Tick that returns true, nil, nil just expresses that the job is still running.
	// Tick that returns true, &SomeStruct{}, nil will express that the job is still running and also send an 'ActivityUpdated' signal back to the FSM with SomeStruct{} as the Input.
	// Tick that returns false, &SomeStruct{}, nil, expresses that the job/activity is done and send SomeStruct{} back as the result. as well as stops heartbeating.
	// Tick that returns false, nil, nil, expresses that the job is done and send no result back, as well as stops heartbeating.
	// Tick that returns false, nil, err expresses that the job/activity failed and sends back err as the reason. as well as stops heartbeating.
	Tick CoordinatedActivityHandlerTickFunc

	// Cancel is called when a running activity receives a request to cancel
	// via heartbeat update.
	Cancel CoordinatedActivityHandlerCancelFunc

	// Finish is called at the end of handling every activity.
	// It is called no matter the outcome, eg if Start fails,
	// Tick decides to stop continuing, or the activity is canceled.
	Finish CoordinatedActivityHandlerFinishFunc

	Input    interface{}
	Activity string
}

func NewCoordinatedActivityHandler

func NewCoordinatedActivityHandler(activity string, start interface{}, tick interface{}, cancel interface{}, finish interface{}) *CoordinatedActivityHandler

type CoordinatedActivityHandlerCancelFunc

type CoordinatedActivityHandlerCancelFunc func(*swf.PollForActivityTaskOutput, interface{}) error

type CoordinatedActivityHandlerFinishFunc

type CoordinatedActivityHandlerFinishFunc func(*swf.PollForActivityTaskOutput, interface{}) error

type CoordinatedActivityHandlerStartFunc

type CoordinatedActivityHandlerStartFunc func(*swf.PollForActivityTaskOutput, interface{}) (interface{}, error)

type CoordinatedActivityHandlerTickFunc

type CoordinatedActivityHandlerTickFunc func(*swf.PollForActivityTaskOutput, interface{}) (bool, interface{}, error)

type CountdownGoroutineDispatcher

type CountdownGoroutineDispatcher struct {
	Stop    chan bool
	StopAck chan bool
	// contains filtered or unexported fields
}

CountdownGoroutineDispatcher is a dispatcher that you can register with a ShutdownManager. Used in your ActivityWorkers, it will count in-flight activities. It doesnt ack shutdowns until the number of in-flight activities are zero.

func RegisterNewCountdownGoroutineDispatcher

func RegisterNewCountdownGoroutineDispatcher(mgr poller.ShutdownManager) *CountdownGoroutineDispatcher

RegisterNewCountdownGoroutineDispatcher constructs a new CountdownGoroutineDispatcher, start it and register it with the given ShutdownManager

func (*CountdownGoroutineDispatcher) DispatchTask

func (*CountdownGoroutineDispatcher) Start

func (m *CountdownGoroutineDispatcher) Start()

type FuncInterceptor

type FuncInterceptor struct {
	BeforeTaskFn        func(*swf.PollForActivityTaskOutput)
	AfterTaskFn         func(t *swf.PollForActivityTaskOutput, result interface{}, err error) (interface{}, error)
	AfterTaskCompleteFn func(t *swf.PollForActivityTaskOutput, result interface{})
	AfterTaskFailedFn   func(t *swf.PollForActivityTaskOutput, err error)
	AfterTaskCanceledFn func(t *swf.PollForActivityTaskOutput, details string)
}

FuncInterceptor is a ActivityInterceptor that you can set handler funcs on. if any are unset, they are no-ops.

func (*FuncInterceptor) AfterTask

func (i *FuncInterceptor) AfterTask(activity *swf.PollForActivityTaskOutput, result interface{}, err error) (interface{}, error)

func (*FuncInterceptor) AfterTaskCanceled

func (i *FuncInterceptor) AfterTaskCanceled(activity *swf.PollForActivityTaskOutput, details string)

AfterTaskCanceled runs the AfterTaskCanceledFn if not nil

func (*FuncInterceptor) AfterTaskComplete

func (i *FuncInterceptor) AfterTaskComplete(activity *swf.PollForActivityTaskOutput, result interface{})

AfterTaskComplete runs the AfterTaskCompleteFn if not nil

func (*FuncInterceptor) AfterTaskFailed

func (i *FuncInterceptor) AfterTaskFailed(activity *swf.PollForActivityTaskOutput, err error)

AfterTaskFailed runs the AfterTaskFailedFn if not nil

func (*FuncInterceptor) BeforeTask

func (i *FuncInterceptor) BeforeTask(activity *swf.PollForActivityTaskOutput)

BeforeTask runs the BeforeTaskFn if not nil

type NewGoroutineDispatcher

type NewGoroutineDispatcher struct {
}

NewGoroutineDispatcher is a DecisionTaskDispatcher that runs the decision handler in a new goroutine.

func (*NewGoroutineDispatcher) DispatchTask

DispatchTask calls the handler in a new goroutine.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL