Documentation
¶
Overview ¶
Package nats wraps github.com/nats-io/nats.go for Core NATS and NATS JetStream messaging with the lifecycle contract used across core: typed Config, functional Options, Shutdown(ctx), and Run(ctx).
The package exposes three primary types:
- Publisher (Resource) — publishes to Core NATS subjects (at-most-once) or JetStream streams (at-least-once).
- Subscriber[I] (Runner) — consumes Core NATS messages with a worker pool and an optional queue group.
- JetStreamConsumer[I] (Runner) — pull consumer over a durable JetStream consumer with ack/nak/term/stop error policy.
Connections are established eagerly in the constructors — invalid URLs or unreachable brokers surface at boot, not on first publish.
Options ¶
Three option families are exported:
- PublisherOption — accepted by NewPublisher.
- SubscriberOption — accepted by NewSubscriber.
- JSConsumerOption — accepted by NewJetStreamConsumer.
CommonOption (WithLogger, WithConnection, WithNATSOptions, WithOtel) implements all three interfaces, so any common option can be passed to any constructor without wrapping.
Error handling ¶
For the Core NATS Subscriber, processor errors are logged and routed through an optional SubscriberErrorHandler (set via SetErrorHandler) for metrics or DLQ side-effects. There is no ack model.
For JetStreamConsumer, processor errors flow through an optional JSErrorHandler[I] (set via SetErrorHandler) returning one of:
- JSErrorActionNak (default) — redeliver after AckWait.
- JSErrorActionNakDelay — redeliver after an explicit delay.
- JSErrorActionTerm — discard (poison-pill).
- JSErrorActionAck — swallow (DLQ already handled by the caller).
- JSErrorActionStop — fatal; Run returns the wrapped error.
OpenTelemetry ¶
Pass WithOtel() to enable per-message spans. Uses the global TracerProvider — register a non-noop provider via otel.SetTracerProvider before constructing the publisher or consumer.
Connection sharing ¶
Pass WithConnection(nc) to reuse an existing *nats.Conn across multiple publishers/subscribers/consumers. Components constructed with a shared connection do NOT close it on Shutdown.
Index ¶
- Constants
- Variables
- func IntPtr(v int) *int
- type BodyMarshaller
- type CommonOption
- type Config
- type JSConsumerHandlers
- type JSConsumerOption
- func WithJSAssumeExistingConsumer() JSConsumerOption
- func WithJSConsumerConfig(mutate func(*jetstream.ConsumerConfig)) JSConsumerOption
- func WithJSDeliveryBuffer(n int) JSConsumerOption
- func WithJSStream(cfg jetstream.StreamConfig) JSConsumerOption
- func WithJSSubjects(subjects ...string) JSConsumerOption
- func WithJSWorkerCount(n int) JSConsumerOption
- type JSErrorAction
- type JSErrorHandler
- type JSONPublisher
- type JSPublishOption
- func WithJSExpectedLastMsgID(id string) JSPublishOption
- func WithJSExpectedLastSequence(seq uint64) JSPublishOption
- func WithJSExpectedLastSubjectSequence(seq uint64) JSPublishOption
- func WithJSExpectedStream(name string) JSPublishOption
- func WithJSHeaders(h nats.Header) JSPublishOption
- func WithJSMsgID(id string) JSPublishOption
- func WithJSRetryAttempts(n int) JSPublishOption
- type JSPublishOpts
- type JetStreamConsumer
- type Message
- type Processor
- type PublishOption
- type PublishOpts
- type Publisher
- func (p *Publisher) Conn() *nats.Conn
- func (p *Publisher) Flush(ctx context.Context) error
- func (p *Publisher) JetStream() jetstream.JetStream
- func (p *Publisher) Publish(ctx context.Context, subject string, data []byte, opts ...PublishOption) error
- func (p *Publisher) PublishJS(ctx context.Context, subject string, data []byte, opts ...JSPublishOption) (*jetstream.PubAck, error)
- func (p *Publisher) PublishJSON(ctx context.Context, subject string, message any, opts ...PublishOption) error
- func (p *Publisher) PublishJSONJS(ctx context.Context, subject string, message any, opts ...JSPublishOption) (*jetstream.PubAck, error)
- func (p *Publisher) Shutdown(ctx context.Context) error
- type PublisherOption
- type Subscriber
- type SubscriberErrorHandler
- type SubscriberHandlers
- type SubscriberOption
Constants ¶
const MaxReconnectsForever = -1
MaxReconnectsForever is the sentinel for "retry reconnection indefinitely". Wrap with IntPtr for use in Config.MaxReconnects: nats.IntPtr(nats.MaxReconnectsForever).
Variables ¶
var ( // ErrEmptyURL is returned by constructors when Config.URL is empty and // no shared connection is provided via WithConnection. ErrEmptyURL = errors.New("nats: URL cannot be empty") // ErrNilProcessor is returned by NewSubscriber and NewJetStreamConsumer // when the Processor argument is nil. ErrNilProcessor = errors.New("nats: processor cannot be nil") // ErrEmptyStream is returned by NewJetStreamConsumer when the stream // name is empty. ErrEmptyStream = errors.New("nats: stream name cannot be empty") // ErrEmptyDurable is returned by NewJetStreamConsumer when the durable // consumer name is empty. ErrEmptyDurable = errors.New("nats: durable consumer name cannot be empty") // ErrEmptySubjectFilter is returned by NewSubscriber when no subject // filter is configured via WithSubscriberSubject. ErrEmptySubjectFilter = errors.New("nats: subscriber subject filter cannot be empty") // ErrAlreadyRunning is returned by Run when the Runner is already active. ErrAlreadyRunning = errors.New("nats: runner already running") // ErrInvalidErrorAction is returned when a JSErrorHandler returns an // unknown JSErrorAction value. ErrInvalidErrorAction = errors.New("nats: invalid JSErrorAction") // ErrUnsupportedMessage is returned by the default JSON BodyMarshaller // when a Core NATS message's Content-Type header is set to something // other than "application/json" or "". JetStream messages without // headers are treated as JSON by default. Mirrors rabbitmq behavior. ErrUnsupportedMessage = errors.New("nats: unsupported message content type") )
Sentinel errors returned by nats constructors and methods. Stable identity so callers can errors.Is them.
Functions ¶
Types ¶
type BodyMarshaller ¶
BodyMarshaller decodes a raw message body into a typed payload. Install via Subscriber.SetMarshaller or JetStreamConsumer.SetMarshaller. Default behavior (no custom marshaller) is json.Unmarshal.
type CommonOption ¶
type CommonOption interface {
PublisherOption
SubscriberOption
JSConsumerOption
}
CommonOption configures any nats constructor (NewPublisher, NewSubscriber, NewJetStreamConsumer). All CommonOption values are simultaneously PublisherOption, SubscriberOption, and JSConsumerOption.
func WithConnection ¶
func WithConnection(nc *nats.Conn) CommonOption
WithConnection injects a pre-existing *nats.Conn. When set, the constructor skips dialing — Config.URL/User/Password/Token/CredsFile and the WithNATSOptions extras are ignored — and Shutdown will NOT close the connection (the caller retains ownership).
Use to share a single connection across a Publisher + Subscriber + JetStreamConsumer in the same process.
func WithLogger ¶
func WithLogger(l *slog.Logger) CommonOption
WithLogger attaches a *slog.Logger used for lifecycle and per-message error logging. Defaults to slog.Default() when omitted. Nil values are ignored.
func WithNATSOptions ¶
func WithNATSOptions(opts ...nats.Option) CommonOption
WithNATSOptions appends raw nats.Option values to the connect call. Escape hatch for TLS, NKey, JWT, custom inbox prefixes, NoEcho, custom error handlers, and anything else not exposed via Config.
Applied after the options derived from Config so callers can override defaults (e.g., a different reconnect handler).
Ignored when WithConnection is set.
func WithOtel ¶
func WithOtel() CommonOption
WithOtel enables OpenTelemetry instrumentation: one span per publish/consume operation with messaging.* attributes. Uses the global tracer provider — register a non-noop provider before constructing the publisher/consumer.
type Config ¶
type Config struct {
// URL is the NATS server URL, e.g. "nats://localhost:4222".
// Required unless WithConnection is used.
URL string
// User is the username for basic authentication.
User string
// Password is the password for basic authentication.
Password string
// Token is the token for token-based authentication.
Token string
// CredsFile is the path to a NATS credentials file (.creds).
CredsFile string
// ConnectionName is reported to the NATS server and is visible in the
// monitoring endpoint. Defaults to "core-nats" when empty.
ConnectionName string
// DialTimeout bounds the initial connect attempt. Defaults to 10s
// when zero.
DialTimeout time.Duration
// ReconnectWait is the delay between reconnect attempts after a lost
// connection. Defaults to 2s when zero.
ReconnectWait time.Duration
// MaxReconnects bounds the number of reconnect attempts. Pointer
// semantics distinguish "unset" from explicit "0".
//
// nil → use the nats.go default (60 attempts).
// *v=0 → no reconnect attempts (give up on first failure).
// *v=-1 (or &MaxReconnectsForever) → retry forever.
// *v>0 → cap at v attempts.
MaxReconnects *int
}
Config describes NATS connection parameters. All authentication fields are optional; supply at most one of (User+Password), Token, or CredsFile.
type JSConsumerHandlers ¶
type JSConsumerHandlers[I any] struct { // Marshaller decodes raw message bytes into I. Nil = json.Unmarshal. Marshaller BodyMarshaller[I] // ErrorHandler decides what to do when Processor.Process returns a // non-nil error. Nil = Nak (redeliver after AckWait). ErrorHandler JSErrorHandler[I] }
JSConsumerHandlers carries payload-typed callbacks for JetStreamConsumer. Pass to NewJetStreamConsumer. Zero value is valid: Marshaller=nil means default JSON decoding, ErrorHandler=nil means the default action on a processor error is JSErrorActionNak.
type JSConsumerOption ¶
type JSConsumerOption interface {
// contains filtered or unexported methods
}
JSConsumerOption configures NewJetStreamConsumer. Any CommonOption is also a JSConsumerOption.
Payload-typed knobs (Marshaller, ErrorHandler) live on JSConsumerHandlers, passed as a positional argument to NewJetStreamConsumer.
func WithJSAssumeExistingConsumer ¶
func WithJSAssumeExistingConsumer() JSConsumerOption
WithJSAssumeExistingConsumer instructs NewJetStreamConsumer to look up the existing durable consumer (via js.Consumer) instead of creating or updating one. The constructor fails if the consumer does not exist.
Use to avoid silent configuration drift in environments where the JetStream consumer is managed externally (Terraform, CLI, ops tooling). When set, WithJSConsumerConfig and WithJSSubjects are ignored.
func WithJSConsumerConfig ¶
func WithJSConsumerConfig(mutate func(*jetstream.ConsumerConfig)) JSConsumerOption
WithJSConsumerConfig installs a mutator that runs against the pre-populated jetstream.ConsumerConfig before CreateOrUpdateConsumer.
The mutator receives a config already populated with Durable, AckPolicy (AckExplicit), and the configured FilterSubject(s). Mutate only what you need:
corenats.WithJSConsumerConfig(func(cfg *jetstream.ConsumerConfig) {
cfg.MaxDeliver = 5
cfg.AckWait = 30 * time.Second
})
This avoids accidentally clobbering defaults (e.g., setting AckPolicy back to AckNone by passing a zero-value struct).
func WithJSDeliveryBuffer ¶
func WithJSDeliveryBuffer(n int) JSConsumerOption
WithJSDeliveryBuffer sets the capacity of the internal channel between the JetStream Consume callback and the worker pool. Default 256. Values < 1 fall back to the default.
func WithJSStream ¶
func WithJSStream(cfg jetstream.StreamConfig) JSConsumerOption
WithJSStream instructs NewJetStreamConsumer to verify or create/update a JetStream stream with the given configuration at construction time.
func WithJSSubjects ¶
func WithJSSubjects(subjects ...string) JSConsumerOption
WithJSSubjects sets one or more filter subjects for the durable consumer. Single subject populates ConsumerConfig.FilterSubject; multiple subjects populate ConsumerConfig.FilterSubjects.
func WithJSWorkerCount ¶
func WithJSWorkerCount(n int) JSConsumerOption
WithJSWorkerCount sets the number of concurrent goroutines processing JetStream deliveries. Default 1 (preserves per-subject order). Values < 1 are normalized to 1.
type JSErrorAction ¶
type JSErrorAction int
JSErrorAction selects what JetStreamConsumer should do after a Processor returns a non-nil error. Returned by JSErrorHandler.
const ( // JSErrorActionNak negative-acks the message with no delay. JetStream // will redeliver after the consumer's AckWait. Default when no // JSErrorHandler is configured. JSErrorActionNak JSErrorAction = iota // JSErrorActionNakDelay negative-acks with an explicit redelivery delay. // The delay is the second return value of JSErrorHandler. JSErrorActionNakDelay // JSErrorActionTerm marks the message as a terminal failure: JetStream // will NOT redeliver and the delivery count is finalized. Use for // poison-pill records after exhausting in-process retries. JSErrorActionTerm // JSErrorActionAck acks the failing message (swallow). Use when the // processor has already shipped the payload to a DLQ topic and the // stream should advance. JSErrorActionAck // JSErrorActionStop terminates JetStreamConsumer: Run returns the // original processing error wrapped. The message is NOT acked. Use for // fatal conditions where the service should fail loudly rather than // silently advance. JSErrorActionStop )
type JSErrorHandler ¶
type JSErrorHandler[I any] func(ctx context.Context, msg Message[I], err error) (JSErrorAction, time.Duration)
JSErrorHandler is invoked when JetStreamConsumer's Processor returns a non-nil error. It decides what the consumer should do with the failing message.
The returned time.Duration is used only when the action is JSErrorActionNakDelay; it is ignored otherwise.
Implementations typically:
- Emit to a DLQ and return (JSErrorActionAck, 0).
- Increment a metric and return (JSErrorActionNak, 0) for transient errors.
- Return (JSErrorActionTerm, 0) for poison-pill records.
- Return (JSErrorActionStop, 0) for fatal dependency outages.
type JSONPublisher ¶
type JSONPublisher[T any] struct { // contains filtered or unexported fields }
JSONPublisher wraps a Publisher and JSON-encodes T messages on every call. Generic over T for compile-time payload-type safety. The default publish subject is fixed at construction time.
func NewJSONPublisher ¶
func NewJSONPublisher[T any](publisher *Publisher, subject string) *JSONPublisher[T]
NewJSONPublisher wraps the given Publisher and JSON-publishes to subject.
func (*JSONPublisher[T]) Publish ¶
func (p *JSONPublisher[T]) Publish(ctx context.Context, message T, opts ...PublishOption) error
Publish marshals message and sends it via Core NATS to the fixed subject.
func (*JSONPublisher[T]) PublishJS ¶
func (p *JSONPublisher[T]) PublishJS( ctx context.Context, message T, opts ...JSPublishOption, ) (*jetstream.PubAck, error)
PublishJS marshals message and sends it via JetStream to the fixed subject.
type JSPublishOption ¶
type JSPublishOption func(*JSPublishOpts)
JSPublishOption configures a single PublishJS/PublishJSONJS call.
func WithJSExpectedLastMsgID ¶
func WithJSExpectedLastMsgID(id string) JSPublishOption
WithJSExpectedLastMsgID enforces a precondition that the previous message in the stream had the given MsgID.
func WithJSExpectedLastSequence ¶
func WithJSExpectedLastSequence(seq uint64) JSPublishOption
WithJSExpectedLastSequence enforces optimistic-concurrency control: the publish is rejected unless the stream's last sequence matches seq.
func WithJSExpectedLastSubjectSequence ¶
func WithJSExpectedLastSubjectSequence(seq uint64) JSPublishOption
WithJSExpectedLastSubjectSequence enforces per-subject optimistic concurrency control.
func WithJSExpectedStream ¶
func WithJSExpectedStream(name string) JSPublishOption
WithJSExpectedStream rejects the publish if the target stream does not match the given name. Useful when multiple streams could route the same subject.
func WithJSHeaders ¶
func WithJSHeaders(h nats.Header) JSPublishOption
WithJSHeaders attaches NATS headers to the published JetStream message.
func WithJSMsgID ¶
func WithJSMsgID(id string) JSPublishOption
WithJSMsgID sets the JetStream message ID used for server-side deduplication. Required for exactly-once-style publishing.
func WithJSRetryAttempts ¶
func WithJSRetryAttempts(n int) JSPublishOption
WithJSRetryAttempts overrides the underlying jetstream client's retry attempt count for the publish. Values < 1 are ignored.
type JSPublishOpts ¶
type JSPublishOpts struct {
Headers nats.Header
MsgID string
ExpectedStream string
ExpectedLastSequence *uint64
ExpectedLastSubjectSeq *uint64
ExpectedLastMsgID string
RetryAttempts int
// contains filtered or unexported fields
}
JSPublishOpts captures per-publish JetStream metadata. Use the helper option functions (WithJSHeaders, WithJSMsgID, etc.) to populate it.
type JetStreamConsumer ¶
type JetStreamConsumer[I any] struct { // contains filtered or unexported fields }
JetStreamConsumer is a generic NATS JetStream pull consumer that consumes messages and dispatches them to a Processor via a worker pool.
Implements lifecycle.Runner. Construct via NewJetStreamConsumer.
The consumer is created (or looked up — see WithJSAssumeExistingConsumer) in the constructor; the pull loop starts in Run. Use Ready() to block until Run has established the pull subscription.
func NewJetStreamConsumer ¶
func NewJetStreamConsumer[I any]( ctx context.Context, cfg Config, stream string, durable string, processor Processor[I], handlers JSConsumerHandlers[I], opts ...JSConsumerOption, ) (*JetStreamConsumer[I], error)
NewJetStreamConsumer creates a JetStreamConsumer and eagerly:
- Dials NATS (or uses WithConnection's shared connection).
- Verifies/creates the stream if WithJSStream is provided.
- Creates/updates the durable consumer with the configured subject filter(s), AckPolicy=Explicit, and any user mutations from WithJSConsumerConfig — OR looks up an existing consumer when WithJSAssumeExistingConsumer is set.
Any of these steps failing returns a wrapped error and closes the owned connection (if any).
stream and durable are required (validated as non-empty).
handlers carries optional payload-typed callbacks; pass JSConsumerHandlers[I]{} for defaults (JSON decode + Nak-on-error).
func (*JetStreamConsumer[I]) Conn ¶
func (c *JetStreamConsumer[I]) Conn() *nats.Conn
Conn returns the underlying *nats.Conn.
func (*JetStreamConsumer[I]) JetStream ¶
func (c *JetStreamConsumer[I]) JetStream() jetstream.JetStream
JetStream returns the underlying jetstream.JetStream context.
func (*JetStreamConsumer[I]) Ready ¶
func (c *JetStreamConsumer[I]) Ready() <-chan struct{}
Ready returns a channel that is closed once Run has established the JetStream Consume callback. Block on it to ensure tests/producers don't race the consumer's start-up.
Ready() is re-armed on every Run cycle (the channel rotates), so callers blocking on Ready() after a restart see the new cycle's completion.
func (*JetStreamConsumer[I]) Run ¶
func (c *JetStreamConsumer[I]) Run(ctx context.Context) error
Run starts the JetStream Consume loop, dispatching messages to a worker pool, and blocks until ctx is cancelled or a fatal processor error triggers JSErrorActionStop.
Returns nil on graceful shutdown; returns a wrapped error when the processor signals JSErrorActionStop.
Implements lifecycle.Runner.
func (*JetStreamConsumer[I]) Shutdown ¶
func (c *JetStreamConsumer[I]) Shutdown(ctx context.Context) error
Shutdown signals Run to stop and waits for it to finish, bounded by ctx. On ctx expiry returns ctx.Err() — but the owned connection is still closed so the resource does not leak.
Implements lifecycle.Resource. Idempotent.
type Message ¶
type Message[I any] struct { // Payload is the decoded message body. Payload I // Subject is the NATS subject the message was published to. Subject string // Reply is the optional reply subject for request-reply patterns. // Empty when the publisher did not set one. Reply string // Headers carries NATS message headers (NATS 2.x+). Nil when no // headers were attached. Headers nats.Header // Sequence is the JetStream stream sequence number. Zero on Core NATS. Sequence uint64 // NumDelivered is the JetStream delivery attempt counter (1-based). // Zero on Core NATS. NumDelivered uint64 // Timestamp is the JetStream server-side publish time. Zero on Core NATS. Timestamp time.Time }
Message is the typed payload delivered to a Processor.
Fields populated for both Core NATS and JetStream:
- Payload, Subject, Reply, Headers
Fields populated for JetStream only (zero on Core NATS):
- Sequence, NumDelivered, Timestamp
type Processor ¶
Processor is what callers implement to handle messages.
For Core NATS Subscriber, the returned error is passed to the configured SubscriberErrorHandler (or logged at warn level if none is set). There is no ack model — the error is observational only.
For JetStreamConsumer, the returned error is passed to the configured JSErrorHandler[I] which decides whether the message is Acked, Nacked, Term'd, or whether the consumer should stop. Without a custom handler the default is Nak (redeliver after AckWait).
type PublishOption ¶
type PublishOption func(*PublishOpts)
PublishOption configures a single Publish/PublishJSON call.
func WithHeaders ¶
func WithHeaders(h nats.Header) PublishOption
WithHeaders attaches NATS headers to the published message.
func WithReply ¶
func WithReply(subject string) PublishOption
WithReply sets the reply subject for request-reply patterns.
type PublishOpts ¶
type PublishOpts struct {
// Headers are NATS message headers (NATS 2.x+).
Headers nats.Header
// Reply is an optional reply subject for request-reply patterns.
Reply string
// contains filtered or unexported fields
}
PublishOpts captures per-publish Core NATS message metadata. Headers are optional; an empty Reply suppresses the reply subject.
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher publishes messages to Core NATS subjects or JetStream streams. Implements lifecycle.Resource.
Construct via NewPublisher. The publisher establishes its NATS connection (and optional JetStream context + stream declaration) eagerly in the constructor.
When constructed with WithOtel(), every Publish*/Publish*JS call opens a SpanKindProducer span and injects the trace context into the outgoing nats.Header via the global TextMapPropagator. The consumer-side span (Subscriber / JetStreamConsumer) extracts the context so the producer and consumer spans chain.
func NewPublisher ¶
NewPublisher creates a Publisher. By default it dials NATS using cfg; pass WithConnection to share an existing connection.
If WithPublisherStream is provided, the stream is verified or created/updated during the constructor — a non-nil error is returned on failure.
func (*Publisher) Conn ¶
Conn returns the underlying *nats.Conn. Use to construct additional subscribers/consumers that share this connection via WithConnection.
func (*Publisher) Flush ¶
Flush blocks until the server has processed all buffered messages or ctx expires. Call before Shutdown when using Core NATS publishing to avoid losing in-flight messages.
If ctx has no deadline, Flush applies a default 10s timeout — the underlying NATS client requires a bounded wait.
func (*Publisher) JetStream ¶
JetStream returns the underlying jetstream.JetStream context. Use for advanced flows (KV, ObjectStore, custom stream management).
func (*Publisher) Publish ¶
func (p *Publisher) Publish(ctx context.Context, subject string, data []byte, opts ...PublishOption) error
Publish sends raw bytes via Core NATS (at-most-once delivery). The message is buffered by the client and flushed asynchronously — call Flush before Shutdown if you must guarantee transmission.
ctx is only used for OTel span context; the underlying nats.Conn.PublishMsg call is non-blocking and does not honor cancellation.
func (*Publisher) PublishJS ¶
func (p *Publisher) PublishJS( ctx context.Context, subject string, data []byte, opts ...JSPublishOption, ) (*jetstream.PubAck, error)
PublishJS sends raw bytes via JetStream (at-least-once delivery). The returned PubAck contains the assigned stream sequence and duplicate flag.
func (*Publisher) PublishJSON ¶
func (p *Publisher) PublishJSON(ctx context.Context, subject string, message any, opts ...PublishOption) error
PublishJSON marshals message to JSON and publishes it via Core NATS. Sets the Content-Type header to application/json so the consumer-side default decoder accepts it without needing a custom marshaller.
func (*Publisher) PublishJSONJS ¶
func (p *Publisher) PublishJSONJS( ctx context.Context, subject string, message any, opts ...JSPublishOption, ) (*jetstream.PubAck, error)
PublishJSONJS marshals message to JSON and publishes it via JetStream. Sets Content-Type=application/json on the outgoing message.
func (*Publisher) Shutdown ¶
Shutdown flushes pending Core NATS messages and closes the underlying connection if it is owned by the Publisher. Honors ctx for the flush step; the close itself is non-blocking.
Flush runs regardless of connection ownership so a shared-connection Publisher does not silently drop its in-flight Core NATS sends. The connection is closed only when this Publisher owns it.
Idempotent — subsequent calls return the error captured by the first invocation (so callers can call Shutdown multiple times without losing the original flush failure).
type PublisherOption ¶
type PublisherOption interface {
// contains filtered or unexported methods
}
PublisherOption configures NewPublisher. Any CommonOption is also a PublisherOption.
func WithPublisherStream ¶
func WithPublisherStream(cfg jetstream.StreamConfig) PublisherOption
WithPublisherStream instructs NewPublisher to verify or create/update a JetStream stream with the given configuration at construction time. The constructor returns an error if the stream cannot be declared.
type Subscriber ¶
type Subscriber[I any] struct { // contains filtered or unexported fields }
Subscriber is a generic Core NATS subscriber (at-most-once delivery) that consumes messages from a subject or queue group and dispatches them to a Processor via a worker pool.
Implements lifecycle.Runner. Construct via NewSubscriber.
Subscription registration happens in Run (not the constructor) — matching the lifecycle model used by kafka.Consumer and rabbitmq. Use Ready() to block until the subscription is live (tests, smoke checks).
Core NATS is at-most-once: messages published to the subject before Run has registered the subscription are dropped by the broker. For at-least-once semantics use JetStreamConsumer.
func NewSubscriber ¶
func NewSubscriber[I any]( cfg Config, subject string, processor Processor[I], handlers SubscriberHandlers[I], opts ...SubscriberOption, ) (*Subscriber[I], error)
NewSubscriber creates a Subscriber and eagerly dials NATS (unless WithConnection is provided). Returns an error if cfg is invalid or the connection cannot be established.
subject is required (the NATS subject filter to consume from). handlers carries optional payload-typed callbacks; pass a zero-value SubscriberHandlers[I]{} for defaults.
By default the message body is JSON-unmarshalled and a Content-Type header of "application/json" (or empty) is enforced. Supply handlers.Marshaller for custom decoding and handlers.ErrorHandler to observe Processor failures.
func (*Subscriber[I]) Conn ¶
func (s *Subscriber[I]) Conn() *nats.Conn
Conn returns the underlying *nats.Conn. Useful for sharing the connection with a Publisher.
func (*Subscriber[I]) Ready ¶
func (s *Subscriber[I]) Ready() <-chan struct{}
Ready returns a channel that is closed once Run has registered the NATS subscription. Block on it to ensure publishes are not lost in the at-most-once gap between Run start and Subscribe completion (typical pattern in tests).
On a never-Run Subscriber, the channel never closes — pair with a timeout if you cannot guarantee Run is being called.
Ready() is re-armed on every Run cycle: after a Run returns, the next Run rotates the underlying channel so callers that block on Ready() see the new cycle's subscribe completion, not the previous one.
func (*Subscriber[I]) Run ¶
func (s *Subscriber[I]) Run(ctx context.Context) error
Run subscribes to the configured subject (optionally in a queue group), dispatches messages to a worker pool, and blocks until ctx is cancelled.
Returns ErrAlreadyRunning if called while an earlier Run is still active. Returns nil on graceful ctx cancellation.
Implements lifecycle.Runner.
func (*Subscriber[I]) Shutdown ¶
func (s *Subscriber[I]) Shutdown(ctx context.Context) error
Shutdown signals Run to stop and waits for it to finish, bounded by ctx. On ctx expiry returns ctx.Err() — but the owned connection is still closed so the resource does not leak.
Implements lifecycle.Resource. Idempotent.
type SubscriberErrorHandler ¶
SubscriberErrorHandler is invoked when Core NATS message processing fails. Use it for metrics, DLQ side-effects, or structured logging. The returned value is ignored — Core NATS has no ack model.
type SubscriberHandlers ¶
type SubscriberHandlers[I any] struct { // Marshaller decodes raw message bytes into I. Nil = json.Unmarshal. Marshaller BodyMarshaller[I] // ErrorHandler is invoked when Processor.Process returns a non-nil // error. Nil = log only. ErrorHandler SubscriberErrorHandler[I] }
SubscriberHandlers carries payload-typed callbacks for Subscriber. Pass to NewSubscriber. Zero value is valid: Marshaller=nil means default JSON decoding, ErrorHandler=nil means processor errors are only logged.
type SubscriberOption ¶
type SubscriberOption interface {
// contains filtered or unexported methods
}
SubscriberOption configures NewSubscriber. Any CommonOption is also a SubscriberOption.
Payload-typed knobs (Marshaller, ErrorHandler) live on SubscriberHandlers, passed as a positional argument to NewSubscriber.
func WithSubscriberChannelBuffer ¶
func WithSubscriberChannelBuffer(n int) SubscriberOption
WithSubscriberChannelBuffer sets the capacity of the internal ChanSubscribe channel used between the NATS client reader and the worker pool. Default 256.
Tune up for high-throughput Subscribers if you see SlowConsumer warnings in the async error log (raises memory usage during bursts). Values < 1 fall back to the default.
func WithSubscriberQueueGroup ¶
func WithSubscriberQueueGroup(group string) SubscriberOption
WithSubscriberQueueGroup configures a queue group for Core NATS subscriber load balancing. Subscribers in the same queue group share message load.
func WithSubscriberWorkerCount ¶
func WithSubscriberWorkerCount(n int) SubscriberOption
WithSubscriberWorkerCount sets the number of concurrent goroutines processing deliveries. Default 1 (ordered processing). Values < 1 are normalized to 1.
Raising above 1 increases throughput at the cost of losing per-subject processing order.