nats

package
v0.0.0-...-223355f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2026 License: Apache-2.0, MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewJetstream

func NewJetstream(config common.NATSConfig) (jetstream.JetStream, error)

func NewLocalServer

func NewLocalServer(opts *server.Options, timeout time.Duration) (*server.Server, error)

Types

type ManagedConsumer

type ManagedConsumer struct {
	// contains filtered or unexported fields
}

ManagedConsumer manages a JetStream consumer with automatic retry on transient failures.

Encapsulates consumer setup, iterator lifecycle, and cleanup. Retries iterator failures automatically while propagating handler errors immediately.

func NewManagedConsumer

func NewManagedConsumer(logger *zap.Logger, pool *Pool, cfg ManagedConsumerConfig) *ManagedConsumer

func (*ManagedConsumer) ConsumeStream

func (c *ManagedConsumer) ConsumeStream(ctx context.Context) error

ConsumeStream runs a long-lived JetStream consumer that processes messages from a stream.

When the pool is closed, the consumer's context is automatically canceled, causing ConsumeStream to exit and return ctx.Err().

Returns:

  • ctx.Err() if context is canceled (graceful shutdown)
  • handler error if handler returns an error (no retry)
  • other errors for non-transient failures

Transient errors (iterator failures) are retried automatically.

NATS may redeliver messages if handlers are slow or fail to acknowledge within AckWait. Handlers should be idempotent.

type ManagedConsumerConfig

type ManagedConsumerConfig struct {
	StreamName        string
	ConsumerConfig    jetstream.ConsumerConfig
	Handler           func(context.Context, jetstream.Msg) error // Handlers should be idempotent.
	RetryDelay        time.Duration
	PullMaxMessages   int
	ConnectionTimeout time.Duration
}

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool manages a pool of NATS connections with automatic replenishment.

Connections are monitored reactively: when Put detects a permanently closed connection (via IsClosed), it triggers replenishment. Connections in RECONNECTING state remain in the pool to leverage NATS auto-reconnect.

func NewPool

func NewPool(
	logger *zap.Logger,
	size int,
	addr string,
	replenishInterval time.Duration,
	opts ...nats.Option,
) (*Pool, error)

func (*Pool) Close

func (p *Pool) Close()

Close shuts down the pool and drains all connections gracefully.

func (*Pool) Done

func (p *Pool) Done() <-chan struct{}

Done returns a channel that's closed when the pool is closed.

func (*Pool) Get

func (p *Pool) Get(ctx context.Context) (*PooledConnection, error)

Get returns a PooledConnection, blocking until one is available or the context is done.

Use this for short-lived, bursty operations where brief waiting is acceptable and backpressure is preferred over immediate failure.

func (*Pool) Put

func (p *Pool) Put(pc *PooledConnection)

Put returns a connection to the pool. If the connection is closed, triggers replenishment automatically.

Safe to call after Close, but callers should avoid races between Put and Close to prevent potential connection leaks. Coordinate shutdown using WaitGroups or context cancellation.

type PooledConnection

type PooledConnection struct {
	Conn *nats.Conn
	JS   jetstream.JetStream
}

type StreamManager

type StreamManager struct {
	CurrentReplicaID int
	ReplicaCount     int
	TotalStreamCount int
	ShardMap         map[int][]int // replicaID -> assigned shards
}

func NewStreamManager

func NewStreamManager(replicaID int, replicaCount int, totalStreamCount int) *StreamManager

func (*StreamManager) AssignedStreams

func (s *StreamManager) AssignedStreams() []int

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL