Documentation ¶
Index ¶
- func AsChan(vs ...interface{}) <-chan interface{}
- func BCasterEventTransformFn(b *BCaster, input interface{}) interface{}
- func Bridge(done <-chan interface{}, chanStream <-chan <-chan interface{}) chan interface{}
- func CloseChannel(ch chan interface{})
- func FanIn(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{}
- func FanInRec(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{}
- func FilterEventTransformFn(f *Filter, input interface{}) interface{}
- func Or(channels ...<-chan interface{}) <-chan interface{}
- func OrDone(done, c <-chan interface{}) chan interface{}
- func OrDoneChanParamFn(done, c <-chan interface{}, fn func(param chan interface{}), ...) chan interface{}
- func OrDoneChanParamsFn(done, c <-chan interface{}, fn func(params ...chan interface{}), ...) chan interface{}
- func OrDoneFn(done, c <-chan interface{}, fn func()) chan interface{}
- func OrDoneParamFn(done, c <-chan interface{}, fn func(param interface{}), param interface{}) chan interface{}
- func OrDoneParamsFn(done, c <-chan interface{}, fn func(params ...interface{}), ...) chan interface{}
- func ProcessorEventTransformFn(p *Processor, input interface{}, result interface{}) interface{}
- func Repeat(done <-chan interface{}, values ...interface{}) chan interface{}
- func RepeatChanParamFn(done, c <-chan interface{}, fn func(param chan interface{}) interface{}, ...) chan interface{}
- func RepeatChanParamsFn(done, c <-chan interface{}, fn func(params ...chan interface{}) interface{}, ...) chan interface{}
- func RepeatFn(done <-chan interface{}, fn func() interface{}) chan interface{}
- func RepeatParamFn(done <-chan interface{}, fn func(param interface{}) interface{}, ...) chan interface{}
- func RepeatParamsFn(done, c <-chan interface{}, fn func(params ...interface{}) interface{}, ...) chan interface{}
- func Route(done <-chan interface{}, in <-chan interface{}, size int) (channels []chan interface{})
- func Take(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{}
- func ToString(done <-chan interface{}, valueStream <-chan interface{}) <-chan string
- type BCaster
- type BCasterOption
- type DoneHandler
- type DoneHandlerOption
- type DoneManager
- func (dm *DoneManager) AddDoneHandler(dh *DoneHandler, layer int)
- func (dm *DoneManager) AddNewDoneHandler(layer int, opts ...DoneHandlerOption) *DoneHandler
- func (dm *DoneManager) Deadline() *time.Time
- func (dm *DoneManager) Done() chan interface{}
- func (dm *DoneManager) Err() error
- func (dm *DoneManager) GetDoneFunc() func()
- func (dm *DoneManager) GetDoneHandler(opts ...QueryDoneHandlerOption) (*DoneHandler, int, bool)
- func (dm *DoneManager) ID() string
- func (dm *DoneManager) RemoveDoneHandler(opts ...QueryDoneHandlerOption) bool
- type DoneManagerOption
- type Event
- type EventMap
- type EventSortedMap
- type Filter
- func (f *Filter) Filter(fn func(input interface{}, params ...interface{}) bool, params ...interface{})
- func (f *Filter) GetState() State
- func (f *Filter) HasValidInputChan() bool
- func (f *Filter) ID() string
- func (f *Filter) InputChannel() chan interface{}
- func (f *Filter) OutputChannel() chan interface{}
- func (f *Filter) Sequence() interface{}
- func (f *Filter) Start()
- func (f *Filter) Stop()
- type FilterOption
- type Map
- func (cm *Map) Delete(key interface{})
- func (cm *Map) Get(key interface{}) (interface{}, bool)
- func (cm *Map) GetKeyByItem(item interface{}) (interface{}, bool)
- func (cm *Map) Iter() <-chan MapItem
- func (cm *Map) IterWithCancel(cancel chan interface{}) <-chan MapItem
- func (cm *Map) Len() int
- func (cm *Map) Set(key, value interface{})
- type MapItem
- type Message
- type MsgMultiplexer
- func (mp *MsgMultiplexer) Get(key interface{}) (chan interface{}, bool)
- func (mp *MsgMultiplexer) ID() string
- func (mp *MsgMultiplexer) Iter() chan interface{}
- func (mp *MsgMultiplexer) Sequence() interface{}
- func (mp *MsgMultiplexer) Set(key interface{}, value chan interface{})
- func (mp *MsgMultiplexer) Start()
- type MsgMultiplexerOption
- func MsgMultiplexerGetChannelItemKeyFn(fn func(v interface{}) int64) MsgMultiplexerOption
- func MsgMultiplexerGetChannelItemSequenceFn(fn func(v interface{}) interface{}) MsgMultiplexerOption
- func MsgMultiplexerGetLastRegKeyFn(fn func() int64) MsgMultiplexerOption
- func MsgMultiplexerSequence(seq interface{}) MsgMultiplexerOption
- func MsgMultiplexerTransformFn(fn func(mp *MsgMultiplexer, sm *SortedMap) interface{}) MsgMultiplexerOption
- type MultiMsgMultiplexer
- func (mp *MultiMsgMultiplexer) AddItemToMap(v interface{}, m *SortedMap)
- func (mp *MultiMsgMultiplexer) Get(key interface{}) (chan interface{}, bool)
- func (mp *MultiMsgMultiplexer) ID() string
- func (mp *MultiMsgMultiplexer) Iter() chan interface{}
- func (mp *MultiMsgMultiplexer) Sequence() interface{}
- func (mp *MultiMsgMultiplexer) Set(key interface{}, value chan interface{})
- func (mp *MultiMsgMultiplexer) Start()
- type MultiMsgMultiplexerOption
- func MultiMsgMultiplexerBufferSize(bufferSize int) MultiMsgMultiplexerOption
- func MultiMsgMultiplexerItemKeyFn(fn func(v interface{}) int64) MultiMsgMultiplexerOption
- func MultiMsgMultiplexerSendPeriod(d *time.Duration) MultiMsgMultiplexerOption
- func MultiMsgMultiplexerSequence(seq interface{}) MultiMsgMultiplexerOption
- func MultiMsgMultiplexerTransformFn(fn func(mp *MultiMsgMultiplexer, sm *SortedMap) interface{}) MultiMsgMultiplexerOption
- func MultiMsgMultiplexerWaitForAll(waitforall bool) MultiMsgMultiplexerOption
- type MultiMsgResultItem
- type Processor
- func (p *Processor) GetState() State
- func (p *Processor) HasValidInputChan() bool
- func (p *Processor) ID() string
- func (p *Processor) InputChannel() chan interface{}
- func (p *Processor) OutputChannel() chan interface{}
- func (p *Processor) Process(f func(input interface{}, params ...interface{}) interface{}, ...)
- func (p *Processor) Sequence() interface{}
- func (p *Processor) Start()
- func (p *Processor) Stop()
- type ProcessorOption
- type QueryDoneHandler
- type QueryDoneHandlerOption
- type Slice
- func (cs *Slice) Append(item interface{})
- func (cs *Slice) Cap() int
- func (cs *Slice) GetItemAtIndex(index int) interface{}
- func (cs *Slice) IndexOf(item interface{}) int
- func (cs *Slice) Iter() <-chan SliceItem
- func (cs *Slice) IterWithCancel(cancel chan interface{}) <-chan SliceItem
- func (cs *Slice) Len() int
- func (cs *Slice) RemoveItemAtIndex(index int)
- type SliceItem
- type SortedMap
- func (sm *SortedMap) Delete(key interface{})
- func (sm *SortedMap) Get(key interface{}) (interface{}, bool)
- func (sm *SortedMap) GetByIndex(index int) (interface{}, bool)
- func (sm *SortedMap) GetKeyByIndex(index int) (interface{}, bool)
- func (sm *SortedMap) GetKeyByItem(item interface{}) (interface{}, bool)
- func (sm *SortedMap) GetMapItemByIndex(index int) (*SortedMapItem, bool)
- func (sm *SortedMap) Iter() <-chan SortedMapItem
- func (sm *SortedMap) IterWithCancel(cancel chan interface{}) <-chan SortedMapItem
- func (sm *SortedMap) Len() int
- func (sm *SortedMap) Set(key, value interface{})
- type SortedMapItem
- type SortedSlice
- func (cs *SortedSlice) Append(item interface{})
- func (cs *SortedSlice) Cap() int
- func (cs *SortedSlice) GetItemAtIndex(index int) interface{}
- func (cs *SortedSlice) IndexOf(item interface{}) int
- func (cs *SortedSlice) Iter() <-chan SortedSliceItem
- func (cs *SortedSlice) IterWithCancel(cancel chan interface{}) <-chan SortedSliceItem
- func (cs *SortedSlice) Len() int
- func (cs *SortedSlice) RemoveItemAtIndex(index int)
- type SortedSliceItem
- type State
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AsChan ¶
func AsChan(vs ...interface{}) <-chan interface{}
AsChan - sends the contents of a slice through a channel
func BCasterEventTransformFn ¶
func BCasterEventTransformFn(b *BCaster, input interface{}) interface{}
BCasterEventTransformFn - Gets the bcaster and input message the output in the form of an event
func Bridge ¶
func Bridge(done <-chan interface{}, chanStream <-chan <-chan interface{}) chan interface{}
Bridge is a way to present a single-channel facade over a channel of channels. It is used to consume values from a sequence of channels (channel of channels) doing an ordered write from different sources. By bridging the channels it destructures the channel of channels into a simple channel, allowing to multiplex the input and simplify the consumption. With this pattern we can use the channel of channels from within a single range statement and focus on our loop’s logic.
func CloseChannel ¶
func CloseChannel(ch chan interface{})
CloseChannel - Checks if the channel is not closed and closes it
func FanIn ¶
func FanIn(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{}
FanIn - combines multiple results in the form of an slice of channels into one channel. This implementation uses a waitgroup in order to multiplex all the results of the slice of channels. The output is not produced in sequence. This pattern is good when order is not important
func FanInRec ¶
func FanInRec(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{}
FanInRec - combines multiple results in the form of an slice of channels into one channel. This implementation uses a a recursive approach in order to multiplex all the results of the slice of channels. The output is not produced in sequence. This pattern is good when order is not important
func FilterEventTransformFn ¶
func FilterEventTransformFn(f *Filter, input interface{}) interface{}
FilterEventTransformFn - Gets the input event returns it in the form of an output event with the sequence of the filter
func Or ¶
func Or(channels ...<-chan interface{}) <-chan interface{}
Or - returns the value of the fastest channel.
func OrDone ¶
func OrDone(done, c <-chan interface{}) chan interface{}
OrDone - Wraps a channel with a select statement that also selects from a done channel. Allows to cancel the channel avoiding go-routine leaks
func OrDoneChanParamFn ¶
func OrDoneChanParamFn(done, c <-chan interface{}, fn func(param chan interface{}), param chan interface{}) chan interface{}
OrDoneChanParamFn - Wraps a channel with a select statement that also selects from a done channel with a function with a channel as parameter that can be run before returning. Allows to cancel the channel avoiding go-routine leaks.
func OrDoneChanParamsFn ¶
func OrDoneChanParamsFn(done, c <-chan interface{}, fn func(params ...chan interface{}), params ...chan interface{}) chan interface{}
OrDoneChanParamsFn - Wraps a channel with a select statement that also selects from a done channel with a function with a list of channels as parameters that can be run before returning. Allows to cancel the channel avoiding go-routine leaks.
func OrDoneFn ¶
func OrDoneFn(done, c <-chan interface{}, fn func()) chan interface{}
OrDoneFn - Wraps a channel with a select statement that also selects from a done channel with a function that can be run before returning. Allows to cancel the channel avoiding go-routine leaks
func OrDoneParamFn ¶
func OrDoneParamFn(done, c <-chan interface{}, fn func(param interface{}), param interface{}) chan interface{}
OrDoneParamFn - Wraps a channel with a select statement that also selects from a done channel with a function with parameter that can be run before returning. Allows to cancel the channel avoiding go-routine leaks.
func OrDoneParamsFn ¶
func OrDoneParamsFn(done, c <-chan interface{}, fn func(params ...interface{}), params ...interface{}) chan interface{}
OrDoneParamsFn - Wraps a channel with a select statement that also selects from a done channel with a function with a list of parameters that can be run before returning. Allows to cancel the channel avoiding go-routine leaks.
func ProcessorEventTransformFn ¶
func ProcessorEventTransformFn(p *Processor, input interface{}, result interface{}) interface{}
ProcessorEventTransformFn - Gets the processor, input event and result message and returns the processed output in the form of an event
func Repeat ¶
func Repeat(done <-chan interface{}, values ...interface{}) chan interface{}
Repeat - Generator that repeats the values defined on the values slice indefinitely.
func RepeatChanParamFn ¶
func RepeatChanParamFn(done, c <-chan interface{}, fn func(param chan interface{}) interface{}, param chan interface{}) chan interface{}
RepeatChanParamFn - Generator that repeats a function with a channel as parameter indefinitely.
func RepeatChanParamsFn ¶
func RepeatChanParamsFn(done, c <-chan interface{}, fn func(params ...chan interface{}) interface{}, params ...chan interface{}) chan interface{}
RepeatChanParamsFn - Generator that repeats a function with a list of channels as parameters indefinitely.
func RepeatFn ¶
func RepeatFn(done <-chan interface{}, fn func() interface{}) chan interface{}
RepeatFn - Generator that repeats a function indefinitely.
func RepeatParamFn ¶
func RepeatParamFn(done <-chan interface{}, fn func(param interface{}) interface{}, param interface{}) chan interface{}
RepeatParamFn - Generator that repeats a function with one parameter indefinitely.
func RepeatParamsFn ¶
func RepeatParamsFn(done, c <-chan interface{}, fn func(params ...interface{}) interface{}, params ...interface{}) chan interface{}
RepeatParamsFn - Generator that repeats a function with a list of parameters indefinitely.
func Route ¶
func Route(done <-chan interface{}, in <-chan interface{}, size int) (channels []chan interface{})
Route - Representation of the tee pattern. Takes a single input channel and an arbitrary number of output channels and duplicates each input into every output. When the input channel is closed, all outputs channels are closed. It allows to route or split an input into multiple outputs.
Types ¶
type BCaster ¶
type BCaster struct { MsgType string // contains filtered or unexported fields }
BCaster - Is a broadcaster that allows to send events of type concurrency.Event to registered listeners using go concurrency patterns. Listeners are chan interfaces{} allowing for go concurrent communication. Closure of BCaster is handle by a concurrency.DoneHandler that allows to control they way a set of go routines are closed in order to prevent deadlocks and unwanted behaviour It detects when listeners are done and performs the required cleanup to ensure that events are sent to the active listeners.
func NewBCaster ¶
func NewBCaster(dh *DoneHandler, MsgType string, opts ...BCasterOption) *BCaster
NewBCaster - Constructor
func (*BCaster) AddListener ¶
func (b *BCaster) AddListener(dh *DoneHandler) chan interface{}
AddListener - creates a listener as chan interface{} with a DoneHandler in order to manage its closure and pass it to the requestor so it can be used in order to consume events from the Bcaster
func (*BCaster) Broadcast ¶
func (b *BCaster) Broadcast(msg interface{})
Broadcast - Transforms a message into a concurrency.Event and broadcasts it to all the active registered listeners
func (*BCaster) RemoveListener ¶
func (b *BCaster) RemoveListener(listenerCh chan interface{})
RemoveListener - removes a listener
func (*BCaster) RemoveListenerByKey ¶
func (b *BCaster) RemoveListenerByKey(key interface{})
RemoveListenerByKey - Removes a listener by its key value
type BCasterOption ¶
type BCasterOption func(*BCaster)
BCasterOption - option to initialize the bcaster
func BCasterTransformFn ¶
func BCasterTransformFn(fn func(b *BCaster, input interface{}) interface{}) BCasterOption
BCasterTransformFn - option to add a function to transform the output into the desired output structure to the BCaster
type DoneHandler ¶
type DoneHandler struct {
// contains filtered or unexported fields
}
DoneHandler - Handles when an object is done and it is ready to be closed.
func NewDoneHandler ¶
func NewDoneHandler(opts ...DoneHandlerOption) *DoneHandler
NewDoneHandler - Constructor
func (*DoneHandler) Deadline ¶
func (dh *DoneHandler) Deadline() *time.Time
Deadline - retrieves the Deadline of the DoneHandler
func (*DoneHandler) Done ¶
func (dh *DoneHandler) Done() chan interface{}
Done - retrieves the Done channel of the DoneHandler
func (*DoneHandler) Err ¶
func (dh *DoneHandler) Err() error
Err - retrieves the Error of the DoneHandler
func (*DoneHandler) GetDoneFunc ¶
func (dh *DoneHandler) GetDoneFunc() func()
GetDoneFunc - retrieves the GetDone Function of the DoneHandler
func (*DoneHandler) ID ¶
func (dh *DoneHandler) ID() string
ID - retrieves the Id of the DoneHandler
type DoneHandlerOption ¶
type DoneHandlerOption func(*DoneHandler)
DoneHandlerOption - option to initialize the DoneHandler
func DoneHandlerWithDeadline ¶
func DoneHandlerWithDeadline(deadline time.Time) DoneHandlerOption
DoneHandlerWithDeadline - option to add a dealine value to the DoneHandler
func DoneHandlerWithTimeout ¶
func DoneHandlerWithTimeout(timeout time.Duration) DoneHandlerOption
DoneHandlerWithTimeout - option to add a timeout value to the DoneHandler. It sets a deadline with the value time.Now + timeout
type DoneManager ¶
type DoneManager struct {
// contains filtered or unexported fields
}
DoneManager - It manages a set of DoneHandlers with a layered approach, allowing to structure the way components are closed. It has an ID, a Done channel, a SortedMap of layers which key is the number of the layer and the item is represented with a SortedMap of DoneHandlers which item key is the DoneHandler.ID. It also have a donefn, a deadline property to setup a deadline to the entire set that the DoneManager is handling, a delay to space the closure of layers within the DoneManager, an err and a lock to ensure that the operations are threadsafe.
func NewDoneManager ¶
func NewDoneManager(opts ...DoneManagerOption) *DoneManager
NewDoneManager - Constructor
func (*DoneManager) AddDoneHandler ¶
func (dm *DoneManager) AddDoneHandler(dh *DoneHandler, layer int)
AddDoneHandler - adds a DoneHandler to the sorted map of the specified layer
func (*DoneManager) AddNewDoneHandler ¶
func (dm *DoneManager) AddNewDoneHandler(layer int, opts ...DoneHandlerOption) *DoneHandler
AddNewDoneHandler - Creates a new DoneHandler with the relevant DoneHandlerOptions and adds it to the sorted map of the specified layer
func (*DoneManager) Deadline ¶
func (dm *DoneManager) Deadline() *time.Time
Deadline - retrieves the Deadline of the DoneManager
func (*DoneManager) Done ¶
func (dm *DoneManager) Done() chan interface{}
Done - retrieves the Done channel of the DoneManager
func (*DoneManager) Err ¶
func (dm *DoneManager) Err() error
Err - retrieves the Error of the DoneManager
func (*DoneManager) GetDoneFunc ¶
func (dm *DoneManager) GetDoneFunc() func()
GetDoneFunc - retrieves the GetDone Function of the DoneManager
func (*DoneManager) GetDoneHandler ¶
func (dm *DoneManager) GetDoneHandler(opts ...QueryDoneHandlerOption) (*DoneHandler, int, bool)
GetDoneHandler - Retrieves a DoneHandler that meets the query defined within the QueryDoneHandlerOptions
func (*DoneManager) ID ¶
func (dm *DoneManager) ID() string
ID - retrieves the Id of the DoneManager
func (*DoneManager) RemoveDoneHandler ¶
func (dm *DoneManager) RemoveDoneHandler(opts ...QueryDoneHandlerOption) bool
RemoveDoneHandler - Removes a DoneHandler that meets the query defined within the QueryDoneHandlerOptions. If item is Not found it returns false
type DoneManagerOption ¶
type DoneManagerOption func(*DoneManager)
DoneManagerOption - option to initialize the DoneManager
func DoneManagerWithDeadline ¶
func DoneManagerWithDeadline(deadline time.Time) DoneManagerOption
DoneManagerWithDeadline - option to add a dealine value to the DoneManager
func DoneManagerWithDelay ¶
func DoneManagerWithDelay(delay time.Duration) DoneManagerOption
DoneManagerWithDelay - option to add a delay value to the DoneManager in order to space in time the closure the different layers of the DoneHandler
func DoneManagerWithTimeout ¶
func DoneManagerWithTimeout(timeout time.Duration) DoneManagerOption
DoneManagerWithTimeout - option to add a timeout value to the DoneManager. It sets a deadline with the value time.Now + timeout
type Event ¶
type Event struct { InitMessage *Message InMessageSequence Slice //Slice of Messages - we can keep track of the entire flow OutMessage *Message Sequence interface{} }
Event - Struct that represents an event in the context of the concurrency package. Contains an InitMessage, An InMessageSequence to give traceability of the Event, an OutMessage, and a sequence number that is usefull to define the processing order of the event
type EventSortedMap ¶
EventSortedMap - A concurrency.SortedMap of events with a common InitMessage
type Filter ¶
type Filter struct { Name string // contains filtered or unexported fields }
Filter - Unit that listen to an input channel (inputChan) and filter work. Closing the inputChan channel needs to be managed outside the Filter using a DoneHandler It has a DoneHandler to manage the lifecycle of the filter, a sequence to determine the order in which the filter might be run, an id of the filter, the name of the filter, the state of the filter and an output channel that emits the processed results for consumption.
func NewFilter ¶
func NewFilter(name string, dh *DoneHandler, opts ...FilterOption) *Filter
NewFilter - Constructor
func (*Filter) Filter ¶
func (f *Filter) Filter(fn func(input interface{}, params ...interface{}) bool, params ...interface{})
Filter - When the filter is in Processing state filters a defined function. When the Filter is in stop state the filter will still consume messages from the input channel and it will output the input event as no filter will be involved.
func (*Filter) HasValidInputChan ¶
HasValidInputChan - checks if the input channel is valid and not nil.
func (*Filter) InputChannel ¶
func (f *Filter) InputChannel() chan interface{}
InputChannel - retrieves the InputChannel of the Filter
func (*Filter) OutputChannel ¶
func (f *Filter) OutputChannel() chan interface{}
OutputChannel - retrieves the OutputChannel of the Filter
type FilterOption ¶
type FilterOption func(*Filter)
FilterOption - option to initialize the filter
func FilterTransformFn ¶
func FilterTransformFn(fn func(fr *Filter, input interface{}) interface{}) FilterOption
FilterTransformFn - option to add a function to transform the output into the desired output structure to the Filter
func FilterWithInputChannel ¶
func FilterWithInputChannel(in chan interface{}) FilterOption
FilterWithInputChannel - option to add an inputchannel to the filter
func FilterWithSequence ¶
func FilterWithSequence(seq interface{}) FilterOption
FilterWithSequence - option to add a sequence value to the filter
type Map ¶
type Map struct {
// contains filtered or unexported fields
}
Map is a map type that can be safely shared between goroutines that require read/write access to a map
func (*Map) Delete ¶
func (cm *Map) Delete(key interface{})
Delete removes the value/key pair of a concurrent map item
func (*Map) GetKeyByItem ¶
GetKeyByItem - retrieves the key for a concurrent map item by item
func (*Map) Iter ¶
Iter iterates over the items in a concurrent map Each item is sent over a channel, so that we can iterate over the map using the builtin range keyword
func (*Map) IterWithCancel ¶
IterWithCancel iterates over the items in a concurrent map Each item is sent over a channel, so that we can iterate over the map using the builtin range keyword allows to pass a cancel chan to make the iteration cancelable
type MapItem ¶
type MapItem struct { Key interface{} Value interface{} }
MapItem contains a key/value pair item of a concurrent map
type Message ¶
Message - Struct that represents an message in the context of the concurrency package. Contains the ID of the message, the Message, the time that was produced and the type of the message
type MsgMultiplexer ¶
type MsgMultiplexer struct {
// contains filtered or unexported fields
}
MsgMultiplexer - The default implementation MsgMultiplexer allows to create complex patterns where a Broadcaster can emit an event to multiple processors (consumers) that can potentially represent multiple processing systems, do the relevant calculation and multiplex the multiple outputs into a single channel for simplified consumption. Its main function is to Mulitplex a set of parallel processors that process a common initial concurrency.Event/ Message converging them into one channel,where the output is an Event which Event.OutMessage is a sortedmap of the output values of the processors grouped by initial concurrency.Event/Message and ordered by sequence value of each processor. Closure of MsgMultiplexer is handle by a concurrency.DoneHandler that allows to control they way a set of go routines are closed in order to prevent deadlocks and unwanted behaviour MsgMultiplexer outputs the multiplexed result in one channel using the channel bridge pattern. MsgMultiplexer default behaviour can be overridden by providing a MsgMultiplexerGetChannelItemKeyFn to provide the comparison key of the items of a channel, MsgMultiplexerGetLastRegKeyFn which should give the key to compare to. With these two items MsgMultiplexer has an algorithm to group the processed messages related to the same source into a SortedMap. MsgMultiplexerGetChannelItemSequenceFn allows to get the sequence order of the relevant channel and MsgMultiplexerTransformFn allows to transform the output into the desired structure.
func NewMsgMultiplexer ¶
func NewMsgMultiplexer(dh *DoneHandler, opts ...MsgMultiplexerOption) *MsgMultiplexer
NewMsgMultiplexer - Constructor
func (*MsgMultiplexer) Get ¶
func (mp *MsgMultiplexer) Get(key interface{}) (chan interface{}, bool)
Get - Retrieves a channel reqistered in the MsgMultiplexer by key
func (*MsgMultiplexer) ID ¶
func (mp *MsgMultiplexer) ID() string
ID - retrieves the Id of the MsgMultiplexer
func (*MsgMultiplexer) Iter ¶
func (mp *MsgMultiplexer) Iter() chan interface{}
Iter iterates over the items in the Multiplexer Each item is sent over a channel, so that we can iterate over the it using the builtin range keyword
func (*MsgMultiplexer) Sequence ¶
func (mp *MsgMultiplexer) Sequence() interface{}
Sequence - retrieves the Sequence of the MsgMultiplexer
func (*MsgMultiplexer) Set ¶
func (mp *MsgMultiplexer) Set(key interface{}, value chan interface{})
Set - Registers a channel in the MsgMultiplexer, starts processing it and logs the length of the registered channels map.
func (*MsgMultiplexer) Start ¶
func (mp *MsgMultiplexer) Start()
Start - starts the main process of the MsgMultiplexer
type MsgMultiplexerOption ¶
type MsgMultiplexerOption func(*MsgMultiplexer)
MsgMultiplexerOption - option to initialize the MsgMultiplexer
func MsgMultiplexerGetChannelItemKeyFn ¶
func MsgMultiplexerGetChannelItemKeyFn(fn func(v interface{}) int64) MsgMultiplexerOption
MsgMultiplexerGetChannelItemKeyFn - option to add a function to resolve the key value of an item of the channel to the MsgMultiplexer
func MsgMultiplexerGetChannelItemSequenceFn ¶
func MsgMultiplexerGetChannelItemSequenceFn(fn func(v interface{}) interface{}) MsgMultiplexerOption
MsgMultiplexerGetChannelItemSequenceFn - option to add a function to resolve the sequence value of an item of the channel to the MsgMultiplexer
func MsgMultiplexerGetLastRegKeyFn ¶
func MsgMultiplexerGetLastRegKeyFn(fn func() int64) MsgMultiplexerOption
MsgMultiplexerGetLastRegKeyFn - option to add a function to resolve the last registered key value for later comparison with the key of an item of the channel to the MsgMultiplexer
func MsgMultiplexerSequence ¶
func MsgMultiplexerSequence(seq interface{}) MsgMultiplexerOption
MsgMultiplexerSequence - option to add a sequence value to the MsgMultiplexer
func MsgMultiplexerTransformFn ¶
func MsgMultiplexerTransformFn(fn func(mp *MsgMultiplexer, sm *SortedMap) interface{}) MsgMultiplexerOption
MsgMultiplexerTransformFn - option to add a function to transform the SortedMap output into the desired output structure to the MsgMultiplexer
type MultiMsgMultiplexer ¶
type MultiMsgMultiplexer struct { BufferSize int MsgType string // contains filtered or unexported fields }
MultiMsgMultiplexer - The default implementation MultiMsgMultiplexer allows to create complex patterns where multiple Broadcasters can emit an event to multiple processors (consumers) that can potentially represent multiple processing systems, do the relevant calculation and multiplex the multiple outputs into a single channel for simplified consumption. Its main function is to Mulitplex a set of multiple messages that can be parallel processed and converge the set of initial concurrency.Event/ Message into a SortedMap ordered by messageType that can be sent on one channel. values of the processors grouped by initial concurrency.Event/Message and ordered by sequence value of each processor. Closure of MultiMsgMultiplexer is handle by a concurrency.DoneHandler that allows to control they way a set of go routines are closed in order to prevent deadlocks and unwanted behaviour MultiMsgMultiplexer outputs the multiplexed result in one channel using the channel bridge pattern. MultiMsgMultiplexer has several modes, the first one is to output the structure everytime a BCaster emits a message, giving an output of the last received message per BCaster. The second one is by using a timer to specify the sendPeriod, where the output represents the state of the last received messages at the specific point of time of the tick of the period. MultiMsgMultiplexer has also a waitForAll property that when true will just start emiting an output when the MultiMsgMultiplexer has at least received one message of each of the BCasters. MultiMsgMultiplexer has also a BufferSize property (default value is 1) where we can send the n number of last messages sent by each BCaster. MultiMsgMultiplexer default behaviour can be overridden by providing a MultiMsgMultiplexerItemKeyFn to the key of the items of a channel within the output SortedMap for a specific MessageType and MultiMsgMultiplexerTransformFn allows to transform the output into the desired structure.
func NewMultiMsgMultiplexer ¶
func NewMultiMsgMultiplexer(dh *DoneHandler, msgType string, opts ...MultiMsgMultiplexerOption) *MultiMsgMultiplexer
NewMultiMsgMultiplexer - Constructor
func (*MultiMsgMultiplexer) AddItemToMap ¶
func (mp *MultiMsgMultiplexer) AddItemToMap(v interface{}, m *SortedMap)
AddItemToMap - Adds an item to the SortedMap that is going to be send as part of the output, the SortedMap length is defined by the BufferSize property, allowing to retrieve the last n messages for a specific Messagetype.
func (*MultiMsgMultiplexer) Get ¶
func (mp *MultiMsgMultiplexer) Get(key interface{}) (chan interface{}, bool)
Get - Retrieves a channel reqistered in the MultiMsgMultiplexer by key
func (*MultiMsgMultiplexer) ID ¶
func (mp *MultiMsgMultiplexer) ID() string
ID - retrieves the Id of the MultiMsgMultiplexer
func (*MultiMsgMultiplexer) Iter ¶
func (mp *MultiMsgMultiplexer) Iter() chan interface{}
Iter iterates over the items in the MultiMsgMultiplexer Each item is sent over a channel, so that we can iterate over the it using the builtin range keyword
func (*MultiMsgMultiplexer) Sequence ¶
func (mp *MultiMsgMultiplexer) Sequence() interface{}
Sequence - retrieves the Sequence of the MultiMsgMultiplexer
func (*MultiMsgMultiplexer) Set ¶
func (mp *MultiMsgMultiplexer) Set(key interface{}, value chan interface{})
Set - Registers a channel in the MultiMsgMultiplexer and starts processing it
func (*MultiMsgMultiplexer) Start ¶
func (mp *MultiMsgMultiplexer) Start()
Start - starts the main process of the MultiMsgMultiplexer
type MultiMsgMultiplexerOption ¶ added in v1.0.1
type MultiMsgMultiplexerOption func(*MultiMsgMultiplexer)
MultiMsgMultiplexerOption - option to initialize the MultiMsgMultiplexer
func MultiMsgMultiplexerBufferSize ¶
func MultiMsgMultiplexerBufferSize(bufferSize int) MultiMsgMultiplexerOption
MultiMsgMultiplexerBufferSize - option to add a buffersize value to the MultiMsgMultiplexer
func MultiMsgMultiplexerItemKeyFn ¶ added in v1.0.1
func MultiMsgMultiplexerItemKeyFn(fn func(v interface{}) int64) MultiMsgMultiplexerOption
MultiMsgMultiplexerItemKeyFn - option to add a function to resolve the set key value of an item of the channel to the map of the MultiMsgMultiplexer specific to a message
func MultiMsgMultiplexerSendPeriod ¶
func MultiMsgMultiplexerSendPeriod(d *time.Duration) MultiMsgMultiplexerOption
MultiMsgMultiplexerSendPeriod - option to add a send period value to the MultiMsgMultiplexer
func MultiMsgMultiplexerSequence ¶
func MultiMsgMultiplexerSequence(seq interface{}) MultiMsgMultiplexerOption
MultiMsgMultiplexerSequence - option to add a sequence value to the MultiMsgMultiplexer
func MultiMsgMultiplexerTransformFn ¶ added in v1.0.1
func MultiMsgMultiplexerTransformFn(fn func(mp *MultiMsgMultiplexer, sm *SortedMap) interface{}) MultiMsgMultiplexerOption
MultiMsgMultiplexerTransformFn - option to add a function to transform the SortedMap output into the desired output structure to the MultiMsgMultiplexer
func MultiMsgMultiplexerWaitForAll ¶
func MultiMsgMultiplexerWaitForAll(waitforall bool) MultiMsgMultiplexerOption
MultiMsgMultiplexerWaitForAll - option to add a waitforall value to the MultiMsgMultiplexer
type MultiMsgResultItem ¶
type MultiMsgResultItem struct {
// contains filtered or unexported fields
}
MultiMsgResultItem - The result item to be stored in the output SortedMap
type Processor ¶
type Processor struct { Name string // contains filtered or unexported fields }
Processor - Unit that listen to an input channel (inputChan) and process work. Closing the inputChan channel needs to be managed outside the Processor using a DoneHandler It has a DoneHandler to manage the lifecycle of the processor, a sequence to determine the order in which the processor output results might be stored in a multiplexed pattern, an id of the processor, the name of the processor, the state of the processor and an output channel that emits the processed results for consumption.
func NewProcessor ¶
func NewProcessor(name string, dh *DoneHandler, opts ...ProcessorOption) *Processor
NewProcessor - Constructor
func (*Processor) HasValidInputChan ¶
HasValidInputChan - checks if the input channel is valid and not nil.
func (*Processor) InputChannel ¶
func (p *Processor) InputChannel() chan interface{}
InputChannel - retrieves the InputChannel of the Processor
func (*Processor) OutputChannel ¶
func (p *Processor) OutputChannel() chan interface{}
OutputChannel - retrieves the OutputChannel of the Processor
func (*Processor) Process ¶
func (p *Processor) Process(f func(input interface{}, params ...interface{}) interface{}, params ...interface{})
Process - When the processor is in Processing state processes a defined function. When the Processor is in stop state the processor will still consume messages from the input channel but it will produce a nil output as no process will be involved.
type ProcessorOption ¶
type ProcessorOption func(*Processor)
ProcessorOption - option to initialize the processor
func ProcessorTransformFn ¶
func ProcessorTransformFn(fn func(pr *Processor, input interface{}, result interface{}) interface{}) ProcessorOption
ProcessorTransformFn - option to add a function to transform the output into the desired output structure to the Processor
func ProcessorWithInputChannel ¶
func ProcessorWithInputChannel(in chan interface{}) ProcessorOption
ProcessorWithInputChannel - option to add an inputchannel to the processor
func ProcessorWithSequence ¶
func ProcessorWithSequence(seq interface{}) ProcessorOption
ProcessorWithSequence - option to add a sequence value to the processor
type QueryDoneHandler ¶
type QueryDoneHandler struct {
// contains filtered or unexported fields
}
QueryDoneHandler - Struct with a key and a layer that is used to query the DoneManager by specifying the key of a DoneHandler
type QueryDoneHandlerOption ¶
type QueryDoneHandlerOption func(*QueryDoneHandler)
QueryDoneHandlerOption - option to initialize the QueryDoneHandler
func QueryDoneHandlerWithKey ¶
func QueryDoneHandlerWithKey(key interface{}) QueryDoneHandlerOption
QueryDoneHandlerWithKey - option to add a key value to the QueryDoneHandler.
func QueryDoneHandlerWithLayer ¶
func QueryDoneHandlerWithLayer(layer int) QueryDoneHandlerOption
QueryDoneHandlerWithLayer - option to add a layer value to the QueryDoneHandler.
type Slice ¶
type Slice struct {
// contains filtered or unexported fields
}
Slice type that can be safely shared between goroutines
func (*Slice) Append ¶
func (cs *Slice) Append(item interface{})
Append adds an item to the concurrent slice
func (*Slice) GetItemAtIndex ¶
GetItemAtIndex - Get item at index
func (*Slice) Iter ¶
Iter iterates over the items in the concurrent slice Each item is sent over a channel, so that we can iterate over the slice using the builtin range keyword
func (*Slice) IterWithCancel ¶
IterWithCancel iterates over the items in the concurrent slice Each item is sent over a channel, so that we can iterate over the slice using the builtin range keyword allows to pass a cancel chan to make the iteration cancelable
func (*Slice) RemoveItemAtIndex ¶
RemoveItemAtIndex removes the item at the specified index
type SliceItem ¶
type SliceItem struct { Index int Value interface{} }
SliceItem contains the index/value pair of an item in a concurrent slice
type SortedMap ¶
type SortedMap struct {
// contains filtered or unexported fields
}
SortedMap is a sorted map type that can be safely shared between goroutines that require read/write access to a map
func (*SortedMap) Delete ¶
func (sm *SortedMap) Delete(key interface{})
Delete removes the value/key pair of a concurrent sorted map item
func (*SortedMap) GetByIndex ¶
GetByIndex retrieves the value for a concurrent map item given the index
func (*SortedMap) GetKeyByIndex ¶
GetKeyByIndex retrieves the key for a concurrent map item given the index
func (*SortedMap) GetKeyByItem ¶
GetKeyByItem retrieves the key for a concurrent map item
func (*SortedMap) GetMapItemByIndex ¶
func (sm *SortedMap) GetMapItemByIndex(index int) (*SortedMapItem, bool)
GetMapItemByIndex retrieves the SortedMapItem for a concurrent map item given the index
func (*SortedMap) Iter ¶
func (sm *SortedMap) Iter() <-chan SortedMapItem
Iter iterates over the items in a concurrent map Each item is sent over a channel, so that we can iterate over the map using the builtin range keyword
func (*SortedMap) IterWithCancel ¶
func (sm *SortedMap) IterWithCancel(cancel chan interface{}) <-chan SortedMapItem
IterWithCancel iterates over the items in a concurrent map Each item is sent over a channel, so that we can iterate over the map using the builtin range keyword allows to pass a cancel chan to make the iteration cancelable
type SortedMapItem ¶
type SortedMapItem struct { Key interface{} Value interface{} }
SortedMapItem contains a key/value pair item of a concurrent map
type SortedSlice ¶
type SortedSlice struct {
// contains filtered or unexported fields
}
SortedSlice type that can be safely shared between goroutines
func NewSortedSlice ¶
func NewSortedSlice() *SortedSlice
NewSortedSlice creates a new concurrent slice
func (*SortedSlice) Append ¶
func (cs *SortedSlice) Append(item interface{})
Append adds an item to the concurrent slice
func (*SortedSlice) GetItemAtIndex ¶
func (cs *SortedSlice) GetItemAtIndex(index int) interface{}
GetItemAtIndex - Get item at index
func (*SortedSlice) IndexOf ¶
func (cs *SortedSlice) IndexOf(item interface{}) int
IndexOf returns the index of a specific item
func (*SortedSlice) Iter ¶
func (cs *SortedSlice) Iter() <-chan SortedSliceItem
Iter iterates over the items in the concurrent slice Each item is sent over a channel, so that we can iterate over the slice using the builtin range keyword
func (*SortedSlice) IterWithCancel ¶
func (cs *SortedSlice) IterWithCancel(cancel chan interface{}) <-chan SortedSliceItem
IterWithCancel iterates over the items in the concurrent slice Each item is sent over a channel, so that we can iterate over the slice using the builtin range keyword allows to pass a cancel chan to make the iteration cancelable
func (*SortedSlice) RemoveItemAtIndex ¶
func (cs *SortedSlice) RemoveItemAtIndex(index int)
RemoveItemAtIndex removes the item at the specified index
type SortedSliceItem ¶
type SortedSliceItem struct { Index int Value interface{} }
SortedSliceItem contains the index/value pair of an item in a concurrent slice