poller

package
v0.0.0-...-8e6450d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2020 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package poller contains TaskPollers and PollerShutdownManager to help workaround the 'phantom' task assignment in SWF, by alowing your pollers to wait until any in-flight polls are done to shut down.

TaskPollers

DecisionTaskPoller and ActivityTaskPoller facilitate proper usage of the PollForDecisionTask and PollForActivityTask endpoints in the SWF API. These endpoints are used by DecisionTask and ActivityTask workers to claim tasks on which to work. The endpoints use long polling. SWF will keep the request open for up to 60 seconds before returning an 'empty' response. If a task is generated before that time, a non-empty task is delivered (and assigned to) a particular polling client.

There is an unfortunate bug in SWF that occurs when a long-polling request gets terminated client side, rather than waiting for the SWF API to respond. SWF does not recognize this condition so it can result in assigning a task to a disconnected worker, which will subsequently cause the task to timeout. This is not terrible if the task has a short timeout but can cause big delays if the task does have a long timeout.

Both types of pollers allow you to manage polling yourself by calling Poll() directly. However it is recommended that you use the PollUntilShutdownBy(...) function, which works in concert with a PollerShutdownManager to await all in-flight polls to complete. This facilitates clean shutdown of end user processes.

PollerShutdownManager

When PollerShutdownManager.ShutdownPollers() is called, it will signal any registered pollers to shut down once any in-flight polls have completed, and block until this happens. The shutdown process can take up to 60 seconds due to the length of SWF long polls before an empty response is returned.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ActivityOps

type ActivityOps interface {
	PollForActivityTask(req *swf.PollForActivityTaskInput) (resp *swf.PollForActivityTaskOutput, err error)
}

type ActivityTaskPoller

type ActivityTaskPoller struct {
	Identity string
	Domain   string
	TaskList string
	// contains filtered or unexported fields
}

ActivityTaskPoller polls a given task list in a domain for activity tasks, and sends tasks on its Tasks channel.

func NewActivityTaskPoller

func NewActivityTaskPoller(awc ActivityOps, domain string, identity string, taskList string) *ActivityTaskPoller

NewActivityTaskPoller returns an ActivityTaskPoller.

func (*ActivityTaskPoller) Poll

Poll polls the task list for a task. If there is no task, nil is returned. If an error is encountered, no task is returned.

func (*ActivityTaskPoller) PollUntilShutdownBy

func (p *ActivityTaskPoller) PollUntilShutdownBy(mgr *ShutdownManager, pollerName string, onTask func(*swf.PollForActivityTaskOutput))

PollUntilShutdownBy will poll until signaled to shutdown by the ShutdownManager. this func blocks, so run it in a goroutine if necessary. The implementation calls Poll() and invokes the callback whenever a valid PollForActivityTaskResponse is received.

type DecisionOps

type DecisionOps interface {
	PollForDecisionTaskPages(*swf.PollForDecisionTaskInput, func(*swf.PollForDecisionTaskOutput, bool) bool) error
}

SWFOps is the subset of the swf.SWF api used by pollers

type DecisionTaskPoller

type DecisionTaskPoller struct {
	Identity string
	Domain   string
	TaskList string
	// contains filtered or unexported fields
}

DecisionTaskPoller polls a given task list in a domain for decision tasks.

func NewDecisionTaskPoller

func NewDecisionTaskPoller(dwc DecisionOps, domain string, identity string, taskList string) *DecisionTaskPoller

NewDecisionTaskPoller returns a DecisionTaskPoller whick can be used to poll the given task list.

func (*DecisionTaskPoller) Poll

Poll polls the task list for a task. If there is no task available, nil is returned. If an error is encountered, no task is returned.

func (*DecisionTaskPoller) PollUntilShutdownBy

func (p *DecisionTaskPoller) PollUntilShutdownBy(mgr *ShutdownManager, pollerName string, onTask func(*swf.PollForDecisionTaskOutput), taskReady func(*swf.PollForDecisionTaskOutput) bool)

PollUntilShutdownBy will poll until signaled to shutdown by the PollerShutdownManager. this func blocks, so run it in a goroutine if necessary. The implementation calls Poll() and invokes the callback whenever a valid PollForDecisionTaskResponse is received.

type ShutdownManager

type ShutdownManager struct {
	// contains filtered or unexported fields
}

ShutdownManager facilitates cleanly shutting down pollers when the application decides to exit. When StopPollers() is called it will send to each of the stopChan that have been registered, then recieve from each of the ackChan that have been registered. At this point StopPollers() returns.

func NewShutdownManager

func NewShutdownManager() *ShutdownManager

NewShutdownManager creates a ShutdownManager

func (*ShutdownManager) Deregister

func (p *ShutdownManager) Deregister(name string)

Deregister removes a registered pair of channels from the shutdown manager.

func (*ShutdownManager) Register

func (p *ShutdownManager) Register(name string, stopChan chan bool, ackChan chan bool)

Register registers a named pair of channels to the shutdown manager. Buffered channels please!

func (*ShutdownManager) StopPollers

func (p *ShutdownManager) StopPollers()

StopPollers blocks until it is able to stop all the registered pollers, which can take up to 60 seconds. the registered pollers are cleared once all pollers have acked the stop.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL