natsjs

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2025 License: Apache-2.0 Imports: 18 Imported by: 4

Documentation

Overview

Package natsjs provides the nats jetstream event client for go-orb.

Index

Constants

View Source
const Name = "natsjs"

Name provides the name of this event client.

Variables

View Source
var (
	DefaultMaxConcurrent = 256

	// DefaultSyncPublish allows using a synchronous publishing instead of the default asynchronous.
	DefaultSyncPublish = false

	// DefaultDisableDurableStreams configures whether to disable durable streams.
	DefaultDisableDurableStreams = false
)

Defaults.

Functions

func Provide

func Provide(
	configData map[string]any,
	logger log.Logger,
	opts ...event.Option,
) (event.Type, error)

Provide creates a new NatsJS event client.

func WithAllowReconnect added in v0.2.0

func WithAllowReconnect(allowReconnect bool) event.Option

WithAllowReconnect enables reconnection logic when disconnected from the server.

func WithCompression added in v0.2.0

func WithCompression(compression bool) event.Option

WithCompression enables compression for websocket connections.

func WithDisableDurableStreams added in v0.2.0

func WithDisableDurableStreams(disableDurableStreams bool) event.Option

WithDisableDurableStreams configures whether to disable durable streams.

func WithDrainTimeout added in v0.2.0

func WithDrainTimeout(drainTimeout time.Duration) event.Option

WithDrainTimeout sets the timeout for a Drain Operation to complete.

func WithFlusherTimeout added in v0.2.0

func WithFlusherTimeout(flusherTimeout time.Duration) event.Option

WithFlusherTimeout sets the maximum time to wait for write operations to complete.

func WithIgnoreAuthErrorAbort added in v0.2.0

func WithIgnoreAuthErrorAbort(ignoreAuthErrorAbort bool) event.Option

WithIgnoreAuthErrorAbort opts out of aborting reconnect attempts on repeated auth errors.

func WithInboxPrefix added in v0.2.0

func WithInboxPrefix(inboxPrefix string) event.Option

WithInboxPrefix allows customizing the default _INBOX prefix.

func WithMaxConcurrent

func WithMaxConcurrent(n int) event.Option

WithMaxConcurrent sets the number of concurrent workers.

func WithMaxPingsOut added in v0.2.0

func WithMaxPingsOut(maxPingsOut int) event.Option

WithMaxPingsOut sets the maximum number of pending ping commands before raising an error.

func WithMaxReconnect added in v0.2.0

func WithMaxReconnect(maxReconnect int) event.Option

WithMaxReconnect sets the number of reconnect attempts before giving up.

func WithNkey added in v0.2.0

func WithNkey(nkey string) event.Option

WithNkey sets the public nkey for authentication.

func WithNoCallbacksAfterClientClose added in v0.2.0

func WithNoCallbacksAfterClientClose(noCallbacksAfterClientClose bool) event.Option

WithNoCallbacksAfterClientClose prevents callbacks after Close() is called.

func WithNoRandomize added in v0.2.0

func WithNoRandomize(noRandomize bool) event.Option

WithNoRandomize configures whether to randomize the server pool.

func WithPassword added in v0.2.0

func WithPassword(password string) event.Option

WithPassword sets the password for authentication.

func WithPingInterval added in v0.2.0

func WithPingInterval(pingInterval time.Duration) event.Option

WithPingInterval sets the period for sending ping commands to the server.

func WithProxyPath added in v0.2.0

func WithProxyPath(proxyPath string) event.Option

WithProxyPath adds a path to connections URL for websocket connections.

func WithPublishCodec added in v0.2.0

func WithPublishCodec(n string) event.Option

WithPublishCodec sets the internal codec.

func WithReconnectBufSize added in v0.2.0

func WithReconnectBufSize(reconnectBufSize int) event.Option

WithReconnectBufSize sets the size of the backing bufio during reconnect.

func WithReconnectJitter added in v0.2.0

func WithReconnectJitter(reconnectJitter time.Duration) event.Option

WithReconnectJitter sets the upper bound for random delay added to ReconnectWait.

func WithReconnectJitterTLS added in v0.2.0

func WithReconnectJitterTLS(reconnectJitterTLS time.Duration) event.Option

WithReconnectJitterTLS sets the upper bound for random delay added to ReconnectWait when TLS is used.

func WithReconnectWait added in v0.2.0

func WithReconnectWait(reconnectWait time.Duration) event.Option

WithReconnectWait sets the time to backoff after attempting a reconnect.

func WithRequestCodec added in v0.2.0

func WithRequestCodec(n string) event.Option

WithRequestCodec sets the internal codec.

func WithRetryOnFailedConnect added in v0.2.0

func WithRetryOnFailedConnect(retryOnFailedConnect bool) event.Option

WithRetryOnFailedConnect sets the connection in reconnecting state if it can't connect initially.

func WithServers added in v0.2.0

func WithServers(servers []string) event.Option

WithServers sets the list of NATS servers to connect to.

func WithSkipHostLookup added in v0.2.0

func WithSkipHostLookup(skipHostLookup bool) event.Option

WithSkipHostLookup skips the DNS lookup for the server hostname.

func WithSubChanLen added in v0.2.0

func WithSubChanLen(subChanLen int) event.Option

WithSubChanLen sets the size of the buffered channel used for SyncSubscriptions.

func WithSyncPublish added in v0.2.0

func WithSyncPublish(syncPublish bool) event.Option

WithSyncPublish configures whether to publish synchronously.

func WithTimeout added in v0.2.0

func WithTimeout(timeout time.Duration) event.Option

WithTimeout sets the timeout for a Dial operation on a connection.

func WithToken added in v0.2.0

func WithToken(token string) event.Option

WithToken sets the token for authentication.

func WithURL added in v0.2.0

func WithURL(url string) event.Option

WithURL sets the URL of the NATS server.

func WithUseOldRequestStyle added in v0.2.0

func WithUseOldRequestStyle(useOldRequestStyle bool) event.Option

WithUseOldRequestStyle forces the old method of Requests.

func WithUser added in v0.2.0

func WithUser(user string) event.Option

WithUser sets the username for authentication.

Types

type Config

type Config struct {
	event.Config `yaml:",inline"`

	NatsOptions `yaml:",inline"`

	// RequestCodec configures the codec to use for encoding and decoding.
	// Default: application/x-protobuf
	RequestCodec string `json:"requestCodec,omitempty" yaml:"requestCodec,omitempty"`

	// PublishCodec configures the codec to use for publishing.
	// Default: application/json
	PublishCodec string `json:"publishCodec,omitempty" yaml:"publishCodec,omitempty"`

	// MaxConcurrent configures the maximum number of concurrent workers.
	// Default: 10
	MaxConcurrent int `json:"maxConcurrent,omitempty" yaml:"maxConcurrent,omitempty"`

	// SyncPublish configures whether to publish synchronously.
	// Default: false
	SyncPublish bool `json:"syncPublish,omitempty" yaml:"syncPublish,omitempty"`

	// DisableDurableStreams configures whether to disable durable streams.
	// Default: false
	DisableDurableStreams bool `json:"disableDurableStreams,omitempty" yaml:"disableDurableStreams,omitempty"`
}

Config provides configuration for the NATS event client.

func NewConfig

func NewConfig(opts ...event.Option) (Config, error)

NewConfig creates a new config object.

func (*Config) ApplyOptions

func (c *Config) ApplyOptions(opts ...event.Option)

ApplyOptions applies a set of options to the config.

type NatsJS

type NatsJS struct {
	// contains filtered or unexported fields
}

NatsJS is the nats jetstream event client for go-orb.

func New

func New(cfg Config, log log.Logger) (*NatsJS, error)

New creates a new NATS registry. This functions should rarely be called manually. To create a new registry use Provide.

func (*NatsJS) Clone

func (n *NatsJS) Clone() event.Type

Clone creates a clone of the handler, this is useful for parallel requests.

func (*NatsJS) Consume added in v0.2.0

func (n *NatsJS) Consume(topic string, opts ...event.ConsumeOption) (<-chan event.Event, error)

Consume from a topic.

func (*NatsJS) GetPublishCodec added in v0.2.0

func (n *NatsJS) GetPublishCodec() codecs.Marshaler

GetPublishCodec returns the codec used by the handler for publish.

func (*NatsJS) HandleRequest

func (n *NatsJS) HandleRequest(
	ctx context.Context,
	topic string,
	callbackHandler func(context.Context, *event.Req[[]byte, []byte]),
)

HandleRequest subscribes to the given topic and handles the requests.

func (*NatsJS) Publish added in v0.2.0

func (n *NatsJS) Publish(ctx context.Context, topic string, msg any, opts ...event.PublishOption) error

Publish a message to a topic.

func (*NatsJS) Request

func (n *NatsJS) Request(
	ctx context.Context,
	req *event.Req[[]byte, any],
	opts ...event.RequestOption,
) ([]byte, error)

Request runs a REST like call on the given topic.

func (*NatsJS) Start

func (n *NatsJS) Start(_ context.Context) error

Start events.

func (*NatsJS) Stop

func (n *NatsJS) Stop(_ context.Context) error

Stop events.

func (*NatsJS) String

func (n *NatsJS) String() string

String returns the plugin name.

func (*NatsJS) Type

func (n *NatsJS) Type() string

Type returns the component type.

type NatsOptions added in v0.2.0

type NatsOptions struct {
	// URL represents a single NATS server url to which the client
	// will be connecting. If the Servers option is also set, it
	// then becomes the first server in the Servers array.
	URL string `json:"url,omitempty" yaml:"url,omitempty"`

	// InProcessServer represents a NATS server running within the
	// same process. If this is set then we will attempt to connect
	// to the server directly rather than using external TCP conns.
	InProcessServer nats.InProcessConnProvider `json:"-" yaml:"-"`

	// Servers is a configured set of servers which this client
	// will use when attempting to connect.
	Servers []string `json:"servers,omitempty" yaml:"servers,omitempty"`

	// NoRandomize configures whether we will randomize the
	// server pool.
	NoRandomize bool `json:"noRandomize,omitempty" yaml:"noRandomize,omitempty"`

	// AllowReconnect enables reconnection logic to be used when we
	// encounter a disconnect from the current server.
	AllowReconnect bool `json:"allowReconnect,omitempty" yaml:"allowReconnect,omitempty"`

	// MaxReconnect sets the number of reconnect attempts that will be
	// tried before giving up. If negative, then it will never give up
	// trying to reconnect.
	// Defaults to 60.
	MaxReconnect int `json:"maxReconnect,omitempty" yaml:"maxReconnect,omitempty"`

	// ReconnectWait sets the time to backoff after attempting a reconnect
	// to a server that we were already connected to previously.
	// Defaults to 2s.
	ReconnectWait config.Duration `json:"reconnectWait,omitempty" yaml:"reconnectWait,omitempty"`

	// CustomReconnectDelayCB is invoked after the library tried every
	// URL in the server list and failed to reconnect. It passes to the
	// user the current number of attempts. This function returns the
	// amount of time the library will sleep before attempting to reconnect
	// again. It is strongly recommended that this value contains some
	// jitter to prevent all connections to attempt reconnecting at the same time.
	CustomReconnectDelayCB nats.ReconnectDelayHandler `json:"-" yaml:"-"`

	// ReconnectJitter sets the upper bound for a random delay added to
	// ReconnectWait during a reconnect when no TLS is used.
	// Defaults to 100ms.
	ReconnectJitter config.Duration `json:"reconnectJitter,omitempty" yaml:"reconnectJitter,omitempty"`

	// ReconnectJitterTLS sets the upper bound for a random delay added to
	// ReconnectWait during a reconnect when TLS is used.
	// Defaults to 1s.
	ReconnectJitterTLS config.Duration `json:"reconnectJitterTls,omitempty" yaml:"reconnectJitterTls,omitempty"`

	// Timeout sets the timeout for a Dial operation on a connection.
	// Defaults to 2s.
	Timeout config.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"`

	// DrainTimeout sets the timeout for a Drain Operation to complete.
	// Defaults to 30s.
	DrainTimeout config.Duration `json:"drainTimeout,omitempty" yaml:"drainTimeout,omitempty"`

	// FlusherTimeout is the maximum time to wait for write operations
	// to the underlying connection to complete (including the flusher loop).
	// Defaults to 1m.
	FlusherTimeout config.Duration `json:"flusherTimeout,omitempty" yaml:"flusherTimeout,omitempty"`

	// PingInterval is the period at which the client will be sending ping
	// commands to the server, disabled if 0 or negative.
	// Defaults to 2m.
	PingInterval config.Duration `json:"pingInterval,omitempty" yaml:"pingInterval,omitempty"`

	// MaxPingsOut is the maximum number of pending ping commands that can
	// be awaiting a response before raising an ErrStaleConnection error.
	// Defaults to 2.
	MaxPingsOut int `json:"maxPingsOut,omitempty" yaml:"maxPingsOut,omitempty"`

	// ClosedCB sets the closed handler that is called when a client will
	// no longer be connected.
	ClosedCB nats.ConnHandler `json:"-" yaml:"-"`

	// DisconnectedErrCB sets the disconnected error handler that is called
	// whenever the connection is disconnected.
	// Disconnected error could be nil, for instance when user explicitly closes the connection.
	// DisconnectedCB will not be called if DisconnectedErrCB is set
	DisconnectedErrCB nats.ConnErrHandler `json:"-" yaml:"-"`

	// ConnectedCB sets the connected handler called when the initial connection
	// is established. It is not invoked on successful reconnects - for reconnections,
	// use ReconnectedCB. ConnectedCB can be used in conjunction with RetryOnFailedConnect
	// to detect whether the initial connect was successful.
	ConnectedCB nats.ConnHandler `json:"-" yaml:"-"`

	// ReconnectedCB sets the reconnected handler called whenever
	// the connection is successfully reconnected.
	ReconnectedCB nats.ConnHandler `json:"-" yaml:"-"`

	// DiscoveredServersCB sets the callback that is invoked whenever a new
	// server has joined the cluster.
	DiscoveredServersCB nats.ConnHandler `json:"-" yaml:"-"`

	// AsyncErrorCB sets the async error handler (e.g. slow consumer errors)
	AsyncErrorCB nats.ErrHandler `json:"-" yaml:"-"`

	// ReconnectBufSize is the size of the backing bufio during reconnect.
	// Once this has been exhausted publish operations will return an error.
	// Defaults to 8388608 bytes (8MB).
	ReconnectBufSize int `json:"reconnectBufSize,omitempty" yaml:"reconnectBufSize,omitempty"`

	// SubChanLen is the size of the buffered channel used between the socket
	// Go routine and the message delivery for SyncSubscriptions.
	// NOTE: This does not affect AsyncSubscriptions which are
	// dictated by PendingLimits()
	// Defaults to 65536.
	SubChanLen int `json:"subChanLen,omitempty" yaml:"subChanLen,omitempty"`

	// UserJWT sets the callback handler that will fetch a user's JWT.
	UserJWT nats.UserJWTHandler `json:"-" yaml:"-"`

	// Nkey sets the public nkey that will be used to authenticate
	// when connecting to the server. UserJWT and Nkey are mutually exclusive
	// and if defined, UserJWT will take precedence.
	Nkey string `json:"nkey,omitempty" yaml:"nkey,omitempty"`

	// SignatureCB designates the function used to sign the nonce
	// presented from the server.
	SignatureCB nats.SignatureHandler `json:"-" yaml:"-"`

	// User sets the username to be used when connecting to the server.
	User string `json:"user,omitempty" yaml:"user,omitempty"`

	// Password sets the password to be used when connecting to a server.
	Password string `json:"password,omitempty" yaml:"password,omitempty"`

	// Token sets the token to be used when connecting to a server.
	Token string `json:"token,omitempty" yaml:"token,omitempty"`

	// TokenHandler designates the function used to generate the token to be used when connecting to a server.
	TokenHandler nats.AuthTokenHandler `json:"-" yaml:"-"`

	// CustomDialer allows to specify a custom dialer (not necessarily
	// a *net.Dialer).
	CustomDialer nats.CustomDialer `json:"-" yaml:"-"`

	// UseOldRequestStyle forces the old method of Requests that utilize
	// a new Inbox and a new Subscription for each request.
	UseOldRequestStyle bool `json:"useOldRequestStyle,omitempty" yaml:"useOldRequestStyle,omitempty"`

	// NoCallbacksAfterClientClose allows preventing the invocation of
	// callbacks after Close() is called. Client won't receive notifications
	// when Close is invoked by user code. Default is to invoke the callbacks.
	NoCallbacksAfterClientClose bool `json:"noCallbacksAfterClientClose,omitempty" yaml:"noCallbacksAfterClientClose,omitempty"`

	// LameDuckModeHandler sets the callback to invoke when the server notifies
	// the connection that it entered lame duck mode, that is, going to
	// gradually disconnect all its connections before shutting down. This is
	// often used in deployments when upgrading NATS Servers.
	LameDuckModeHandler nats.ConnHandler `json:"-" yaml:"-"`

	// RetryOnFailedConnect sets the connection in reconnecting state right
	// away if it can't connect to a server in the initial set. The
	// MaxReconnect and ReconnectWait options are used for this process,
	// similarly to when an established connection is disconnected.
	// If a ReconnectHandler is set, it will be invoked on the first
	// successful reconnect attempt (if the initial connect fails),
	// and if a ClosedHandler is set, it will be invoked if
	// it fails to connect (after exhausting the MaxReconnect attempts).
	RetryOnFailedConnect bool `json:"retryOnFailedConnect,omitempty" yaml:"retryOnFailedConnect,omitempty"`

	// For websocket connections, indicates to the server that the connection
	// supports compression. If the server does too, then data will be compressed.
	Compression bool `json:"compression,omitempty" yaml:"compression,omitempty"`

	// For websocket connections, adds a path to connections url.
	// This is useful when connecting to NATS behind a proxy.
	ProxyPath string `json:"proxyPath,omitempty" yaml:"proxyPath,omitempty"`

	// InboxPrefix allows the default _INBOX prefix to be customized
	InboxPrefix string `json:"inboxPrefix,omitempty" yaml:"inboxPrefix,omitempty"`

	// IgnoreAuthErrorAbort - if set to true, client opts out of the default connect behavior of aborting
	// subsequent reconnect attempts if server returns the same auth error twice (regardless of reconnect policy).
	IgnoreAuthErrorAbort bool `json:"ignoreAuthErrorAbort,omitempty" yaml:"ignoreAuthErrorAbort,omitempty"`

	// SkipHostLookup skips the DNS lookup for the server hostname.
	SkipHostLookup bool `json:"skipHostLookup,omitempty" yaml:"skipHostLookup,omitempty"`
}

NatsOptions can be used to create a customized connection.

func (NatsOptions) ToOptions added in v0.2.0

func (o NatsOptions) ToOptions() nats.Options

ToOptions converts the NatsOptions to nats.Options.

Directories

Path Synopsis
Package pb ...
Package pb ...

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL