multicast

package module
v0.0.0-...-cf40b75 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 2, 2020 License: MIT Imports: 7 Imported by: 0

README

multicast

import "github.com/reactivego/multicast"

Package multicast provides MxN multicast channels for Go with buffering and time based buffer eviction. It can be fed by multiple concurrent senders. It multicasts and replays messages to multiple concurrent receivers.

If you are in a situation where you need to record and replay a stream of data or need to split a stream of data into multiple identical streams, then this package offers a fast and simple implementation.

Example (Send 2x2)

Send from 2 goroutines and receive in 2 goroutines.

Code:

ch := multicast.NewChan(128, 2)

// Send suppports multiple goroutine sending and stores a timestamp with
// every message sent.

var wgs sync.WaitGroup
wgs.Add(2)
go func() {
	ch.Send("Hello")
	wgs.Done()
}()
go func() {
	ch.Send("World!")
	wgs.Done()
}()

print := func(value interface{}, err error, closed bool) bool {
	switch {
	case !closed:
		fmt.Println(value)
	case err != nil:
		fmt.Println(err)
	default:
		fmt.Println("closed")
	}
	return true
}

var wgr sync.WaitGroup
wgr.Add(2)
ep1, _ := ch.NewEndpoint(multicast.ReplayAll)
go func() {
	ep1.Range(print, 0)
	wgr.Done()
}()

ep2, _ := ch.NewEndpoint(multicast.ReplayAll)
go func() {
	ep2.Range(print, 0)
	wgr.Done()
}()

wgs.Wait()
ch.Close(nil)
if ch.Closed() {
	fmt.Println("channel closed")
}
wgr.Wait()

Unordered Output:

Hello
Hello
World!
World!
closed
closed
channel closed

Example (FastSend 1x2)

Send from 1 goroutine and receive in 2 goroutines Code:

ch := multicast.NewChan(128, 2)

// FastSend allows only a single goroutine sending and does not store
// timestamps with messages.

ch.FastSend("Hello")
ch.FastSend("World!")
ch.Close(nil)
if ch.Closed() {
    fmt.Println("channel closed")
}

print := func(value interface{}, err error, closed bool) bool {
    switch {
    case !closed:
        fmt.Println(value)
    case err != nil:
        fmt.Println(err)
    default:
        fmt.Println("closed")
    }
    return true
}

var wg sync.WaitGroup
wg.Add(2)
ep1, _ := ch.NewEndpoint(multicast.ReplayAll)
go func() {
    ep1.Range(print, 0)
    wg.Done()
}()

ep2, _ := ch.NewEndpoint(multicast.ReplayAll)
go func() {
    ep2.Range(print, 0)
    wg.Done()
}()
wg.Wait()

Unordered Output:

channel closed
Hello
Hello
World!
World!
closed
closed

Compared to Go channels

The standard Go channel cannot multicast the same message to multiple receivers and it cannot play back messages previously sent to it. The multicast.Chan type offered here does.

Additionally, you can even evict messages from the buffer that are past a certain age because multicast.Chan also stores a timestamp with each message sent.

Compared to other Multicast packages

This multicast channel is different from other multicast implementations.

  1. It uses only fast synchronization primitives like atomic operations to implement its features.
  2. It doesn't use goroutines internally.
  3. It uses internal struct padding to speed up CPU cache access.

This allows it to operate at a very high level of performance.

Regenerating this Package

This package is generated from generics in the sub-folder generic by the jig tool. You don't need to regenerate this package in order to use it. However, if you are interested in regenerating it, then read on.

The jig tool provides the parametric polymorphism capability that Go 1 is missing. It works by replacing place-holder types of generic functions and datatypes with interface{} (it can also generate statically typed code though).

To regenerate, change the current working directory to the package directory and run the jig tool as follows:

$ go get -d github.com/reactivego/jig
$ go run github.com/reactivego/jig -v

License

This library is licensed under the terms of the MIT License. See LICENSE file in this repository for copyright notice and exact wording.

Documentation

Overview

Package multicast provides a Chan type that can multicast and replay messages to multiple receivers.

Multicast and Replay

Native Go channels don't support multicasting the same message to multiple receivers and they don't support replaying previously sent messages.

Unlike native Go channels, messages send to this channel are multicast to all receiving endpoints. A new endpoint created while the channel is operational can choose to receive messages previously sent by specifying a replay count parameter, or 0 to indicate it is only interested in new messages.

You can also limit playback to messages younger than a certain age because the channel stores a timestamp with each message you send to it.

Just like native Go channels, the channel exhibits blocking backpressure to the sender goroutines when the channel buffer is full. Total speed of the channel is dictated by the slowest receiver.

Lock and Goroutine free

This multicast channel is different from other multicast implementations in that it uses only fast synchronization primitives like atomic operations to implement its features. Furthermore, it also doesn't use goroutines internally. This implementation is low-latency and has a high throughput.

If you are in a situation where you need to record and replay a stream of data or you need to split a stream of data into multiple identical streams, then this package offers a fast and simple solution.

Heterogeneous

Heterogeneous simply means that you can mix types, that is very convenient but not typesafe. The Chan type provided in this package supports sending and receiving values of mixed type:

ch := NewChan(128, 1)
ch.Send("hello")
ch.Send(42)
ch.Send(1.6180)
ch.Close(nil)

Regenerating this Package

The implementation in this package is generated from a generic implementation of the Chan type found in the subdirectory "generic" inside this package. By replacing the place-holder type with "interface{}" a heterogeneous Chan type is created. To regenerate this channel implementation, run jig inside this package directory:

go get -d github.com/reactivego/generics/cmd/jig
go run github.com/reactivego/generics/cmd/jig -v
Example (FastSend1x2)
package main

import (
	"fmt"
	"sync"

	"github.com/reactivego/multicast"
)

func main() {
	ch := multicast.NewChan(128, 2)

	// FastSend allows only a single goroutine sending and does not store
	// timestamps with messages.

	ch.FastSend("Hello")
	ch.FastSend("World!")
	ch.Close(nil)
	if ch.Closed() {
		fmt.Println("channel closed")
	}

	print := func(value interface{}, err error, closed bool) bool {
		switch {
		case !closed:
			fmt.Println(value)
		case err != nil:
			fmt.Println(err)
		default:
			fmt.Println("closed")
		}
		return true
	}

	var wg sync.WaitGroup
	wg.Add(2)
	ep1, _ := ch.NewEndpoint(multicast.ReplayAll)
	go func() {
		ep1.Range(print, 0)
		wg.Done()
	}()

	ep2, _ := ch.NewEndpoint(multicast.ReplayAll)
	go func() {
		ep2.Range(print, 0)
		wg.Done()
	}()
	wg.Wait()

}
Output:

channel closed
Hello
Hello
World!
World!
closed
closed
Example (Send2x2)
package main

import (
	"fmt"
	"sync"

	"github.com/reactivego/multicast"
)

func main() {
	ch := multicast.NewChan(128, 2)

	// Send suppports multiple goroutine sending and stores a timestamp with
	// every message sent.

	var wgs sync.WaitGroup
	wgs.Add(2)
	go func() {
		ch.Send("Hello")
		wgs.Done()
	}()
	go func() {
		ch.Send("World!")
		wgs.Done()
	}()

	print := func(value interface{}, err error, closed bool) bool {
		switch {
		case !closed:
			fmt.Println(value)
		case err != nil:
			fmt.Println(err)
		default:
			fmt.Println("closed")
		}
		return true
	}

	var wgr sync.WaitGroup
	wgr.Add(2)
	ep1, _ := ch.NewEndpoint(multicast.ReplayAll)
	go func() {
		ep1.Range(print, 0)
		wgr.Done()
	}()

	ep2, _ := ch.NewEndpoint(multicast.ReplayAll)
	go func() {
		ep2.Range(print, 0)
		wgr.Done()
	}()

	wgs.Wait()
	ch.Close(nil)
	if ch.Closed() {
		fmt.Println("channel closed")
	}
	wgr.Wait()

}
Output:

Hello
Hello
World!
World!
closed
closed
channel closed

Index

Examples

Constants

View Source
const ErrOutOfEndpoints = ChannelError("out of endpoints")

ErrOutOfEndpoints is returned by NewEndpoint when the maximum number of endpoints has already been created.

View Source
const (
	// ReplayAll can be passed to NewEndpoint to retain as many of the
	// previously sent messages as possible that are still in the buffer.
	ReplayAll uint64 = math.MaxUint64
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Chan

type Chan struct {
	// contains filtered or unexported fields
}

Chan is a fast, concurrent multi-(casting,sending,receiving) buffered channel. It is implemented using only sync/atomic operations. Spinlocks using runtime.Gosched() are used in situations where goroutines are waiting or contending for resources.

func NewChan

func NewChan(bufferCapacity int, endpointCapacity int) *Chan

NewChan creates a new channel. The parameters bufferCapacity and endpointCapacity determine the size of the message buffer and maximum number of concurrent receiving endpoints respectively.

Note that bufferCapacity is always scaled up to a power of 2 so e.g. specifying 400 will create a buffer of 512 (2^9). Also because of this a bufferCapacity of 0 is scaled up to 1 (2^0).

func (*Chan) Close

func (c *Chan) Close(err error)

Close will close the channel. Pass in an error or nil. Endpoints continue to receive data until the buffer is empty. Only then will the close notification be delivered to the Range function.

func (*Chan) Closed

func (c *Chan) Closed() bool

Closed returns true when the channel was closed using the Close method.

func (*Chan) FastSend

func (c *Chan) FastSend(value interface{})

FastSend can be used to send values to the channel from a SINGLE goroutine. Also, this does not record the time a message was sent, so the maxAge value passed to Range will be ignored.

Note, that when the number of unread messages has reached bufferCapacity, then the call to FastSend will block until the slowest Endpoint has read another message.

func (*Chan) Lock

func (c *Chan) Lock()

Lock, empty method so we can pass *Chan to sync.NewCond as a Locker.

func (*Chan) NewEndpoint

func (c *Chan) NewEndpoint(keep uint64) (*Endpoint, error)

NewEndpoint will create a new channel endpoint that can be used to receive from the channel. The argument keep specifies how many entries of the existing channel buffer to keep.

After Close is called on the channel, any endpoints created after that will still receive the number of messages as indicated in the keep parameter and then subsequently the close.

An endpoint that is canceled or read until it is exhausted (after channel was closed) will be reused by NewEndpoint.

func (*Chan) Send

func (c *Chan) Send(value interface{})

Send can be used by concurrent goroutines to send values to the channel.

Note, that when the number of unread messages has reached bufferCapacity, then the call to Send will block until the slowest Endpoint has read another message.

func (*Chan) Unlock

func (c *Chan) Unlock()

Unlock, empty method so we can pass *Chan to sync.NewCond as a Locker.

type ChannelError

type ChannelError string

func (ChannelError) Error

func (e ChannelError) Error() string

type Endpoint

type Endpoint struct {
	*Chan
	// contains filtered or unexported fields
}

Endpoint is returned by a call to NewEndpoint on the channel. Every endpoint should be used by only a single goroutine, so no sharing between goroutines.

func (*Endpoint) Cancel

func (e *Endpoint) Cancel()

Cancel cancels the endpoint, making it available to be reused when NewEndpoint is called on the channel. When canceled the foreach function passed to Range is not notified, instead just never called again.

func (*Endpoint) Range

func (e *Endpoint) Range(foreach func(value interface{}, err error, closed bool) bool, maxAge time.Duration)

Range will call the passed in foreach function with all the messages in the buffer, followed by all the messages received. When the foreach function returns true Range will continue, when you return false this is the same as calling Cancel. When canceled the foreach will never be called again. Passing a maxAge duration other than 0 will skip messages that are older than maxAge.

When the channel is closed, eventually when the buffer is exhausted the close with optional error will be notified by calling foreach one last time with the closed parameter set to true.

Directories

Path Synopsis
Package multicast provides generic MxN multicast channels for Go with buffering and time based buffer eviction.
Package multicast provides generic MxN multicast channels for Go with buffering and time based buffer eviction.
Package test provides examples, tests and benchmarks for the channel (specialized on type int).
Package test provides examples, tests and benchmarks for the channel (specialized on type int).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL