Documentation
¶
Index ¶
- type ChanBroadcaster
- func NewBestEffortChanBroadcaster[T any](ctx context.Context, source <-chan T) *ChanBroadcaster[T]
- func NewChanBroadcaster[T any](ctx context.Context, source <-chan T, bufferSize int, ...) (*ChanBroadcaster[T], error)
- func NewSynchronousChanBroadcaster[T any](ctx context.Context, source <-chan T) *ChanBroadcaster[T]
- type DeliveryStrategy
- type NoSyncBroadcaster
- func (b *NoSyncBroadcaster[T]) AddSubscriber(sub chan T)
- func (b *NoSyncBroadcaster[T]) CloseAll()
- func (b *NoSyncBroadcaster[T]) Len() int
- func (b *NoSyncBroadcaster[T]) SendOrSkip(message T) int
- func (b *NoSyncBroadcaster[T]) SendOrUnsubscribe(message T) int
- func (b *NoSyncBroadcaster[T]) SendOrWait(ctx context.Context, message T) bool
- func (b *NoSyncBroadcaster[T]) Subscribe() <-chan T
- func (b *NoSyncBroadcaster[T]) Unsubscribe(sub <-chan T) bool
- type SyncBroadcaster
- func (b *SyncBroadcaster[T]) AddSubscriber(sub chan T)
- func (b *SyncBroadcaster[T]) CloseAll()
- func (b *SyncBroadcaster[T]) Len() int
- func (b *SyncBroadcaster[T]) SendOrSkip(message T) int
- func (b *SyncBroadcaster[T]) SendOrUnsubscribe(message T) int
- func (b *SyncBroadcaster[T]) SendOrWait(ctx context.Context, message T) bool
- func (b *SyncBroadcaster[T]) Subscribe() <-chan T
- func (b *SyncBroadcaster[T]) Unsubscribe(sub <-chan T) bool
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChanBroadcaster ¶
type ChanBroadcaster[T any] struct { // contains filtered or unexported fields }
ChanBroadcaster is a communication service with one sender and many recievers with all recievers (subscribers) getting every message sent by the sender. All communication happens via channels.
Example ¶
source := make(chan int, 1) ctx, cancel := context.WithCancel(context.Background()) defer cancel() broadcast := NewBestEffortChanBroadcaster(ctx, source) sub1 := broadcast.Subscribe() sub2 := broadcast.Subscribe() source <- 5318008 fmt.Println("Sub1:", <-sub1) fmt.Println("Sub2:", <-sub2)
Output: Sub1: 5318008 Sub2: 5318008
func NewBestEffortChanBroadcaster ¶
func NewBestEffortChanBroadcaster[T any](ctx context.Context, source <-chan T) *ChanBroadcaster[T]
NewBestEffortChanBroadcaster creates a Broadcaster that will try to forward data from the source channel to subscribers. It will skip any channels that are full i.e. the subscribers are too slow at emptying it. All subscribers' channels will have the same capacity as source or 1 in case of unbuffered source.
func NewChanBroadcaster ¶
func NewChanBroadcaster[T any](ctx context.Context, source <-chan T, bufferSize int, deliveryStrategy DeliveryStrategy) (*ChanBroadcaster[T], error)
NewChanBroadcaster creates a Broadcaster that will forward all data from the source channel to subscribers. All subscribers' channels will have the capacity of bufferSize. Depending on the deliveryStrategy the Broadcaster will either * ensure that all messages are sent to all subscribers (even if that means waiting on unbuffered/full channels), * skip any channel whose buffer is full, * broadcast to empty channels and unsubscribe the rest.
func NewSynchronousChanBroadcaster ¶
func NewSynchronousChanBroadcaster[T any](ctx context.Context, source <-chan T) *ChanBroadcaster[T]
NewSynchronousChanBroadcaster creates a Broadcaster that will ensure all messages are not only sent, but delivered to subscribers.
func (*ChanBroadcaster[T]) Subscribe ¶
func (s *ChanBroadcaster[T]) Subscribe() <-chan T
Subscribe will return a read-only channel that will deliver all broadcast messages to a new subscriber.
func (*ChanBroadcaster[T]) Unsubscribe ¶
func (s *ChanBroadcaster[T]) Unsubscribe(channel <-chan T)
Unsubscribe will close the given channel and ensure it doesn't recieve any more updates.
type DeliveryStrategy ¶
type DeliveryStrategy int
const ( // Will try to push to channel and pass if channel is full. Skip DeliveryStrategy = iota // Will try to push to channel and wait if channel is full. Wait // Will try to push to channel and unsubscribe if channel is full. Unsubscribe )
type NoSyncBroadcaster ¶
type NoSyncBroadcaster[T any] struct { // contains filtered or unexported fields }
NoSyncBroadcaster is a broadcast service that allows a single sender to send messages to multiple recievers. It does not implement any synchronisation and can only be used if the sender is externally synchronised e.g. only used in one goroutine or using a mutex.
Example ¶
broadcast := NewNoSyncBroadcaster[int](10) sub := broadcast.Subscribe() broadcast.SendOrSkip(1) broadcast.SendOrSkip(3) fmt.Println(<-sub) fmt.Println(<-sub) broadcast.Unsubscribe(sub) if _, ok := <-sub; !ok { fmt.Println("Channel was closed and unsubscribed.") }
Output: 1 3 Channel was closed and unsubscribed.
func NewNoSyncBroadcaster ¶
func NewNoSyncBroadcaster[T any](bufferSize int) *NoSyncBroadcaster[T]
NewNoSyncBroadcaster creates a NoSyncBroadcaster where all subscribers will get a channel of capacity bufferSize when they subscribe.
func (*NoSyncBroadcaster[T]) AddSubscriber ¶
func (b *NoSyncBroadcaster[T]) AddSubscriber(sub chan T)
AddSubscriber gives subscribers the option to provide their own channel to receive updates on. In case they already have allocated one and want to reuse it or if the default bufferSize isn't OK for them.
func (*NoSyncBroadcaster[T]) CloseAll ¶
func (b *NoSyncBroadcaster[T]) CloseAll()
CloseAll will close and delete all subscribers' channels.
func (*NoSyncBroadcaster[T]) Len ¶
func (b *NoSyncBroadcaster[T]) Len() int
Len returns the number of subcribers that the service is send to.
func (*NoSyncBroadcaster[T]) SendOrSkip ¶
func (b *NoSyncBroadcaster[T]) SendOrSkip(message T) int
SendOrSkip will try to send message to all subscribers. If a subscriber's channel is full or unbuffered it will skip that channel and continue to the next one.
func (*NoSyncBroadcaster[T]) SendOrUnsubscribe ¶
func (b *NoSyncBroadcaster[T]) SendOrUnsubscribe(message T) int
SendOrUnsubscribe will try to send message to all subscribers. If a subscriber's channel is full or unbuffered it will unsubscribe that channel from further updates and close it.
func (*NoSyncBroadcaster[T]) SendOrWait ¶
func (b *NoSyncBroadcaster[T]) SendOrWait(ctx context.Context, message T) bool
SendOrWait will send message to all subscribers. If a subscriber's channel is full or unbuffered it will wait until a space in the channel frees up. It returns true if it managed to send the message to all subscribers before ctx expires.
func (*NoSyncBroadcaster[T]) Subscribe ¶
func (b *NoSyncBroadcaster[T]) Subscribe() <-chan T
Subscribe creates and returns a new channel that will receive all messages sent by the sender via this broadcast service.
func (*NoSyncBroadcaster[T]) Unsubscribe ¶
func (b *NoSyncBroadcaster[T]) Unsubscribe(sub <-chan T) bool
Unsubscribe will stop the service sending messages on this channel and close the channel. Returns true if the provided channel is a valid subscriber.
type SyncBroadcaster ¶
type SyncBroadcaster[T any] struct { // contains filtered or unexported fields }
SyncBroadcaster is a wrapper around NoSyncBroadcaster ensuring that all operations are properly synchronised using an internal mutex.
Example ¶
broadcast := NewSyncBroadcaster[int](10) sub := broadcast.Subscribe() broadcast.SendOrSkip(1) broadcast.SendOrSkip(3) fmt.Println(<-sub) fmt.Println(<-sub) broadcast.Unsubscribe(sub) if _, ok := <-sub; !ok { fmt.Println("Channel was closed and unsubscribed.") }
Output: 1 3 Channel was closed and unsubscribed.
func NewSyncBroadcaster ¶
func NewSyncBroadcaster[T any](bufferSize int) *SyncBroadcaster[T]
NewSyncBroadcaster creates a SyncBroadcaster where all subscribers will get a channel of capacity bufferSize when they subscribe.
func (*SyncBroadcaster[T]) AddSubscriber ¶
func (b *SyncBroadcaster[T]) AddSubscriber(sub chan T)
AddSubscriber gives subscribers the option to provide their own chanel to receive updates on. In case they already have allocated one and want to reuse it or if the default bufferSize isn't OK for them.
func (*SyncBroadcaster[T]) CloseAll ¶
func (b *SyncBroadcaster[T]) CloseAll()
CloseAll will close and delete all subscribers' channels.
func (*SyncBroadcaster[T]) Len ¶
func (b *SyncBroadcaster[T]) Len() int
Len returns the number of subcribers that the service is send to.
func (*SyncBroadcaster[T]) SendOrSkip ¶
func (b *SyncBroadcaster[T]) SendOrSkip(message T) int
SendOrSkip will try to send message to all subscribers. If a subscriber's channel is full or unbuffered it will skip that channel and continue to the next one.
func (*SyncBroadcaster[T]) SendOrUnsubscribe ¶
func (b *SyncBroadcaster[T]) SendOrUnsubscribe(message T) int
SendOrUnsubscribe will try to send message to all subscribers. If a subscriber's channel is full or unbuffered it will unsubscribe that channel from further updates and close it.
func (*SyncBroadcaster[T]) SendOrWait ¶
func (b *SyncBroadcaster[T]) SendOrWait(ctx context.Context, message T) bool
SendOrWait will send message to all subscribers. If a subscriber's channel is full or unbuffered it will wait until a space in the channel frees up. It returns true if it managed to send the message to all subscribers before ctx expires.
func (*SyncBroadcaster[T]) Subscribe ¶
func (b *SyncBroadcaster[T]) Subscribe() <-chan T
Subscribe creates and returns a new channel that will receive all messages send by the sender via this broadcast service.
func (*SyncBroadcaster[T]) Unsubscribe ¶
func (b *SyncBroadcaster[T]) Unsubscribe(sub <-chan T) bool
Unsubscribe will stop the service sending messages on this channel and close the channel. Returns true if the provided channel is a valid subscriber.