pubsub

package
v0.10.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2024 License: Apache-2.0 Imports: 10 Imported by: 7

Documentation

Overview

Package pubsub provides a message broker for one-to-many or many-to-many message distribution. In addition pubsub includes a generic deque and queue implementations suited to concurrent use.

Index

Constants

View Source
const (
	// ErrQueueFull is returned by the Add method of a queue when the queue has
	// reached its hard capacity limit.
	ErrQueueFull = ers.Error("queue is full")

	// ErrQueueNoCredit is returned by the Add method of a queue when the queue has
	// exceeded its soft quota and there is insufficient burst credit.
	ErrQueueNoCredit = ers.Error("insufficient burst credit")
)

Variables

View Source
var (
	// ErrQueueClosed is returned by the Add method of a closed queue, and by
	// the Wait method of a closed empty queue.
	ErrQueueClosed = fmt.Errorf("queue is closed: %w", io.EOF)
)

Functions

This section is empty.

Types

type Broker

type Broker[T any] struct {
	// contains filtered or unexported fields
}

Broker is a simple message broker that provides a useable interface for distributing messages to an arbitrary group of channels.

func MakeDistributorBroker added in v0.7.0

func MakeDistributorBroker[T any](ctx context.Context, dist Distributor[T], opts BrokerOptions) *Broker[T]

MakeDistributorBroker constructs a Broker that uses the provided distributor to handle the buffering between the sending half and the receiving half.

In general, you should configure the distributor to provide whatever buffering requirements you have, and

func NewBroker

func NewBroker[T any](ctx context.Context, opts BrokerOptions) *Broker[T]

NewBroker constructs with a simple distrubtion scheme: the incoming and outgoing messages are not buffered, but the client subscription channels are not buffered.

All brokers respect the BrokerOptions, which control how messages are set to subscribers. The specific configuration of these settings can have profound impacts on the semantics and ordering of messages in the broker.

func NewDequeBroker

func NewDequeBroker[T any](ctx context.Context, deque *Deque[T], opts BrokerOptions) *Broker[T]

NewDequeBroker constructs a broker that uses the queue object to buffer incoming requests if subscribers are slow to process requests. The semantics of the Deque depends a bit on the configuration of it's limits and capacity.

This broker distributes messages in a FIFO order, dropping older messages to make room for new messages.

func NewLIFOBroker

func NewLIFOBroker[T any](ctx context.Context, opts BrokerOptions, capacity int) *Broker[T]

NewLIFOBroker constructs a broker that uses the queue object to buffer incoming requests if subscribers are slow to process requests. The semantics of the Deque depends a bit on the configuration of it's limits and capacity.

This broker distributes messages in a LIFO order, dropping older messages to make room for new messages. The capacity of the queue is fixed, and must be a positive integer greater than 0, NewLIFOBroker will panic if the capcity is less than or equal to 0.

func NewQueueBroker

func NewQueueBroker[T any](ctx context.Context, queue *Queue[T], opts BrokerOptions) *Broker[T]

NewQueueBroker constructs a broker that uses the queue object to buffer incoming requests if subscribers are slow to process requests. Queue have a system for sheding load when the queue's limits have been exceeded. In general the messages are distributed in FIFO order, and Publish calls will drop messages if the queue is full.

All brokers respect the BrokerOptions, which control the size of the worker pool used to send messages to senders and if the Broker should use non-blocking sends. All channels between the broker and the subscribers are un-buffered.

func (*Broker[T]) Populate added in v0.10.3

func (b *Broker[T]) Populate(iter *fun.Iterator[T]) fun.Worker

Populate creates a fun.Worker function that publishes items from the input iterator to the broker, returning when its context expires or the iterator is closed (propagating its error).

Callers should avoid using an iterator that will retain input items in memory.

func (*Broker[T]) Publish

func (b *Broker[T]) Publish(ctx context.Context, msg T)

Publish distributes a message to all subscribers.

func (*Broker[T]) Stats added in v0.8.0

func (b *Broker[T]) Stats(ctx context.Context) BrokerStats

Stats provides introspection into the current state of the broker.

func (*Broker[T]) Stop

func (b *Broker[T]) Stop()

Stop cancels the broker, allowing background work to stop.

func (*Broker[T]) Subscribe

func (b *Broker[T]) Subscribe(ctx context.Context) chan T

Subscribe generates a new subscription channel, of the specified buffer size. You *must* call Unsubcribe on this channel when you are no longer listening to this channel.

Subscription channels are *not* closed and should never be closed by the caller. Closing a subscription channel will cause an unhandled panic.

func (*Broker[T]) Unsubscribe

func (b *Broker[T]) Unsubscribe(ctx context.Context, msgCh chan T)

Unsubscribe removes a channel from the broker.

func (*Broker[T]) Wait

func (b *Broker[T]) Wait(ctx context.Context)

Wait blocks until either the context has been canceled, or all work has been completed.

type BrokerOptions

type BrokerOptions struct {
	// BufferSize controls the buffer size of the internal broker
	// channels that handle subscription creation and deletion
	// (unsubscribe.) Buffering
	BufferSize int
	// ParallelDispatch, when true, sends each message to
	// subscribers in parallel, and waits for all messages to be
	// delivered before continuing.
	ParallelDispatch bool
	// WorkerPoolSize controls the number of go routines used for
	// sending messages to subscribers, when using Queue-backed
	// brokers. If unset this defaults to 1.
	//
	// When this value is larger than 1, the order of messages
	// observed by individual subscribers will not be consistent.
	WorkerPoolSize int
}

BrokerOptions configures the semantics of a broker. The zero-values produce a blocking unbuffered queue message broker with every message distributed to every subscriber. While the default settings make it possible for one subscriber to block another subscriber, they guarantee that all messages will be delivered. Buffered brokers may lose messages.

type BrokerStats added in v0.8.0

type BrokerStats struct {
	Subscriptions int
	BufferDepth   int
}

BrokerStats is a data struct used to report on the internal state of the broker.

type Deque

type Deque[T any] struct {
	// contains filtered or unexported fields
}

Deque proves a basic double ended queue backed by a doubly linked list, with features to support a maximum capacity, burstable limits and soft quotas, as well as iterators, that safe for access from multiple concurrent go-routines. Furthermore, the implementation safely handles multiple concurrent blocking operations (e.g. Wait, iterators).

Use the NewDeque constructor to instantiate a Deque object.

func NewDeque

func NewDeque[T any](opts DequeOptions) (*Deque[T], error)

NewDeque constructs a Deque according to the options, and errors if there are any problems with the configuration.

func NewUnlimitedDeque added in v0.9.4

func NewUnlimitedDeque[T any]() *Deque[T]

NewUnlimitedDeque constructs an unbounded Deque.

func (*Deque[T]) Close

func (dq *Deque[T]) Close() error

Close marks the deque as closed, after which point all iterators will stop and no more operations will succeed. The error value is not used in the current operation.

func (*Deque[T]) Distributor added in v0.9.4

func (dq *Deque[T]) Distributor() Distributor[T]

Distributor produces a Distributor instance with Send/Receive operations that block if Deque is full or empty (respectively). Receive operations always remove the element from the Deque.

func (*Deque[T]) DistributorNonBlocking added in v0.10.0

func (dq *Deque[T]) DistributorNonBlocking() Distributor[T]

DistributorNonBlocking produces a distributor instance that always accepts send items: if the deque is full, it removes one element from the front of the queue before adding them to the back.

func (*Deque[T]) ForcePushBack

func (dq *Deque[T]) ForcePushBack(it T) error

ForcePushBack is the same as PushBack, except, if the deque is at capacity, it removes one item from the front of the deque and then, having made room prepends the item. Returns an error if the deque is closed.

func (*Deque[T]) ForcePushFront

func (dq *Deque[T]) ForcePushFront(it T) error

ForcePushFront is the same as PushFront, except, if the deque is at capacity, it removes one item from the back of the deque and then, having made room appends the item. Returns an error if the deque is closed.

func (*Deque[T]) Iterator

func (dq *Deque[T]) Iterator() *fun.Iterator[T]

IteratorReverse starts at the back of the queue and iterates towards the front. When the iterator reaches the beginning of the queue it ends.

func (*Deque[T]) IteratorReverse

func (dq *Deque[T]) IteratorReverse() *fun.Iterator[T]

IteratorReverse starts at the back of the queue and iterates towards the front. When the iterator reaches the end of the queue it ends.

func (*Deque[T]) Len

func (dq *Deque[T]) Len() int

Len returns the length of the queue. This is an O(1) operation in this implementation.

func (*Deque[T]) PopBack

func (dq *Deque[T]) PopBack() (T, bool)

PopBack removes the last (tail) item of the queue, with the second value being false if the queue is empty or closed.

func (*Deque[T]) PopFront

func (dq *Deque[T]) PopFront() (T, bool)

PopFront removes the first (head) item of the queue, with the second value being false if the queue is empty or closed.

func (*Deque[T]) Producer added in v0.10.0

func (dq *Deque[T]) Producer() fun.Producer[T]

Producer exposes the deque to a single-function interface for iteration. The producer function operation will not modify the contents of the Deque, but will produce elements from the deque, front to back.

func (*Deque[T]) ProducerBlocking added in v0.10.0

func (dq *Deque[T]) ProducerBlocking() fun.Producer[T]

ProducerBlocking exposes the deque to a single-function interface for iteration. The producer function operation will not modify the contents of the Deque, but will produce elements from the deque, front to back, and will block for a new element if the deque is empty or the producer reaches the end, the operation will block until another item is added.

func (*Deque[T]) ProducerReverse added in v0.10.0

func (dq *Deque[T]) ProducerReverse() fun.Producer[T]

Producer exposes the deque to a single-function interface for iteration. The producer function operation will not modify the contents of the Deque, but will produce elements from the deque, back to front.

func (*Deque[T]) ProducerReverseBlocking added in v0.10.0

func (dq *Deque[T]) ProducerReverseBlocking() fun.Producer[T]

ProducerReverseBlocking exposes the deque to a single-function interface for iteration. The producer function operation will not modify the contents of the Deque, but will produce elements from the deque, back to fron, and will block for a new element if the deque is empty or the producer reaches the end, the operation will block until another item is added.

func (*Deque[T]) PushBack

func (dq *Deque[T]) PushBack(it T) error

PushFront adds an item to the back or end of the deque, and erroring if the queue is closed, at capacity, or has reached its limit.

func (*Deque[T]) PushFront

func (dq *Deque[T]) PushFront(it T) error

PushFront adds an item to the front or head of the deque, and erroring if the queue is closed, at capacity, or has reached its limit.

func (*Deque[T]) WaitBack

func (dq *Deque[T]) WaitBack(ctx context.Context) (T, error)

WaitBack pops the last (tail) item in the deque, and if the queue is empty, will block until an item is added, returning an error if the context canceled or the queue is closed.

func (*Deque[T]) WaitFront

func (dq *Deque[T]) WaitFront(ctx context.Context) (v T, err error)

WaitFront pops the first (head) item in the deque, and if the queue is empty, will block until an item is added, returning an error if the context canceled or the queue is closed.

func (*Deque[T]) WaitPushBack added in v0.6.0

func (dq *Deque[T]) WaitPushBack(ctx context.Context, it T) error

WaitPushBack performs a blocking add to the deque: if the deque is at capacity, this operation blocks until the deque is closed or there is capacity to add an item. The new item is added to the back of the deque.

func (*Deque[T]) WaitPushFront added in v0.6.0

func (dq *Deque[T]) WaitPushFront(ctx context.Context, it T) error

WaitPushFront performs a blocking add to the deque: if the deque is at capacity, this operation blocks until the deque is closed or there is capacity to add an item. The new item is added to the front of the deque.

type DequeOptions

type DequeOptions struct {
	Unlimited    bool
	Capacity     int
	QueueOptions *QueueOptions
}

DequeOptions configure the semantics of the deque. The Validate() method ensures that you do not produce a configuration that is impossible.

Capcaity puts a firm upper cap on the number of items in the deque, while the Unlimited options

func (*DequeOptions) Validate

func (opts *DequeOptions) Validate() error

Validate ensures that the options are consistent. Exported as a convenience function. All errors have ErrConfigurationMalformed as their root.

type Distributor added in v0.7.0

type Distributor[T any] struct {
	// contains filtered or unexported fields
}

Distributor provides a layer of indirection above queue-like implementations (e.g. Queue, Deque, and channels) for buffering and queuing objects for use by higher level pubsub mechanisms like the Broker.

Distributors returned by the pubsub package provide iterators that are destructive, and

func DistributorChanOp added in v0.10.3

func DistributorChanOp[T any](ch fun.ChanOp[T]) Distributor[T]

DistributorChanOp constructs a Distributor from the channel operator type constructed by the root package's Blocking() and NonBlocking() functions

func DistributorChannel added in v0.7.0

func DistributorChannel[T any](ch chan T) Distributor[T]

DistributorChannel provides a bridge between channels and distributors, and has expected FIFO semantics with blocking reads and writes.

func MakeDistributor added in v0.10.0

func MakeDistributor[T any](
	processor fun.Processor[T],
	producer fun.Producer[T],
	length func() int,
) Distributor[T]

MakeDistributor builds a distributor from producer and processor functions.

func (Distributor[T]) Iterator added in v0.9.4

func (d Distributor[T]) Iterator() *fun.Iterator[T]

Iterator allows iterator-like access to a distributor. These iterators are blocking and destructive. The iterator's close method does *not* close the distributor's underlying structure.

func (Distributor[T]) Len added in v0.8.0

func (d Distributor[T]) Len() int

Len returns the length of the underlying storage for the distributor.

func (Distributor[T]) Processor added in v0.10.0

func (d Distributor[T]) Processor() fun.Processor[T]

Processor provides convienet access to the "send" side of the distributor as a fun.Processor function.

func (Distributor[T]) Producer added in v0.10.0

func (d Distributor[T]) Producer() fun.Producer[T]

Producer provides a convenient access to the "receive" side of the as a fun.Producer function

func (Distributor[T]) Receive added in v0.7.0

func (d Distributor[T]) Receive(ctx context.Context) (T, error)

Receive pulls an object from the distributor.

func (Distributor[T]) Send added in v0.7.0

func (d Distributor[T]) Send(ctx context.Context, in T) error

Send pushes an object into the distributor.

func (Distributor[T]) WithInputFilter added in v0.10.5

func (d Distributor[T]) WithInputFilter(filter func(T) bool) Distributor[T]

WithInputFilter returns a copy of the distributor where all items pass through a filter before being written/passed to Send. When the filter returns true items are propagated and are skipped otherwise.

func (Distributor[T]) WithOutputFilter added in v0.10.5

func (d Distributor[T]) WithOutputFilter(filter func(T) bool) Distributor[T]

WithOutputFilter returns a copy of the distributor where all items pass through the provided filter before being delivered to readers/Receive. When the filter returns true items are propagated and are skipped otherwise.

type Queue

type Queue[T any] struct {
	// contains filtered or unexported fields
}

A Queue is a limited-capacity FIFO queue of arbitrary data items.

A queue has a soft quota and a hard limit on the number of items that may be contained in the queue. Adding items in excess of the hard limit will fail unconditionally.

For items in excess of the soft quota, a credit system applies: Each queue maintains a burst credit score. Adding an item in excess of the soft quota costs 1 unit of burst credit. If there is not enough burst credit, the add will fail.

The initial burst credit is assigned when the queue is constructed. Removing items from the queue adds additional credit if the resulting queue length is less than the current soft quota. Burst credit is capped by the hard limit.

A Queue is safe for concurrent use by multiple goroutines.

func NewQueue

func NewQueue[T any](opts QueueOptions) (*Queue[T], error)

NewQueue constructs a new empty queue with the specified options. It reports an error if any of the option values are invalid.

func NewUnlimitedQueue

func NewUnlimitedQueue[T any]() *Queue[T]

NewUnlimitedQueue produces an unbounded queue.

func (*Queue[T]) Add

func (q *Queue[T]) Add(item T) error

Add adds item to the back of the queue. It reports an error and does not enqueue the item if the queue is full or closed, or if it exceeds its soft quota and there is not enough burst credit.

func (*Queue[T]) BlockingAdd added in v0.6.0

func (q *Queue[T]) BlockingAdd(ctx context.Context, item T) error

BlockingAdd attempts to add an item to the queue, as with Add, but if the queue is full, blocks until the queue has capacity, is closed, or the context is canceled. Returns an error if the context is canceled or the queue is closed.

func (*Queue[T]) Close

func (q *Queue[T]) Close() error

Close closes the queue. After closing, any further Add calls will report an error, but items that were added to the queue prior to closing will still be available for Remove and Wait. Wait will report an error without blocking if it is called on a closed, empty queue.

func (*Queue[T]) Distributor added in v0.9.4

func (q *Queue[T]) Distributor() Distributor[T]

Distributor creates a object used to process the items in the queue: items yielded by the Distributor's iterator, are removed from the queue.

func (*Queue[T]) Iterator

func (q *Queue[T]) Iterator() *fun.Iterator[T]

Iterator produces an iterator implementation that wraps the underlying queue linked list. The iterator respects the Queue's mutex and is safe for concurrent access and current queue operations, without additional locking. The iterator does not modify or remove items from the queue, and will only terminate when the queue has been closed via the Close() method.

To create a "consuming" iterator, use a Distributor.

func (*Queue[T]) Len added in v0.8.0

func (q *Queue[T]) Len() int

Len returns the number of items in the queue. Because the queue tracks its size this is a constant time operation.

func (*Queue[T]) Producer added in v0.10.0

func (q *Queue[T]) Producer() fun.Producer[T]

Producer returns a function that produces items from the queue, iteratively. It's not destructive, and has the same semantics as the Iterator.

func (*Queue[T]) Remove

func (q *Queue[T]) Remove() (out T, _ bool)

Remove removes and returns the frontmost (oldest) item in the queue and reports whether an item was available. If the queue is empty, Remove returns nil, false.

func (*Queue[T]) Wait

func (q *Queue[T]) Wait(ctx context.Context) (out T, _ error)

Wait blocks until q is non-empty or closed, and then returns the frontmost (oldest) item from the queue. If ctx ends before an item is available, Wait returns a nil value and a context error. If the queue is closed while it is still, Wait returns nil, ErrQueueClosed.

Wait is destructive: every item returned is removed from the queue.

type QueueOptions

type QueueOptions struct {
	// The maximum number of items the queue will ever be
	// permitted to hold. This value must be positive, and greater
	// than or equal to SoftQuota. The hard limit is fixed and
	// does not change as the queue is used.
	//
	// The hard limit should be chosen to exceed the largest burst
	// size expected under normal operating conditions.
	HardLimit int

	// The initial expected maximum number of items the queue
	// should contain on an average workload. If this value is
	// zero, it is initialized to the hard limit. The soft quota
	// is adjusted from the initial value dynamically as the queue
	// is used.
	SoftQuota int

	// The initial burst credit score.  This value must be greater
	// than or equal to zero. If it is zero, the soft quota is
	// used.
	BurstCredit float64
}

QueueOptions are the initial settings for a Queue or Deque.

func (*QueueOptions) Validate

func (opts *QueueOptions) Validate() error

Validate ensures that the options are consistent. Exported as a convenience function. All errors have ErrConfigurationMalformed as their root.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL