Documentation
¶
Overview ¶
Package multicast provides a Chan type that can multicast and replay messages to multiple receivers.
Multicast and Replay ¶
Native Go channels don't support multicasting the same message to multiple receivers and they don't support replaying previously sent messages.
Unlike native Go channels, messages send to this channel are multicast to all receiving endpoints. A new endpoint created while the channel is operational can choose to receive messages previously sent by specifying a replay count parameter, or 0 to indicate it is only interested in new messages.
You can also limit playback to messages younger than a certain age because the channel stores a timestamp with each message you send to it.
Just like native Go channels, the channel exhibits blocking backpressure to the sender goroutines when the channel buffer is full. Total speed of the channel is dictated by the slowest receiver.
Lock and Goroutine free ¶
This multicast channel is different from other multicast implementations in that it uses only fast synchronization primitives like atomic operations to implement its features. Furthermore, it also doesn't use goroutines internally. This implementation is low-latency and has a high throughput.
If you are in a situation where you need to record and replay a stream of data or you need to split a stream of data into multiple identical streams, then this package offers a fast and simple solution.
Heterogeneous ¶
Heterogeneous simply means that you can mix types, that is very convenient but not typesafe. The Chan type provided in this package supports sending and receiving values of mixed type:
ch := NewChan(128, 1) ch.Send("hello") ch.Send(42) ch.Send(1.6180) ch.Close(nil)
Regenerating this Package ¶
The implementation in this package is generated from a generic implementation of the Chan type found in the subdirectory "generic" inside this package. By replacing the place-holder type with "interface{}" a heterogeneous Chan type is created. To regenerate this channel implementation, run jig inside this package directory:
go get -d github.com/reactivego/generics/cmd/jig go run github.com/reactivego/generics/cmd/jig -v
Example (FastSend1x2) ¶
package main import ( "fmt" "sync" "github.com/reactivego/multicast" ) func main() { ch := multicast.NewChan(128, 2) // FastSend allows only a single goroutine sending and does not store // timestamps with messages. ch.FastSend("Hello") ch.FastSend("World!") ch.Close(nil) if ch.Closed() { fmt.Println("channel closed") } print := func(value interface{}, err error, closed bool) bool { switch { case !closed: fmt.Println(value) case err != nil: fmt.Println(err) default: fmt.Println("closed") } return true } var wg sync.WaitGroup wg.Add(2) ep1, _ := ch.NewEndpoint(multicast.ReplayAll) go func() { ep1.Range(print, 0) wg.Done() }() ep2, _ := ch.NewEndpoint(multicast.ReplayAll) go func() { ep2.Range(print, 0) wg.Done() }() wg.Wait() }
Output: channel closed Hello Hello World! World! closed closed
Example (Send2x2) ¶
package main import ( "fmt" "sync" "github.com/reactivego/multicast" ) func main() { ch := multicast.NewChan(128, 2) // Send suppports multiple goroutine sending and stores a timestamp with // every message sent. var wgs sync.WaitGroup wgs.Add(2) go func() { ch.Send("Hello") wgs.Done() }() go func() { ch.Send("World!") wgs.Done() }() print := func(value interface{}, err error, closed bool) bool { switch { case !closed: fmt.Println(value) case err != nil: fmt.Println(err) default: fmt.Println("closed") } return true } var wgr sync.WaitGroup wgr.Add(2) ep1, _ := ch.NewEndpoint(multicast.ReplayAll) go func() { ep1.Range(print, 0) wgr.Done() }() ep2, _ := ch.NewEndpoint(multicast.ReplayAll) go func() { ep2.Range(print, 0) wgr.Done() }() wgs.Wait() ch.Close(nil) if ch.Closed() { fmt.Println("channel closed") } wgr.Wait() }
Output: Hello Hello World! World! closed closed channel closed
Index ¶
Examples ¶
Constants ¶
const ErrOutOfEndpoints = ChannelError("out of endpoints")
ErrOutOfEndpoints is returned by NewEndpoint when the maximum number of endpoints has already been created.
const ( // ReplayAll can be passed to NewEndpoint to retain as many of the // previously sent messages as possible that are still in the buffer. ReplayAll uint64 = math.MaxUint64 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Chan ¶
type Chan struct {
// contains filtered or unexported fields
}
Chan is a fast, concurrent multi-(casting,sending,receiving) buffered channel. It is implemented using only sync/atomic operations. Spinlocks using runtime.Gosched() are used in situations where goroutines are waiting or contending for resources.
func NewChan ¶
NewChan creates a new channel. The parameters bufferCapacity and endpointCapacity determine the size of the message buffer and maximum number of concurrent receiving endpoints respectively.
Note that bufferCapacity is always scaled up to a power of 2 so e.g. specifying 400 will create a buffer of 512 (2^9). Also because of this a bufferCapacity of 0 is scaled up to 1 (2^0).
func (*Chan) Close ¶
Close will close the channel. Pass in an error or nil. Endpoints continue to receive data until the buffer is empty. Only then will the close notification be delivered to the Range function.
func (*Chan) FastSend ¶
func (c *Chan) FastSend(value interface{})
FastSend can be used to send values to the channel from a SINGLE goroutine. Also, this does not record the time a message was sent, so the maxAge value passed to Range will be ignored.
Note, that when the number of unread messages has reached bufferCapacity, then the call to FastSend will block until the slowest Endpoint has read another message.
func (*Chan) Lock ¶
func (c *Chan) Lock()
Lock, empty method so we can pass *Chan to sync.NewCond as a Locker.
func (*Chan) NewEndpoint ¶
NewEndpoint will create a new channel endpoint that can be used to receive from the channel. The argument keep specifies how many entries of the existing channel buffer to keep.
After Close is called on the channel, any endpoints created after that will still receive the number of messages as indicated in the keep parameter and then subsequently the close.
An endpoint that is canceled or read until it is exhausted (after channel was closed) will be reused by NewEndpoint.
type ChannelError ¶
type ChannelError string
func (ChannelError) Error ¶
func (e ChannelError) Error() string
type Endpoint ¶
type Endpoint struct { *Chan // contains filtered or unexported fields }
Endpoint is returned by a call to NewEndpoint on the channel. Every endpoint should be used by only a single goroutine, so no sharing between goroutines.
func (*Endpoint) Cancel ¶
func (e *Endpoint) Cancel()
Cancel cancels the endpoint, making it available to be reused when NewEndpoint is called on the channel. When canceled the foreach function passed to Range is not notified, instead just never called again.
func (*Endpoint) Range ¶
func (e *Endpoint) Range(foreach func(value interface{}, err error, closed bool) bool, maxAge time.Duration)
Range will call the passed in foreach function with all the messages in the buffer, followed by all the messages received. When the foreach function returns true Range will continue, when you return false this is the same as calling Cancel. When canceled the foreach will never be called again. Passing a maxAge duration other than 0 will skip messages that are older than maxAge.
When the channel is closed, eventually when the buffer is exhausted the close with optional error will be notified by calling foreach one last time with the closed parameter set to true.
Directories
¶
Path | Synopsis |
---|---|
Package multicast provides generic MxN multicast channels for Go with buffering and time based buffer eviction.
|
Package multicast provides generic MxN multicast channels for Go with buffering and time based buffer eviction. |
Package test provides examples, tests and benchmarks for the channel (specialized on type int).
|
Package test provides examples, tests and benchmarks for the channel (specialized on type int). |