broadcast

package
v1.0.0-pre3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2023 License: MIT Imports: 3 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ChanBroadcaster

type ChanBroadcaster[T any] struct {
	// contains filtered or unexported fields
}

ChanBroadcaster is a communication service with one sender and many recievers with all recievers (subscribers) getting every message sent by the sender. All communication happens via channels.

Example
source := make(chan int, 1)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
broadcast := NewBestEffortChanBroadcaster(ctx, source)
sub1 := broadcast.Subscribe()
sub2 := broadcast.Subscribe()
source <- 5318008
fmt.Println("Sub1:", <-sub1)
fmt.Println("Sub2:", <-sub2)
Output:

Sub1: 5318008
Sub2: 5318008

func NewBestEffortChanBroadcaster

func NewBestEffortChanBroadcaster[T any](ctx context.Context, source <-chan T) *ChanBroadcaster[T]

NewBestEffortChanBroadcaster creates a Broadcaster that will try to forward data from the source channel to subscribers. It will skip any channels that are full i.e. the subscribers are too slow at emptying it. All subscribers' channels will have the same capacity as source or 1 in case of unbuffered source.

func NewChanBroadcaster

func NewChanBroadcaster[T any](ctx context.Context, source <-chan T, bufferSize int, deliveryStrategy DeliveryStrategy) (*ChanBroadcaster[T], error)

NewChanBroadcaster creates a Broadcaster that will forward all data from the source channel to subscribers. All subscribers' channels will have the capacity of bufferSize. Depending on the deliveryStrategy the Broadcaster will either * ensure that all messages are sent to all subscribers (even if that means waiting on unbuffered/full channels), * skip any channel whose buffer is full, * broadcast to empty channels and unsubscribe the rest.

func NewSynchronousChanBroadcaster

func NewSynchronousChanBroadcaster[T any](ctx context.Context, source <-chan T) *ChanBroadcaster[T]

NewSynchronousChanBroadcaster creates a Broadcaster that will ensure all messages are not only sent, but delivered to subscribers.

func (*ChanBroadcaster[T]) Subscribe

func (s *ChanBroadcaster[T]) Subscribe() <-chan T

Subscribe will return a read-only channel that will deliver all broadcast messages to a new subscriber.

func (*ChanBroadcaster[T]) Unsubscribe

func (s *ChanBroadcaster[T]) Unsubscribe(channel <-chan T)

Unsubscribe will close the given channel and ensure it doesn't recieve any more updates.

type DeliveryStrategy

type DeliveryStrategy int
const (
	// Will try to push to channel and pass if channel is full.
	Skip DeliveryStrategy = iota
	// Will try to push to channel and wait if channel is full.
	Wait
	// Will try to push to channel and unsubscribe if channel is full.
	Unsubscribe
)

type NoSyncBroadcaster

type NoSyncBroadcaster[T any] struct {
	// contains filtered or unexported fields
}

NoSyncBroadcaster is a broadcast service that allows a single sender to send messages to multiple recievers. It does not implement any synchronisation and can only be used if the sender is externally synchronised e.g. only used in one goroutine or using a mutex.

Example
broadcast := NewNoSyncBroadcaster[int](10)
sub := broadcast.Subscribe()
broadcast.SendOrSkip(1)
broadcast.SendOrSkip(3)
fmt.Println(<-sub)
fmt.Println(<-sub)
broadcast.Unsubscribe(sub)
if _, ok := <-sub; !ok {
	fmt.Println("Channel was closed and unsubscribed.")
}
Output:

1
3
Channel was closed and unsubscribed.

func NewNoSyncBroadcaster

func NewNoSyncBroadcaster[T any](bufferSize int) *NoSyncBroadcaster[T]

NewNoSyncBroadcaster creates a NoSyncBroadcaster where all subscribers will get a channel of capacity bufferSize when they subscribe.

func (*NoSyncBroadcaster[T]) AddSubscriber

func (b *NoSyncBroadcaster[T]) AddSubscriber(sub chan T)

AddSubscriber gives subscribers the option to provide their own channel to receive updates on. In case they already have allocated one and want to reuse it or if the default bufferSize isn't OK for them.

func (*NoSyncBroadcaster[T]) CloseAll

func (b *NoSyncBroadcaster[T]) CloseAll()

CloseAll will close and delete all subscribers' channels.

func (*NoSyncBroadcaster[T]) Len

func (b *NoSyncBroadcaster[T]) Len() int

Len returns the number of subcribers that the service is send to.

func (*NoSyncBroadcaster[T]) SendOrSkip

func (b *NoSyncBroadcaster[T]) SendOrSkip(message T) int

SendOrSkip will try to send message to all subscribers. If a subscriber's channel is full or unbuffered it will skip that channel and continue to the next one.

func (*NoSyncBroadcaster[T]) SendOrUnsubscribe

func (b *NoSyncBroadcaster[T]) SendOrUnsubscribe(message T) int

SendOrUnsubscribe will try to send message to all subscribers. If a subscriber's channel is full or unbuffered it will unsubscribe that channel from further updates and close it.

func (*NoSyncBroadcaster[T]) SendOrWait

func (b *NoSyncBroadcaster[T]) SendOrWait(ctx context.Context, message T) bool

SendOrWait will send message to all subscribers. If a subscriber's channel is full or unbuffered it will wait until a space in the channel frees up. It returns true if it managed to send the message to all subscribers before ctx expires.

func (*NoSyncBroadcaster[T]) Subscribe

func (b *NoSyncBroadcaster[T]) Subscribe() <-chan T

Subscribe creates and returns a new channel that will receive all messages sent by the sender via this broadcast service.

func (*NoSyncBroadcaster[T]) Unsubscribe

func (b *NoSyncBroadcaster[T]) Unsubscribe(sub <-chan T) bool

Unsubscribe will stop the service sending messages on this channel and close the channel. Returns true if the provided channel is a valid subscriber.

type SyncBroadcaster

type SyncBroadcaster[T any] struct {
	// contains filtered or unexported fields
}

SyncBroadcaster is a wrapper around NoSyncBroadcaster ensuring that all operations are properly synchronised using an internal mutex.

Example
broadcast := NewSyncBroadcaster[int](10)
sub := broadcast.Subscribe()
broadcast.SendOrSkip(1)
broadcast.SendOrSkip(3)
fmt.Println(<-sub)
fmt.Println(<-sub)
broadcast.Unsubscribe(sub)
if _, ok := <-sub; !ok {
	fmt.Println("Channel was closed and unsubscribed.")
}
Output:

1
3
Channel was closed and unsubscribed.

func NewSyncBroadcaster

func NewSyncBroadcaster[T any](bufferSize int) *SyncBroadcaster[T]

NewSyncBroadcaster creates a SyncBroadcaster where all subscribers will get a channel of capacity bufferSize when they subscribe.

func (*SyncBroadcaster[T]) AddSubscriber

func (b *SyncBroadcaster[T]) AddSubscriber(sub chan T)

AddSubscriber gives subscribers the option to provide their own chanel to receive updates on. In case they already have allocated one and want to reuse it or if the default bufferSize isn't OK for them.

func (*SyncBroadcaster[T]) CloseAll

func (b *SyncBroadcaster[T]) CloseAll()

CloseAll will close and delete all subscribers' channels.

func (*SyncBroadcaster[T]) Len

func (b *SyncBroadcaster[T]) Len() int

Len returns the number of subcribers that the service is send to.

func (*SyncBroadcaster[T]) SendOrSkip

func (b *SyncBroadcaster[T]) SendOrSkip(message T) int

SendOrSkip will try to send message to all subscribers. If a subscriber's channel is full or unbuffered it will skip that channel and continue to the next one.

func (*SyncBroadcaster[T]) SendOrUnsubscribe

func (b *SyncBroadcaster[T]) SendOrUnsubscribe(message T) int

SendOrUnsubscribe will try to send message to all subscribers. If a subscriber's channel is full or unbuffered it will unsubscribe that channel from further updates and close it.

func (*SyncBroadcaster[T]) SendOrWait

func (b *SyncBroadcaster[T]) SendOrWait(ctx context.Context, message T) bool

SendOrWait will send message to all subscribers. If a subscriber's channel is full or unbuffered it will wait until a space in the channel frees up. It returns true if it managed to send the message to all subscribers before ctx expires.

func (*SyncBroadcaster[T]) Subscribe

func (b *SyncBroadcaster[T]) Subscribe() <-chan T

Subscribe creates and returns a new channel that will receive all messages send by the sender via this broadcast service.

func (*SyncBroadcaster[T]) Unsubscribe

func (b *SyncBroadcaster[T]) Unsubscribe(sub <-chan T) bool

Unsubscribe will stop the service sending messages on this channel and close the channel. Returns true if the provided channel is a valid subscriber.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL