Documentation
¶
Overview ¶
Package chansplit provides a tool for the distribution of the data sent to one channel to multiple output channels. If ensures that all the output channel consumers receive messages as soon as possible and that they do not block each other. This is achieved using internal buffers for storing the channel values that cannot be received by the consumers right away.
The core component of this package is the Splitter structure which is initialized using an existing input channel. It provides method GetOutputCh that is used to obtain an output channel and can be called multiple times.
Closing of the output channels is triggered automatically by closing the input channel. All the messages that are currently in the internal buffer of the Splitter are sent to every individual output channel before this channel is closed. To wait until all the output channels have been successfully closed we can call the Wait method of the Splitter.
Known shortcomings ¶
It is not beneficial to use the solution provided by this package in all cases when there is a need for forwarding the data to multiple channels. In some of these cases it might be easier to run multiple instances of the producer instead. The typical use-case for this package would be when the operations carried out by the message producer to get the payload sent to the channel take a significant amount of computational time or memory, and therefore we do not want to duplicate them.
When the consumers take long time to process a message and a producer sends the messages to the channel with a high frequency the memory used by the Splitter will gradually grow as its internal buffers will store all the unprocessed messages. There is currently no mechanism that would suppress this behavior or limit the memory usage.
Example ¶
// Split one input channel into two output channels and wait for them to finish the processing before finishing.
ch := make(chan int)
s := New(ch)
outCh1 := s.GetOutputCh()
outCh2 := s.GetOutputCh()
// Start two consuming go routines, one for each output channel.
go func() {
for num := range outCh1 {
fmt.Printf("Consumer 1 got message: %d\n", num)
time.Sleep(10 * time.Millisecond) // simulate some work
}
}()
go func() {
for num := range outCh2 {
fmt.Printf("Consumer 2 got message: %d\n", num)
time.Sleep(100 * time.Millisecond) // simulate some work
}
}()
ch <- 1
ch <- 2
// Closing the input channel closes all the output channels as well after all the messages have been received.
close(ch)
// Wait until all the output channels have been successfully closed. Beware that this does not wait for the
// work of the consuming go routine to be finished. To ensure this, we would have to use a sync.WaitGroup
s.Wait()
Output: Consumer 1 got message: 1 Consumer 2 got message: 1 - the order of the first two lines is non-deterministic Consumer 1 got message: 2 Consumer 2 got message: 2 - this line might not be printed due to the missing sync.WaitGroup
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Splitter ¶
type Splitter[T any] struct { // contains filtered or unexported fields }
Splitter provides a way how to distribute messages sent onto one channel to multiple output channels. These channels are completely independent, and they do not block each other. Message sent onto the input channel never blocks the caller. If the output channels' consumers are occupied when a new incoming message is received, the message is stored in an internal buffer. When the input channel is closed, it closes all the output channels as well, but only after all the messages currently in the buffer have been consumed.
func (*Splitter[T]) GetOutputCh ¶
func (a *Splitter[T]) GetOutputCh() <-chan T
GetOutputCh returns a new output channel connected to the receiver's input channel