Documentation ¶
Overview ¶
Package receive provides a stream demuxer for handling results where you need to send results to different channels based on some identity.
Here is an example:
// This is uses to send int data to our demuxer. By making the // channel hold 100 values, 100 goroutines will be used to do demuxing. // This also means that no order is guranteed on the output to the // different channels we forward to. If you need guaranteed ordering, // you can use the InOrder type. See the demux_test.go file's // TestDemuxEtoE() for how to integrate it. However, unless doing complex // middleware or the need to prevent head of line blocking for the receiver, // a value of 1 will often do. input := make(chan int, 100) // Here is a function to handle any stream errors. It receives the // value "v" and the error. All errors are of type Error, so you // can make decisions on what to do based on the error type. // This example does nothing. errHandle := func(v int, err error){} // getID returns a channel ID based on the value modulus 2. // So it will always return 0 or 1. In more complex forwarding, the // value "v" could simply hold a value "channelID" that the function could // return. getID := func(v int) int { return v % 2 } // Sets up our demuxer that receives data on input. getID() will decide // what channel to forward on. If there is an error, it is handled by // errHandle. demux, err := New(input, getID, errHandle) if err != nil { panic(err) } // Here is channels we will forward to. output0 := make(chan int, 1) output1 := make(chan int, 1) // Add our channel receivers. This can be done anytime if you have // the need for dynamic handling for new channels. There is also // a Remove() to remove a receiver. Also, receivers can be closed // when they receive a single value or if the values implement the // CloseCher type, the message can signal to close the channel. demux.AddReceiver(0, output0) demux.AddReceiver(1, output1) // Send all of our values into the demuxer for input. wg.Add(1) go func() { defer wg.Done() for i := 0; i < 10000; i++ { i := i wg.Add(1) go func() { defer wg.Done() time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) input <- i }() } }() go func() { wg.Wait() demux.Close() }() dataProcessed := sync.WaitGroup{} dataProcessed.Add(1) go func() { defer dataProcessed.Done() for v := range output0 { fmt.Println(v) } }() go func() { defer dataProcessed.Done() for v := range output1 { fmt.Println(v) } }() wg.Wait()
For a more complex example that handles reordering messages, see the demux_test.go file's TestDemuxEtoE().
You can also apply options that require sending on a channel to happen in some time.Duration or error (prevents head of line blocking).
We support middleware via WithMiddleware().
And if ordering of output matters for particular output but you want concurrency, you can use the InOrder type to get the values out of the Demuxer and back into order.
Finally, we support a types that implement CloseCher. If we receive a message where CloseCh() == true, we forward that message and close the channel. In this case, all messages going to that channel must be received in order with the one indicating CloseCh() received last. Otherwise, there is a deadlock.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CloseCher ¶
type CloseCher interface { // CloseCh indicates if this should close the channel the input // is routed to. Remember, if the input channel has more than 1 // in capacity, order cannot be guaranteed and you could be closin // a channel that is still processing. CloseCh() bool }
CloseCher is checked against every input we receive. If the input has this method and it returns true, we push the type onto the output channel and then close the channel.
type Demux ¶
type Demux[K comparable, V any] struct { // contains filtered or unexported fields }
Demux is used to unmux data received on a stream and put it on the proper receive channel. K represents a key that can be used to identify the channel, V represents the channel value that will be sent.
func New ¶
func New[K comparable, V any](in chan V, getID GetIDer[K, V], errHandle ErrHandler[K, V], options ...Option[K, V]) (*Demux[K, V], error)
New creates a new Demux. in is the channel that this Demux will receive values on. The len(in) is equal to the number of goroutines that will be used to process incoming values. If len(in) == 0, 1 goroutine is used. Concurrent goroutines means that order is not guaranteed. getID extracts an ID of type K from a value V. ID should be unique to that value V. ID is used to map a received V to a channel K. errHandle provides a function that handles what to do with an error for value V received on input. IMPORTANT: Your input must be in ID order if implementing a value type with CloseCher(). Otherwise, this will likely just lock up.
func (*Demux[K, V]) AddReceiver ¶
AddReceiver an ID and a channel to send all values that have that ID onto passed channel. This must occur before receiving values of id.
func (*Demux[K, V]) Close ¶
Close closes our Demux and the input channel it takes in. If there are any open outpu channels, they are closed. This will block if until all routing goroutines are closed.
func (*Demux[K, V]) RemoveReceiver ¶
func (s *Demux[K, V]) RemoveReceiver(id K)
RemoveReceiver removes a receiver.
type ErrHandler ¶
type ErrHandler[K comparable, V any] func(v V, err error)
ErrHandler is a function that determines what to do when value V was received which generated error err. All errors for processing are of our Error type and can be used to decide what to do with the value.
type ErrType ¶
type ErrType uint8
ErrType is the type of error that is being returned.
const ( ETUnknown ErrType = iota // ETValueExists indicates you are creating an exit channel // that already exists. ETChanExists // ETChanNotFound indicates that an exit channel for the value could // not be found. ETChanNotFound // ETMiddleware indicates a piece of middleware gave an error. The // error given is wrapped in this error. ETMiddleware // ETChanTimeout indicates WithDelayMax was set and a value exceeded // this delay and is dropped. ETChanTimeout )
type Error ¶
type Error struct { // Type is the type of error. Type ErrType // Message is the errors message. Message string // contains filtered or unexported fields }
Error provides errors for this module.
type GetIDer ¶
type GetIDer[K comparable, V any] func(v V) K
GetIDer represents a function that handed value V will return the unique ID of K.
type InOrder ¶
type InOrder[I constraints.Integer, V any] struct { // contains filtered or unexported fields }
InOrder allows you to funnel the Demux channel output that will receive out of order messages into InOrder which will reprocess the messages and send them out in order. InOrder must always receive a value of 0 to start processing and expects messages to ascend by 1 from there.
func NewInOrder ¶
func NewInOrder[I constraints.Integer, V any](getID GetIDer[I, V], out chan V) *InOrder[I, V]
NewInOrder makes a new InOrder processor. getID is a GetIDer that gets the message's order ID, which should start at 0 and increment.
func (*InOrder[I, V]) Add ¶
Add adds a value to be sorted. If the id of v is <= to the current internal counter, this will return an error. Nothing is processed until a value with id 0 is passed, then 1, then 2.
type Middleware ¶
type Middleware[K comparable, V any] func(v V) error
Middleware is a function that is executed on value v before it is routed. If error != nil, further Middleware is not executed and the ErrHandler is called for value v.
type MutexType ¶
type MutexType uint8
MutexType indicates the type of mutex to protect the internal datastore with.
const ( // NoMutex, which is the default, says don't use a mutex. Instead we do a // copy of the internal data into a new map every time AddReceiver() is called. // The new map is stored in an atomic.Pointer. This is the fastest for reading, but // the slowest for writing. This is good when reads vastly outstrip writes // and the number of internal channels is low. NoMutex MutexType = iota // Mutex indicates to use the standard sync.Mutex. Good for when there // are symetrical channel adds to responses. So if you are doing a promise // type of response (waiting for a single response and then close the forwarding // channel), then this is the best option. Mutex // RWMutex indicates to use the sync.RWMutex. Good when we add // new channels much less than we forward to those channels. // If the number of forwarding channels is low, NoMutex is probaly the best option. RWMutex )
type Option ¶
type Option[K comparable, V any] func(s *Demux[K, V])
Option provide optional arguments to New().
func WithCloseChan ¶
func WithCloseChan[K comparable, V any]() Option[K, V]
WithCloseChan says to close the found channel after putting a value on it. Useful for when the return channel represents a promise that gets a single value.
func WithDelayMax ¶
func WithDelayMax[K comparable, V any](d time.Duration) Option[K, V]
WithDelayMax tells Stream to call errHandler for a value V that is received but can't be router to the destination after d time. This prevents head of line blocking.
func WithMiddleware ¶
func WithMiddleware[K comparable, V any](m ...Middleware[K, V]) Option[K, V]
WithMiddleware appends Middleware to be executed by the Stream.
func WithMutex ¶
func WithMutex[K comparable, V any](mt MutexType) Option[K, V]
WithMutex indicates to use an RWMutex with the Stream. Without this, syncronization occurs by having an internal map copied every time an AddReceiver() is called adding the new value and storing it in an atomic.Pointer. Slow on write, but fast on reads. If