nats

package
v1.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2026 License: MIT Imports: 16 Imported by: 0

README

nats

Wrapper around github.com/nats-io/nats.go for Core NATS (at-most-once) and NATS JetStream (at-least-once) messaging with the core lifecycle contract: typed Config, functional Options, Shutdown(ctx), and Run(ctx).

Primary types

Type lifecycle Use for
Publisher Resource Publishing to Core NATS subjects or JetStream streams.
Subscriber[I] Runner Consuming Core NATS messages (at-most-once).
JetStreamConsumer[I] Runner Consuming a durable JetStream consumer (at-least-once).
JSONPublisher[T] (wrapper) Type-safe JSON publishing to a fixed subject.

Connections are established eagerly in the constructors — invalid URLs or unreachable brokers surface at boot, not on first publish. JetStream streams and consumers are also resolved at construct time so configuration drift fails fast.

NATS subscriptions and the JetStream Consume callback are registered in Run. Use Ready() (a <-chan struct{} closed once subscription is live) to block deterministically — useful for tests and synchronous startup.

Quickstart — Core NATS

package main

import (
    "context"
    "log"

    "github.com/sergeyslonimsky/core/app"
    corenats "github.com/sergeyslonimsky/core/nats"
)

type Event struct {
    ID string `json:"id"`
}

type eventProcessor struct{}

func (p *eventProcessor) Process(_ context.Context, msg corenats.Message[Event]) error {
    log.Printf("got %s on %s", msg.Payload.ID, msg.Subject)
    return nil
}

func main() {
    ctx := context.Background()
    cfg := corenats.Config{URL: "nats://localhost:4222"}

    publisher, err := corenats.NewPublisher(ctx, cfg)
    if err != nil { log.Fatal(err) }

    subscriber, err := corenats.NewSubscriber[Event](
        cfg, "events.core", &eventProcessor{},
        corenats.SubscriberHandlers[Event]{}, // zero value = JSON decode, log-only errors
        corenats.WithSubscriberQueueGroup("workers"),
    )
    if err != nil { log.Fatal(err) }

    a := app.New()
    a.Add(publisher)  // Resource
    a.Add(subscriber) // Runner
    log.Fatal(a.Run())
}

Quickstart — JetStream

package main

import (
    "context"
    "log"
    "time"

    "github.com/nats-io/nats.go/jetstream"
    "github.com/sergeyslonimsky/core/app"
    corenats "github.com/sergeyslonimsky/core/nats"
)

type Event struct{ ID string `json:"id"` }

type eventProcessor struct{}

func (p *eventProcessor) Process(_ context.Context, msg corenats.Message[Event]) error {
    log.Printf("got %s seq=%d delivery=%d",
        msg.Payload.ID, msg.Sequence, msg.NumDelivered)
    return nil
}

func main() {
    ctx := context.Background()
    cfg := corenats.Config{URL: "nats://localhost:4222"}

    publisher, err := corenats.NewPublisher(ctx, cfg,
        corenats.WithPublisherStream(jetstream.StreamConfig{
            Name:     "EVENTS",
            Subjects: []string{"events.>"},
        }),
    )
    if err != nil { log.Fatal(err) }

    consumer, err := corenats.NewJetStreamConsumer[Event](ctx,
        cfg, "EVENTS", "event-worker", &eventProcessor{},
        corenats.JSConsumerHandlers[Event]{}, // zero value = JSON decode, Nak on error
        corenats.WithJSSubjects("events.js"),
        corenats.WithJSConsumerConfig(func(c *jetstream.ConsumerConfig) {
            c.MaxDeliver = 5
            c.AckWait    = 30 * time.Second
        }),
    )
    if err != nil { log.Fatal(err) }

    a := app.New()
    a.Add(publisher)
    a.Add(consumer)
    log.Fatal(a.Run())
}

Configuration

Config is the declarative input — typically populated from env/yaml:

Field Default Purpose
URL — (required) Server URL.
User, Password Basic auth.
Token Token auth.
CredsFile NATS .creds file.
ConnectionName "core-nats" Reported to the server (visible in monitoring).
DialTimeout 10s Initial connect bound.
ReconnectWait 2s Delay between reconnect attempts.
MaxReconnects (*int) nil → nats.go default (60) nil = library default, IntPtr(0) = no retries, IntPtr(MaxReconnectsForever) (=-1) = retry forever, IntPtr(n) = cap at n.

For TLS, NKey, JWT, custom inbox prefixes, NoEcho, or anything else not in Config, use WithNATSOptions(...nats.Option) (see below).

Options

Three option families exist — PublisherOption, SubscriberOption, and JSConsumerOption. Any CommonOption value satisfies all three, so the common helpers can be passed to any constructor:

Option Publisher Subscriber JS Consumer
WithLogger(*slog.Logger)
WithConnection(*nats.Conn)
WithNATSOptions(...nats.Option)
WithOtel()
WithPublisherStream(jetstream.StreamConfig)
WithSubscriberQueueGroup(string)
WithSubscriberWorkerCount(int)
WithSubscriberChannelBuffer(int)
WithJSStream(jetstream.StreamConfig)
WithJSSubjects(...string)
WithJSConsumerConfig(func(*jetstream.ConsumerConfig))
WithJSAssumeExistingConsumer()
WithJSWorkerCount(int)
WithJSDeliveryBuffer(int)

The Subscriber's subject filter and the JetStreamConsumer's stream/durable are required positional arguments to the constructor — not options. Payload-typed callbacks (custom decoder, error handler) live on the SubscriberHandlers[I] / JSConsumerHandlers[I] struct, also passed positionally; pass the zero value for defaults.

Custom NATS options
publisher, err := corenats.NewPublisher(ctx, cfg,
    corenats.WithNATSOptions(
        nats.Secure(&tls.Config{InsecureSkipVerify: false}),
        nats.RootCAs("/etc/ssl/ca.pem"),
    ),
)
Custom JetStream consumer settings (mutator)

WithJSConsumerConfig receives a config pre-populated with Durable, AckPolicy=Explicit, and your WithJSSubjects filter. Mutate only what you need:

corenats.WithJSConsumerConfig(func(c *jetstream.ConsumerConfig) {
    c.MaxDeliver = 5
    c.AckWait    = 30 * time.Second
    c.BackOff    = []time.Duration{1*time.Second, 5*time.Second, 30*time.Second}
})

AckPolicy is automatically restored to AckExplicitPolicy if your mutator leaves it at the zero value (AckNonePolicy), so a forgetful mutator cannot silently downgrade at-least-once delivery. To use a different non-default ack policy, set it explicitly inside the mutator.

Externally-managed JetStream consumer

If the durable consumer is provisioned by ops tooling (Terraform, CLI, etc.) and the app must not modify it:

consumer, err := corenats.NewJetStreamConsumer[Event](ctx,
    cfg, "EVENTS", "event-worker", &processor{},
    corenats.WithJSAssumeExistingConsumer(),
)

The constructor calls js.Consumer(...) instead of CreateOrUpdateConsumer and fails if the consumer does not exist. WithJSConsumerConfig and WithJSSubjects are ignored in this mode.

Error handling

Subscriber (Core NATS)

There is no ack model — failing messages cannot be retried at the protocol level. Install a SubscriberErrorHandler[I] via SubscriberHandlers for observability or DLQ side-effects:

subscriber, _ := corenats.NewSubscriber[Event](
    cfg, "events.core", &proc{},
    corenats.SubscriberHandlers[Event]{
        ErrorHandler: func(ctx context.Context, msg corenats.Message[Event], err error) {
            metrics.Counter("nats.subscriber.error").Inc()
            _ = dlqPublisher.PublishJSON(ctx, "dlq.events", dlqWrapper{msg, err.Error()})
        },
    },
)

A non-JSON message body with a custom Content-Type header (anything other than empty or application/json) is rejected with ErrUnsupportedMessage by the default decoder — install a custom BodyMarshaller[I] via SubscriberHandlers.Marshaller to accept other payloads.

JetStreamConsumer

Processor errors flow through a JSErrorHandler[I] that returns one of:

Action Effect
JSErrorActionNak NAK without delay — broker redelivers after AckWait. Default.
JSErrorActionNakDelay NAK with the returned delay.
JSErrorActionTerm Mark as terminal — broker does NOT redeliver. Poison-pill path.
JSErrorActionAck Swallow the error (caller has already DLQ'd, advance the stream).
JSErrorActionStop Fatal — Run returns the wrapped error (only when ctx is still live; shutdown-time errors are NAK'd instead).
consumer, _ := corenats.NewJetStreamConsumer[Event](ctx,
    cfg, "EVENTS", "event-worker", &proc{},
    corenats.JSConsumerHandlers[Event]{
        ErrorHandler: func(ctx context.Context, msg corenats.Message[Event], err error) (corenats.JSErrorAction, time.Duration) {
            if errors.Is(err, ErrPoison) {
                return corenats.JSErrorActionTerm, 0
            }
            if errors.Is(err, ErrUpstreamDown) {
                return corenats.JSErrorActionStop, 0
            }
            return corenats.JSErrorActionNakDelay, 5 * time.Second
        },
    },
)

Without a handler (zero-value JSConsumerHandlers) the default is Nak (redeliver after AckWait).

Publishing

Headers, reply, and JSON helpers
publisher.PublishJSON(ctx, "events.core", evt,
    corenats.WithHeaders(nats.Header{"Trace-ID": []string{traceID}}),
    corenats.WithReply("inbox.42"),
)

PublishJSON automatically sets Content-Type: application/json if no explicit value is supplied.

JetStream — MsgID dedup, expected sequence
ack, err := publisher.PublishJSONJS(ctx, "events.js", evt,
    corenats.WithJSMsgID(evt.ID),                // server-side dedup window
    corenats.WithJSExpectedStream("EVENTS"),     // belt-and-braces
    corenats.WithJSExpectedLastSubjectSequence(prevSeq),
)
if err != nil { return err }
log.Printf("ack seq=%d duplicate=%v", ack.Sequence, ack.Duplicate)
Typed JSON publisher
type Order struct{ ID string `json:"id"` }

pub := corenats.NewJSONPublisher[Order](publisher, "events.orders.created")
_ = pub.Publish(ctx, Order{ID: "o-1"})
_, _ = pub.PublishJS(ctx, Order{ID: "o-1"})

Custom body marshaller

consumer, _ := corenats.NewJetStreamConsumer[Event](ctx,
    cfg, "EVENTS", "event-worker", &proc{},
    corenats.JSConsumerHandlers[Event]{
        Marshaller: func(body []byte, p *Event) error {
            return proto.Unmarshal(body, p)
        },
    },
)

The marshaller is type-parameterized (BodyMarshaller[I]) and supplied at construction time, eliminating the race window the previous SetMarshaller method had. When a custom marshaller is installed, the Content-Type check is skipped.

Concurrency

Both Subscriber and JetStreamConsumer accept worker-count and channel-buffer options:

corenats.WithSubscriberWorkerCount(8)
corenats.WithSubscriberChannelBuffer(1024)

corenats.WithJSWorkerCount(8)
corenats.WithJSDeliveryBuffer(1024)

Worker count default 1 preserves per-subject ordering. Channel buffer default 256 absorbs short bursts; raise it if you see SlowConsumer warnings in the async error log.

For JetStream, keep worker_count ≤ MaxAckPending (defaults to 1000) or the extra workers starve.

OpenTelemetry

import (
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/propagation"
)

func main() {
    otel.SetTracerProvider(...)
    otel.SetTextMapPropagator(propagation.TraceContext{})

    publisher, _ := corenats.NewPublisher(ctx, cfg, corenats.WithOtel())
    subscriber, _ := corenats.NewSubscriber[Event](
        cfg, "events.core", proc, corenats.SubscriberHandlers[Event]{},
        corenats.WithOtel(),
    )
    ...
}

When WithOtel() is set:

  • Publisher opens a SpanKindProducer span per Publish*/Publish*JS call and injects the trace context into the outgoing nats.Header via the global TextMapPropagator.
  • Subscriber / JetStreamConsumer extract the trace context from incoming headers and open a SpanKindConsumer span — chained to the producer span — for every Process invocation.

Span attributes follow OpenTelemetry messaging conventions: messaging.system=nats, messaging.destination.name=<subject>, messaging.operation, plus for JetStream messaging.message.sequence and messaging.nats.delivery_attempt.

Register a non-noop TracerProvider AND TextMapPropagator before the constructors — without the propagator, trace context is never written to headers and producer↔consumer spans do not chain.

Connection sharing

publisher, _ := corenats.NewPublisher(ctx, cfg)
subscriber, _ := corenats.NewSubscriber[Event](
    cfg, "events.core", proc, corenats.SubscriberHandlers[Event]{},
    corenats.WithConnection(publisher.Conn()),
)

Components built with WithConnection do NOT close the underlying connection on Shutdown — the caller (here: the publisher) retains ownership. Note that Publisher.Shutdown will still Flush the connection even when it does not own it, so pending Core NATS sends are not silently dropped.

Shutdown ordering matters when sharing: shut down the consumers first (they only release their own runtime state), then the publisher (which closes the shared connection). The reverse order will close the connection from under a still-running consumer.

Shutdown semantics

Type Behavior
Publisher.Shutdown(ctx) Always flushes buffered Core NATS messages (bounded by ctx; falls back to 10s if no deadline), then closes the connection if owned. Idempotent — subsequent calls return the first flush error.
Subscriber.Shutdown(ctx) Cancels Run, waits for it to drain (bounded by ctx). Returns ctx.Err() on timeout but ALWAYS closes the owned connection.
JetStreamConsumer.Shutdown(ctx) Cancels Run. Run stops new deliveries, closes the internal channel, waits for in-flight workers, then NAKs anything left buffered so the broker can redeliver immediately. Returns ctx.Err() on timeout but ALWAYS closes the owned connection.

In-flight Core NATS publishes that have not yet been transmitted to the broker WILL be lost without Flush — call Publisher.Flush(ctx) before shutdown when delivery matters.

Errors

All sentinel errors are exported and can be matched with errors.Is:

ErrEmptyURL
ErrNilProcessor
ErrEmptyStream
ErrEmptyDurable
ErrEmptySubjectFilter
ErrAlreadyRunning
ErrInvalidErrorAction
ErrUnsupportedMessage

Documentation

Overview

Package nats wraps github.com/nats-io/nats.go for Core NATS and NATS JetStream messaging with the lifecycle contract used across core: typed Config, functional Options, Shutdown(ctx), and Run(ctx).

The package exposes three primary types:

  • Publisher (Resource) — publishes to Core NATS subjects (at-most-once) or JetStream streams (at-least-once).
  • Subscriber[I] (Runner) — consumes Core NATS messages with a worker pool and an optional queue group.
  • JetStreamConsumer[I] (Runner) — pull consumer over a durable JetStream consumer with ack/nak/term/stop error policy.

Connections are established eagerly in the constructors — invalid URLs or unreachable brokers surface at boot, not on first publish.

Options

Three option families are exported:

  • PublisherOption — accepted by NewPublisher.
  • SubscriberOption — accepted by NewSubscriber.
  • JSConsumerOption — accepted by NewJetStreamConsumer.

CommonOption (WithLogger, WithConnection, WithNATSOptions, WithOtel) implements all three interfaces, so any common option can be passed to any constructor without wrapping.

Error handling

For the Core NATS Subscriber, processor errors are logged and routed through an optional SubscriberErrorHandler (set via SetErrorHandler) for metrics or DLQ side-effects. There is no ack model.

For JetStreamConsumer, processor errors flow through an optional JSErrorHandler[I] (set via SetErrorHandler) returning one of:

  • JSErrorActionNak (default) — redeliver after AckWait.
  • JSErrorActionNakDelay — redeliver after an explicit delay.
  • JSErrorActionTerm — discard (poison-pill).
  • JSErrorActionAck — swallow (DLQ already handled by the caller).
  • JSErrorActionStop — fatal; Run returns the wrapped error.

OpenTelemetry

Pass WithOtel() to enable per-message spans. Uses the global TracerProvider — register a non-noop provider via otel.SetTracerProvider before constructing the publisher or consumer.

Connection sharing

Pass WithConnection(nc) to reuse an existing *nats.Conn across multiple publishers/subscribers/consumers. Components constructed with a shared connection do NOT close it on Shutdown.

Index

Constants

View Source
const MaxReconnectsForever = -1

MaxReconnectsForever is the sentinel for "retry reconnection indefinitely". Wrap with IntPtr for use in Config.MaxReconnects: nats.IntPtr(nats.MaxReconnectsForever).

Variables

View Source
var (
	// ErrEmptyURL is returned by constructors when Config.URL is empty and
	// no shared connection is provided via WithConnection.
	ErrEmptyURL = errors.New("nats: URL cannot be empty")

	// ErrNilProcessor is returned by NewSubscriber and NewJetStreamConsumer
	// when the Processor argument is nil.
	ErrNilProcessor = errors.New("nats: processor cannot be nil")

	// ErrEmptyStream is returned by NewJetStreamConsumer when the stream
	// name is empty.
	ErrEmptyStream = errors.New("nats: stream name cannot be empty")

	// ErrEmptyDurable is returned by NewJetStreamConsumer when the durable
	// consumer name is empty.
	ErrEmptyDurable = errors.New("nats: durable consumer name cannot be empty")

	// ErrEmptySubjectFilter is returned by NewSubscriber when no subject
	// filter is configured via WithSubscriberSubject.
	ErrEmptySubjectFilter = errors.New("nats: subscriber subject filter cannot be empty")

	// ErrAlreadyRunning is returned by Run when the Runner is already active.
	ErrAlreadyRunning = errors.New("nats: runner already running")

	// ErrInvalidErrorAction is returned when a JSErrorHandler returns an
	// unknown JSErrorAction value.
	ErrInvalidErrorAction = errors.New("nats: invalid JSErrorAction")

	// ErrUnsupportedMessage is returned by the default JSON BodyMarshaller
	// when a Core NATS message's Content-Type header is set to something
	// other than "application/json" or "". JetStream messages without
	// headers are treated as JSON by default. Mirrors rabbitmq behavior.
	ErrUnsupportedMessage = errors.New("nats: unsupported message content type")
)

Sentinel errors returned by nats constructors and methods. Stable identity so callers can errors.Is them.

Functions

func IntPtr

func IntPtr(v int) *int

IntPtr returns a pointer to v. Helper for filling *int fields like Config.MaxReconnects with a literal value.

Types

type BodyMarshaller

type BodyMarshaller[I any] func(body []byte, payload *I) error

BodyMarshaller decodes a raw message body into a typed payload. Install via Subscriber.SetMarshaller or JetStreamConsumer.SetMarshaller. Default behavior (no custom marshaller) is json.Unmarshal.

type CommonOption

type CommonOption interface {
	PublisherOption
	SubscriberOption
	JSConsumerOption
}

CommonOption configures any nats constructor (NewPublisher, NewSubscriber, NewJetStreamConsumer). All CommonOption values are simultaneously PublisherOption, SubscriberOption, and JSConsumerOption.

func WithConnection

func WithConnection(nc *nats.Conn) CommonOption

WithConnection injects a pre-existing *nats.Conn. When set, the constructor skips dialing — Config.URL/User/Password/Token/CredsFile and the WithNATSOptions extras are ignored — and Shutdown will NOT close the connection (the caller retains ownership).

Use to share a single connection across a Publisher + Subscriber + JetStreamConsumer in the same process.

func WithLogger

func WithLogger(l *slog.Logger) CommonOption

WithLogger attaches a *slog.Logger used for lifecycle and per-message error logging. Defaults to slog.Default() when omitted. Nil values are ignored.

func WithNATSOptions

func WithNATSOptions(opts ...nats.Option) CommonOption

WithNATSOptions appends raw nats.Option values to the connect call. Escape hatch for TLS, NKey, JWT, custom inbox prefixes, NoEcho, custom error handlers, and anything else not exposed via Config.

Applied after the options derived from Config so callers can override defaults (e.g., a different reconnect handler).

Ignored when WithConnection is set.

func WithOtel

func WithOtel() CommonOption

WithOtel enables OpenTelemetry instrumentation: one span per publish/consume operation with messaging.* attributes. Uses the global tracer provider — register a non-noop provider before constructing the publisher/consumer.

type Config

type Config struct {
	// URL is the NATS server URL, e.g. "nats://localhost:4222".
	// Required unless WithConnection is used.
	URL string

	// User is the username for basic authentication.
	User string

	// Password is the password for basic authentication.
	Password string

	// Token is the token for token-based authentication.
	Token string

	// CredsFile is the path to a NATS credentials file (.creds).
	CredsFile string

	// ConnectionName is reported to the NATS server and is visible in the
	// monitoring endpoint. Defaults to "core-nats" when empty.
	ConnectionName string

	// DialTimeout bounds the initial connect attempt. Defaults to 10s
	// when zero.
	DialTimeout time.Duration

	// ReconnectWait is the delay between reconnect attempts after a lost
	// connection. Defaults to 2s when zero.
	ReconnectWait time.Duration

	// MaxReconnects bounds the number of reconnect attempts. Pointer
	// semantics distinguish "unset" from explicit "0".
	//
	//   nil  → use the nats.go default (60 attempts).
	//   *v=0 → no reconnect attempts (give up on first failure).
	//   *v=-1 (or &MaxReconnectsForever) → retry forever.
	//   *v>0 → cap at v attempts.
	MaxReconnects *int
}

Config describes NATS connection parameters. All authentication fields are optional; supply at most one of (User+Password), Token, or CredsFile.

type JSConsumerHandlers

type JSConsumerHandlers[I any] struct {
	// Marshaller decodes raw message bytes into I. Nil = json.Unmarshal.
	Marshaller BodyMarshaller[I]

	// ErrorHandler decides what to do when Processor.Process returns a
	// non-nil error. Nil = Nak (redeliver after AckWait).
	ErrorHandler JSErrorHandler[I]
}

JSConsumerHandlers carries payload-typed callbacks for JetStreamConsumer. Pass to NewJetStreamConsumer. Zero value is valid: Marshaller=nil means default JSON decoding, ErrorHandler=nil means the default action on a processor error is JSErrorActionNak.

type JSConsumerOption

type JSConsumerOption interface {
	// contains filtered or unexported methods
}

JSConsumerOption configures NewJetStreamConsumer. Any CommonOption is also a JSConsumerOption.

Payload-typed knobs (Marshaller, ErrorHandler) live on JSConsumerHandlers, passed as a positional argument to NewJetStreamConsumer.

func WithJSAssumeExistingConsumer

func WithJSAssumeExistingConsumer() JSConsumerOption

WithJSAssumeExistingConsumer instructs NewJetStreamConsumer to look up the existing durable consumer (via js.Consumer) instead of creating or updating one. The constructor fails if the consumer does not exist.

Use to avoid silent configuration drift in environments where the JetStream consumer is managed externally (Terraform, CLI, ops tooling). When set, WithJSConsumerConfig and WithJSSubjects are ignored.

func WithJSConsumerConfig

func WithJSConsumerConfig(mutate func(*jetstream.ConsumerConfig)) JSConsumerOption

WithJSConsumerConfig installs a mutator that runs against the pre-populated jetstream.ConsumerConfig before CreateOrUpdateConsumer.

The mutator receives a config already populated with Durable, AckPolicy (AckExplicit), and the configured FilterSubject(s). Mutate only what you need:

corenats.WithJSConsumerConfig(func(cfg *jetstream.ConsumerConfig) {
    cfg.MaxDeliver = 5
    cfg.AckWait = 30 * time.Second
})

This avoids accidentally clobbering defaults (e.g., setting AckPolicy back to AckNone by passing a zero-value struct).

func WithJSDeliveryBuffer

func WithJSDeliveryBuffer(n int) JSConsumerOption

WithJSDeliveryBuffer sets the capacity of the internal channel between the JetStream Consume callback and the worker pool. Default 256. Values < 1 fall back to the default.

func WithJSStream

func WithJSStream(cfg jetstream.StreamConfig) JSConsumerOption

WithJSStream instructs NewJetStreamConsumer to verify or create/update a JetStream stream with the given configuration at construction time.

func WithJSSubjects

func WithJSSubjects(subjects ...string) JSConsumerOption

WithJSSubjects sets one or more filter subjects for the durable consumer. Single subject populates ConsumerConfig.FilterSubject; multiple subjects populate ConsumerConfig.FilterSubjects.

func WithJSWorkerCount

func WithJSWorkerCount(n int) JSConsumerOption

WithJSWorkerCount sets the number of concurrent goroutines processing JetStream deliveries. Default 1 (preserves per-subject order). Values < 1 are normalized to 1.

type JSErrorAction

type JSErrorAction int

JSErrorAction selects what JetStreamConsumer should do after a Processor returns a non-nil error. Returned by JSErrorHandler.

const (
	// JSErrorActionNak negative-acks the message with no delay. JetStream
	// will redeliver after the consumer's AckWait. Default when no
	// JSErrorHandler is configured.
	JSErrorActionNak JSErrorAction = iota

	// JSErrorActionNakDelay negative-acks with an explicit redelivery delay.
	// The delay is the second return value of JSErrorHandler.
	JSErrorActionNakDelay

	// JSErrorActionTerm marks the message as a terminal failure: JetStream
	// will NOT redeliver and the delivery count is finalized. Use for
	// poison-pill records after exhausting in-process retries.
	JSErrorActionTerm

	// JSErrorActionAck acks the failing message (swallow). Use when the
	// processor has already shipped the payload to a DLQ topic and the
	// stream should advance.
	JSErrorActionAck

	// JSErrorActionStop terminates JetStreamConsumer: Run returns the
	// original processing error wrapped. The message is NOT acked. Use for
	// fatal conditions where the service should fail loudly rather than
	// silently advance.
	JSErrorActionStop
)

type JSErrorHandler

type JSErrorHandler[I any] func(ctx context.Context, msg Message[I], err error) (JSErrorAction, time.Duration)

JSErrorHandler is invoked when JetStreamConsumer's Processor returns a non-nil error. It decides what the consumer should do with the failing message.

The returned time.Duration is used only when the action is JSErrorActionNakDelay; it is ignored otherwise.

Implementations typically:

  • Emit to a DLQ and return (JSErrorActionAck, 0).
  • Increment a metric and return (JSErrorActionNak, 0) for transient errors.
  • Return (JSErrorActionTerm, 0) for poison-pill records.
  • Return (JSErrorActionStop, 0) for fatal dependency outages.

type JSONPublisher

type JSONPublisher[T any] struct {
	// contains filtered or unexported fields
}

JSONPublisher wraps a Publisher and JSON-encodes T messages on every call. Generic over T for compile-time payload-type safety. The default publish subject is fixed at construction time.

func NewJSONPublisher

func NewJSONPublisher[T any](publisher *Publisher, subject string) *JSONPublisher[T]

NewJSONPublisher wraps the given Publisher and JSON-publishes to subject.

func (*JSONPublisher[T]) Publish

func (p *JSONPublisher[T]) Publish(ctx context.Context, message T, opts ...PublishOption) error

Publish marshals message and sends it via Core NATS to the fixed subject.

func (*JSONPublisher[T]) PublishJS

func (p *JSONPublisher[T]) PublishJS(
	ctx context.Context,
	message T,
	opts ...JSPublishOption,
) (*jetstream.PubAck, error)

PublishJS marshals message and sends it via JetStream to the fixed subject.

type JSPublishOption

type JSPublishOption func(*JSPublishOpts)

JSPublishOption configures a single PublishJS/PublishJSONJS call.

func WithJSExpectedLastMsgID

func WithJSExpectedLastMsgID(id string) JSPublishOption

WithJSExpectedLastMsgID enforces a precondition that the previous message in the stream had the given MsgID.

func WithJSExpectedLastSequence

func WithJSExpectedLastSequence(seq uint64) JSPublishOption

WithJSExpectedLastSequence enforces optimistic-concurrency control: the publish is rejected unless the stream's last sequence matches seq.

func WithJSExpectedLastSubjectSequence

func WithJSExpectedLastSubjectSequence(seq uint64) JSPublishOption

WithJSExpectedLastSubjectSequence enforces per-subject optimistic concurrency control.

func WithJSExpectedStream

func WithJSExpectedStream(name string) JSPublishOption

WithJSExpectedStream rejects the publish if the target stream does not match the given name. Useful when multiple streams could route the same subject.

func WithJSHeaders

func WithJSHeaders(h nats.Header) JSPublishOption

WithJSHeaders attaches NATS headers to the published JetStream message.

func WithJSMsgID

func WithJSMsgID(id string) JSPublishOption

WithJSMsgID sets the JetStream message ID used for server-side deduplication. Required for exactly-once-style publishing.

func WithJSRetryAttempts

func WithJSRetryAttempts(n int) JSPublishOption

WithJSRetryAttempts overrides the underlying jetstream client's retry attempt count for the publish. Values < 1 are ignored.

type JSPublishOpts

type JSPublishOpts struct {
	Headers                nats.Header
	MsgID                  string
	ExpectedStream         string
	ExpectedLastSequence   *uint64
	ExpectedLastSubjectSeq *uint64
	ExpectedLastMsgID      string
	RetryAttempts          int
	// contains filtered or unexported fields
}

JSPublishOpts captures per-publish JetStream metadata. Use the helper option functions (WithJSHeaders, WithJSMsgID, etc.) to populate it.

type JetStreamConsumer

type JetStreamConsumer[I any] struct {
	// contains filtered or unexported fields
}

JetStreamConsumer is a generic NATS JetStream pull consumer that consumes messages and dispatches them to a Processor via a worker pool.

Implements lifecycle.Runner. Construct via NewJetStreamConsumer.

The consumer is created (or looked up — see WithJSAssumeExistingConsumer) in the constructor; the pull loop starts in Run. Use Ready() to block until Run has established the pull subscription.

func NewJetStreamConsumer

func NewJetStreamConsumer[I any](
	ctx context.Context,
	cfg Config,
	stream string,
	durable string,
	processor Processor[I],
	handlers JSConsumerHandlers[I],
	opts ...JSConsumerOption,
) (*JetStreamConsumer[I], error)

NewJetStreamConsumer creates a JetStreamConsumer and eagerly:

  1. Dials NATS (or uses WithConnection's shared connection).
  2. Verifies/creates the stream if WithJSStream is provided.
  3. Creates/updates the durable consumer with the configured subject filter(s), AckPolicy=Explicit, and any user mutations from WithJSConsumerConfig — OR looks up an existing consumer when WithJSAssumeExistingConsumer is set.

Any of these steps failing returns a wrapped error and closes the owned connection (if any).

stream and durable are required (validated as non-empty).

handlers carries optional payload-typed callbacks; pass JSConsumerHandlers[I]{} for defaults (JSON decode + Nak-on-error).

func (*JetStreamConsumer[I]) Conn

func (c *JetStreamConsumer[I]) Conn() *nats.Conn

Conn returns the underlying *nats.Conn.

func (*JetStreamConsumer[I]) JetStream

func (c *JetStreamConsumer[I]) JetStream() jetstream.JetStream

JetStream returns the underlying jetstream.JetStream context.

func (*JetStreamConsumer[I]) Ready

func (c *JetStreamConsumer[I]) Ready() <-chan struct{}

Ready returns a channel that is closed once Run has established the JetStream Consume callback. Block on it to ensure tests/producers don't race the consumer's start-up.

Ready() is re-armed on every Run cycle (the channel rotates), so callers blocking on Ready() after a restart see the new cycle's completion.

func (*JetStreamConsumer[I]) Run

func (c *JetStreamConsumer[I]) Run(ctx context.Context) error

Run starts the JetStream Consume loop, dispatching messages to a worker pool, and blocks until ctx is cancelled or a fatal processor error triggers JSErrorActionStop.

Returns nil on graceful shutdown; returns a wrapped error when the processor signals JSErrorActionStop.

Implements lifecycle.Runner.

func (*JetStreamConsumer[I]) Shutdown

func (c *JetStreamConsumer[I]) Shutdown(ctx context.Context) error

Shutdown signals Run to stop and waits for it to finish, bounded by ctx. On ctx expiry returns ctx.Err() — but the owned connection is still closed so the resource does not leak.

Implements lifecycle.Resource. Idempotent.

type Message

type Message[I any] struct {
	// Payload is the decoded message body.
	Payload I

	// Subject is the NATS subject the message was published to.
	Subject string

	// Reply is the optional reply subject for request-reply patterns.
	// Empty when the publisher did not set one.
	Reply string

	// Headers carries NATS message headers (NATS 2.x+). Nil when no
	// headers were attached.
	Headers nats.Header

	// Sequence is the JetStream stream sequence number. Zero on Core NATS.
	Sequence uint64

	// NumDelivered is the JetStream delivery attempt counter (1-based).
	// Zero on Core NATS.
	NumDelivered uint64

	// Timestamp is the JetStream server-side publish time. Zero on Core NATS.
	Timestamp time.Time
}

Message is the typed payload delivered to a Processor.

Fields populated for both Core NATS and JetStream:

  • Payload, Subject, Reply, Headers

Fields populated for JetStream only (zero on Core NATS):

  • Sequence, NumDelivered, Timestamp

type Processor

type Processor[I any] interface {
	Process(ctx context.Context, msg Message[I]) error
}

Processor is what callers implement to handle messages.

For Core NATS Subscriber, the returned error is passed to the configured SubscriberErrorHandler (or logged at warn level if none is set). There is no ack model — the error is observational only.

For JetStreamConsumer, the returned error is passed to the configured JSErrorHandler[I] which decides whether the message is Acked, Nacked, Term'd, or whether the consumer should stop. Without a custom handler the default is Nak (redeliver after AckWait).

type PublishOption

type PublishOption func(*PublishOpts)

PublishOption configures a single Publish/PublishJSON call.

func WithHeaders

func WithHeaders(h nats.Header) PublishOption

WithHeaders attaches NATS headers to the published message.

func WithReply

func WithReply(subject string) PublishOption

WithReply sets the reply subject for request-reply patterns.

type PublishOpts

type PublishOpts struct {
	// Headers are NATS message headers (NATS 2.x+).
	Headers nats.Header

	// Reply is an optional reply subject for request-reply patterns.
	Reply string
	// contains filtered or unexported fields
}

PublishOpts captures per-publish Core NATS message metadata. Headers are optional; an empty Reply suppresses the reply subject.

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher publishes messages to Core NATS subjects or JetStream streams. Implements lifecycle.Resource.

Construct via NewPublisher. The publisher establishes its NATS connection (and optional JetStream context + stream declaration) eagerly in the constructor.

When constructed with WithOtel(), every Publish*/Publish*JS call opens a SpanKindProducer span and injects the trace context into the outgoing nats.Header via the global TextMapPropagator. The consumer-side span (Subscriber / JetStreamConsumer) extracts the context so the producer and consumer spans chain.

func NewPublisher

func NewPublisher(ctx context.Context, cfg Config, opts ...PublisherOption) (*Publisher, error)

NewPublisher creates a Publisher. By default it dials NATS using cfg; pass WithConnection to share an existing connection.

If WithPublisherStream is provided, the stream is verified or created/updated during the constructor — a non-nil error is returned on failure.

func (*Publisher) Conn

func (p *Publisher) Conn() *nats.Conn

Conn returns the underlying *nats.Conn. Use to construct additional subscribers/consumers that share this connection via WithConnection.

func (*Publisher) Flush

func (p *Publisher) Flush(ctx context.Context) error

Flush blocks until the server has processed all buffered messages or ctx expires. Call before Shutdown when using Core NATS publishing to avoid losing in-flight messages.

If ctx has no deadline, Flush applies a default 10s timeout — the underlying NATS client requires a bounded wait.

func (*Publisher) JetStream

func (p *Publisher) JetStream() jetstream.JetStream

JetStream returns the underlying jetstream.JetStream context. Use for advanced flows (KV, ObjectStore, custom stream management).

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, subject string, data []byte, opts ...PublishOption) error

Publish sends raw bytes via Core NATS (at-most-once delivery). The message is buffered by the client and flushed asynchronously — call Flush before Shutdown if you must guarantee transmission.

ctx is only used for OTel span context; the underlying nats.Conn.PublishMsg call is non-blocking and does not honor cancellation.

func (*Publisher) PublishJS

func (p *Publisher) PublishJS(
	ctx context.Context,
	subject string,
	data []byte,
	opts ...JSPublishOption,
) (*jetstream.PubAck, error)

PublishJS sends raw bytes via JetStream (at-least-once delivery). The returned PubAck contains the assigned stream sequence and duplicate flag.

func (*Publisher) PublishJSON

func (p *Publisher) PublishJSON(ctx context.Context, subject string, message any, opts ...PublishOption) error

PublishJSON marshals message to JSON and publishes it via Core NATS. Sets the Content-Type header to application/json so the consumer-side default decoder accepts it without needing a custom marshaller.

func (*Publisher) PublishJSONJS

func (p *Publisher) PublishJSONJS(
	ctx context.Context,
	subject string,
	message any,
	opts ...JSPublishOption,
) (*jetstream.PubAck, error)

PublishJSONJS marshals message to JSON and publishes it via JetStream. Sets Content-Type=application/json on the outgoing message.

func (*Publisher) Shutdown

func (p *Publisher) Shutdown(ctx context.Context) error

Shutdown flushes pending Core NATS messages and closes the underlying connection if it is owned by the Publisher. Honors ctx for the flush step; the close itself is non-blocking.

Flush runs regardless of connection ownership so a shared-connection Publisher does not silently drop its in-flight Core NATS sends. The connection is closed only when this Publisher owns it.

Idempotent — subsequent calls return the error captured by the first invocation (so callers can call Shutdown multiple times without losing the original flush failure).

type PublisherOption

type PublisherOption interface {
	// contains filtered or unexported methods
}

PublisherOption configures NewPublisher. Any CommonOption is also a PublisherOption.

func WithPublisherStream

func WithPublisherStream(cfg jetstream.StreamConfig) PublisherOption

WithPublisherStream instructs NewPublisher to verify or create/update a JetStream stream with the given configuration at construction time. The constructor returns an error if the stream cannot be declared.

type Subscriber

type Subscriber[I any] struct {
	// contains filtered or unexported fields
}

Subscriber is a generic Core NATS subscriber (at-most-once delivery) that consumes messages from a subject or queue group and dispatches them to a Processor via a worker pool.

Implements lifecycle.Runner. Construct via NewSubscriber.

Subscription registration happens in Run (not the constructor) — matching the lifecycle model used by kafka.Consumer and rabbitmq. Use Ready() to block until the subscription is live (tests, smoke checks).

Core NATS is at-most-once: messages published to the subject before Run has registered the subscription are dropped by the broker. For at-least-once semantics use JetStreamConsumer.

func NewSubscriber

func NewSubscriber[I any](
	cfg Config,
	subject string,
	processor Processor[I],
	handlers SubscriberHandlers[I],
	opts ...SubscriberOption,
) (*Subscriber[I], error)

NewSubscriber creates a Subscriber and eagerly dials NATS (unless WithConnection is provided). Returns an error if cfg is invalid or the connection cannot be established.

subject is required (the NATS subject filter to consume from). handlers carries optional payload-typed callbacks; pass a zero-value SubscriberHandlers[I]{} for defaults.

By default the message body is JSON-unmarshalled and a Content-Type header of "application/json" (or empty) is enforced. Supply handlers.Marshaller for custom decoding and handlers.ErrorHandler to observe Processor failures.

func (*Subscriber[I]) Conn

func (s *Subscriber[I]) Conn() *nats.Conn

Conn returns the underlying *nats.Conn. Useful for sharing the connection with a Publisher.

func (*Subscriber[I]) Ready

func (s *Subscriber[I]) Ready() <-chan struct{}

Ready returns a channel that is closed once Run has registered the NATS subscription. Block on it to ensure publishes are not lost in the at-most-once gap between Run start and Subscribe completion (typical pattern in tests).

On a never-Run Subscriber, the channel never closes — pair with a timeout if you cannot guarantee Run is being called.

Ready() is re-armed on every Run cycle: after a Run returns, the next Run rotates the underlying channel so callers that block on Ready() see the new cycle's subscribe completion, not the previous one.

func (*Subscriber[I]) Run

func (s *Subscriber[I]) Run(ctx context.Context) error

Run subscribes to the configured subject (optionally in a queue group), dispatches messages to a worker pool, and blocks until ctx is cancelled.

Returns ErrAlreadyRunning if called while an earlier Run is still active. Returns nil on graceful ctx cancellation.

Implements lifecycle.Runner.

func (*Subscriber[I]) Shutdown

func (s *Subscriber[I]) Shutdown(ctx context.Context) error

Shutdown signals Run to stop and waits for it to finish, bounded by ctx. On ctx expiry returns ctx.Err() — but the owned connection is still closed so the resource does not leak.

Implements lifecycle.Resource. Idempotent.

type SubscriberErrorHandler

type SubscriberErrorHandler[I any] func(ctx context.Context, msg Message[I], err error)

SubscriberErrorHandler is invoked when Core NATS message processing fails. Use it for metrics, DLQ side-effects, or structured logging. The returned value is ignored — Core NATS has no ack model.

type SubscriberHandlers

type SubscriberHandlers[I any] struct {
	// Marshaller decodes raw message bytes into I. Nil = json.Unmarshal.
	Marshaller BodyMarshaller[I]

	// ErrorHandler is invoked when Processor.Process returns a non-nil
	// error. Nil = log only.
	ErrorHandler SubscriberErrorHandler[I]
}

SubscriberHandlers carries payload-typed callbacks for Subscriber. Pass to NewSubscriber. Zero value is valid: Marshaller=nil means default JSON decoding, ErrorHandler=nil means processor errors are only logged.

type SubscriberOption

type SubscriberOption interface {
	// contains filtered or unexported methods
}

SubscriberOption configures NewSubscriber. Any CommonOption is also a SubscriberOption.

Payload-typed knobs (Marshaller, ErrorHandler) live on SubscriberHandlers, passed as a positional argument to NewSubscriber.

func WithSubscriberChannelBuffer

func WithSubscriberChannelBuffer(n int) SubscriberOption

WithSubscriberChannelBuffer sets the capacity of the internal ChanSubscribe channel used between the NATS client reader and the worker pool. Default 256.

Tune up for high-throughput Subscribers if you see SlowConsumer warnings in the async error log (raises memory usage during bursts). Values < 1 fall back to the default.

func WithSubscriberQueueGroup

func WithSubscriberQueueGroup(group string) SubscriberOption

WithSubscriberQueueGroup configures a queue group for Core NATS subscriber load balancing. Subscribers in the same queue group share message load.

func WithSubscriberWorkerCount

func WithSubscriberWorkerCount(n int) SubscriberOption

WithSubscriberWorkerCount sets the number of concurrent goroutines processing deliveries. Default 1 (ordered processing). Values < 1 are normalized to 1.

Raising above 1 increases throughput at the cost of losing per-subject processing order.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL