subscriber

package
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2026 License: GPL-3.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SubscriptionsPendingCount = "nats.subscriptions.pending.msgs"
	SubscriptionsPendingBytes = "nats.subscriptions.pending.bytes"
	SubscriptionsDroppedMsgs  = "nats.subscriptions.dropped.count"
	SubscriptionCountMsgs     = "nats.subscriptions.send.count"

	Bytes string = "By"

	Subject = attribute.Key("subject")
)
View Source
const (
	HeaderConsumerError = "Error"
)

Variables

View Source
var ErrHandlerNotSet = errors.New("handler is not set")

ErrHandlerNotSet reports a nil message handler.

View Source
var ErrSubjectNotSet = errors.New("subject is not set")

ErrSubjectNotSet reports a missing subscription subject.

View Source
var ErrSubscriberNotSet = errors.New("subscriber is not set")

ErrSubscriberNotSet reports a nil subscriber dependency.

Functions

This section is empty.

Types

type Client deprecated

type Client interface {
	Conn() *nats.Conn
	Context() context.Context
	Logger() client.Logger
	Config() *client.Config
	QueueSubscribeSync(subject, queue string) (*nats.Subscription, error)
	Meter() metric.Meter
	WithMeter(metric.Meter)
}

Client is the legacy NATS SDK-shaped subscriber dependency.

Deprecated: use queue/generic/subscriber/interfaces.Client for new code.

type Headers added in v1.1.0

type Headers map[string][]string

Headers is the core-owned subscriber message header shape.

type Message added in v1.1.0

type Message struct {
	Subject string
	Reply   string
	Data    []byte
	Header  Headers
}

Message is the core-owned subscriber message shape.

type MessageHandler added in v1.1.0

type MessageHandler func(ctx context.Context, msg Message) error

MessageHandler handles a message without exposing the NATS SDK message type.

type MessageSubscriber added in v1.1.0

type MessageSubscriber interface {
	SubscribeMessage(ctx context.Context, options SubscribeOptions, handler MessageHandler) (SubscribeResult, error)
	Close() error
	Wait() error
}

MessageSubscriber is the core-owned subscriber contract for new consumers.

type MsgHandler deprecated

type MsgHandler func(ctx context.Context, msg *nats.Msg) error

MsgHandler is the legacy NATS message handler shape.

Deprecated: use queue/generic/subscriber/interfaces.MsgHandler for new code.

func WithResponseOnError

func WithResponseOnError(logger client.Logger, handler MsgHandler) MsgHandler

WithResponseOnError wraps handler where if handler returns a non-nil error, the msg is then responded to with said error's string in the HeaderConsumerError header. Such message then causes nats publisher to return the error when reading response.

Note: this wrapper should NOT be used for observer-like handlers that do not send success responses. If subscribed to the same subject with an actual responder, the latter's response can potentially get snubbed.

type SubscribeOptions added in v1.1.0

type SubscribeOptions struct {
	Subject string
	Queue   string
	Buffer  int
	Timeout time.Duration
}

SubscribeOptions is the core-owned input for NATS queue subscriptions.

type SubscribeResult added in v1.1.0

type SubscribeResult struct {
	Subject string
	Queue   string
}

SubscribeResult reports the registered subscription identity.

type Subscriber

type Subscriber struct {
	Client
	*worker.Pool
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber(c Client) *Subscriber

func (*Subscriber) Close

func (s *Subscriber) Close() error

func (*Subscriber) ForceClose

func (s *Subscriber) ForceClose()

func (*Subscriber) Get

func (s *Subscriber) Get(subject, queue string) *Subscription

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(subject, queue string, handler MsgHandler)

func (*Subscriber) SubscribeMessage added in v1.1.0

func (s *Subscriber) SubscribeMessage(
	ctx context.Context,
	options SubscribeOptions,
	handler MessageHandler,
) (SubscribeResult, error)

SubscribeMessage registers a message handler through core-owned inputs.

func (*Subscriber) SubscribeWithParameters

func (s *Subscriber) SubscribeWithParameters(
	buffer int, timeout time.Duration, subject, queue string, handler MsgHandler,
)

func (*Subscriber) Wait

func (s *Subscriber) Wait() error

type Subscription

type Subscription struct {
	Client
	*nats.Subscription
	// contains filtered or unexported fields
}

func NewSubscription

func NewSubscription(c Client, subject, queue string) (*Subscription, error)

func (*Subscription) Process

func (s *Subscription) Process(
	ctx context.Context,
	buffer int,
	timeout time.Duration,
	handler MsgHandler,
) error

func (*Subscription) Stop

func (s *Subscription) Stop() error

type SubscriptionDetails

type SubscriptionDetails struct {
	Pending      int64
	PendingBytes int64
	Dropped      int64
	Delivered    int64
}

Directories

Path Synopsis
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL