pubsub

package module
v0.0.0-...-8c1fed0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2020 License: Apache-2.0 Imports: 8 Imported by: 0

README

pubsub

Publish & Subscribe management in a go process

Documentation

Index

Constants

View Source
const (
	// Interval describes how frequently we should be scanning for new messages
	// from the hub.
	Interval time.Duration = time.Millisecond * 10
)

Variables

View Source
var (
	// ErrComplete is used as a sentinel error to identify when a worker has
	// been run.
	ErrComplete = errors.New("completed run")

	// ErrTimeout is a sentinel error to identify when a worker has timed out.
	ErrTimeout = errors.New("timed out")
)
View Source
var Exponential = func(max time.Duration) func(task.BackoffOptions) {
	return func(backoff task.BackoffOptions) {
		amount := 1
		backoff.SetBackoff(func(n int, t time.Duration) time.Duration {

			if n <= 1 {
				amount = n
				return t
			}

			base := 2.0
			b := t * time.Duration(math.Pow(base, float64(amount)))
			if b >= max {
				return max
			}
			amount = n
			return b
		})
	}
}

Exponential describes a backoff function that grows exponentially with time.

The max time duration is to limit the duration to an upper cap, to prevent stalling the hub completely.

Functions

func Any

func Any() func(string) bool

Any will always return true when matching a topic

func Contains

func Contains(value string) func(string) bool

Contains will match a topic that contains a certain string

func EndsWith

func EndsWith(suffix string) func(string) bool

EndsWith will match a topic that ends with a certain string

func Forward

func Forward(hub *Hub, other *Hub) func(time.Duration) error

Forward all messages from one hub to another. Useful to cross boundaries.

func ForwardMatcher

func ForwardMatcher(matcher TopicMatcher, hub *Hub, other *Hub) func(time.Duration) error

ForwardMatcher all messages that match the matcher from one hub to another. Useful to cross boundaries.

func Loop

func Loop(fn task.Func) (func() error, error)

Loop takes a task.Func and iterates until cancel is called. This is useful for exhausting a subscriber without caring about a schedule or interval.

func Match

func Match(topic string) func(string) bool

Match with attempt to match a topic with another topic using exactly the same topic.

func Multiplexer

func Multiplexer(hub *Hub, subs ...Sub) func(time.Duration) error

Multiplexer forwards multiple subscribers to one singular hub.

func Never

func Never() func(string) bool

Never will never match a topic

func StartsWith

func StartsWith(prefix string) func(string) bool

StartsWith will match a topic that starts with a certain string

Types

type Event

type Event struct {
	// contains filtered or unexported fields
}

Event represents a typed message when is dispatched with in the hub to a set of subscribers of the hub.

type Group

type Group struct {
	// contains filtered or unexported fields
}

Group represents a group where all subscribers are treated fairly in terms of task consumption.

func NewGroup

func NewGroup(interval time.Duration) *Group

NewGroup creates a new hub where all subscribers are treated equally in terms of fairness.

func (*Group) Close

func (g *Group) Close(timeout time.Duration) error

Close will close all underlying subscribers then stop all the tasks in the group. Starting the group again will be pointless as the hub will be left with no subscribers after closing.

func (*Group) Publish

func (g *Group) Publish(topic string, data interface{}) <-chan struct{}

Publish will notify all the subscribers that are interested by calling their handler function.

The data is passed through to each Subscriber untouched. Note that all subscribers are notified in parallel, and that no modification should be done to the data or data races will occur.

The channel return value is closed when all the subscribers have been notified of the event.

func (*Group) Start

func (g *Group) Start(ctx context.Context) error

Start all the tasks in the group.

func (*Group) Stop

func (g *Group) Stop(timeout time.Duration) error

Stop all tasks in the group.

In case the given timeout expires before all tasks complete, this method exits immediately and returns an error, otherwise it returns nil.

func (*Group) Subscribe

func (g *Group) Subscribe(topic string, handler func(string, interface{})) *GroupSubscriber

Subscribe to a topic with a handler function. If the topic is the same as the published topic, the handler function is called with the published topic and the associated data.

The return value is a Subscriber that will unsubscribe the caller from the hub, for this subscription.

func (*Group) SubscribeMatch

func (g *Group) SubscribeMatch(matcher TopicMatcher, handler func(string, interface{})) *GroupSubscriber

SubscribeMatch takes a function that determins whether the topic matches, and a handler function. If the matcher matches the published topic, the handler function is called with the published topic and the associated data.

The return value is a Subscriber that will unsubscribe the caller from the hub, for this subscription.

type GroupSubscriber

type GroupSubscriber struct {
	// contains filtered or unexported fields
}

GroupSubscriber represents a subscription to the hub.

func (*GroupSubscriber) Close

func (s *GroupSubscriber) Close()

Close will ensure that any pending events will be closed out and the associated blocking actions are collapsed.

func (*GroupSubscriber) Unsubscribe

func (s *GroupSubscriber) Unsubscribe() error

Unsubscribe attempts to unsubscribe from the hub, if the subscriber is found within the hub, then a error is returned.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub provides the base functionality of dealing with subscribers, and the notification of subscribers of events.

func Forever

func Forever(topic string, handler func(string, interface{})) (*Hub, func(time.Duration) error)

Forever will attempt subscribe to a hub forever.

This is a convenience around starting a task that will always consume messages when publishing events.

func New

func New(options ...HubOption) *Hub

New creates a Hub for others to utilise.

func (*Hub) Close

func (h *Hub) Close()

Close will close any outstanding subscriber messages and shut down each subscriber.

At the end, the hub will have no subscribers listening and can be seen as invalid for use.

func (*Hub) Publish

func (h *Hub) Publish(topic string, data interface{}) <-chan struct{}

Publish will notify all the subscribers that are interested by calling their handler function.

The data is passed through to each Subscriber untouched. Note that all subscribers are notified in parallel, and that no modification should be done to the data or data races will occur.

The channel return value is closed when all the subscribers have been notified of the event.

func (*Hub) Subscribe

func (h *Hub) Subscribe(topic string, handler func(string, interface{})) *Subscriber

Subscribe to a topic with a handler function. If the topic is the same as the published topic, the handler function is called with the published topic and the associated data.

The return value is a Subscriber that will unsubscribe the caller from the hub, for this subscription.

func (*Hub) SubscribeMatch

func (h *Hub) SubscribeMatch(matcher TopicMatcher, handler func(string, interface{})) *Subscriber

SubscribeMatch takes a function that determins whether the topic matches, and a handler function. If the matcher matches the published topic, the handler function is called with the published topic and the associated data.

The return value is a Subscriber that will unsubscribe the caller from the hub, for this subscription.

type HubOption

type HubOption func(HubOptions)

HubOption captures a tweak that can be applied to the Hub.

type HubOptions

type HubOptions interface {
	SetMetrics(Metrics)
}

HubOptions represents a way to set optional values to a hub option. The HubOptions shows what options are available to change.

type Life

type Life int

Life describes the life cycle of a subscriber, to prevent a subscriber being used when it's dead.

const (
	// Alive states that the subscriber is available to use.
	Alive Life = iota

	// Dead states that the subscriber is dead and shouldn't be used any more.
	Dead
)

type Metrics

type Metrics interface {

	// Subscribe is called if a subscription to the hub is called.
	Subscribe()

	// Unsubscribe is called when a subscription is removed from the hub.
	Unsubscribe()

	// EventPublished reports when an event topic is published.
	//
	// The event topic can be used for a label value to allow pivoting of the
	// data. Alternatively it can just be used as a gauged counter.
	EventPublished(string)

	// EventHandled reports when an event topic is handled by the subscriber.
	//
	// The event topic can be used for a label value to allow pivoting of the
	// data. Alternatively it can just be used as a gauged counter.
	EventHandled(string)

	// RunDuration identifies how long a run takes in seconds.
	RunDuration(float64)
}

Metrics defines an interface for implementing different metric aggregators for a hub.

The interface allows the implementation of various metrics, either no-op metrics or prometheus.

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node represents a type queue node. In other languages this would be simply a generic tuple of type <T>

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue represents a single instance of a queue structure. Under the hood, it implements a first in, first out (FIFO), data structure.

No concurrency guarantees are created for the queue structure, it is expected to roll your own when interacting with the Queue.

func NewQueue

func NewQueue() *Queue

NewQueue creates a queue.

func (*Queue) Len

func (f *Queue) Len() int

Len returns the number of elements currently stored in the queue.

func (*Queue) Pop

func (f *Queue) Pop() (Node, bool)

Pop removes and returns the element from the front of the queue. Implements FIFO when used with Push().

Returns false if the queue is empty

func (*Queue) Push

func (f *Queue) Push(event Node)

Push appends an element to the back of the queue. Implements FIFO when elements are removed with Pop

type Sub

type Sub interface {

	// Run creates a task and a schedule to perform the consumption of messages
	// sent to the subscriber from the origin.
	Run(interval time.Duration) (task.Func, task.Schedule)
}

Sub defines a type that the multiplexer can run.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber represents a subscription to the hub.

func (*Subscriber) Close

func (s *Subscriber) Close()

Close will ensure that any pending events will be closed out and the associated blocking actions are collapsed.

func (*Subscriber) Run

func (s *Subscriber) Run(interval time.Duration) (task.Func, task.Schedule)

Run creates a task and a schedule to perform the consumption of messages sent to the subscriber from the origin. The interval parameter allows for prioritization of each subscriber independently using the internval time duration. The aim it to provide fair-ness or unfair-ness at a user defined API level. The downside to all of this, is that management of a subscriber is then put on the onus of the callee.

func (*Subscriber) Unsubscribe

func (s *Subscriber) Unsubscribe() error

Unsubscribe attempts to unsubscribe from the hub, if the subscriber is found within the hub, then a error is returned.

type TopicMatcher

type TopicMatcher func(string) bool

TopicMatcher defines a type that can be used for matching topics when dispatching through the hub.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL