Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewJetstream ¶
func NewJetstream(config common.NATSConfig) (jetstream.JetStream, error)
Types ¶
type ManagedConsumer ¶
type ManagedConsumer struct {
// contains filtered or unexported fields
}
ManagedConsumer manages a JetStream consumer with automatic retry on transient failures.
Encapsulates consumer setup, iterator lifecycle, and cleanup. Retries iterator failures automatically while propagating handler errors immediately.
func NewManagedConsumer ¶
func NewManagedConsumer(logger *zap.Logger, pool *Pool, cfg ManagedConsumerConfig) *ManagedConsumer
func (*ManagedConsumer) ConsumeStream ¶
func (c *ManagedConsumer) ConsumeStream(ctx context.Context) error
ConsumeStream runs a long-lived JetStream consumer that processes messages from a stream.
When the pool is closed, the consumer's context is automatically canceled, causing ConsumeStream to exit and return ctx.Err().
Returns:
- ctx.Err() if context is canceled (graceful shutdown)
- handler error if handler returns an error (no retry)
- other errors for non-transient failures
Transient errors (iterator failures) are retried automatically.
NATS may redeliver messages if handlers are slow or fail to acknowledge within AckWait. Handlers should be idempotent.
type ManagedConsumerConfig ¶
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool manages a pool of NATS connections with automatic replenishment.
Connections are monitored reactively: when Put detects a permanently closed connection (via IsClosed), it triggers replenishment. Connections in RECONNECTING state remain in the pool to leverage NATS auto-reconnect.
func (*Pool) Close ¶
func (p *Pool) Close()
Close shuts down the pool and drains all connections gracefully.
func (*Pool) Done ¶
func (p *Pool) Done() <-chan struct{}
Done returns a channel that's closed when the pool is closed.
func (*Pool) Get ¶
func (p *Pool) Get(ctx context.Context) (*PooledConnection, error)
Get returns a PooledConnection, blocking until one is available or the context is done.
Use this for short-lived, bursty operations where brief waiting is acceptable and backpressure is preferred over immediate failure.
func (*Pool) Put ¶
func (p *Pool) Put(pc *PooledConnection)
Put returns a connection to the pool. If the connection is closed, triggers replenishment automatically.
Safe to call after Close, but callers should avoid races between Put and Close to prevent potential connection leaks. Coordinate shutdown using WaitGroups or context cancellation.
type StreamManager ¶
type StreamManager struct {
CurrentReplicaID int
ReplicaCount int
TotalStreamCount int
ShardMap map[int][]int // replicaID -> assigned shards
}
func NewStreamManager ¶
func NewStreamManager(replicaID int, replicaCount int, totalStreamCount int) *StreamManager
func (*StreamManager) AssignedStreams ¶
func (s *StreamManager) AssignedStreams() []int