Documentation ¶
Index ¶
- func ContextWithBus(ctx context.Context, bus *Bus) context.Context
- func FilterFmt(kind string, labels ...Label) string
- type Bus
- func (b *Bus) Name() string
- func (b *Bus) Pub(v Messager, labels ...Label)
- func (b *Bus) SetDefaultSubscriptionQueueSize(i uint64)
- func (b *Bus) SetDrainChanDuration(duration time.Duration)
- func (b *Bus) Start(ctx context.Context)
- func (b *Bus) Stop()
- func (b *Bus) Sub(name string, options ...interface{}) *Subscription
- type ErrSubscriptionIDNotFound
- type Label
- type Labels
- type Messager
- type Msg
- type QueueSizer
- type Subscription
- type SubscriptionError
- type SubscriptionQueueThreshold
- type Timeout
- type Timeouter
- type WithQueueSize
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ContextWithBus ¶
ContextWithBus stores the bus in the context and returns the new context.
Types ¶
type Bus ¶
func BusFromContext ¶
func (*Bus) Pub ¶
Pub posts a new Publication on the bus. The labels are added to existing v labels, so a subscriber can retrieve message publication labels from the received message.
func (*Bus) SetDefaultSubscriptionQueueSize ¶
SetDefaultSubscriptionQueueSize overrides the default queue size of subscribers for not yet started bus.
It panics if called on started bus.
func (*Bus) SetDrainChanDuration ¶
SetDrainChanDuration overrides defaultDrainChanDuration for not yet started bus.
It panics if called on started bus.
func (*Bus) Sub ¶
func (b *Bus) Sub(name string, options ...interface{}) *Subscription
Sub function requires a new Subscription on the bus.
Used options: Timeouter, QueueSizer
when Timeouter, it sets the subscriber timeout to pull each message, subscriber with exceeded timeout notification are automatically dropped, and SubscriptionError message is sent on bus. defaults is no timeout
when QueueSizer, it sets the subscriber queue size. default value is bus dependent (see SetDefaultSubscriptionQueueSize)
type ErrSubscriptionIDNotFound ¶
type ErrSubscriptionIDNotFound struct {
// contains filtered or unexported fields
}
func (ErrSubscriptionIDNotFound) Error ¶
func (e ErrSubscriptionIDNotFound) Error() string
type Labels ¶
Labels allow message routing filtering based on key/value matching
func (Labels) Keys ¶
Keys returns all the permutations of all lengths of the labels ex:
keys of l1=foo l2=foo l3=foo: {l1=foo} {l2=foo} {l3=foo} {l1=foo}{l2=foo} {l1=foo}{l3=foo} {l2=foo}{l3=foo} {l2=foo}{l1=foo} {l3=foo}{l1=foo} {l3=foo}{l2=foo} {l1=foo}{l2=foo}{l3=foo} {l1=foo}{l3=foo}{l2=foo} {l2=foo}{l1=foo}{l3=foo} {l2=foo}{l3=foo}{l1=foo} {l3=foo}{l1=foo}{l2=foo} {l3=foo}{l2=foo}{l1=foo}
type QueueSizer ¶
type QueueSizer interface {
// contains filtered or unexported methods
}
type Subscription ¶
type Subscription struct { // C is the channel exposed to the subscriber for polling C chan any // contains filtered or unexported fields }
func (*Subscription) AddFilter ¶
func (sub *Subscription) AddFilter(v any, labels ...Label)
func (*Subscription) DelFilter ¶
func (sub *Subscription) DelFilter(v any, labels ...Label)
func (*Subscription) Drain ¶
func (sub *Subscription) Drain()
Drain dequeues exposed channel.
Drain is automatically called during sub.Stop()
func (*Subscription) Start ¶
func (sub *Subscription) Start()
func (*Subscription) Stop ¶
func (sub *Subscription) Stop() error
Stop closes the subscription and deueues private and exposed subscription channels
func (*Subscription) String ¶
func (sub *Subscription) String() string
type SubscriptionError ¶
type SubscriptionError struct { Msg ID uuid.UUID `json:"id"` Name string `json:"name"` ErrS string `json:"error"` }
SubscriptionError is an emitted publication made when a subscriber notification exceeds its timeout
func (SubscriptionError) Kind ¶
func (m SubscriptionError) Kind() string
type SubscriptionQueueThreshold ¶
type SubscriptionQueueThreshold struct { Msg ID uuid.UUID Name string `json:"name"` // Count is the current used slots in internal subscriber queue Count uint64 `json:"count"` // From is the previous high threshold value From uint64 `json:"from"` // To is the new high threshold value To uint64 `json:"to"` // Limit is the maximum queue size Limit uint64 `json:"limit"` }
SubscriptionQueueThreshold is an emitted publication made when a subscriber queue reach/leave its current high threshold value
func (SubscriptionQueueThreshold) Kind ¶
func (m SubscriptionQueueThreshold) Kind() string
type WithQueueSize ¶
type WithQueueSize uint64