Documentation ¶
Overview ¶
Package nats exposes an opinionated way to interact with NATS JetStream. It comes with a strong association with OpenTelemetry for distributed tracing and automatic error recording.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { // Addresses are NATS addresses to connect to. A URL can contain username and // password, such as: // // "nats://derek:pass@localhost:4222" // // Default: // // []string{"nats://localhost:4222"} Addresses []string `json:"addresses"` // TLSConfig configures TLS to communicate with the NATS server. TLS integration.ConfigTLS `json:"tls"` }
Config is used to configure the NATS integration.
type Consumer ¶
type Consumer interface { Fetch(ctx context.Context, batch int, opts ...jetstream.FetchOpt) (jetstream.MessageBatch, error) FetchBytes(ctx context.Context, maxBytes int, opts ...jetstream.FetchOpt) (jetstream.MessageBatch, error) FetchNoWait(ctx context.Context, batch int) (jetstream.MessageBatch, error) Consume(ctx context.Context, handler MsgHandler, opts ...jetstream.PullConsumeOpt) (jetstream.ConsumeContext, error) Messages(ctx context.Context, opts ...jetstream.PullMessagesOpt) (MessagesContext, error) }
Consumer exposes an opinionated way to interact with NATS JetStream consumer capabilities.
type Header ¶
type Header = nats.Header
Header is an alias to nats.Header. This avoids to import the nats package and therefore avoids naming conflict as much as possible.
type JetStream ¶
type JetStream interface { Publish(ctx context.Context, msg *Msg, opts ...jetstream.PublishOpt) (*jetstream.PubAck, error) PublishAsync(ctx context.Context, msg *Msg, opts ...jetstream.PublishOpt) (jetstream.PubAckFuture, error) PublishAsyncPending(ctx context.Context) int PublishAsyncComplete(ctx context.Context) <-chan struct{} Stream(ctx context.Context, streamname string) (Stream, error) CreateOrUpdateStream(ctx context.Context, config jetstream.StreamConfig) (Stream, error) DeleteStream(ctx context.Context, streamname string) error Consumer(ctx context.Context, streamname string, consumername string) (Consumer, error) CreateOrUpdateConsumer(ctx context.Context, streamname string, config jetstream.ConsumerConfig) (Consumer, error) OrderedConsumer(ctx context.Context, streamname string, config jetstream.OrderedConsumerConfig) (Consumer, error) DeleteConsumer(ctx context.Context, streamname string, consumername string) error KeyValue(ctx context.Context, bucket string) (KeyValue, error) CreateOrUpdateKeyValue(ctx context.Context, config jetstream.KeyValueConfig) (KeyValue, error) DeleteKeyValue(ctx context.Context, bucket string) error }
JetStream exposes an opinionated way to interact with NATS JetStream.
type KeyValue ¶
type KeyValue interface { Bucket(ctx context.Context) string Get(ctx context.Context, key string) (jetstream.KeyValueEntry, error) GetRevision(ctx context.Context, key string, revision uint64) (jetstream.KeyValueEntry, error) Create(ctx context.Context, key string, value []byte) (uint64, error) Put(ctx context.Context, key string, value []byte) (uint64, error) Update(ctx context.Context, key string, value []byte, revision uint64) (uint64, error) Delete(ctx context.Context, key string, opts ...jetstream.KVDeleteOpt) error Purge(ctx context.Context, key string, opts ...jetstream.KVDeleteOpt) error PurgeDeletes(ctx context.Context, opts ...jetstream.KVPurgeOpt) error Watch(ctx context.Context, keys string, opts ...jetstream.WatchOpt) (jetstream.KeyWatcher, error) WatchAll(ctx context.Context, opts ...jetstream.WatchOpt) (jetstream.KeyWatcher, error) Keys(ctx context.Context, opts ...jetstream.WatchOpt) ([]string, error) History(ctx context.Context, key string, opts ...jetstream.WatchOpt) ([]jetstream.KeyValueEntry, error) Status(ctx context.Context) (jetstream.KeyValueStatus, error) }
KeyValue exposes an opinionated way to interact with a NATS JetStream key-value store. All functions are wrapped with a context because some of them automatically do distributed tracing (by using the said context) as well as error recording within traces.
type MessagesContext ¶
type MessagesContext interface { Next(ctx context.Context) (context.Context, jetstream.Msg, error) Stop(ctx context.Context) }
MessagesContext exposes an opinionated way to interact with a NATS JetStream messages' iterator.
type Msg ¶
type Msg = nats.Msg
Msg is an alias to nats.Msg. This avoids to import the nats package and therefore avoids naming conflict as much as possible.
type MsgHandler ¶
MsgHandler is like jetstream.MessageHandler but allows to pass a context for leveraging automatic and distributed tracing with OpenTelemetry.
type Stream ¶
type Stream interface { CreateOrUpdateConsumer(ctx context.Context, config jetstream.ConsumerConfig) (Consumer, error) OrderedConsumer(ctx context.Context, config jetstream.OrderedConsumerConfig) (Consumer, error) Consumer(ctx context.Context, consumername string) (Consumer, error) DeleteConsumer(ctx context.Context, consumername string) error Purge(ctx context.Context, opts ...jetstream.StreamPurgeOpt) error GetMsg(ctx context.Context, seq uint64, opts ...jetstream.GetMsgOpt) (*jetstream.RawStreamMsg, error) GetLastMsgForSubject(ctx context.Context, subject string) (*jetstream.RawStreamMsg, error) DeleteMsg(ctx context.Context, seq uint64) error SecureDeleteMsg(ctx context.Context, seq uint64) error }
Stream exposes an opinionated way to interact with NATS JetStream stream management capabilities.
type Subscription ¶
type Subscription = nats.Subscription
Subscription is an alias to nats.Subscription. This avoids to import the nats package and therefore avoids naming conflict as much as possible.