nats

package
v0.4.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2023 License: Apache-2.0 Imports: 14 Imported by: 1

Documentation

Overview

Package nats provides an event bus that uses NATS to publish and subscribe to events over a network with support for both NATS Core and NATS JetStream.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrStreamExists is returned when the JetStream driver tries to create a
	// stream that already exists with a different configuration.
	ErrStreamExists = errors.New("stream already exists with a different configuration")

	// ErrConsumerExists is returned when the JetStream driver tries to create a
	// consumer that already exists with a different configuration.
	ErrConsumerExists = errors.New("consumer already exists with a different configuration")
)
View Source
var (
	// DefaultStream is the default JetStream stream name to use/create if no
	// explicit name is provided using the StreamName() option.
	DefaultStream = "goes"
)
View Source
var (
	// ErrPullTimeout is raised by an eventBus when a subscriber doesn't pull an
	// event from the event channel within the specified PullTimeout. In such
	// case, the event is dropped to avoid blocking the application because of a
	// slow consumer.
	ErrPullTimeout = errors.New("pull timed out. slow consumer?")
)

Functions

This section is empty.

Types

type Driver

type Driver interface {
	// contains filtered or unexported methods
}

A Driver provides the specific implementation for interacting with either NATS Core or NATS JetStream. Use the Core or JetStream functions to create a Driver.

func Core

func Core() Driver

Core returns the NATS Core Driver (which is enabled by default):

bus := NewEventBus(enc, Use(Core())) // or
bus := NewEventBus(enc)

func JetStream

func JetStream(opts ...JetStreamOption) Driver

JetStream returns the NATS JetStream Driver:

bus := NewEventBus(enc, Use(JetStream()))

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus is an event bus that uses NATS to publish and subscribe to events.

Drivers

The event bus supports both NATS Core and NATS JetStream. By default, the Core driver is used, but you can create and specify the JetStream driver with the Use option:

var enc codec.Encoding
bus := nats.NewEventBus(enc, nats.Use(nats.JetStream()))

func NewEventBus

func NewEventBus(enc codec.Encoding, opts ...EventBusOption) *EventBus

NewEventBus returns a NATS event bus.

The provided Encoder is used to encode and decode event data when publishing and subscribing to events.

If no other specified, the returned event bus will use the NATS Core Driver. To use the NATS JetStream Driver instead, explicitly set the Driver:

NewEventBus(enc, Use(JetStream()))

func (*EventBus) Connect

func (bus *EventBus) Connect(ctx context.Context) error

Connects connects to NATS.

It is not required to call Connect to use the eventBus because Connect is automatically called by Subscribe and Publish.

func (*EventBus) Connection added in v0.1.2

func (bus *EventBus) Connection() *nats.Conn

Connection returns the underlying *nats.Conn.

func (*EventBus) Disconnect

func (bus *EventBus) Disconnect(ctx context.Context) error

Disconnect closes the underlying *nats.Conn. Should ctx be canceled before the connection is closed, ctx.Err() is returned.

func (*EventBus) Publish

func (bus *EventBus) Publish(ctx context.Context, events ...event.Event) error

Publish publishes events.

func (*EventBus) Subscribe

func (bus *EventBus) Subscribe(ctx context.Context, names ...string) (<-chan event.Event, <-chan error, error)

Subscribe subscribes to events.

type EventBusOption

type EventBusOption func(*EventBus)

EventBusOption is an option for an eventBus.

func Conn

func Conn(conn *nats.Conn) EventBusOption

Conn returns an option that provides the underlying *nats.Conn to the event bus. When providing a connection, the event bus does not try to connect to NATS but uses the provided connection instead.

func EatErrors

func EatErrors() EventBusOption

EatErrors returns an option that discards any asynchronous errors of subscriptions. When subscribing to an event, you can safely ignore the returned error channel:

var bus *EventBus
events, _, err := bus.Subscribe(context.TODO(), ...)

func LoadBalancer added in v0.1.2

func LoadBalancer(serviceName string) EventBusOption

LoadBalancer returns a QueueGroup option that enables load-blancing between event buses that share the same serviceName. The option applies the QueueGroup option so that the queue group for the subscription to an event is built in the following format:

fmt.Sprintf("%s:%s", <serviceName>, <eventName>)

Caution

Providing a load-balanced event bus as the underlying bus to a command bus should be avoided, and providing it to a projection schedule should be done with caution.

To create a command bus, create another instance of the event bus with load- balancing disabled, and pass that bus to cmdbus.New().

When you create a projection schedule, you have to think about what makes sense in the context of your projection, because a load-balanced event bus will cause only a single instance of a replicated service to trigger a projection. Also, each event may be received by a different instance which can make the projection jobs fragmented and less efficient. A common example is some kind of lookup table that is projected from events and that your instances keep "live" in-memory. Each instance needs the lookup table to work, so it wouldn't make sense to load-balance the projection. In most cases where projections are not kept in memory but instead fetched from a database, updated and then saved back to the database, a load-balanced projection schedule is exactly what you want, but then again, context matters.

Read more about queue groups: https://docs.nats.io/nats-concepts/core-nats/queue

func PullTimeout

func PullTimeout(d time.Duration) EventBusOption

PullTimeout returns an option that limits the Duration an event bus tries to push an event into a subscribed event channel. When the pull timeout is exceeded, the event gets dropped and a warning is logged. Default is the zero-Duration which means "no timeout".

func QueueGroup

func QueueGroup(fn func(eventName string) string) EventBusOption

QueueGroup returns an option that specifies the NATS queue group for new subscriptions. When subscribing to an event, fn(eventName) is called to determine the queue group name for that subscription. If the returned queue group is an empty string, the queue group feature will not be used for the subscription.

This option has no effect if used with the "jetstream" driver in "pull" mode (default).

Use Case

Queue groups can be used to load-balance between multiple subscribers of the same event. When multiple subscribers are subscribed to an event and use the same queue group, only one of the subscribers will receive the event. To load-balance between instances of a replicated (micro-)service, use a shared name (service name) that is the same between the replicated services and use that id as the queue group:

serviceName := "foo-service"
bus := NewEventBus(enc,
	QueueGroup(func(eventName) string {
		return serviceName
	}),
)

The example above can also be written as:

serviceName := "foo-service"
bus := NewEventBus(enc, WithLoadBalancer(serviceName))

Queue groups are disabled by default.

Read more about queue groups: https://docs.nats.io/nats-concepts/core-nats/queue

func SubjectFunc

func SubjectFunc(fn func(eventName string) string) EventBusOption

SubjectFunc returns an option that specifies how the NATS subjects for event names are generated. Any "." in the subject are replaced by "_".

By default, a subject is the event name with "." replaced by "_".

func SubjectPrefix

func SubjectPrefix(prefix string) EventBusOption

SubjectFunc returns an option that specifies how the NATS subjects for event names are generated.

func URL

func URL(url string) EventBusOption

URL returns an option that sets the connection URL to the NATS server. If no URL is specified, the environment variable `NATS_URL` will be used as the connection URL. If that is also not set, the default NATS URL (nats.DefaultURL) is used instead.

func Use

func Use(d Driver) EventBusOption

Use returns the option to specify the Driver to use to communicate with NATS. By default, the "core" driver is used.

bus := NewEventBus(enc, Use(JetStream()))

type JetStreamOption added in v0.1.2

type JetStreamOption func(*jetStream)

JetStreamOption is an option for the JetStream driver.

func Durable

func Durable(prefix string) JetStreamOption

Durable returns an option that makes JetStream consumers / subscriptions durable (see DurableFunc). The durable name is formatted like this:

fmt.Sprintf("%s:%s:%s", prefix, queue, event)

func DurableFunc

func DurableFunc(fn func(event, queue string) string) JetStreamOption

DurableFunc returns an option that makes JetStream consumers / subscriptions durable. When creating a consumer, the provided function is called with the event name and queue group (see QueueGroup and LoadBalancer options) to generate the durable name. If the event is the wildcard "*", it is passed as "$all". Similarly, if the queue group is an empty string, it is passed as "$noqueue". Any ".", "*", or ">" characters in the returned durable name will be replaced by "_".

The JetStream driver creates one consumer / subscription per event.

Read more about durable subscriptions: https://docs.nats.io/nats-concepts/jetstream/consumers#durable-name

func StreamName added in v0.1.2

func StreamName(stream string) JetStreamOption

StreamName returns a JetStreamOption that specifies the stream name that is created by the JetStream driver. The default stream name is "goes".

func SubOpts

func SubOpts(opts ...nats.SubOpt) JetStreamOption

SubOpts returns an option that adds custom nats.SubOpts when creating a JetStream subscription.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL