pubsub

package
v0.0.0-...-ded2b46 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ContextWithBus

func ContextWithBus(ctx context.Context, bus *Bus) context.Context

ContextWithBus stores the bus in the context and returns the new context.

func FilterFmt

func FilterFmt(kind string, labels ...Label) string

FilterFmt returns a string that identify a filter

Types

type Bus

type Bus struct {
	sync.WaitGroup
	// contains filtered or unexported fields
}

func BusFromContext

func BusFromContext(ctx context.Context) *Bus

func NewBus

func NewBus(name string) *Bus

NewBus allocate and runs a new Bus and return a pointer

func (*Bus) Name

func (b *Bus) Name() string

func (*Bus) Pub

func (b *Bus) Pub(v Messager, labels ...Label)

Pub posts a new Publication on the bus. The labels are added to existing v labels, so a subscriber can retrieve message publication labels from the received message.

func (*Bus) SetDefaultSubscriptionQueueSize

func (b *Bus) SetDefaultSubscriptionQueueSize(i uint64)

SetDefaultSubscriptionQueueSize overrides the default queue size of subscribers for not yet started bus.

It panics if called on started bus.

func (*Bus) SetDrainChanDuration

func (b *Bus) SetDrainChanDuration(duration time.Duration)

SetDrainChanDuration overrides defaultDrainChanDuration for not yet started bus.

It panics if called on started bus.

func (*Bus) Start

func (b *Bus) Start(ctx context.Context)

func (*Bus) Stop

func (b *Bus) Stop()

func (*Bus) Sub

func (b *Bus) Sub(name string, options ...interface{}) *Subscription

Sub function requires a new Subscription on the bus.

Used options: Timeouter, QueueSizer

when Timeouter, it sets the subscriber timeout to pull each message, subscriber with exceeded timeout notification are automatically dropped, and SubscriptionError message is sent on bus. defaults is no timeout

when QueueSizer, it sets the subscriber queue size. default value is bus dependent (see SetDefaultSubscriptionQueueSize)

type ErrSubscriptionIDNotFound

type ErrSubscriptionIDNotFound struct {
	// contains filtered or unexported fields
}

func (ErrSubscriptionIDNotFound) Error

type Label

type Label [2]string

Label is a {key, val} array

type Labels

type Labels map[string]string

Labels allow message routing filtering based on key/value matching

func NewLabels

func NewLabels(l ...string) Labels

func (Labels) Is

func (t Labels) Is(labels Labels) bool

func (Labels) Key

func (t Labels) Key() string

Key returns labelMap key as a string with ordered label names

func (Labels) Keys

func (t Labels) Keys() []string

Keys returns all the permutations of all lengths of the labels ex:

keys of l1=foo l2=foo l3=foo:
 {l1=foo}
 {l2=foo}
 {l3=foo}
 {l1=foo}{l2=foo}
 {l1=foo}{l3=foo}
 {l2=foo}{l3=foo}
 {l2=foo}{l1=foo}
 {l3=foo}{l1=foo}
 {l3=foo}{l2=foo}
 {l1=foo}{l2=foo}{l3=foo}
 {l1=foo}{l3=foo}{l2=foo}
 {l2=foo}{l1=foo}{l3=foo}
 {l2=foo}{l3=foo}{l1=foo}
 {l3=foo}{l1=foo}{l2=foo}
 {l3=foo}{l2=foo}{l1=foo}

func (Labels) String

func (t Labels) String() string

type Messager

type Messager interface {
	AddLabels(...Label)
	GetLabels() Labels
}

type Msg

type Msg struct {
	Labels Labels `json:"labels"`
}

func (*Msg) AddLabels

func (p *Msg) AddLabels(l ...Label)

func (*Msg) GetLabels

func (p *Msg) GetLabels() Labels

type QueueSizer

type QueueSizer interface {
	// contains filtered or unexported methods
}

type Subscription

type Subscription struct {

	// C is the channel exposed to the subscriber for polling
	C chan any
	// contains filtered or unexported fields
}

func (*Subscription) AddFilter

func (sub *Subscription) AddFilter(v any, labels ...Label)

func (*Subscription) DelFilter

func (sub *Subscription) DelFilter(v any, labels ...Label)

func (*Subscription) Drain

func (sub *Subscription) Drain()

Drain dequeues exposed channel.

Drain is automatically called during sub.Stop()

func (*Subscription) Start

func (sub *Subscription) Start()

func (*Subscription) Stop

func (sub *Subscription) Stop() error

Stop closes the subscription and deueues private and exposed subscription channels

func (*Subscription) String

func (sub *Subscription) String() string

type SubscriptionError

type SubscriptionError struct {
	Msg
	ID   uuid.UUID `json:"id"`
	Name string    `json:"name"`
	ErrS string    `json:"error"`
}

SubscriptionError is an emitted publication made when a subscriber notification exceeds its timeout

func (SubscriptionError) Kind

func (m SubscriptionError) Kind() string

type SubscriptionQueueThreshold

type SubscriptionQueueThreshold struct {
	Msg
	ID   uuid.UUID
	Name string `json:"name"`

	// Count is the current used slots in internal subscriber queue
	Count uint64 `json:"count"`

	// From is the previous high threshold value
	From uint64 `json:"from"`

	// To is the new high threshold value
	To uint64 `json:"to"`

	// Limit is the maximum queue size
	Limit uint64 `json:"limit"`
}

SubscriptionQueueThreshold is an emitted publication made when a subscriber queue reach/leave its current high threshold value

func (SubscriptionQueueThreshold) Kind

type Timeout

type Timeout time.Duration

type Timeouter

type Timeouter interface {
	// contains filtered or unexported methods
}

type WithQueueSize

type WithQueueSize uint64

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL