Documentation
¶
Index ¶
- Constants
- Variables
- func Any() func(string) bool
- func Contains(value string) func(string) bool
- func EndsWith(suffix string) func(string) bool
- func Forward(hub *Hub, other *Hub) func(time.Duration) error
- func ForwardMatcher(matcher TopicMatcher, hub *Hub, other *Hub) func(time.Duration) error
- func Loop(fn task.Func) (func() error, error)
- func Match(topic string) func(string) bool
- func Multiplexer(hub *Hub, subs ...Sub) func(time.Duration) error
- func Never() func(string) bool
- func StartsWith(prefix string) func(string) bool
- type Event
- type Group
- func (g *Group) Close(timeout time.Duration) error
- func (g *Group) Publish(topic string, data interface{}) <-chan struct{}
- func (g *Group) Start(ctx context.Context) error
- func (g *Group) Stop(timeout time.Duration) error
- func (g *Group) Subscribe(topic string, handler func(string, interface{})) *GroupSubscriber
- func (g *Group) SubscribeMatch(matcher TopicMatcher, handler func(string, interface{})) *GroupSubscriber
- type GroupSubscriber
- type Hub
- type HubOption
- type HubOptions
- type Life
- type Metrics
- type Node
- type Queue
- type Sub
- type Subscriber
- type TopicMatcher
Constants ¶
const ( // Interval describes how frequently we should be scanning for new messages // from the hub. Interval time.Duration = time.Millisecond * 10 )
Variables ¶
var ( // ErrComplete is used as a sentinel error to identify when a worker has // been run. ErrComplete = errors.New("completed run") // ErrTimeout is a sentinel error to identify when a worker has timed out. ErrTimeout = errors.New("timed out") )
var Exponential = func(max time.Duration) func(task.BackoffOptions) { return func(backoff task.BackoffOptions) { amount := 1 backoff.SetBackoff(func(n int, t time.Duration) time.Duration { if n <= 1 { amount = n return t } base := 2.0 b := t * time.Duration(math.Pow(base, float64(amount))) if b >= max { return max } amount = n return b }) } }
Exponential describes a backoff function that grows exponentially with time.
The max time duration is to limit the duration to an upper cap, to prevent stalling the hub completely.
Functions ¶
func ForwardMatcher ¶
ForwardMatcher all messages that match the matcher from one hub to another. Useful to cross boundaries.
func Loop ¶
Loop takes a task.Func and iterates until cancel is called. This is useful for exhausting a subscriber without caring about a schedule or interval.
func Multiplexer ¶
Multiplexer forwards multiple subscribers to one singular hub.
func StartsWith ¶
StartsWith will match a topic that starts with a certain string
Types ¶
type Event ¶
type Event struct {
// contains filtered or unexported fields
}
Event represents a typed message when is dispatched with in the hub to a set of subscribers of the hub.
type Group ¶
type Group struct {
// contains filtered or unexported fields
}
Group represents a group where all subscribers are treated fairly in terms of task consumption.
func NewGroup ¶
NewGroup creates a new hub where all subscribers are treated equally in terms of fairness.
func (*Group) Close ¶
Close will close all underlying subscribers then stop all the tasks in the group. Starting the group again will be pointless as the hub will be left with no subscribers after closing.
func (*Group) Publish ¶
Publish will notify all the subscribers that are interested by calling their handler function.
The data is passed through to each Subscriber untouched. Note that all subscribers are notified in parallel, and that no modification should be done to the data or data races will occur.
The channel return value is closed when all the subscribers have been notified of the event.
func (*Group) Stop ¶
Stop all tasks in the group.
In case the given timeout expires before all tasks complete, this method exits immediately and returns an error, otherwise it returns nil.
func (*Group) Subscribe ¶
func (g *Group) Subscribe(topic string, handler func(string, interface{})) *GroupSubscriber
Subscribe to a topic with a handler function. If the topic is the same as the published topic, the handler function is called with the published topic and the associated data.
The return value is a Subscriber that will unsubscribe the caller from the hub, for this subscription.
func (*Group) SubscribeMatch ¶
func (g *Group) SubscribeMatch(matcher TopicMatcher, handler func(string, interface{})) *GroupSubscriber
SubscribeMatch takes a function that determins whether the topic matches, and a handler function. If the matcher matches the published topic, the handler function is called with the published topic and the associated data.
The return value is a Subscriber that will unsubscribe the caller from the hub, for this subscription.
type GroupSubscriber ¶
type GroupSubscriber struct {
// contains filtered or unexported fields
}
GroupSubscriber represents a subscription to the hub.
func (*GroupSubscriber) Close ¶
func (s *GroupSubscriber) Close()
Close will ensure that any pending events will be closed out and the associated blocking actions are collapsed.
func (*GroupSubscriber) Unsubscribe ¶
func (s *GroupSubscriber) Unsubscribe() error
Unsubscribe attempts to unsubscribe from the hub, if the subscriber is found within the hub, then a error is returned.
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub provides the base functionality of dealing with subscribers, and the notification of subscribers of events.
func Forever ¶
Forever will attempt subscribe to a hub forever.
This is a convenience around starting a task that will always consume messages when publishing events.
func (*Hub) Close ¶
func (h *Hub) Close()
Close will close any outstanding subscriber messages and shut down each subscriber.
At the end, the hub will have no subscribers listening and can be seen as invalid for use.
func (*Hub) Publish ¶
Publish will notify all the subscribers that are interested by calling their handler function.
The data is passed through to each Subscriber untouched. Note that all subscribers are notified in parallel, and that no modification should be done to the data or data races will occur.
The channel return value is closed when all the subscribers have been notified of the event.
func (*Hub) Subscribe ¶
func (h *Hub) Subscribe(topic string, handler func(string, interface{})) *Subscriber
Subscribe to a topic with a handler function. If the topic is the same as the published topic, the handler function is called with the published topic and the associated data.
The return value is a Subscriber that will unsubscribe the caller from the hub, for this subscription.
func (*Hub) SubscribeMatch ¶
func (h *Hub) SubscribeMatch(matcher TopicMatcher, handler func(string, interface{})) *Subscriber
SubscribeMatch takes a function that determins whether the topic matches, and a handler function. If the matcher matches the published topic, the handler function is called with the published topic and the associated data.
The return value is a Subscriber that will unsubscribe the caller from the hub, for this subscription.
type HubOption ¶
type HubOption func(HubOptions)
HubOption captures a tweak that can be applied to the Hub.
type HubOptions ¶
type HubOptions interface {
SetMetrics(Metrics)
}
HubOptions represents a way to set optional values to a hub option. The HubOptions shows what options are available to change.
type Life ¶
type Life int
Life describes the life cycle of a subscriber, to prevent a subscriber being used when it's dead.
type Metrics ¶
type Metrics interface {
// Subscribe is called if a subscription to the hub is called.
Subscribe()
// Unsubscribe is called when a subscription is removed from the hub.
Unsubscribe()
// EventPublished reports when an event topic is published.
//
// The event topic can be used for a label value to allow pivoting of the
// data. Alternatively it can just be used as a gauged counter.
EventPublished(string)
// EventHandled reports when an event topic is handled by the subscriber.
//
// The event topic can be used for a label value to allow pivoting of the
// data. Alternatively it can just be used as a gauged counter.
EventHandled(string)
// RunDuration identifies how long a run takes in seconds.
RunDuration(float64)
}
Metrics defines an interface for implementing different metric aggregators for a hub.
The interface allows the implementation of various metrics, either no-op metrics or prometheus.
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Node represents a type queue node. In other languages this would be simply a generic tuple of type <T>
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue represents a single instance of a queue structure. Under the hood, it implements a first in, first out (FIFO), data structure.
No concurrency guarantees are created for the queue structure, it is expected to roll your own when interacting with the Queue.
type Sub ¶
type Sub interface {
// Run creates a task and a schedule to perform the consumption of messages
// sent to the subscriber from the origin.
Run(interval time.Duration) (task.Func, task.Schedule)
}
Sub defines a type that the multiplexer can run.
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber represents a subscription to the hub.
func (*Subscriber) Close ¶
func (s *Subscriber) Close()
Close will ensure that any pending events will be closed out and the associated blocking actions are collapsed.
func (*Subscriber) Run ¶
Run creates a task and a schedule to perform the consumption of messages sent to the subscriber from the origin. The interval parameter allows for prioritization of each subscriber independently using the internval time duration. The aim it to provide fair-ness or unfair-ness at a user defined API level. The downside to all of this, is that management of a subscriber is then put on the onus of the callee.
func (*Subscriber) Unsubscribe ¶
func (s *Subscriber) Unsubscribe() error
Unsubscribe attempts to unsubscribe from the hub, if the subscriber is found within the hub, then a error is returned.
type TopicMatcher ¶
TopicMatcher defines a type that can be used for matching topics when dispatching through the hub.