Documentation
¶
Overview ¶
Package natsjs provides the nats kvstore client for go-orb.
Index ¶
- Constants
- func Provide(configData map[string]any, logger log.Logger, opts ...kvstore.Option) (kvstore.Type, error)
- func WithAllowReconnect(allowReconnect bool) kvstore.Option
- func WithBucketDescription(description string) kvstore.Option
- func WithBucketPerTable(bucketPerTable bool) kvstore.Option
- func WithCompression(compression bool) kvstore.Option
- func WithDrainTimeout(drainTimeout time.Duration) kvstore.Option
- func WithFlusherTimeout(flusherTimeout time.Duration) kvstore.Option
- func WithIgnoreAuthErrorAbort(ignoreAuthErrorAbort bool) kvstore.Option
- func WithInboxPrefix(inboxPrefix string) kvstore.Option
- func WithJSONKeyValues(jsonKeyValues bool) kvstore.Option
- func WithKeyEncoding(keyEncoding string) kvstore.Option
- func WithMaxPingsOut(maxPingsOut int) kvstore.Option
- func WithMaxReconnect(maxReconnect int) kvstore.Option
- func WithNkey(nkey string) kvstore.Option
- func WithNoCallbacksAfterClientClose(noCallbacksAfterClientClose bool) kvstore.Option
- func WithNoRandomize(noRandomize bool) kvstore.Option
- func WithPassword(password string) kvstore.Option
- func WithPingInterval(pingInterval time.Duration) kvstore.Option
- func WithProxyPath(proxyPath string) kvstore.Option
- func WithReconnectBufSize(reconnectBufSize int) kvstore.Option
- func WithReconnectJitter(reconnectJitter time.Duration) kvstore.Option
- func WithReconnectJitterTLS(reconnectJitterTLS time.Duration) kvstore.Option
- func WithReconnectWait(reconnectWait time.Duration) kvstore.Option
- func WithRetryOnFailedConnect(retryOnFailedConnect bool) kvstore.Option
- func WithServers(servers []string) kvstore.Option
- func WithSkipHostLookup(skipHostLookup bool) kvstore.Option
- func WithSubChanLen(subChanLen int) kvstore.Option
- func WithTimeout(timeout time.Duration) kvstore.Option
- func WithToken(token string) kvstore.Option
- func WithURL(url string) kvstore.Option
- func WithUseOldRequestStyle(useOldRequestStyle bool) kvstore.Option
- func WithUser(user string) kvstore.Option
- type Config
- type NatsJS
- func (n *NatsJS) Delete(key string, opts ...kvstore.DeleteOption) error
- func (n *NatsJS) DropDatabase(ctx context.Context, database string) error
- func (n *NatsJS) DropTable(ctx context.Context, database, table string) error
- func (n *NatsJS) Get(ctx context.Context, key, database, table string, _ ...kvstore.GetOption) ([]kvstore.Record, error)
- func (n *NatsJS) Keys(ctx context.Context, database, table string, opts ...kvstore.KeysOption) ([]string, error)
- func (n *NatsJS) List(opts ...kvstore.ListOption) ([]string, error)
- func (n *NatsJS) Purge(ctx context.Context, key, database, table string) error
- func (n *NatsJS) Read(key string, opts ...kvstore.ReadOption) ([]*kvstore.Record, error)
- func (n *NatsJS) Set(ctx context.Context, key, database, table string, data []byte, ...) error
- func (n *NatsJS) Start(ctx context.Context) error
- func (n *NatsJS) Stop(_ context.Context) error
- func (n *NatsJS) String() string
- func (n *NatsJS) Type() string
- func (n *NatsJS) Watch(ctx context.Context, database, table string, opts ...kvstore.WatchOption) (<-chan kvstore.WatchEvent, func() error, error)
- func (n *NatsJS) Write(r *kvstore.Record, opts ...kvstore.WriteOption) error
- type NatsOptions
Constants ¶
const ( DefaultBucketDescription = "KeyValue storage administered by go-orb" DefaultDatabase = "default" DefaultTable = "" DefaultKeyEncoding = "base32" DefaultBucketPerTable = true DefaultJSONKeyValues = false )
Defaults.
const Name = "natsjs"
Name provides the name of this kvstore client.
Variables ¶
This section is empty.
Functions ¶
func Provide ¶
func Provide( configData map[string]any, logger log.Logger, opts ...kvstore.Option, ) (kvstore.Type, error)
Provide creates a new NatsJS KVStore client.
func WithAllowReconnect ¶
WithAllowReconnect enables reconnection logic when disconnected from the server.
func WithBucketDescription ¶
WithBucketDescription sets the description for the default bucket.
func WithBucketPerTable ¶
WithBucketPerTable configures whether a separate bucket is created for each table.
func WithCompression ¶
WithCompression enables compression for websocket connections.
func WithDrainTimeout ¶
WithDrainTimeout sets the timeout for a Drain Operation to complete.
func WithFlusherTimeout ¶
WithFlusherTimeout sets the maximum time to wait for write operations to complete.
func WithIgnoreAuthErrorAbort ¶
WithIgnoreAuthErrorAbort opts out of aborting reconnect attempts on repeated auth errors.
func WithInboxPrefix ¶
WithInboxPrefix allows customizing the default _INBOX prefix.
func WithJSONKeyValues ¶
WithJSONKeyValues configures whether to store key values as JSON.
func WithKeyEncoding ¶
WithKeyEncoding sets the encoding used for keys, set to empty string for no encoding.
func WithMaxPingsOut ¶
WithMaxPingsOut sets the maximum number of pending ping commands before raising an error.
func WithMaxReconnect ¶
WithMaxReconnect sets the number of reconnect attempts before giving up.
func WithNoCallbacksAfterClientClose ¶
WithNoCallbacksAfterClientClose prevents callbacks after Close() is called.
func WithNoRandomize ¶
WithNoRandomize configures whether to randomize the server pool.
func WithPassword ¶
WithPassword sets the password for authentication.
func WithPingInterval ¶
WithPingInterval sets the period for sending ping commands to the server.
func WithProxyPath ¶
WithProxyPath adds a path to connections URL for websocket connections.
func WithReconnectBufSize ¶
WithReconnectBufSize sets the size of the backing bufio during reconnect.
func WithReconnectJitter ¶
WithReconnectJitter sets the upper bound for random delay added to ReconnectWait.
func WithReconnectJitterTLS ¶
WithReconnectJitterTLS sets the upper bound for random delay added to ReconnectWait when TLS is used.
func WithReconnectWait ¶
WithReconnectWait sets the time to backoff after attempting a reconnect.
func WithRetryOnFailedConnect ¶
WithRetryOnFailedConnect sets the connection in reconnecting state if it can't connect initially.
func WithServers ¶
WithServers sets the list of NATS servers to connect to.
func WithSkipHostLookup ¶
WithSkipHostLookup skips the DNS lookup for the server hostname.
func WithSubChanLen ¶
WithSubChanLen sets the size of the buffered channel used for SyncSubscriptions.
func WithTimeout ¶
WithTimeout sets the timeout for a Dial operation on a connection.
func WithUseOldRequestStyle ¶
WithUseOldRequestStyle forces the old method of Requests.
Types ¶
type Config ¶
type Config struct { kvstore.Config `yaml:",inline"` NatsOptions `yaml:",inline"` // BucketDescription configures the description for the each bucket. // Default: "KeyValue storage administered by go-orb" BucketDescription string `json:"bucketDescription,omitempty" yaml:"bucketDescription,omitempty"` // KeyEncoding configures the encoding used for keys, set it to an empty string for no encoding. // Default: base32 KeyEncoding string `json:"keyEncoding,omitempty" yaml:"keyEncoding,omitempty"` // BucketPerTable configures whether a separate bucket is created for each table. // If false, all tables are stored in the same bucket. // Default: true // Deprecated: Disable this only if you need backwards compatibility. BucketPerTable bool `json:"bucketPerTable,omitempty" yaml:"bucketPerTable,omitempty"` // JSONKeyValues configures whether the key values are encoded again as JSON. // Default: false // Deprecated: Enable this only if you need backwards compatibility. JSONKeyValues bool `json:"jsonKeyValues,omitempty" yaml:"jsonKeyValues,omitempty"` }
Config provides configuration for the NATS registry.
func (*Config) ApplyOptions ¶
ApplyOptions applies a set of options to the config.
type NatsJS ¶
type NatsJS struct {
// contains filtered or unexported fields
}
NatsJS implements the kvstore.KVStore interface using NATS JetStream.
func New ¶
New creates a new NATS JetStream KVStore. This function should rarely be called manually. To create a new KVStore use Provide.
func (*NatsJS) Delete ¶
func (n *NatsJS) Delete(key string, opts ...kvstore.DeleteOption) error
Delete removes the record with the corresponding key from the store. Deprecated: use Remove instead.
func (*NatsJS) DropDatabase ¶
DropDatabase drops the database.
func (*NatsJS) Get ¶
func (n *NatsJS) Get(ctx context.Context, key, database, table string, _ ...kvstore.GetOption) ([]kvstore.Record, error)
Get takes a key, database, table and optional GetOptions. It returns the Record or an error.
func (*NatsJS) Keys ¶
func (n *NatsJS) Keys(ctx context.Context, database, table string, opts ...kvstore.KeysOption) ([]string, error)
Keys returns any keys that match, or an empty list with no error if none matched.
func (*NatsJS) List ¶
func (n *NatsJS) List(opts ...kvstore.ListOption) ([]string, error)
List returns any keys that match, or an empty list with no error if none matched. Deprecated: use Keys instead.
func (*NatsJS) Read ¶
Read takes a single key and optional ReadOptions. It returns matching []*Record or an error. Deprecated: use Get instead.
func (*NatsJS) Set ¶
func (n *NatsJS) Set(ctx context.Context, key, database, table string, data []byte, _ ...kvstore.SetOption) error
Set takes a key, database, table and data, and optional SetOptions.
type NatsOptions ¶
type NatsOptions struct { // URL represents a single NATS server url to which the client // will be connecting. If the Servers option is also set, it // then becomes the first server in the Servers array. URL string `json:"url,omitempty" yaml:"url,omitempty"` // InProcessServer represents a NATS server running within the // same process. If this is set then we will attempt to connect // to the server directly rather than using external TCP conns. InProcessServer nats.InProcessConnProvider `json:"-" yaml:"-"` // Servers is a configured set of servers which this client // will use when attempting to connect. Servers []string `json:"servers,omitempty" yaml:"servers,omitempty"` // NoRandomize configures whether we will randomize the // server pool. NoRandomize bool `json:"noRandomize,omitempty" yaml:"noRandomize,omitempty"` // AllowReconnect enables reconnection logic to be used when we // encounter a disconnect from the current server. AllowReconnect bool `json:"allowReconnect,omitempty" yaml:"allowReconnect,omitempty"` // MaxReconnect sets the number of reconnect attempts that will be // tried before giving up. If negative, then it will never give up // trying to reconnect. // Defaults to 60. MaxReconnect int `json:"maxReconnect,omitempty" yaml:"maxReconnect,omitempty"` // ReconnectWait sets the time to backoff after attempting a reconnect // to a server that we were already connected to previously. // Defaults to 2s. ReconnectWait config.Duration `json:"reconnectWait,omitempty" yaml:"reconnectWait,omitempty"` // CustomReconnectDelayCB is invoked after the library tried every // URL in the server list and failed to reconnect. It passes to the // user the current number of attempts. This function returns the // amount of time the library will sleep before attempting to reconnect // again. It is strongly recommended that this value contains some // jitter to prevent all connections to attempt reconnecting at the same time. CustomReconnectDelayCB nats.ReconnectDelayHandler `json:"-" yaml:"-"` // ReconnectJitter sets the upper bound for a random delay added to // ReconnectWait during a reconnect when no TLS is used. // Defaults to 100ms. ReconnectJitter config.Duration `json:"reconnectJitter,omitempty" yaml:"reconnectJitter,omitempty"` // ReconnectJitterTLS sets the upper bound for a random delay added to // ReconnectWait during a reconnect when TLS is used. // Defaults to 1s. ReconnectJitterTLS config.Duration `json:"reconnectJitterTls,omitempty" yaml:"reconnectJitterTls,omitempty"` // Timeout sets the timeout for a Dial operation on a connection. // Defaults to 2s. Timeout config.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"` // DrainTimeout sets the timeout for a Drain Operation to complete. // Defaults to 30s. DrainTimeout config.Duration `json:"drainTimeout,omitempty" yaml:"drainTimeout,omitempty"` // FlusherTimeout is the maximum time to wait for write operations // to the underlying connection to complete (including the flusher loop). // Defaults to 1m. FlusherTimeout config.Duration `json:"flusherTimeout,omitempty" yaml:"flusherTimeout,omitempty"` // PingInterval is the period at which the client will be sending ping // commands to the server, disabled if 0 or negative. // Defaults to 2m. PingInterval config.Duration `json:"pingInterval,omitempty" yaml:"pingInterval,omitempty"` // MaxPingsOut is the maximum number of pending ping commands that can // be awaiting a response before raising an ErrStaleConnection error. // Defaults to 2. MaxPingsOut int `json:"maxPingsOut,omitempty" yaml:"maxPingsOut,omitempty"` // ClosedCB sets the closed handler that is called when a client will // no longer be connected. ClosedCB nats.ConnHandler `json:"-" yaml:"-"` // DisconnectedErrCB sets the disconnected error handler that is called // whenever the connection is disconnected. // Disconnected error could be nil, for instance when user explicitly closes the connection. // DisconnectedCB will not be called if DisconnectedErrCB is set DisconnectedErrCB nats.ConnErrHandler `json:"-" yaml:"-"` // ConnectedCB sets the connected handler called when the initial connection // is established. It is not invoked on successful reconnects - for reconnections, // use ReconnectedCB. ConnectedCB can be used in conjunction with RetryOnFailedConnect // to detect whether the initial connect was successful. ConnectedCB nats.ConnHandler `json:"-" yaml:"-"` // ReconnectedCB sets the reconnected handler called whenever // the connection is successfully reconnected. ReconnectedCB nats.ConnHandler `json:"-" yaml:"-"` // DiscoveredServersCB sets the callback that is invoked whenever a new // server has joined the cluster. DiscoveredServersCB nats.ConnHandler `json:"-" yaml:"-"` // AsyncErrorCB sets the async error handler (e.g. slow consumer errors) AsyncErrorCB nats.ErrHandler `json:"-" yaml:"-"` // ReconnectBufSize is the size of the backing bufio during reconnect. // Once this has been exhausted publish operations will return an error. // Defaults to 8388608 bytes (8MB). ReconnectBufSize int `json:"reconnectBufSize,omitempty" yaml:"reconnectBufSize,omitempty"` // SubChanLen is the size of the buffered channel used between the socket // Go routine and the message delivery for SyncSubscriptions. // NOTE: This does not affect AsyncSubscriptions which are // dictated by PendingLimits() // Defaults to 65536. SubChanLen int `json:"subChanLen,omitempty" yaml:"subChanLen,omitempty"` // UserJWT sets the callback handler that will fetch a user's JWT. UserJWT nats.UserJWTHandler `json:"-" yaml:"-"` // Nkey sets the public nkey that will be used to authenticate // when connecting to the server. UserJWT and Nkey are mutually exclusive // and if defined, UserJWT will take precedence. Nkey string `json:"nkey,omitempty" yaml:"nkey,omitempty"` // SignatureCB designates the function used to sign the nonce // presented from the server. SignatureCB nats.SignatureHandler `json:"-" yaml:"-"` // User sets the username to be used when connecting to the server. User string `json:"user,omitempty" yaml:"user,omitempty"` // Password sets the password to be used when connecting to a server. Password string `json:"password,omitempty" yaml:"password,omitempty"` // Token sets the token to be used when connecting to a server. Token string `json:"token,omitempty" yaml:"token,omitempty"` // TokenHandler designates the function used to generate the token to be used when connecting to a server. TokenHandler nats.AuthTokenHandler `json:"-" yaml:"-"` // CustomDialer allows to specify a custom dialer (not necessarily // a *net.Dialer). CustomDialer nats.CustomDialer `json:"-" yaml:"-"` // UseOldRequestStyle forces the old method of Requests that utilize // a new Inbox and a new Subscription for each request. UseOldRequestStyle bool `json:"useOldRequestStyle,omitempty" yaml:"useOldRequestStyle,omitempty"` // NoCallbacksAfterClientClose allows preventing the invocation of // callbacks after Close() is called. Client won't receive notifications // when Close is invoked by user code. Default is to invoke the callbacks. NoCallbacksAfterClientClose bool `json:"noCallbacksAfterClientClose,omitempty" yaml:"noCallbacksAfterClientClose,omitempty"` // LameDuckModeHandler sets the callback to invoke when the server notifies // the connection that it entered lame duck mode, that is, going to // gradually disconnect all its connections before shutting down. This is // often used in deployments when upgrading NATS Servers. LameDuckModeHandler nats.ConnHandler `json:"-" yaml:"-"` // RetryOnFailedConnect sets the connection in reconnecting state right // away if it can't connect to a server in the initial set. The // MaxReconnect and ReconnectWait options are used for this process, // similarly to when an established connection is disconnected. // If a ReconnectHandler is set, it will be invoked on the first // successful reconnect attempt (if the initial connect fails), // and if a ClosedHandler is set, it will be invoked if // it fails to connect (after exhausting the MaxReconnect attempts). RetryOnFailedConnect bool `json:"retryOnFailedConnect,omitempty" yaml:"retryOnFailedConnect,omitempty"` // For websocket connections, indicates to the server that the connection // supports compression. If the server does too, then data will be compressed. Compression bool `json:"compression,omitempty" yaml:"compression,omitempty"` // For websocket connections, adds a path to connections url. // This is useful when connecting to NATS behind a proxy. ProxyPath string `json:"proxyPath,omitempty" yaml:"proxyPath,omitempty"` // InboxPrefix allows the default _INBOX prefix to be customized InboxPrefix string `json:"inboxPrefix,omitempty" yaml:"inboxPrefix,omitempty"` // IgnoreAuthErrorAbort - if set to true, client opts out of the default connect behavior of aborting // subsequent reconnect attempts if server returns the same auth error twice (regardless of reconnect policy). IgnoreAuthErrorAbort bool `json:"ignoreAuthErrorAbort,omitempty" yaml:"ignoreAuthErrorAbort,omitempty"` // SkipHostLookup skips the DNS lookup for the server hostname. SkipHostLookup bool `json:"skipHostLookup,omitempty" yaml:"skipHostLookup,omitempty"` }
NatsOptions can be used to create a customized connection.
func (NatsOptions) ToOptions ¶
func (o NatsOptions) ToOptions() nats.Options
ToOptions converts the NatsOptions to nats.Options.