demux

package
v0.0.0-...-b995670 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 1, 2023 License: MIT Imports: 9 Imported by: 2

Documentation

Overview

Package receive provides a stream demuxer for handling results where you need to send results to different channels based on some identity.

Here is an example:

// This is uses to send int data to our demuxer. By making the
// channel hold 100 values, 100 goroutines will be used to do demuxing.
// This also means that no order is guranteed on the output to the
// different channels we forward to. If you need guaranteed ordering,
// you can use the InOrder type. See the demux_test.go file's
// TestDemuxEtoE() for how to integrate it. However, unless doing complex
// middleware or the need to prevent head of line blocking for the receiver,
// a value of 1 will often do.
input := make(chan int, 100)

// Here is a function to handle any stream errors. It receives the
// value "v" and the error. All errors are of type Error, so you
// can make decisions on what to do based on the error type.
// This example does nothing.
errHandle := func(v int, err error){}

// getID returns a channel ID based on the value modulus 2.
// So it will always return 0 or 1. In more complex forwarding, the
// value "v" could simply hold a value "channelID" that the function could
// return.
getID := func(v int) int {
	return v % 2
}

// Sets up our demuxer that receives data on input. getID() will decide
// what channel to forward on. If there is an error, it is handled by
// errHandle.
demux, err := New(input, getID, errHandle)
if err != nil {
	panic(err)
}

// Here is channels we will forward to.
output0 := make(chan int, 1)
output1 := make(chan int, 1)

// Add our channel receivers. This can be done anytime if you have
// the need for dynamic handling for new channels. There is also
// a Remove() to remove a receiver. Also, receivers can be closed
// when they receive a single value or if the values implement the
// CloseCher type, the message can signal to close the channel.
demux.AddReceiver(0, output0)
demux.AddReceiver(1, output1)

// Send all of our values into the demuxer for input.
wg.Add(1)
go func() {
	defer wg.Done()

	for i := 0; i < 10000; i++ {
		i := i

		wg.Add(1)
		go func() {
			defer wg.Done()
			time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
			input <- i
		}()
	}
}()

go func() {
	wg.Wait()
	demux.Close()
}()

dataProcessed := sync.WaitGroup{}
dataProcessed.Add(1)
go func() {
	defer dataProcessed.Done()
	for v := range output0 {
		fmt.Println(v)
	}
}()
go func() {
	defer dataProcessed.Done()
	for v := range output1 {
		fmt.Println(v)
	}
}()
wg.Wait()

For a more complex example that handles reordering messages, see the demux_test.go file's TestDemuxEtoE().

You can also apply options that require sending on a channel to happen in some time.Duration or error (prevents head of line blocking).

We support middleware via WithMiddleware().

And if ordering of output matters for particular output but you want concurrency, you can use the InOrder type to get the values out of the Demuxer and back into order.

Finally, we support a types that implement CloseCher. If we receive a message where CloseCh() == true, we forward that message and close the channel. In this case, all messages going to that channel must be received in order with the one indicating CloseCh() received last. Otherwise, there is a deadlock.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CloseCher

type CloseCher interface {
	// CloseCh indicates if this should close the channel the input
	// is routed to. Remember, if the input channel has more than 1
	// in capacity, order cannot be guaranteed and you could be closin
	// a channel that is still processing.
	CloseCh() bool
}

CloseCher is checked against every input we receive. If the input has this method and it returns true, we push the type onto the output channel and then close the channel.

type Demux

type Demux[K comparable, V any] struct {
	// contains filtered or unexported fields
}

Demux is used to unmux data received on a stream and put it on the proper receive channel. K represents a key that can be used to identify the channel, V represents the channel value that will be sent.

func New

func New[K comparable, V any](in chan V, getID GetIDer[K, V], errHandle ErrHandler[K, V], options ...Option[K, V]) (*Demux[K, V], error)

New creates a new Demux. in is the channel that this Demux will receive values on. The len(in) is equal to the number of goroutines that will be used to process incoming values. If len(in) == 0, 1 goroutine is used. Concurrent goroutines means that order is not guaranteed. getID extracts an ID of type K from a value V. ID should be unique to that value V. ID is used to map a received V to a channel K. errHandle provides a function that handles what to do with an error for value V received on input. IMPORTANT: Your input must be in ID order if implementing a value type with CloseCher(). Otherwise, this will likely just lock up.

func (*Demux[K, V]) AddReceiver

func (s *Demux[K, V]) AddReceiver(id K, ch chan V) error

AddReceiver an ID and a channel to send all values that have that ID onto passed channel. This must occur before receiving values of id.

func (*Demux[K, V]) Close

func (s *Demux[K, V]) Close() error

Close closes our Demux and the input channel it takes in. If there are any open outpu channels, they are closed. This will block if until all routing goroutines are closed.

func (*Demux[K, V]) RemoveReceiver

func (s *Demux[K, V]) RemoveReceiver(id K)

RemoveReceiver removes a receiver.

type ErrHandler

type ErrHandler[K comparable, V any] func(v V, err error)

ErrHandler is a function that determines what to do when value V was received which generated error err. All errors for processing are of our Error type and can be used to decide what to do with the value.

type ErrType

type ErrType uint8

ErrType is the type of error that is being returned.

const (
	ETUnknown ErrType = iota
	// ETValueExists indicates you are creating an exit channel
	// that already exists.
	ETChanExists
	// ETChanNotFound indicates that an exit channel for the value could
	// not be found.
	ETChanNotFound
	// ETMiddleware indicates a piece of middleware gave an error. The
	// error given is wrapped in this error.
	ETMiddleware
	// ETChanTimeout indicates WithDelayMax was set and a value exceeded
	// this delay and is dropped.
	ETChanTimeout
)

func (ErrType) String

func (i ErrType) String() string

type Error

type Error struct {
	// Type is the type of error.
	Type ErrType
	// Message is the errors message.
	Message string
	// contains filtered or unexported fields
}

Error provides errors for this module.

func (Error) Error

func (e Error) Error() string

func (Error) Unwrap

func (e Error) Unwrap() error

func (Error) Wrap

func (e Error) Wrap(err error) Error

type GetIDer

type GetIDer[K comparable, V any] func(v V) K

GetIDer represents a function that handed value V will return the unique ID of K.

type InOrder

type InOrder[I constraints.Integer, V any] struct {
	// contains filtered or unexported fields
}

InOrder allows you to funnel the Demux channel output that will receive out of order messages into InOrder which will reprocess the messages and send them out in order. InOrder must always receive a value of 0 to start processing and expects messages to ascend by 1 from there.

func NewInOrder

func NewInOrder[I constraints.Integer, V any](getID GetIDer[I, V], out chan V) *InOrder[I, V]

NewInOrder makes a new InOrder processor. getID is a GetIDer that gets the message's order ID, which should start at 0 and increment.

func (*InOrder[I, V]) Add

func (n *InOrder[I, V]) Add(v V) error

Add adds a value to be sorted. If the id of v is <= to the current internal counter, this will return an error. Nothing is processed until a value with id 0 is passed, then 1, then 2.

func (*InOrder[I, V]) Close

func (n *InOrder[I, V]) Close()

Close closes InOrder and closes the out channel.

func (*InOrder[I, V]) Len

func (n *InOrder[I, V]) Len() int

Len returns the length of the internal queue.

type Middleware

type Middleware[K comparable, V any] func(v V) error

Middleware is a function that is executed on value v before it is routed. If error != nil, further Middleware is not executed and the ErrHandler is called for value v.

type MutexType

type MutexType uint8

MutexType indicates the type of mutex to protect the internal datastore with.

const (
	// NoMutex, which is the default, says don't use a mutex. Instead we do a
	// copy of the internal data into a new map every time AddReceiver() is called.
	// The new map is stored in an atomic.Pointer. This is the fastest for reading, but
	// the slowest for writing. This is good when reads vastly outstrip writes
	// and the number of internal channels is low.
	NoMutex MutexType = iota
	// Mutex indicates to use the standard sync.Mutex. Good for when there
	// are symetrical channel adds to responses. So if you are doing a promise
	// type of response (waiting for a single response and then close the forwarding
	// channel), then this is the best option.
	Mutex
	// RWMutex indicates to use the sync.RWMutex. Good when we add
	// new channels much less than we forward to those channels.
	// If the number of forwarding channels is low, NoMutex is probaly the best option.
	RWMutex
)

func (MutexType) String

func (i MutexType) String() string

type Option

type Option[K comparable, V any] func(s *Demux[K, V])

Option provide optional arguments to New().

func WithCloseChan

func WithCloseChan[K comparable, V any]() Option[K, V]

WithCloseChan says to close the found channel after putting a value on it. Useful for when the return channel represents a promise that gets a single value.

func WithDelayMax

func WithDelayMax[K comparable, V any](d time.Duration) Option[K, V]

WithDelayMax tells Stream to call errHandler for a value V that is received but can't be router to the destination after d time. This prevents head of line blocking.

func WithMiddleware

func WithMiddleware[K comparable, V any](m ...Middleware[K, V]) Option[K, V]

WithMiddleware appends Middleware to be executed by the Stream.

func WithMutex

func WithMutex[K comparable, V any](mt MutexType) Option[K, V]

WithMutex indicates to use an RWMutex with the Stream. Without this, syncronization occurs by having an internal map copied every time an AddReceiver() is called adding the new value and storing it in an atomic.Pointer. Slow on write, but fast on reads. If

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL