bus

package
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2024 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrHandlerRegistrationFailed = errors.New("failed to register handler")
	ErrMessageNotRoutable        = errors.New("message is not routable")
	ErrMessageHandlerFailed      = errors.New("message could not be handled")
)
View Source
var DefaultSubscriptionOptions = SubscriptionOptions{
	FilterSubjects:   []string{},
	GuaranteeOrder:   false,
	MaxDeliveryTries: 10,
}
View Source
var (
	ErrInvalidSubjectPattern = errors.New("invalid subject pattern")
)

Functions

func MatchSubject

func MatchSubject(subject, pattern string) bool

MatchSubject checks whether a given subject matches a given pattern. The pattern can be expressed in the same syntax as in NATS. For more information about valid patterns check: https://docs.nats.io/nats-concepts/subjects#wildcards

func ValidatePattern

func ValidatePattern(pattern string) error

Types

type Bus

type Bus interface {
	// Publish sends a new message to the bus. It must be added in order to ensure consistency
	Publish(ctx context.Context, message *OutboundMessage) error
	// Subscribe retrieves messages from the bus in an ordered matter.
	Subscribe(ctx context.Context, subscriberName string, stream string, opts ...SubscribeOption) (*Subscription, error)
	// Migrate ensures that dependencies (streams, topic, consumers, etc.) are up to date and ready to be used
	Migrate(ctx context.Context) error
}

type DeliveryPolicy

type DeliveryPolicy int
const (
	DeliverAll DeliveryPolicy = iota
	DeliverNew
)

type ErrorCallbackFunc

type ErrorCallbackFunc func(err error)

type HandlerFunc

type HandlerFunc func(ctx context.Context, message InboundMessage) error

type InboundMessage

type InboundMessage struct {
	MessageCtx context.Context
	Id         string
	Subject    string
	Data       []byte
	Ack        func() error
	Nak        func(retryAfter time.Duration) error
	// contains filtered or unexported fields
}

func (*InboundMessage) Unmarshal

func (im *InboundMessage) Unmarshal(dst interface{}) error

type OutboundMessage

type OutboundMessage struct {
	Id      string
	Subject string
	Data    []byte
}

type SubscribeOption

type SubscribeOption func(*SubscriptionOptions)

func WithDeliveryPolicy

func WithDeliveryPolicy(policy DeliveryPolicy) SubscribeOption

func WithDeserializer

func WithDeserializer(deserializer serialization.Serializer) SubscribeOption

func WithDurable

func WithDurable() SubscribeOption

func WithFilterSubject

func WithFilterSubject(subject string) SubscribeOption

func WithGuaranteeOrder

func WithGuaranteeOrder() SubscribeOption

func WithMaxDeliveryTries

func WithMaxDeliveryTries(maxTries int) SubscribeOption

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

func NewSubscription

func NewSubscription(inboundMessages chan InboundMessage, deserializer serialization.Serializer, unsubscribe UnsubscribeFn) *Subscription

func (*Subscription) AddHandler

func (s *Subscription) AddHandler(pattern string, handlerFunc HandlerFunc) error

func (*Subscription) IsRunning

func (s *Subscription) IsRunning() bool

func (*Subscription) OnError

func (s *Subscription) OnError(errorFunc ErrorCallbackFunc)

func (*Subscription) RemoveHandler

func (s *Subscription) RemoveHandler(pattern string)

func (*Subscription) Start

func (s *Subscription) Start(ctx context.Context)

func (*Subscription) Stop

func (s *Subscription) Stop()

type SubscriptionOptions

type SubscriptionOptions struct {
	FilterSubjects   []string
	GuaranteeOrder   bool
	MaxDeliveryTries int
	DeliveryPolicy   DeliveryPolicy
	Durable          bool
	Deserializer     serialization.Serializer
}

type UnsubscribeFn

type UnsubscribeFn func()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL