nats

package module
v0.17.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2024 License: MIT Imports: 14 Imported by: 0

README

helix.go - NATS JetStream integration

Website Go API reference Go Report Card GitHub Release

The NATS JetStream integration provides an opinionated way to interact with NATS JetStream for helix services. It uses JetStream only for distributed key-value store and higher Quality of Service (QoS).

Documentation

Overview

Package nats exposes an opinionated way to interact with NATS JetStream. It comes with a strong association with OpenTelemetry for distributed tracing and automatic error recording.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {

	// Addresses are NATS addresses to connect to. A URL can contain username and
	// password, such as:
	//
	//   "nats://derek:pass@localhost:4222"
	//
	// Default:
	//
	//   []string{"nats://localhost:4222"}
	Addresses []string `json:"addresses"`

	// TLSConfig configures TLS to communicate with the NATS server.
	TLS integration.ConfigTLS `json:"tls"`
}

Config is used to configure the NATS integration.

type Consumer

type Consumer interface {
	Fetch(ctx context.Context, batch int, opts ...jetstream.FetchOpt) (jetstream.MessageBatch, error)
	FetchBytes(ctx context.Context, maxBytes int, opts ...jetstream.FetchOpt) (jetstream.MessageBatch, error)
	FetchNoWait(ctx context.Context, batch int) (jetstream.MessageBatch, error)
	Consume(ctx context.Context, handler MsgHandler, opts ...jetstream.PullConsumeOpt) (jetstream.ConsumeContext, error)
	Messages(ctx context.Context, opts ...jetstream.PullMessagesOpt) (MessagesContext, error)
}

Consumer exposes an opinionated way to interact with NATS JetStream consumer capabilities.

type Header = nats.Header

Header is an alias to nats.Header. This avoids to import the nats package and therefore avoids naming conflict as much as possible.

type JetStream

type JetStream interface {
	Publish(ctx context.Context, msg *Msg, opts ...jetstream.PublishOpt) (*jetstream.PubAck, error)
	PublishAsync(ctx context.Context, msg *Msg, opts ...jetstream.PublishOpt) (jetstream.PubAckFuture, error)
	PublishAsyncPending(ctx context.Context) int
	PublishAsyncComplete(ctx context.Context) <-chan struct{}

	Stream(ctx context.Context, streamname string) (Stream, error)
	CreateOrUpdateStream(ctx context.Context, config jetstream.StreamConfig) (Stream, error)
	DeleteStream(ctx context.Context, streamname string) error

	Consumer(ctx context.Context, streamname string, consumername string) (Consumer, error)
	CreateOrUpdateConsumer(ctx context.Context, streamname string, config jetstream.ConsumerConfig) (Consumer, error)
	OrderedConsumer(ctx context.Context, streamname string, config jetstream.OrderedConsumerConfig) (Consumer, error)
	DeleteConsumer(ctx context.Context, streamname string, consumername string) error

	KeyValue(ctx context.Context, bucket string) (KeyValue, error)
	CreateOrUpdateKeyValue(ctx context.Context, config jetstream.KeyValueConfig) (KeyValue, error)
	DeleteKeyValue(ctx context.Context, bucket string) error
}

JetStream exposes an opinionated way to interact with NATS JetStream.

func Connect

func Connect(cfg Config) (JetStream, error)

Connect tries to connect to the NATS server given the Config. Returns an error if Config is not valid or if the connection failed.

type KeyValue

type KeyValue interface {
	Bucket(ctx context.Context) string
	Get(ctx context.Context, key string) (jetstream.KeyValueEntry, error)
	GetRevision(ctx context.Context, key string, revision uint64) (jetstream.KeyValueEntry, error)
	Create(ctx context.Context, key string, value []byte) (uint64, error)
	Put(ctx context.Context, key string, value []byte) (uint64, error)
	Update(ctx context.Context, key string, value []byte, revision uint64) (uint64, error)
	Delete(ctx context.Context, key string, opts ...jetstream.KVDeleteOpt) error
	Purge(ctx context.Context, key string, opts ...jetstream.KVDeleteOpt) error
	PurgeDeletes(ctx context.Context, opts ...jetstream.KVPurgeOpt) error
	Watch(ctx context.Context, keys string, opts ...jetstream.WatchOpt) (jetstream.KeyWatcher, error)
	WatchAll(ctx context.Context, opts ...jetstream.WatchOpt) (jetstream.KeyWatcher, error)
	Keys(ctx context.Context, opts ...jetstream.WatchOpt) ([]string, error)
	History(ctx context.Context, key string, opts ...jetstream.WatchOpt) ([]jetstream.KeyValueEntry, error)
	Status(ctx context.Context) (jetstream.KeyValueStatus, error)
}

KeyValue exposes an opinionated way to interact with a NATS JetStream key-value store. All functions are wrapped with a context because some of them automatically do distributed tracing (by using the said context) as well as error recording within traces.

type MessagesContext

type MessagesContext interface {
	Next(ctx context.Context) (context.Context, jetstream.Msg, error)
	Stop(ctx context.Context)
}

MessagesContext exposes an opinionated way to interact with a NATS JetStream messages' iterator.

type Msg

type Msg = nats.Msg

Msg is an alias to nats.Msg. This avoids to import the nats package and therefore avoids naming conflict as much as possible.

type MsgHandler

type MsgHandler func(ctx context.Context, msg jetstream.Msg)

MsgHandler is like jetstream.MessageHandler but allows to pass a context for leveraging automatic and distributed tracing with OpenTelemetry.

type Stream

type Stream interface {
	CreateOrUpdateConsumer(ctx context.Context, config jetstream.ConsumerConfig) (Consumer, error)
	OrderedConsumer(ctx context.Context, config jetstream.OrderedConsumerConfig) (Consumer, error)
	Consumer(ctx context.Context, consumername string) (Consumer, error)
	DeleteConsumer(ctx context.Context, consumername string) error
	Purge(ctx context.Context, opts ...jetstream.StreamPurgeOpt) error
	GetMsg(ctx context.Context, seq uint64, opts ...jetstream.GetMsgOpt) (*jetstream.RawStreamMsg, error)
	GetLastMsgForSubject(ctx context.Context, subject string) (*jetstream.RawStreamMsg, error)
	DeleteMsg(ctx context.Context, seq uint64) error
	SecureDeleteMsg(ctx context.Context, seq uint64) error
}

Stream exposes an opinionated way to interact with NATS JetStream stream management capabilities.

type Subscription

type Subscription = nats.Subscription

Subscription is an alias to nats.Subscription. This avoids to import the nats package and therefore avoids naming conflict as much as possible.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL