Version: v0.6.3 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2021 License: MIT Imports: 11 Imported by: 1




View Source
const (
	DefaultMaxWorkers        = 10
	DefaultMessageBuffer     = 10
	DefaultMinWorkers        = 1
	DefaultIdleness          = time.Minute
	DefaultMessageAcceptWait = time.Second / 2


This section is empty.


This section is empty.


type Action

type Action interface {
	Do(ctx context.Context, j Job)

type ActionFunc

type ActionFunc func(ctx context.Context, job Job)

func (ActionFunc) Do

func (a ActionFunc) Do(ctx context.Context, job Job)

type ActionHub

type ActionHub struct {
	Pubsub         sabuhp.MessageBus
	Relay          *sabuhp.BusRelay
	Logger         sabuhp.Logger
	Injector       *injectors.Injector
	WorkRegistry   *WorkerTemplateRegistry
	Context        context.Context
	CancelFn       context.CancelFunc
	EscalationFunc EscalationNotification
	// contains filtered or unexported fields

ActionHub is a worker pool supervisor which sits to manage the execution of functions deployed for processing of commands asynchronously, each responding with another message if needed.

func NewActionHub

func NewActionHub(
	ctx context.Context,
	escalationHandler EscalationNotification,
	templates *WorkerTemplateRegistry,
	injector *injectors.Injector,
	pubsub sabuhp.MessageBus,
	relay *sabuhp.BusRelay,
	logger sabuhp.Logger,
) *ActionHub

NewActionHub returns a new instance of a ActionHub.

func (*ActionHub) Do

func (ah *ActionHub) Do(actionName string, creator WorkGroupCreator, slaves ...SlaveWorkerRequest) error

Do registers giving action (based on actionName) with using action name as address with the pubsub for receiving commands.

Actions generally are unique self contained pure functions describing special case behaviours that perform some action and can if required respond with other commands in return. There payloads are specific and known at implementation time, so that the contract of what an action does are clear.

Generally Actions based on configuration of their work group templates will scaling accordingly if possible to meet processing needs based on worker group constraints. Hence generally unless special cases occur creating multiple worker workerGroupInstances of the same action is not necessary.

Do be aware the actionName is very unique and must be different from others both in the workers template registry as well.

func (*ActionHub) DoAlias

func (ah *ActionHub) DoAlias(
	actionName string,
	pubsubTopic string,
	creator WorkGroupCreator,
	slaves ...SlaveWorkerRequest,
) error

DoAlias registers a WorkerCreator based on the action name but instead of using the action name as the pubsub topic, uses the value of pubsubTopic provided allowing you to link an action work group to an external pre-defined topic.

We always encourage the actionName to be the topic because they based on configuration are able to scale without the need to spawn separate work groups. But there will be cases for the need to provide support to handle outside events that may not necessary be termed with the action name due to lack of control or due to integration, hence DoAlias exists for this.

The general rule is when you can control the topic name, use the actionName, and if you don't know which to use, then its a safe bet to say: just use the action name, hence using ActionHub.Do.

func (*ActionHub) Start

func (ah *ActionHub) Start()

func (*ActionHub) Stats

func (ah *ActionHub) Stats() []WorkerStat

func (*ActionHub) Stop

func (ah *ActionHub) Stop()

Stop ends the action hub and ends all operation. This also makes this instance unusable.

func (*ActionHub) Wait

func (ah *ActionHub) Wait()

type BehaviourType

type BehaviourType int
const (
	DoNothing BehaviourType = iota

type Escalation

type Escalation struct {
	// Err is the generated error with tracing data.
	Err error

	// Additional data to be attached, could be the returned value
	// of recover() for a panic protocol.
	Data interface{}

	// WorkerProtocol is the escalation protocol communicated by the worker.
	WorkerProtocol WorkerProtocol

	// GroupProtocol is the escalation protocol being used by the worker group
	// for handling the worker protocol.
	GroupProtocol EscalationProtocol

	// PendingMessages are the commands left to be processed when
	// an escalation occurred. It's only set when it's a KillAndEscalate
	// protocol.
	PendingMessages chan *sabuhp.Message

	// OffendingMessage is the message which caused the PanicProtocol
	// Only has a value when it's a PanicProtocol.
	OffendingMessage *sabuhp.Message

type EscalationNotification

type EscalationNotification func(escalation Escalation, hub *ActionHub)

type EscalationProtocol

type EscalationProtocol int
const (
	RestartProtocol EscalationProtocol = iota

type InstanceType

type InstanceType int
const (
	NullInstance InstanceType = iota

type Job

type Job struct {
	To        string
	DI        *injectors.Injector
	Msg       *sabuhp.Message
	Transport sabuhp.Transport

type MasterWorkerGroup

type MasterWorkerGroup struct {
	Master *WorkerGroup
	Slaves map[string]*WorkerGroup

MasterWorkerGroup implements a group of worker-group nodes where a master node has specific actions group along as dependent actions where the death of the master leads to the death of the slaves.

func (*MasterWorkerGroup) AddSlave

func (mg *MasterWorkerGroup) AddSlave(slave *WorkerGroup)

func (*MasterWorkerGroup) Start

func (mg *MasterWorkerGroup) Start()

func (*MasterWorkerGroup) Stats

func (mg *MasterWorkerGroup) Stats() []WorkerStat

func (*MasterWorkerGroup) Stop

func (mg *MasterWorkerGroup) Stop()

type SlaveWorkerRequest

type SlaveWorkerRequest struct {
	// ActionName represent the action name for this
	// request work group, which identifiers in cases
	// where the PubSubAddress has no value is the
	// topic to listen for commands on.
	ActionName string

	GroupName string

	// WorkerCreator is the creation function which will be supplied
	// the initial WorkerConfig which should be populated accordingly
	// by the creator to create and return a WorkerGroup which will
	// service all commands for this action.
	Action Action

	// Attributes which are allowed for configuration in the WorkerGroupCreator
	// as slaves are very special, users do not have rights to decide their behaviour
	// and instance types.
	MessageBufferSize   int
	MinWorker           int
	MaxWorkers          int
	MaxIdleness         time.Duration
	MessageDeliveryWait time.Duration

type WorkGroupCreator

type WorkGroupCreator func(config WorkerConfig) *WorkerGroup

WorkGroupCreator exposes a method which takes a WorkerConfig which the function can modify as it sees fit and return a WorkerGroup which will be managed by the a ActionHub.

type WorkRequest

type WorkRequest struct {
	Ctx       context.Context
	Message   *sabuhp.Message
	Transport sabuhp.Transport

type WorkerConfig

type WorkerConfig struct {
	ActionName             string
	Addr                   string
	MessageBufferSize      int
	Action                 Action
	MinWorker              int
	MaxWorkers             int
	Injector               *injectors.Injector
	Behaviour              BehaviourType
	Instance               InstanceType
	Context                context.Context
	EscalationNotification WorkerEscalationNotification
	MaxIdleness            time.Duration
	MessageDeliveryWait    time.Duration

WorkerConfig contains necessary properties required by a Worker

An important notice to be given is to ensure any blocking operation in the WorkerConfig.EscalationNotification is shot into a goroutine, else this will block the WorkerGroup's internal run loop. The responsibility is shifted to the user to provide a more concise, expected behaviour, hence the user should be aware.

type WorkerEscalationNotification

type WorkerEscalationNotification func(escalation *Escalation, wk *WorkerGroup)

type WorkerGroup

type WorkerGroup struct {
	// contains filtered or unexported fields

WorkerGroup embodies a small action based workgroup which at their default state are scaling functions for execution across their maximum allowed range. WorkerGroup provide other settings like SingleInstance where only one function is allowed or OneTime instance type where for a function runs once and dies off.

func NewWorkGroup

func NewWorkGroup(config WorkerConfig) *WorkerGroup

func (*WorkerGroup) Ctx

func (w *WorkerGroup) Ctx() context.Context

func (*WorkerGroup) HandleMessage

func (w *WorkerGroup) HandleMessage(ctx context.Context, message sabuhp.Message, t sabuhp.Transport) error

func (*WorkerGroup) Start

func (w *WorkerGroup) Start()

func (*WorkerGroup) Stats

func (w *WorkerGroup) Stats() WorkerStat

func (*WorkerGroup) Stop

func (w *WorkerGroup) Stop()

func (*WorkerGroup) Wait

func (w *WorkerGroup) Wait()

Wait block till the group is stopped or killed

func (*WorkerGroup) WaitRestart

func (w *WorkerGroup) WaitRestart()

WaitRestart will block if there is a restart process occurring when it's called.

type WorkerProtocol

type WorkerProtocol int
const (
	PanicProtocol WorkerProtocol = iota

type WorkerRequest

type WorkerRequest struct {
	// Err provides a means of response detailing possible error
	// that occurred during the processing of creating this worker.
	Err chan error

	// ActionName represent the action name for this
	// request work group, which identifiers in cases
	// where the PubSubTopic has no value is the
	// topic to listen for commands on.
	ActionName string

	// PubSubTopic provides alternate means of redirecting
	// a specific unique topic to this action worker, where
	// commands for these are piped accordingly.
	PubSubTopic string

	// PubSubGroup indicates the target group for which the
	// action will be added into. Similar to a Kafka ConsumerGroup.
	PubSubGroup string

	// WorkerCreator is the creation function which will be supplied
	// the initial WorkerConfig which should be populated accordingly
	// by the creator to create and return a WorkerGroup which will
	// service all commands for this action.
	WorkerCreator WorkGroupCreator

	// Slaves are personalized workers we want generated with this worker.
	// Slaves have strict naming format which will be generated automatically
	// internally.
	// Name format: [MasterActionName]/slaves/[SlaveActionName]
	// WARNING: Use sparingly, most of your use case are doable with shared
	// workgroups.
	// There are cases where we want a worker specific for a work group
	// which can be used to hand off specific tasks to handle very
	// special cases, this then allows specific and limited use
	// around non-generalist work group slaves.
	// Example of such is a dangerous operation that is not intrinsic to
	// the behaviour of the action but must be done, this can be offloaded
	// a dangerous task or non-secure operation to the slaves who will
	// always be running and are never ever going die because they will be
	// restarted and respawned.
	// They will exists as far as the master exists.
	Slaves []SlaveWorkerRequest

WorkerRequest defines a request template used by the ActionHub to request the creation of a worker group for a giving action.

func NewWorker added in v0.5.0

func NewWorker(name string, topic string, grp string, action Action) WorkerRequest

type WorkerStat

type WorkerStat struct {
	Addr                    string
	MaxWorkers              int
	MinWorkers              int
	TotalMessageReceived    int
	TotalMessageProcessed   int
	TotalEscalations        int
	TotalPanics             int
	TotalRestarts           int
	AvailableWorkerCapacity int
	TotalCurrentWorkers     int
	TotalCreatedWorkers     int
	TotalKilledWorkers      int
	TotalIdledWorkers       int
	Instance                InstanceType
	BehaviourType           BehaviourType

func (WorkerStat) EncodeObject

func (w WorkerStat) EncodeObject(encoder npkg.ObjectEncoder)

type WorkerStats

type WorkerStats []WorkerStat

func (WorkerStats) EncodeList

func (items WorkerStats) EncodeList(encoder npkg.ListEncoder)

type WorkerTemplateRegistry

type WorkerTemplateRegistry struct {
	// contains filtered or unexported fields

WorkerTemplateRegistry implements a registry of WorkerRequest templates which predefine specific ActionWorkerRequests that define what should be created to handle work for such actions.

It provides the ActionHub a predefined set of templates to boot up action worker groups based on commands for specific addresses.

func NewWorkerTemplateRegistry

func NewWorkerTemplateRegistry() *WorkerTemplateRegistry

NewWorkerTemplateRegistry returns a new instance of the WorkerTemplateRegistry.

func (*WorkerTemplateRegistry) Delete

func (ar *WorkerTemplateRegistry) Delete(actionName string)

func (*WorkerTemplateRegistry) Has

func (ar *WorkerTemplateRegistry) Has(actionName string) bool

func (*WorkerTemplateRegistry) Register

func (ar *WorkerTemplateRegistry) Register(template WorkerRequest)

func (*WorkerTemplateRegistry) Template

func (ar *WorkerTemplateRegistry) Template(actionName string) WorkerRequest

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL