Documentation
¶
Overview ¶
Package pubsub provides a message broker for one-to-many or many-to-many message distribution. In addition pubsub includes a generic deque and queue implementations suited to concurrent use.
Index ¶
- Variables
- func Populate[T any](ctx context.Context, iter fun.Iterator[T], broker *Broker[T]) error
- type Broker
- func NewBroker[T any](ctx context.Context, opts BrokerOptions) *Broker[T]
- func NewDequeBroker[T any](ctx context.Context, opts BrokerOptions, deque *Deque[T]) *Broker[T]
- func NewLIFOBroker[T any](ctx context.Context, opts BrokerOptions, capacity int) (*Broker[T], error)
- func NewQueueBroker[T any](ctx context.Context, opts BrokerOptions, queue *Queue[T]) *Broker[T]
- type BrokerOptions
- type Deque
- func (dq *Deque[T]) Close() error
- func (dq *Deque[T]) ForcePushBack(it T) error
- func (dq *Deque[T]) ForcePushFront(it T) error
- func (dq *Deque[T]) Iterator() fun.Iterator[T]
- func (dq *Deque[T]) IteratorBlocking() fun.Iterator[T]
- func (dq *Deque[T]) IteratorBlockingReverse() fun.Iterator[T]
- func (dq *Deque[T]) IteratorReverse() fun.Iterator[T]
- func (dq *Deque[T]) Len() int
- func (dq *Deque[T]) PopBack() (T, bool)
- func (dq *Deque[T]) PopFront() (T, bool)
- func (dq *Deque[T]) PushBack(it T) error
- func (dq *Deque[T]) PushFront(it T) error
- func (dq *Deque[T]) WaitBack(ctx context.Context) (T, error)
- func (dq *Deque[T]) WaitFront(ctx context.Context) (T, error)
- type DequeOptions
- type Queue
- type QueueOptions
Constants ¶
This section is empty.
Variables ¶
var ( // ErrQueueFull is returned by the Add method of a queue when the queue has // reached its hard capacity limit. ErrQueueFull = errors.New("queue is full") // ErrQueueNoCredit is returned by the Add method of a queue when the queue has // exceeded its soft quota and there is insufficient burst credit. ErrQueueNoCredit = errors.New("insufficient burst credit") // ErrQueueClosed is returned by the Add method of a closed queue, and by // the Wait method of a closed empty queue. ErrQueueClosed = errors.New("queue is closed") )
queue implements a dynamic FIFO queue with a fixed upper bound and a flexible quota mechanism to handle bursty load.
var ErrConfigurationMalformed = errors.New("configuration error")
ErrConfigurationError is the returned by the queue and deque constructors if their configurations are malformed.
Functions ¶
Types ¶
type Broker ¶
type Broker[T any] struct { // contains filtered or unexported fields }
Broker is a simple message broker that provides a useable interface for distributing messages to an arbitrary group of channels.
func NewBroker ¶
func NewBroker[T any](ctx context.Context, opts BrokerOptions) *Broker[T]
NewBroker constructs with a simple distrubtion scheme: incoming messages are buffered in a channel, and then distributed to subscribers channels, which may also be buffered.
All brokers respect the BrokerOptions, which control how messages are set to subscribers (e.g. concurrently with a worker pool, with buffered channels, and if sends can be non-blocking.) Consider how these settings may interact.
func NewDequeBroker ¶
NewDequeBroker constructs a broker that uses the queue object to buffer incoming requests if subscribers are slow to process requests. The semantics of the Deque depends a bit on the configuration of it's limits and capacity.
This broker distributes messages in a FIFO order, dropping older messages to make room for new messages.
All brokers respect the BrokerOptions, which control the size of the worker pool used to send messages to senders and if the Broker should use non-blocking sends. All channels between the broker and the subscribers are un-buffered.
func NewLIFOBroker ¶
func NewLIFOBroker[T any](ctx context.Context, opts BrokerOptions, capacity int) (*Broker[T], error)
NewDequeBroker constructs a broker that uses the queue object to buffer incoming requests if subscribers are slow to process requests. The semantics of the Deque depends a bit on the configuration of it's limits and capacity.
This broker distributes messages in a LIFO order, dropping older messages to make room for new messages. The capacity of the queue is fixed, and must be a positive integer, greater than 0.
All brokers respect the BrokerOptions, which control the size of the worker pool used to send messages to senders and if the Broker should use non-blocking sends. All channels between the broker and the subscribers are un-buffered.
func NewQueueBroker ¶
NewQueueBroker constructs a broker that uses the queue object to buffer incoming requests if subscribers are slow to process requests. Queue have a system for sheding load when the queue's limits have been exceeded. In general the messages are distributed in FIFO order, and Publish calls will drop messages if the queue is full.
All brokers respect the BrokerOptions, which control the size of the worker pool used to send messages to senders and if the Broker should use non-blocking sends. All channels between the broker and the subscribers are un-buffered.
func (*Broker[T]) Stop ¶
func (b *Broker[T]) Stop()
Stop cancels the broker, allowing background work to stop.
func (*Broker[T]) Subscribe ¶
Subscribe generates a new subscription channel, of the specified buffer size. You *must* call Unsubcribe on this channel when you are no longer listening to this channel.
Subscription channels are *not* closed and should never be closed by the caller.
func (*Broker[T]) Unsubscribe ¶
Unsubscribe removes a channel from the broker.
type BrokerOptions ¶
type BrokerOptions struct { // NonBlockingSubscriptions, when true, allows the broker to // skip sending messags to subscriber channels are filled. In // this system, some subscribers will miss some messages based // on their own processing time. NonBlockingSubscriptions bool // BufferSize controls the buffer size of all internal broker // channels and channel subscriptions. When using a queue or // deque backed broker, the buffer size is only used for the // subscription channels. BufferSize int // ParallelDispatch, when true, sends each message to // subscribers in parallel, and (pending the behavior of // NonBlockingSubscriptions) waits for all messages to be // delivered before continuing. ParallelDispatch bool // WorkerPoolSize controls the number of go routines used for // sending messages to subscribers, when using Queue-backed // brokers. If unset this defaults to 1. // // When this value is larger than 1, the order of messages // observed by individual subscribers will not be consistent. WorkerPoolSize int }
BrokerOptions configures the semantics of a broker. The zero-values produce a blocking unbuffered queue message broker with every message distributed to every subscriber. While the default settings make it possible for one subscriber to block another subscriber, they guarantee that all messages will be delivered. NonBlocking and Buffered brokers may lose messages.
type Deque ¶
type Deque[T any] struct { // contains filtered or unexported fields }
Deque proves a basic double ended queue backed by a doubly linked list, with features to support a maximum capacity, burstable limits and soft quotas, as well as iterators, that safe for access from multiple concurrent go-routines. Furthermore, the implementation safely handles multiple concurrent blocking operations (e.g. Wait, iterators).
Use the NewDeque constructor to instantiate a Deque object.
func NewDeque ¶
func NewDeque[T any](opts DequeOptions) (*Deque[T], error)
NewDeque constructs a Deque according to the options, and errors if there are any problems with the configuration.
func (*Deque[T]) Close ¶
Close marks the deque as closed, after which point all iterators will stop and no more operations will succeed. The error value is not used in the current operation.
func (*Deque[T]) ForcePushBack ¶
ForcePushBack is the same as PushBack, except, if the deque is at capacity, it removes one item from the front of the deque and then, having made room prepends the item. Returns an error if the deque is closed.
func (*Deque[T]) ForcePushFront ¶
ForcePushFront is the same as PushFront, except, if the deque is at capacity, it removes one item from the back of the deque and then, having made room appends the item. Returns an error if the deque is closed.
func (*Deque[T]) Iterator ¶
IteratorReverse starts at the back of the queue and iterates towards the front. When the iterator reaches the beginning of the queue it ends.
func (*Deque[T]) IteratorBlocking ¶
IteratorBlocking starts at the front of the deque, iterates to the end and then waits for a new item to be pushed to the back of the queue, the queue to be closed or the context to be canceled.
func (*Deque[T]) IteratorBlockingReverse ¶
IteratorBlockingReverse starts at the back of the deque and iterates toward the front, and then waits for a new item to be pushed to the front of the queue, the queue to be closed or the context to be canceled.
func (*Deque[T]) IteratorReverse ¶
IteratorReverse starts at the back of the queue and iterates towards the front. When the iterator reaches the end of the queue it ends.
func (*Deque[T]) Len ¶
Len returns the length of the queue. This is an O(1) operation in this implementation.
func (*Deque[T]) PopBack ¶
PopBack removes the last (tail) item of the queue, with the second value being false if the queue is empty or closed.
func (*Deque[T]) PopFront ¶
PopFront removes the first (head) item of the queue, with the second value being false if the queue is empty or closed.
func (*Deque[T]) PushBack ¶
PushFront adds an item to the back or end of the deque, and erroring if the queue is closed, at capacity, or has reached its limit.
func (*Deque[T]) PushFront ¶
PushFront adds an item to the front or head of the deque, and erroring if the queue is closed, at capacity, or has reached its limit.
type DequeOptions ¶
type DequeOptions struct { Unlimited bool Capacity int QueueOptions *QueueOptions }
DequeOptions configure the semantics of the deque. The Validate() method ensures that you do not produce a configuration that is impossible.
Capcaity puts a firm upper cap on the number of items in the deque, while the Unlimited options
func (*DequeOptions) Validate ¶
func (opts *DequeOptions) Validate() error
Validate ensures that the options are consistent. Exported as a convenience function. All errors have ErrConfigurationMalformed as their root.
type Queue ¶
type Queue[T any] struct { // contains filtered or unexported fields }
A Queue is a limited-capacity FIFO queue of arbitrary data items.
A queue has a soft quota and a hard limit on the number of items that may be contained in the queue. Adding items in excess of the hard limit will fail unconditionally.
For items in excess of the soft quota, a credit system applies: Each queue maintains a burst credit score. Adding an item in excess of the soft quota costs 1 unit of burst credit. If there is not enough burst credit, the add will fail.
The initial burst credit is assigned when the queue is constructed. Removing items from the queue adds additional credit if the resulting queue length is less than the current soft quota. Burst credit is capped by the hard limit.
A Queue is safe for concurrent use by multiple goroutines.
func NewQueue ¶
func NewQueue[T any](opts QueueOptions) (*Queue[T], error)
NewQueue constructs a new empty queue with the specified options. It reports an error if any of the option values are invalid.
func NewUnlimitedQueue ¶
NewUnlimitedQueue produces an unbounded queue.
func (*Queue[T]) Add ¶
Add adds item to the back of the queue. It reports an error and does not enqueue the item if the queue is full or closed, or if it exceeds its soft quota and there is not enough burst credit.
func (*Queue[T]) Close ¶
Close closes the queue. After closing, any further Add calls will report an error, but items that were added to the queue prior to closing will still be available for Remove and Wait. Wait will report an error without blocking if it is called on a closed, empty queue.
func (*Queue[T]) Iterator ¶
Iterator produces an iterator implementation that wraps the underlying queue linked list. The iterator respects the Queue's mutex and is safe for concurrent access and current queue operations, without additional locking. The iterator does not modify the contents of the queue, and will only terminate when the queue has been closed via the Close() method.
func (*Queue[T]) Remove ¶
Remove removes and returns the frontmost (oldest) item in the queue and reports whether an item was available. If the queue is empty, Remove returns nil, false.
type QueueOptions ¶
type QueueOptions struct { // The maximum number of items the queue will ever be // permitted to hold. This value must be positive, and greater // than or equal to SoftQuota. The hard limit is fixed and // does not change as the queue is used. // // The hard limit should be chosen to exceed the largest burst // size expected under normal operating conditions. HardLimit int // The initial expected maximum number of items the queue // should contain on an average workload. If this value is // zero, it is initialized to the hard limit. The soft quota // is adjusted from the initial value dynamically as the queue // is used. SoftQuota int // The initial burst credit score. This value must be greater // than or equal to zero. If it is zero, the soft quota is // used. BurstCredit float64 }
QueueOptions are the initial settings for a Queue or Deque.
func (*QueueOptions) Validate ¶
func (opts *QueueOptions) Validate() error
Validate ensures that the options are consistent. Exported as a convenience function. All errors have ErrConfigurationMalformed as their root.