broker

package
v0.1.25 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2024 License: LGPL-2.1 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrBroker broker errors.
	ErrBroker = errors.New("broker")
	// ErrUnsubscribed is an error indicating that the subscriber has been unsubscribed. It is returned by the ISyncSubscriber.Next method when the subscriber has been unsubscribed.
	ErrUnsubscribed = fmt.Errorf("%w: unsubscribed", ErrBroker)
)
View Source
var (
	Name  = plugin.Name
	Using = plugin.Using
)

Functions

func Flush

func Flush(servCtx service.Context, ctx context.Context) error

Flush will perform a round trip to the server and return when it receives the internal reply.

func GetMaxPayload

func GetMaxPayload(servCtx service.Context) int64

GetMaxPayload return max payload bytes.

func GetSeparator

func GetSeparator(servCtx service.Context) string

GetSeparator return topic path separator.

func MakeReadChan

func MakeReadChan(servCtx service.Context, ctx context.Context, pattern, queue string, size int) (<-chan []byte, error)

MakeReadChan creates a new channel for receiving data from a specific pattern.

func MakeWriteChan

func MakeWriteChan(servCtx service.Context, topic string, size int) chan<- []byte

MakeWriteChan creates a new channel for publishing data to a specific topic.

func Path

func Path(servCtx service.Context, elems ...string) string

Path return topic path.

func Publish

func Publish(servCtx service.Context, ctx context.Context, topic string, data []byte) error

Publish the data argument to the given topic. The data argument is left untouched and needs to be correctly interpreted on the receiver.

Types

type DeliveryReliability

type DeliveryReliability int32

DeliveryReliability Message delivery reliability.

const (
	AtMostOnce      DeliveryReliability = iota // At most once
	AtLeastOnce                                // At last once
	ExactlyOnce                                // Exactly once
	EffectivelyOnce                            // Effectively once
)

func GetDeliveryReliability

func GetDeliveryReliability(servCtx service.Context) DeliveryReliability

GetDeliveryReliability return message delivery reliability.

type EventHandler

type EventHandler = generic.DelegateFunc1[IEvent, error]

EventHandler is used to process messages via a subscription of a topic. The handler is passed a publication interface which contains the message and optional Ack method to acknowledge receipt of the message.

type IBroker

type IBroker interface {
	// Publish the data argument to the given topic. The data argument is left untouched and needs to be correctly interpreted on the receiver.
	Publish(ctx context.Context, topic string, data []byte) error
	// Subscribe will express interest in the given topic pattern. Use option EventHandler to handle message events.
	Subscribe(ctx context.Context, pattern string, settings ...option.Setting[SubscriberOptions]) (ISubscriber, error)
	// SubscribeSync will express interest in the given topic pattern.
	SubscribeSync(ctx context.Context, pattern string, settings ...option.Setting[SubscriberOptions]) (ISyncSubscriber, error)
	// SubscribeChan will express interest in the given topic pattern.
	SubscribeChan(ctx context.Context, pattern string, settings ...option.Setting[SubscriberOptions]) (IChanSubscriber, error)
	// Flush will perform a round trip to the server and return when it receives the internal reply.
	Flush(ctx context.Context) error
	// GetDeliveryReliability return message delivery reliability.
	GetDeliveryReliability() DeliveryReliability
	// GetMaxPayload return max payload bytes.
	GetMaxPayload() int64
	// GetSeparator return topic path separator.
	GetSeparator() string
}

IBroker is an interface used for asynchronous messaging.

type IChanSubscriber

type IChanSubscriber interface {
	ISubscriber
	// EventChan returns a channel that can be used to receive events from the subscriber.
	EventChan() (<-chan IEvent, error)
}

IChanSubscriber is a convenience return type for the IBroker.SubscribeChan method.

func SubscribeChan

func SubscribeChan(servCtx service.Context, ctx context.Context, pattern string, settings ...option.Setting[SubscriberOptions]) (IChanSubscriber, error)

SubscribeChan will express interest in the given topic pattern.

type IEvent

type IEvent interface {
	// Pattern returns the subscription pattern used to create the event subscriber.
	Pattern() string
	// Topic returns the topic the event was received on.
	Topic() string
	// Queue subscribers with the same queue name will create a shared subscription where each receives a subset of messages.
	Queue() string
	// Message returns the raw message payload of the event.
	Message() []byte
	// Ack acknowledges the successful processing of the event. It indicates that the event can be removed from the subscription queue.
	Ack(ctx context.Context) error
	// Nak negatively acknowledges a message. This tells the server to redeliver the message.
	Nak(ctx context.Context) error
}

IEvent is given to a subscription handler for processing.

type ISubscriber

type ISubscriber interface {
	context.Context
	// Pattern returns the subscription pattern used to create the subscriber.
	Pattern() string
	// Queue subscribers with the same queue name will create a shared subscription where each receives a subset of messages.
	Queue() string
	// Unsubscribe unsubscribes the subscriber from the topic.
	Unsubscribe() <-chan struct{}
}

ISubscriber is a convenience return type for the IBroker.Subscribe method.

func Subscribe

func Subscribe(servCtx service.Context, ctx context.Context, pattern string, settings ...option.Setting[SubscriberOptions]) (ISubscriber, error)

Subscribe will express interest in the given topic pattern. Use option EventHandler to handle message events.

type ISyncSubscriber

type ISyncSubscriber interface {
	ISubscriber
	// Next is a blocking call that waits for the next event to be received from the subscriber.
	Next() (IEvent, error)
}

ISyncSubscriber is a convenience return type for the IBroker.SubscribeSync method.

func SubscribeSync

func SubscribeSync(servCtx service.Context, ctx context.Context, pattern string, settings ...option.Setting[SubscriberOptions]) (ISyncSubscriber, error)

SubscribeSync will express interest in the given topic pattern.

type Option

type Option struct{}

Option is a helper struct to provide default options.

func (Option) AutoAck

AutoAck defaults to true. When a handler returns with a nil error the message is acked.

func (Option) Default

Default sets the default options for subscribe topic.

func (Option) EventChanSize

func (Option) EventChanSize(size int) option.Setting[SubscriberOptions]

EventChanSize specifies the size of the event channel used for received synchronously event.

func (Option) EventHandler

func (Option) EventHandler(handler EventHandler) option.Setting[SubscriberOptions]

EventHandler is the function that will be called to handle the received events.

func (Option) Queue

Queue subscribers with the same queue name will create a shared subscription where each receives a subset of messages.

func (Option) UnsubscribedHandler

func (Option) UnsubscribedHandler(handler UnsubscribedHandler) option.Setting[SubscriberOptions]

UnsubscribedHandler Unsubscribed callback method.

type SubscriberOptions

type SubscriberOptions struct {
	// AutoAck defaults to true. When a handler returns with a nil error the message is acked.
	AutoAck bool
	// Queue subscribers with the same queue name will create a shared subscription where each receives a subset of messages.
	Queue string
	// EventHandler is the function that will be called to handle the received events.
	EventHandler EventHandler
	// EventChanSize specifies the size of the event channel used for received synchronously event.
	EventChanSize int
	// UnsubscribedHandler Unsubscribed callback method.
	UnsubscribedHandler UnsubscribedHandler
}

SubscriberOptions represents the options for subscribe topic.

type UnsubscribedHandler

type UnsubscribedHandler = generic.DelegateAction1[ISubscriber]

UnsubscribedHandler Unsubscribed callback method.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL