Documentation
¶
Index ¶
- Variables
- func AutoDisableClosedChannels() func(opt *PriorityChannelOptions)
- func ChannelWaitInterval(d time.Duration) func(opt *PriorityChannelOptions)
- func ProcessByFrequencyRatioWithGoroutines[T any](ctx context.Context, channelsWithFreqRatios []channels.ChannelWithFreqRatio[T], ...) error
- func Synchronized(val bool) func(opt *PriorityChannelOptions)
- func WithFrequencyMethod(method FrequencyMethod) func(opt *PriorityChannelOptions)
- func WithFrequencyMode(mode FrequencyMode) func(opt *PriorityChannelOptions)
- type ChannelConfig
- type ChannelNode
- type ChannelValidationError
- type ChannelWaitIntervalConfig
- type ChannelWaitIntervalUnitConfig
- type Configuration
- type Delivery
- type DuplicateChannelError
- type DynamicPriorityProcessor
- func (p *DynamicPriorityProcessor[T]) ActiveWorkersNum() int
- func (p *DynamicPriorityProcessor[T]) Done() <-chan struct{}
- func (p *DynamicPriorityProcessor[T]) Start() error
- func (p *DynamicPriorityProcessor[T]) Status() (stopped bool, reason ExitReason, channelName string)
- func (p *DynamicPriorityProcessor[T]) StopGracefully()
- func (p *DynamicPriorityProcessor[T]) StopImmediately(onMessageDrop func(msg T, channelName string))
- func (p *DynamicPriorityProcessor[T]) UpdatePriorityConfiguration(priorityConfiguration Configuration) error
- func (p *DynamicPriorityProcessor[T]) UpdateWorkersNum(newWorkersNum int) error
- func (p *DynamicPriorityProcessor[T]) WorkersNum() int
- type ExitReason
- type FrequencyMethod
- type FrequencyMode
- type InvalidPrioritizationMethodError
- type PrioritizationMethod
- type PrioritizationStrategy
- type PriorityChannel
- func CombineByFrequencyRatio[T any](ctx context.Context, ...) (*PriorityChannel[T], error)
- func CombineByHighestAlwaysFirst[T any](ctx context.Context, ...) (*PriorityChannel[T], error)
- func CombineByStrategy[T any, W any](ctx context.Context, strategy PrioritizationStrategy[W], ...) (*PriorityChannel[T], error)
- func CombineDynamicByPreconfiguredFrequencyRatios[T any](ctx context.Context, ...) (*PriorityChannel[T], error)
- func CombineDynamicByPreconfiguredStrategies[T any](ctx context.Context, ...) (*PriorityChannel[T], error)
- func NewByFrequencyRatio[T any](ctx context.Context, channelsWithFreqRatios []channels.ChannelWithFreqRatio[T], ...) (*PriorityChannel[T], error)
- func NewByHighestAlwaysFirst[T any](ctx context.Context, channelsWithPriorities []channels.ChannelWithPriority[T], ...) (*PriorityChannel[T], error)
- func NewByStrategy[T any, W any](ctx context.Context, strategy PrioritizationStrategy[W], ...) (*PriorityChannel[T], error)
- func NewDynamicByPreconfiguredFrequencyRatios[T any](ctx context.Context, ...) (*PriorityChannel[T], error)
- func NewDynamicByPreconfiguredStrategies[T any](ctx context.Context, ...) (*PriorityChannel[T], error)
- func NewFromConfiguration[T any](ctx context.Context, config Configuration, ...) (*PriorityChannel[T], error)
- func WrapAsPriorityChannel[T any](ctx context.Context, channelName string, msgsC <-chan T, ...) (*PriorityChannel[T], error)
- func (pc *PriorityChannel[T]) Close()
- func (pc *PriorityChannel[T]) Receive() (msg T, channelName string, ok bool)
- func (pc *PriorityChannel[T]) ReceiveEx() (msg T, details ReceiveDetails, ok bool)
- func (pc *PriorityChannel[T]) ReceiveWithContext(ctx context.Context) (msg T, channelName string, status ReceiveStatus)
- func (pc *PriorityChannel[T]) ReceiveWithContextEx(ctx context.Context) (msg T, details ReceiveDetails, status ReceiveStatus)
- func (pc *PriorityChannel[T]) ReceiveWithDefaultCase() (msg T, channelName string, status ReceiveStatus)
- func (pc *PriorityChannel[T]) ReceiveWithDefaultCaseEx() (msg T, details ReceiveDetails, status ReceiveStatus)
- type PriorityChannelConfig
- type PriorityChannelFrequencyMethodConfig
- type PriorityChannelFrequencyModeConfig
- type PriorityChannelMethodConfig
- type PriorityChannelOptions
- type PriorityChannelWithFreqRatio
- type PriorityChannelWithPriority
- type PriorityChannelWithWeight
- type PriorityConsumer
- func (c *PriorityConsumer[T]) Consume() (<-chan Delivery[T], error)
- func (c *PriorityConsumer[T]) ConsumeMessages() (<-chan T, error)
- func (c *PriorityConsumer[T]) Done() <-chan struct{}
- func (c *PriorityConsumer[T]) Status() (stopped bool, reason ExitReason, channelName string)
- func (c *PriorityConsumer[T]) StopGracefully()
- func (c *PriorityConsumer[T]) StopImmediately(onMessageDrop func(msg T, channelName string))
- func (c *PriorityConsumer[T]) UpdatePriorityConfiguration(priorityConfiguration Configuration) error
- type ReceiveDetails
- type ReceiveStatus
- func Select[T any](ctx context.Context, channelsWithPriorities []channels.ChannelWithPriority[T], ...) (msg T, channelName string, status ReceiveStatus, err error)
- func SelectWithDefaultCase[T any](channelsWithPriorities []channels.ChannelWithPriority[T], ...) (msg T, channelName string, status ReceiveStatus, err error)
- type UnsupportedFrequencyMethodForCombineError
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidFrequencyMode = errors.New("invalid frequency mode") ErrInvalidFrequencyMethod = errors.New("invalid frequency method") )
var ( ErrNoChannels = errors.New("no channels provided") ErrEmptyChannelName = errors.New("channel name is empty") )
Functions ¶
func AutoDisableClosedChannels ¶
func AutoDisableClosedChannels() func(opt *PriorityChannelOptions)
func ChannelWaitInterval ¶
func ChannelWaitInterval(d time.Duration) func(opt *PriorityChannelOptions)
func ProcessByFrequencyRatioWithGoroutines ¶
func ProcessByFrequencyRatioWithGoroutines[T any](ctx context.Context, channelsWithFreqRatios []channels.ChannelWithFreqRatio[T], onMessageReceived func(msg T, channelName string), onChannelClosed func(channelName string), onProcessingFinished func(reason ExitReason)) error
func Synchronized ¶
func Synchronized(val bool) func(opt *PriorityChannelOptions)
func WithFrequencyMethod ¶
func WithFrequencyMethod(method FrequencyMethod) func(opt *PriorityChannelOptions)
func WithFrequencyMode ¶
func WithFrequencyMode(mode FrequencyMode) func(opt *PriorityChannelOptions)
Types ¶
type ChannelConfig ¶
type ChannelConfig struct { Name string `json:"name"` Priority int `json:"priority,omitempty"` FreqRatio int `json:"freqRatio,omitempty"` Probability float64 `json:"probability,omitempty"` *PriorityChannelConfig `json:"priorityChannel,omitempty"` }
type ChannelNode ¶
type ChannelValidationError ¶
func (*ChannelValidationError) Error ¶
func (e *ChannelValidationError) Error() string
type ChannelWaitIntervalConfig ¶
type ChannelWaitIntervalConfig struct { Unit ChannelWaitIntervalUnitConfig `json:"unit"` Value int `json:"value"` }
type ChannelWaitIntervalUnitConfig ¶
type ChannelWaitIntervalUnitConfig string
const ( MicrosecondsChannelWaitIntervalUnitConfig ChannelWaitIntervalUnitConfig = "microseconds" MillisecondsChannelWaitIntervalUnitConfig ChannelWaitIntervalUnitConfig = "milliseconds" )
type Configuration ¶
type Configuration struct {
PriorityChannel *PriorityChannelConfig `json:"priorityChannel,omitempty"`
}
type Delivery ¶
type Delivery[T any] struct { Msg T ReceiveDetails ReceiveDetails }
type DuplicateChannelError ¶
type DuplicateChannelError struct {
ChannelName string
}
func (*DuplicateChannelError) Error ¶
func (e *DuplicateChannelError) Error() string
type DynamicPriorityProcessor ¶
type DynamicPriorityProcessor[T any] struct { // contains filtered or unexported fields }
func NewDynamicPriorityProcessor ¶
func NewDynamicPriorityProcessor[T any]( ctx context.Context, processFn func(Delivery[T]), channelNameToChannel map[string]<-chan T, priorityConfiguration Configuration, workersNum int) (*DynamicPriorityProcessor[T], error)
func (*DynamicPriorityProcessor[T]) ActiveWorkersNum ¶
func (p *DynamicPriorityProcessor[T]) ActiveWorkersNum() int
func (*DynamicPriorityProcessor[T]) Done ¶
func (p *DynamicPriorityProcessor[T]) Done() <-chan struct{}
func (*DynamicPriorityProcessor[T]) Start ¶
func (p *DynamicPriorityProcessor[T]) Start() error
func (*DynamicPriorityProcessor[T]) Status ¶
func (p *DynamicPriorityProcessor[T]) Status() (stopped bool, reason ExitReason, channelName string)
func (*DynamicPriorityProcessor[T]) StopGracefully ¶
func (p *DynamicPriorityProcessor[T]) StopGracefully()
func (*DynamicPriorityProcessor[T]) StopImmediately ¶
func (p *DynamicPriorityProcessor[T]) StopImmediately(onMessageDrop func(msg T, channelName string))
func (*DynamicPriorityProcessor[T]) UpdatePriorityConfiguration ¶
func (p *DynamicPriorityProcessor[T]) UpdatePriorityConfiguration(priorityConfiguration Configuration) error
func (*DynamicPriorityProcessor[T]) UpdateWorkersNum ¶
func (p *DynamicPriorityProcessor[T]) UpdateWorkersNum(newWorkersNum int) error
func (*DynamicPriorityProcessor[T]) WorkersNum ¶
func (p *DynamicPriorityProcessor[T]) WorkersNum() int
type ExitReason ¶
type ExitReason int
const ( UnknownExitReason ExitReason = iota ChannelClosed PriorityChannelClosed NoOpenChannels ContextCancelled )
func ProcessPriorityChannelMessages ¶
func ProcessPriorityChannelMessages[T any]( msgReceiver *PriorityChannel[T], msgProcessor func(ctx context.Context, msg T, channelName string)) ExitReason
type FrequencyMethod ¶
type FrequencyMethod int
const ( StrictOrderAcrossCycles FrequencyMethod = iota StrictOrderFully ProbabilisticByCaseDuplication ProbabilisticByMultipleRandCalls )
type FrequencyMode ¶
type FrequencyMode int
const ( StrictOrderMode FrequencyMode = iota ProbabilisticMode )
type InvalidPrioritizationMethodError ¶
type InvalidPrioritizationMethodError struct {
Method int
}
func (*InvalidPrioritizationMethodError) Error ¶
func (e *InvalidPrioritizationMethodError) Error() string
type PrioritizationMethod ¶
type PrioritizationMethod int
const ( ByHighestAlwaysFirst PrioritizationMethod = iota ByFrequencyRatio ByProbability )
type PrioritizationStrategy ¶
type PrioritizationStrategy[W any] interface { Initialize(weights []W) error NextSelectCasesRankedIndexes(upto int) ([]strategies.RankedIndex, bool) UpdateOnCaseSelected(index int) DisableSelectCase(index int) }
type PriorityChannel ¶
type PriorityChannel[T any] struct { // contains filtered or unexported fields }
func CombineByFrequencyRatio ¶
func CombineByFrequencyRatio[T any](ctx context.Context, priorityChannelsWithFreqRatio []PriorityChannelWithFreqRatio[T], options ...func(*PriorityChannelOptions)) (*PriorityChannel[T], error)
func CombineByHighestAlwaysFirst ¶
func CombineByHighestAlwaysFirst[T any](ctx context.Context, priorityChannelsWithPriority []PriorityChannelWithPriority[T], options ...func(*PriorityChannelOptions)) (*PriorityChannel[T], error)
func CombineByStrategy ¶
func CombineByStrategy[T any, W any](ctx context.Context, strategy PrioritizationStrategy[W], priorityChannelsWithWeight []PriorityChannelWithWeight[T, W], options ...func(*PriorityChannelOptions)) (*PriorityChannel[T], error)
func CombineDynamicByPreconfiguredFrequencyRatios ¶
func CombineDynamicByPreconfiguredFrequencyRatios[T any](ctx context.Context, channelsWithDynamicFreqRatio []PriorityChannelWithWeight[T, map[string]int], currentStrategySelector func() string, options ...func(*PriorityChannelOptions)) (*PriorityChannel[T], error)
func CombineDynamicByPreconfiguredStrategies ¶
func CombineDynamicByPreconfiguredStrategies[T any](ctx context.Context, dynamicPrioritizationMethods map[string]PrioritizationMethod, priorityChannelsWithDynamicWeights []PriorityChannelWithWeight[T, map[string]interface{}], currentStrategySelector func() string, options ...func(*PriorityChannelOptions)) (*PriorityChannel[T], error)
func NewByFrequencyRatio ¶
func NewByFrequencyRatio[T any](ctx context.Context, channelsWithFreqRatios []channels.ChannelWithFreqRatio[T], options ...func(*PriorityChannelOptions)) (*PriorityChannel[T], error)
func NewByHighestAlwaysFirst ¶
func NewByHighestAlwaysFirst[T any](ctx context.Context, channelsWithPriorities []channels.ChannelWithPriority[T], options ...func(*PriorityChannelOptions)) (*PriorityChannel[T], error)
func NewByStrategy ¶
func NewByStrategy[T any, W any](ctx context.Context, strategy PrioritizationStrategy[W], channelsWithWeights []channels.ChannelWithWeight[T, W], options ...func(*PriorityChannelOptions)) (*PriorityChannel[T], error)
func NewDynamicByPreconfiguredFrequencyRatios ¶
func NewDynamicByPreconfiguredFrequencyRatios[T any](ctx context.Context, channelsWithDynamicFreqRatio []channels.ChannelWithWeight[T, map[string]int], currentStrategySelector func() string, options ...func(*PriorityChannelOptions)) (*PriorityChannel[T], error)
func NewDynamicByPreconfiguredStrategies ¶
func NewDynamicByPreconfiguredStrategies[T any](ctx context.Context, dynamicPrioritizationMethods map[string]PrioritizationMethod, channelsWithDynamicWeights []channels.ChannelWithWeight[T, map[string]interface{}], currentStrategySelector func() string, options ...func(*PriorityChannelOptions)) (*PriorityChannel[T], error)
func NewFromConfiguration ¶
func NewFromConfiguration[T any](ctx context.Context, config Configuration, channelNameToChannel map[string]<-chan T) (*PriorityChannel[T], error)
func WrapAsPriorityChannel ¶
func WrapAsPriorityChannel[T any](ctx context.Context, channelName string, msgsC <-chan T, options ...func(*PriorityChannelOptions)) (*PriorityChannel[T], error)
func (*PriorityChannel[T]) Close ¶
func (pc *PriorityChannel[T]) Close()
func (*PriorityChannel[T]) Receive ¶
func (pc *PriorityChannel[T]) Receive() (msg T, channelName string, ok bool)
func (*PriorityChannel[T]) ReceiveEx ¶
func (pc *PriorityChannel[T]) ReceiveEx() (msg T, details ReceiveDetails, ok bool)
func (*PriorityChannel[T]) ReceiveWithContext ¶
func (pc *PriorityChannel[T]) ReceiveWithContext(ctx context.Context) (msg T, channelName string, status ReceiveStatus)
func (*PriorityChannel[T]) ReceiveWithContextEx ¶
func (pc *PriorityChannel[T]) ReceiveWithContextEx(ctx context.Context) (msg T, details ReceiveDetails, status ReceiveStatus)
func (*PriorityChannel[T]) ReceiveWithDefaultCase ¶
func (pc *PriorityChannel[T]) ReceiveWithDefaultCase() (msg T, channelName string, status ReceiveStatus)
func (*PriorityChannel[T]) ReceiveWithDefaultCaseEx ¶
func (pc *PriorityChannel[T]) ReceiveWithDefaultCaseEx() (msg T, details ReceiveDetails, status ReceiveStatus)
type PriorityChannelConfig ¶
type PriorityChannelConfig struct { Method PriorityChannelMethodConfig `json:"method"` Channels []ChannelConfig `json:"channels"` AutoDisableClosedChannels bool `json:"autoDisableClosedChannels,omitempty"` FrequencyMode PriorityChannelFrequencyModeConfig `json:"frequencyMode,omitempty"` FrequencyMethod PriorityChannelFrequencyMethodConfig `json:"frequencyMethod,omitempty"` ChannelWaitInterval *ChannelWaitIntervalConfig `json:"channelWaitInterval,omitempty"` }
type PriorityChannelFrequencyMethodConfig ¶
type PriorityChannelFrequencyMethodConfig string
const ( StrictOrderAcrossCyclesFrequencyMethodConfig PriorityChannelFrequencyMethodConfig = "strict-order-across-cycles" StrictOrderFullyFrequencyMethodConfig PriorityChannelFrequencyMethodConfig = "strict-order-fully" ProbabilisticByCaseDuplicationFrequencyMethodConfig PriorityChannelFrequencyMethodConfig = "case-duplication" ProbabilisticByMultipleRandCallsFrequencyMethodConfig PriorityChannelFrequencyMethodConfig = "by-probability" )
type PriorityChannelFrequencyModeConfig ¶
type PriorityChannelFrequencyModeConfig string
const ( StrictOrderModeFrequencyModeConfig PriorityChannelFrequencyModeConfig = "strict-order" ProbabilisticModeFrequencyModeConfig PriorityChannelFrequencyModeConfig = "probabilistic" )
type PriorityChannelMethodConfig ¶
type PriorityChannelMethodConfig string
const ( ByHighestAlwaysFirstMethodConfig PriorityChannelMethodConfig = "by-highest-always-first" ByFrequencyRatioMethodConfig PriorityChannelMethodConfig = "by-frequency-ratio" ByProbabilityMethodConfig PriorityChannelMethodConfig = "by-probability" )
type PriorityChannelOptions ¶
type PriorityChannelOptions struct {
// contains filtered or unexported fields
}
type PriorityChannelWithFreqRatio ¶
type PriorityChannelWithFreqRatio[T any] struct { // contains filtered or unexported fields }
func NewPriorityChannelWithFreqRatio ¶
func NewPriorityChannelWithFreqRatio[T any](name string, priorityChannel *PriorityChannel[T], freqRatio int) PriorityChannelWithFreqRatio[T]
func (*PriorityChannelWithFreqRatio[T]) FreqRatio ¶
func (c *PriorityChannelWithFreqRatio[T]) FreqRatio() int
func (*PriorityChannelWithFreqRatio[T]) Name ¶
func (c *PriorityChannelWithFreqRatio[T]) Name() string
func (*PriorityChannelWithFreqRatio[T]) PriorityChannel ¶
func (c *PriorityChannelWithFreqRatio[T]) PriorityChannel() *PriorityChannel[T]
type PriorityChannelWithPriority ¶
type PriorityChannelWithPriority[T any] struct { // contains filtered or unexported fields }
func NewPriorityChannelWithPriority ¶
func NewPriorityChannelWithPriority[T any](name string, priorityChannel *PriorityChannel[T], priority int) PriorityChannelWithPriority[T]
func (*PriorityChannelWithPriority[T]) Name ¶
func (c *PriorityChannelWithPriority[T]) Name() string
func (*PriorityChannelWithPriority[T]) Priority ¶
func (c *PriorityChannelWithPriority[T]) Priority() int
func (*PriorityChannelWithPriority[T]) PriorityChannel ¶
func (c *PriorityChannelWithPriority[T]) PriorityChannel() *PriorityChannel[T]
type PriorityChannelWithWeight ¶
func NewPriorityChannelWithWeight ¶
func NewPriorityChannelWithWeight[T any, W any](name string, priorityChannel *PriorityChannel[T], weight W) PriorityChannelWithWeight[T, W]
func (*PriorityChannelWithWeight[T, W]) Name ¶
func (c *PriorityChannelWithWeight[T, W]) Name() string
func (*PriorityChannelWithWeight[T, W]) PriorityChannel ¶
func (c *PriorityChannelWithWeight[T, W]) PriorityChannel() *PriorityChannel[T]
func (*PriorityChannelWithWeight[T, W]) Weight ¶
func (c *PriorityChannelWithWeight[T, W]) Weight() W
type PriorityConsumer ¶
type PriorityConsumer[T any] struct { // contains filtered or unexported fields }
func NewConsumer ¶
func NewConsumer[T any]( ctx context.Context, channelNameToChannel map[string]<-chan T, priorityConfiguration Configuration, ) (*PriorityConsumer[T], error)
func (*PriorityConsumer[T]) Consume ¶
func (c *PriorityConsumer[T]) Consume() (<-chan Delivery[T], error)
func (*PriorityConsumer[T]) ConsumeMessages ¶
func (c *PriorityConsumer[T]) ConsumeMessages() (<-chan T, error)
ConsumeMessages returns a stream of just the message payloads (T only) while Consume returns a stream of Delivery[T] which includes the message payload and the receive details. This is useful when either you don't care about the receive details or they are already included in the message payload.
func (*PriorityConsumer[T]) Done ¶
func (c *PriorityConsumer[T]) Done() <-chan struct{}
Done returns a channel that is closed when the consumer is stopped.
func (*PriorityConsumer[T]) Status ¶
func (c *PriorityConsumer[T]) Status() (stopped bool, reason ExitReason, channelName string)
Status returns whether the consumer is stopped, and if so, the reason for stopping and, in case the reason is a closed channel, the name of the channel that was closed.
func (*PriorityConsumer[T]) StopGracefully ¶
func (c *PriorityConsumer[T]) StopGracefully()
StopGracefully stops the consumer with a graceful shutdown, draining the unprocessed messages before stopping.
func (*PriorityConsumer[T]) StopImmediately ¶
func (c *PriorityConsumer[T]) StopImmediately(onMessageDrop func(msg T, channelName string))
StopImmediately stops the consumer in a forced manner. onMessageDrop is called when a message is dropped. It is optional and can be nil, in this case the message will be silently dropped.
func (*PriorityConsumer[T]) UpdatePriorityConfiguration ¶
func (c *PriorityConsumer[T]) UpdatePriorityConfiguration(priorityConfiguration Configuration) error
type ReceiveDetails ¶
type ReceiveDetails struct { ChannelName string ChannelIndex int PathInTree []ChannelNode }
type ReceiveStatus ¶
type ReceiveStatus int
const ( ReceiveSuccess ReceiveStatus = iota ReceiveContextCancelled ReceiveDefaultCase ReceiveChannelClosed ReceivePriorityChannelClosed ReceiveNoOpenChannels ReceiveStatusUnknown )
func Select ¶
func Select[T any](ctx context.Context, channelsWithPriorities []channels.ChannelWithPriority[T], options ...func(*PriorityChannelOptions)) (msg T, channelName string, status ReceiveStatus, err error)
func SelectWithDefaultCase ¶
func SelectWithDefaultCase[T any]( channelsWithPriorities []channels.ChannelWithPriority[T], options ...func(*PriorityChannelOptions)) (msg T, channelName string, status ReceiveStatus, err error)
func (ReceiveStatus) ExitReason ¶
func (r ReceiveStatus) ExitReason() ExitReason
type UnsupportedFrequencyMethodForCombineError ¶
type UnsupportedFrequencyMethodForCombineError struct {
FrequencyMethod FrequencyMethod
}
func (*UnsupportedFrequencyMethodForCombineError) Error ¶
func (e *UnsupportedFrequencyMethodForCombineError) Error() string
Source Files
¶
- by_freq.go
- by_strategy.go
- combine_by_freq.go
- combine_by_highest_priority_first.go
- combine_by_strategy.go
- common.go
- configuration.go
- consumer.go
- dynamic.go
- dynamic_processor.go
- frequency_method.go
- highest_priority_first.go
- priority_channel.go
- priority_select.go
- validation.go
- wrapped_channel.go
- wrapped_priority_channel.go