natsjs

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2025 License: Apache-2.0 Imports: 14 Imported by: 0

README

kvstore/natsjs

natsjs is a kvstore implementation for go-orb that uses nats with JetStream as the backend.

It's a port of the store/nats-js plugin for go-micro to go-orb.

This plugins supports the kvstore.Watcher interface!

Configuration

The NATS JetStream KV store can be configured with the following options:

Option Description Default
Servers List of NATS server addresses ["nats://localhost:4222"]
Database Default bucket name "default"
Table Default table name ""
Timeout Connection timeout 2s
KeyEncoding Transparent key encoding "base32"
BucketPerTable Create separate bucket per table true
JSONKeyValues Store values as JSON false
Compression Enable compression for values false

There are many more configuration options available, look at the NatsOptions struct for more details.

Note: Setting BucketPerTable or JSONKeyValues or Compression to true will break compatibility with the go-micro plugin and its data layout.

Compatibility to "store/nats-js"

The plugin is compatible to the "store/nats-js" plugin for go-micro as long as you don't disable JSONKeyValues or enable BucketPerTable.

The compatiblity is ensured by tests in the kvstore/natsjs_micro_tests directory.

Warning

nats doesn't support per record TTL, so the TTL option per record is ignored.

Previous Authors

Authors

License

This plugin is Apache 2.0 licensed.

Documentation

Overview

Package natsjs provides the nats kvstore client for go-orb.

Index

Constants

View Source
const (
	DefaultBucketDescription = "KeyValue storage administered by go-orb"
	DefaultDatabase          = "default"
	DefaultTable             = ""
	DefaultKeyEncoding       = "base32"
	DefaultBucketPerTable    = true
	DefaultJSONKeyValues     = false
)

Defaults.

View Source
const Name = "natsjs"

Name provides the name of this kvstore client.

Variables

This section is empty.

Functions

func Provide

func Provide(
	configData map[string]any,
	logger log.Logger,
	opts ...kvstore.Option,
) (kvstore.Type, error)

Provide creates a new NatsJS KVStore client.

func WithAllowReconnect

func WithAllowReconnect(allowReconnect bool) kvstore.Option

WithAllowReconnect enables reconnection logic when disconnected from the server.

func WithBucketDescription

func WithBucketDescription(description string) kvstore.Option

WithBucketDescription sets the description for the default bucket.

func WithBucketPerTable

func WithBucketPerTable(bucketPerTable bool) kvstore.Option

WithBucketPerTable configures whether a separate bucket is created for each table.

func WithCompression

func WithCompression(compression bool) kvstore.Option

WithCompression enables compression for websocket connections.

func WithDrainTimeout

func WithDrainTimeout(drainTimeout time.Duration) kvstore.Option

WithDrainTimeout sets the timeout for a Drain Operation to complete.

func WithFlusherTimeout

func WithFlusherTimeout(flusherTimeout time.Duration) kvstore.Option

WithFlusherTimeout sets the maximum time to wait for write operations to complete.

func WithIgnoreAuthErrorAbort

func WithIgnoreAuthErrorAbort(ignoreAuthErrorAbort bool) kvstore.Option

WithIgnoreAuthErrorAbort opts out of aborting reconnect attempts on repeated auth errors.

func WithInboxPrefix

func WithInboxPrefix(inboxPrefix string) kvstore.Option

WithInboxPrefix allows customizing the default _INBOX prefix.

func WithJSONKeyValues

func WithJSONKeyValues(jsonKeyValues bool) kvstore.Option

WithJSONKeyValues configures whether to store key values as JSON.

func WithKeyEncoding

func WithKeyEncoding(keyEncoding string) kvstore.Option

WithKeyEncoding sets the encoding used for keys, set to empty string for no encoding.

func WithMaxPingsOut

func WithMaxPingsOut(maxPingsOut int) kvstore.Option

WithMaxPingsOut sets the maximum number of pending ping commands before raising an error.

func WithMaxReconnect

func WithMaxReconnect(maxReconnect int) kvstore.Option

WithMaxReconnect sets the number of reconnect attempts before giving up.

func WithNkey

func WithNkey(nkey string) kvstore.Option

WithNkey sets the public nkey for authentication.

func WithNoCallbacksAfterClientClose

func WithNoCallbacksAfterClientClose(noCallbacksAfterClientClose bool) kvstore.Option

WithNoCallbacksAfterClientClose prevents callbacks after Close() is called.

func WithNoRandomize

func WithNoRandomize(noRandomize bool) kvstore.Option

WithNoRandomize configures whether to randomize the server pool.

func WithPassword

func WithPassword(password string) kvstore.Option

WithPassword sets the password for authentication.

func WithPingInterval

func WithPingInterval(pingInterval time.Duration) kvstore.Option

WithPingInterval sets the period for sending ping commands to the server.

func WithProxyPath

func WithProxyPath(proxyPath string) kvstore.Option

WithProxyPath adds a path to connections URL for websocket connections.

func WithReconnectBufSize

func WithReconnectBufSize(reconnectBufSize int) kvstore.Option

WithReconnectBufSize sets the size of the backing bufio during reconnect.

func WithReconnectJitter

func WithReconnectJitter(reconnectJitter time.Duration) kvstore.Option

WithReconnectJitter sets the upper bound for random delay added to ReconnectWait.

func WithReconnectJitterTLS

func WithReconnectJitterTLS(reconnectJitterTLS time.Duration) kvstore.Option

WithReconnectJitterTLS sets the upper bound for random delay added to ReconnectWait when TLS is used.

func WithReconnectWait

func WithReconnectWait(reconnectWait time.Duration) kvstore.Option

WithReconnectWait sets the time to backoff after attempting a reconnect.

func WithRetryOnFailedConnect

func WithRetryOnFailedConnect(retryOnFailedConnect bool) kvstore.Option

WithRetryOnFailedConnect sets the connection in reconnecting state if it can't connect initially.

func WithServers

func WithServers(servers []string) kvstore.Option

WithServers sets the list of NATS servers to connect to.

func WithSkipHostLookup

func WithSkipHostLookup(skipHostLookup bool) kvstore.Option

WithSkipHostLookup skips the DNS lookup for the server hostname.

func WithSubChanLen

func WithSubChanLen(subChanLen int) kvstore.Option

WithSubChanLen sets the size of the buffered channel used for SyncSubscriptions.

func WithTimeout

func WithTimeout(timeout time.Duration) kvstore.Option

WithTimeout sets the timeout for a Dial operation on a connection.

func WithToken

func WithToken(token string) kvstore.Option

WithToken sets the token for authentication.

func WithURL

func WithURL(url string) kvstore.Option

WithURL sets the URL of the NATS server.

func WithUseOldRequestStyle

func WithUseOldRequestStyle(useOldRequestStyle bool) kvstore.Option

WithUseOldRequestStyle forces the old method of Requests.

func WithUser

func WithUser(user string) kvstore.Option

WithUser sets the username for authentication.

Types

type Config

type Config struct {
	kvstore.Config `yaml:",inline"`

	NatsOptions `yaml:",inline"`

	// BucketDescription configures the description for the each bucket.
	// Default: "KeyValue storage administered by go-orb"
	BucketDescription string `json:"bucketDescription,omitempty" yaml:"bucketDescription,omitempty"`

	// KeyEncoding configures the encoding used for keys, set it to an empty string for no encoding.
	// Default: base32
	KeyEncoding string `json:"keyEncoding,omitempty" yaml:"keyEncoding,omitempty"`

	// BucketPerTable configures whether a separate bucket is created for each table.
	// If false, all tables are stored in the same bucket.
	// Default: true
	// Deprecated: Disable this only if you need backwards compatibility.
	BucketPerTable bool `json:"bucketPerTable,omitempty" yaml:"bucketPerTable,omitempty"`

	// JSONKeyValues configures whether the key values are encoded again as JSON.
	// Default: false
	// Deprecated: Enable this only if you need backwards compatibility.
	JSONKeyValues bool `json:"jsonKeyValues,omitempty" yaml:"jsonKeyValues,omitempty"`
}

Config provides configuration for the NATS registry.

func NewConfig

func NewConfig(opts ...kvstore.Option) Config

NewConfig creates a new config object with default options.

func (*Config) ApplyOptions

func (c *Config) ApplyOptions(opts ...kvstore.Option)

ApplyOptions applies a set of options to the config.

type NatsJS

type NatsJS struct {
	// contains filtered or unexported fields
}

NatsJS implements the kvstore.KVStore interface using NATS JetStream.

func New

func New(cfg Config, log log.Logger) (*NatsJS, error)

New creates a new NATS JetStream KVStore. This function should rarely be called manually. To create a new KVStore use Provide.

func (*NatsJS) Delete

func (n *NatsJS) Delete(key string, opts ...kvstore.DeleteOption) error

Delete removes the record with the corresponding key from the store. Deprecated: use Remove instead.

func (*NatsJS) DropDatabase

func (n *NatsJS) DropDatabase(ctx context.Context, database string) error

DropDatabase drops the database.

func (*NatsJS) DropTable

func (n *NatsJS) DropTable(ctx context.Context, database, table string) error

DropTable drops the table.

func (*NatsJS) Get

func (n *NatsJS) Get(ctx context.Context, key, database, table string, _ ...kvstore.GetOption) ([]kvstore.Record, error)

Get takes a key, database, table and optional GetOptions. It returns the Record or an error.

func (*NatsJS) Keys

func (n *NatsJS) Keys(ctx context.Context, database, table string, opts ...kvstore.KeysOption) ([]string, error)

Keys returns any keys that match, or an empty list with no error if none matched.

func (*NatsJS) List

func (n *NatsJS) List(opts ...kvstore.ListOption) ([]string, error)

List returns any keys that match, or an empty list with no error if none matched. Deprecated: use Keys instead.

func (*NatsJS) Purge

func (n *NatsJS) Purge(ctx context.Context, key, database, table string) error

Purge takes a key, database and table and purges it.

func (*NatsJS) Read

func (n *NatsJS) Read(key string, opts ...kvstore.ReadOption) ([]*kvstore.Record, error)

Read takes a single key and optional ReadOptions. It returns matching []*Record or an error. Deprecated: use Get instead.

func (*NatsJS) Set

func (n *NatsJS) Set(ctx context.Context, key, database, table string, data []byte, _ ...kvstore.SetOption) error

Set takes a key, database, table and data, and optional SetOptions.

func (*NatsJS) Start

func (n *NatsJS) Start(ctx context.Context) error

Start initializes the connection to NATS JetStream.

func (*NatsJS) Stop

func (n *NatsJS) Stop(_ context.Context) error

Stop closes the connection to NATS JetStream.

func (*NatsJS) String

func (n *NatsJS) String() string

String returns the plugin name.

func (*NatsJS) Type

func (n *NatsJS) Type() string

Type returns the component type.

func (*NatsJS) Watch

func (n *NatsJS) Watch(
	ctx context.Context,
	database,
	table string,
	opts ...kvstore.WatchOption,
) (<-chan kvstore.WatchEvent, func() error, error)

Watch exposes the watcher interface from the underlying JetStreamContext.

func (*NatsJS) Write

func (n *NatsJS) Write(r *kvstore.Record, opts ...kvstore.WriteOption) error

Write takes a single key and value, and optional WriteOptions. Deprecated: use Set instead.

type NatsOptions

type NatsOptions struct {
	// URL represents a single NATS server url to which the client
	// will be connecting. If the Servers option is also set, it
	// then becomes the first server in the Servers array.
	URL string `json:"url,omitempty" yaml:"url,omitempty"`

	// InProcessServer represents a NATS server running within the
	// same process. If this is set then we will attempt to connect
	// to the server directly rather than using external TCP conns.
	InProcessServer nats.InProcessConnProvider `json:"-" yaml:"-"`

	// Servers is a configured set of servers which this client
	// will use when attempting to connect.
	Servers []string `json:"servers,omitempty" yaml:"servers,omitempty"`

	// NoRandomize configures whether we will randomize the
	// server pool.
	NoRandomize bool `json:"noRandomize,omitempty" yaml:"noRandomize,omitempty"`

	// AllowReconnect enables reconnection logic to be used when we
	// encounter a disconnect from the current server.
	AllowReconnect bool `json:"allowReconnect,omitempty" yaml:"allowReconnect,omitempty"`

	// MaxReconnect sets the number of reconnect attempts that will be
	// tried before giving up. If negative, then it will never give up
	// trying to reconnect.
	// Defaults to 60.
	MaxReconnect int `json:"maxReconnect,omitempty" yaml:"maxReconnect,omitempty"`

	// ReconnectWait sets the time to backoff after attempting a reconnect
	// to a server that we were already connected to previously.
	// Defaults to 2s.
	ReconnectWait config.Duration `json:"reconnectWait,omitempty" yaml:"reconnectWait,omitempty"`

	// CustomReconnectDelayCB is invoked after the library tried every
	// URL in the server list and failed to reconnect. It passes to the
	// user the current number of attempts. This function returns the
	// amount of time the library will sleep before attempting to reconnect
	// again. It is strongly recommended that this value contains some
	// jitter to prevent all connections to attempt reconnecting at the same time.
	CustomReconnectDelayCB nats.ReconnectDelayHandler `json:"-" yaml:"-"`

	// ReconnectJitter sets the upper bound for a random delay added to
	// ReconnectWait during a reconnect when no TLS is used.
	// Defaults to 100ms.
	ReconnectJitter config.Duration `json:"reconnectJitter,omitempty" yaml:"reconnectJitter,omitempty"`

	// ReconnectJitterTLS sets the upper bound for a random delay added to
	// ReconnectWait during a reconnect when TLS is used.
	// Defaults to 1s.
	ReconnectJitterTLS config.Duration `json:"reconnectJitterTls,omitempty" yaml:"reconnectJitterTls,omitempty"`

	// Timeout sets the timeout for a Dial operation on a connection.
	// Defaults to 2s.
	Timeout config.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"`

	// DrainTimeout sets the timeout for a Drain Operation to complete.
	// Defaults to 30s.
	DrainTimeout config.Duration `json:"drainTimeout,omitempty" yaml:"drainTimeout,omitempty"`

	// FlusherTimeout is the maximum time to wait for write operations
	// to the underlying connection to complete (including the flusher loop).
	// Defaults to 1m.
	FlusherTimeout config.Duration `json:"flusherTimeout,omitempty" yaml:"flusherTimeout,omitempty"`

	// PingInterval is the period at which the client will be sending ping
	// commands to the server, disabled if 0 or negative.
	// Defaults to 2m.
	PingInterval config.Duration `json:"pingInterval,omitempty" yaml:"pingInterval,omitempty"`

	// MaxPingsOut is the maximum number of pending ping commands that can
	// be awaiting a response before raising an ErrStaleConnection error.
	// Defaults to 2.
	MaxPingsOut int `json:"maxPingsOut,omitempty" yaml:"maxPingsOut,omitempty"`

	// ClosedCB sets the closed handler that is called when a client will
	// no longer be connected.
	ClosedCB nats.ConnHandler `json:"-" yaml:"-"`

	// DisconnectedErrCB sets the disconnected error handler that is called
	// whenever the connection is disconnected.
	// Disconnected error could be nil, for instance when user explicitly closes the connection.
	// DisconnectedCB will not be called if DisconnectedErrCB is set
	DisconnectedErrCB nats.ConnErrHandler `json:"-" yaml:"-"`

	// ConnectedCB sets the connected handler called when the initial connection
	// is established. It is not invoked on successful reconnects - for reconnections,
	// use ReconnectedCB. ConnectedCB can be used in conjunction with RetryOnFailedConnect
	// to detect whether the initial connect was successful.
	ConnectedCB nats.ConnHandler `json:"-" yaml:"-"`

	// ReconnectedCB sets the reconnected handler called whenever
	// the connection is successfully reconnected.
	ReconnectedCB nats.ConnHandler `json:"-" yaml:"-"`

	// DiscoveredServersCB sets the callback that is invoked whenever a new
	// server has joined the cluster.
	DiscoveredServersCB nats.ConnHandler `json:"-" yaml:"-"`

	// AsyncErrorCB sets the async error handler (e.g. slow consumer errors)
	AsyncErrorCB nats.ErrHandler `json:"-" yaml:"-"`

	// ReconnectBufSize is the size of the backing bufio during reconnect.
	// Once this has been exhausted publish operations will return an error.
	// Defaults to 8388608 bytes (8MB).
	ReconnectBufSize int `json:"reconnectBufSize,omitempty" yaml:"reconnectBufSize,omitempty"`

	// SubChanLen is the size of the buffered channel used between the socket
	// Go routine and the message delivery for SyncSubscriptions.
	// NOTE: This does not affect AsyncSubscriptions which are
	// dictated by PendingLimits()
	// Defaults to 65536.
	SubChanLen int `json:"subChanLen,omitempty" yaml:"subChanLen,omitempty"`

	// UserJWT sets the callback handler that will fetch a user's JWT.
	UserJWT nats.UserJWTHandler `json:"-" yaml:"-"`

	// Nkey sets the public nkey that will be used to authenticate
	// when connecting to the server. UserJWT and Nkey are mutually exclusive
	// and if defined, UserJWT will take precedence.
	Nkey string `json:"nkey,omitempty" yaml:"nkey,omitempty"`

	// SignatureCB designates the function used to sign the nonce
	// presented from the server.
	SignatureCB nats.SignatureHandler `json:"-" yaml:"-"`

	// User sets the username to be used when connecting to the server.
	User string `json:"user,omitempty" yaml:"user,omitempty"`

	// Password sets the password to be used when connecting to a server.
	Password string `json:"password,omitempty" yaml:"password,omitempty"`

	// Token sets the token to be used when connecting to a server.
	Token string `json:"token,omitempty" yaml:"token,omitempty"`

	// TokenHandler designates the function used to generate the token to be used when connecting to a server.
	TokenHandler nats.AuthTokenHandler `json:"-" yaml:"-"`

	// CustomDialer allows to specify a custom dialer (not necessarily
	// a *net.Dialer).
	CustomDialer nats.CustomDialer `json:"-" yaml:"-"`

	// UseOldRequestStyle forces the old method of Requests that utilize
	// a new Inbox and a new Subscription for each request.
	UseOldRequestStyle bool `json:"useOldRequestStyle,omitempty" yaml:"useOldRequestStyle,omitempty"`

	// NoCallbacksAfterClientClose allows preventing the invocation of
	// callbacks after Close() is called. Client won't receive notifications
	// when Close is invoked by user code. Default is to invoke the callbacks.
	NoCallbacksAfterClientClose bool `json:"noCallbacksAfterClientClose,omitempty" yaml:"noCallbacksAfterClientClose,omitempty"`

	// LameDuckModeHandler sets the callback to invoke when the server notifies
	// the connection that it entered lame duck mode, that is, going to
	// gradually disconnect all its connections before shutting down. This is
	// often used in deployments when upgrading NATS Servers.
	LameDuckModeHandler nats.ConnHandler `json:"-" yaml:"-"`

	// RetryOnFailedConnect sets the connection in reconnecting state right
	// away if it can't connect to a server in the initial set. The
	// MaxReconnect and ReconnectWait options are used for this process,
	// similarly to when an established connection is disconnected.
	// If a ReconnectHandler is set, it will be invoked on the first
	// successful reconnect attempt (if the initial connect fails),
	// and if a ClosedHandler is set, it will be invoked if
	// it fails to connect (after exhausting the MaxReconnect attempts).
	RetryOnFailedConnect bool `json:"retryOnFailedConnect,omitempty" yaml:"retryOnFailedConnect,omitempty"`

	// For websocket connections, indicates to the server that the connection
	// supports compression. If the server does too, then data will be compressed.
	Compression bool `json:"compression,omitempty" yaml:"compression,omitempty"`

	// For websocket connections, adds a path to connections url.
	// This is useful when connecting to NATS behind a proxy.
	ProxyPath string `json:"proxyPath,omitempty" yaml:"proxyPath,omitempty"`

	// InboxPrefix allows the default _INBOX prefix to be customized
	InboxPrefix string `json:"inboxPrefix,omitempty" yaml:"inboxPrefix,omitempty"`

	// IgnoreAuthErrorAbort - if set to true, client opts out of the default connect behavior of aborting
	// subsequent reconnect attempts if server returns the same auth error twice (regardless of reconnect policy).
	IgnoreAuthErrorAbort bool `json:"ignoreAuthErrorAbort,omitempty" yaml:"ignoreAuthErrorAbort,omitempty"`

	// SkipHostLookup skips the DNS lookup for the server hostname.
	SkipHostLookup bool `json:"skipHostLookup,omitempty" yaml:"skipHostLookup,omitempty"`
}

NatsOptions can be used to create a customized connection.

func (NatsOptions) ToOptions

func (o NatsOptions) ToOptions() nats.Options

ToOptions converts the NatsOptions to nats.Options.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL