Documentation
¶
Overview ¶
Package pubsub implements an unbounded channel for the pub/sub pattern.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type FilterFunc ¶
FilterFunc is a filter function for any type. If the function returns true, the message will be sent to the subscriber. If the function is nil, all messages will be sent to the subscriber.
type Queue ¶
type Queue[T any] struct { // contains filtered or unexported fields }
Queue implements a simple first-in-first-out queue. It is not thread-safe. A zero-value Queue is a valid queue.
func NewQueueWithCapacity ¶
NewQueueWithCapacity creates a new queue with the given capacity. The capacity is set to be at least 16.
func (*Queue[T]) Dequeue ¶
func (q *Queue[T]) Dequeue()
Dequeue pops the first element from the queue and discards it.
func (*Queue[T]) Pending ¶
Pending returns the pending item in the queue and a boolean indicating whether the item was found.
func (*Queue[T]) PendingOrZero ¶
func (q *Queue[T]) PendingOrZero() T
PendingOrZero returns the pending item in the queue or the zero value if the queue is empty.
type Subscriber ¶
type Subscriber[T any] struct { // contains filtered or unexported fields }
Subscriber is a subscriber that subscribes to a Pipe. A zero-value Subscriber is a valid Subscriber.
func NewSubscriber ¶
func NewSubscriber[T any]() *Subscriber[T]
NewSubscriber creates a new Subscriber.
func (*Subscriber[T]) Listen ¶
func (s *Subscriber[T]) Listen(ctx context.Context, src <-chan T) error
Listen starts broadcasting messages received from the given src channel. It blocks until the src channel is closed or ctx is canceled.
func (*Subscriber[T]) Subscribe ¶
func (s *Subscriber[T]) Subscribe(ch chan<- T, filter FilterFunc[T])
Subscribe subscribes ch to incoming messages from the given recipient. Calls to Subscribe should always be paired with Unsubscribe. It is recommended to use defer.
Subscribe panics if it's called on a Subscriber w/ a src that's already closed.
func (*Subscriber[T]) Unsubscribe ¶
func (s *Subscriber[T]) Unsubscribe(ch chan<- T)
Unsubscribe unsubscribes ch from incoming messages from all its recipients. Once unsubscribed, ch will be closed.