natsrouter

package module
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2024 License: MIT Imports: 12 Imported by: 2

README

NATS Router

example workflow PkgGoDev Go Report Card

NATS Router is a simple package of utilities on top of NATS messaging system. It simplified the creation of queue groups and subjects, adds basic middleware support and adds both context and error handling to NATS subscriptions.

Usage

To use NATS Router, first create a NatsRouter object and pass in a NATS connection:

import "github.com/nats-io/nats.go"
import "github.com/example/natsrouter"

nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
	// handle error
}
router := natsrouter.NewRouter(nc)

Next, register handlers for the different subjects you expect to receive:

router.Subscribe("foo", func(msg *natsrouter.NatsMsg) error {
	// Handle "foo" subject here
	return nil
})
router.Subscribe("bar", func(msg *natsrouter.NatsMsg) error {
	// Handle "bar" subject here
	return nil
})

You can use the NatsMsg object passed to the handler functions to inspect and manipulate the incoming message, including sending a response:

router.Subscribe("ping", func(msg *natsrouter.NatsMsg) {
	// Extract data from incoming message
	data := msg.Data

	// Send response
	if err := msg.Respond([]byte("pong")); err != nil {
		// handle error
	}
})

You can also subscribe using a queue group for distributed request processing:

router.QueueSubscribe("baz", func(msg *natsrouter.NatsMsg) error {
	// Handle "baz" subject here
	return nil
})
Middlewares

The primary reason for natsrouter is middleware support for incoming request. These are modeled after generic HTTP middlewares, or any other similar middleware. There are a few example middlewares in middleware.go.

Middleware can be layered on top of NatsRouter before subscribing to subjects. It is possible to build a complex tree if necessary. For example authentication can be performed with the simple AuthMiddleware example:

cb := func(token string) bool {
	// Test if token is valid
	if token == "admin" {
		return true
	}
	return false
}
auth := natsrouter.NewAuthMiddleware(cb)
router.Use(auth.Auth).Subscribe("foo", func(msg *natsrouter.NatsMsg) error {
	// The auth middleware check "authorization" header and errors out if authentication fails.
	// This handler is only reached when authentication succeeds
	return nil
})

This is of course just a simple example. A real one would use for example JWTs or something similar. It must however be noted that NATS headers are limited to 4096 bytes in total, so keeping the tokens small is recommended.

Subjects and queue groups

Instead of manually building subjects, it is possible to use Subject() to build any kind of subject chain:

router.Subject("orders").Subject("eu").Subscribe(func(msg *natsrouter.NatsMsg) error {
	// Handle "orders.eu" subject here
	return nil
})

Note that there is no subject in the Subscribe() function.

There is also the wildcard * and the any > subjects. If the "any" subject is included, it must be the last in chain.

Another helpful utility are queue groups. Both subjects and queue groups are mix-and-match, but only a single queue group will be in effect:

router.Use(auth.Auth).Queue("my-group").Subject("orders").Subject("us").Subscribe(func(msg *natsrouter.NatsMsg) error {
	// Handle "orders.us" subject here, in one of multiple instances belonging to "my-group".
	// Also protected by "authentication" middleware.
	return nil
})

Contributing

Contributions are welcome! If you find a bug or have a feature request, please open an issue. If you would like to contribute code, please fork the repository and create a pull request.

Documentation

Index

Constants

View Source
const ALL_SUBJECT = ">"

ALL_SUBJECT is a wildcard subject that can match one or more tokens in a subject string.

View Source
const ANY_SUBJECT = "*"

ANY_SUBJECT is a wildcard subject that can match any single token in a subject string.

Variables

View Source
var ErrNonLastAllSubject = errors.New("'all' subject must be last in subject chain")

This error is returned when an "all" subject is not the last one in a subject chain.

View Source
var ErrUnsupportedEncoding = errors.New("unsupported encoding")

Functions

func Version

func Version() string

Version is the current release version.

Types

type EncodedMiddleware added in v0.2.0

type EncodedMiddleware struct {
	// contains filtered or unexported fields
}

Simple middleware that simulated EncodedConn-type subscriptions

func NewEncodedMiddleware added in v0.2.0

func NewEncodedMiddleware(cb Handler, encType string) (*EncodedMiddleware, error)

Create a new EncodedConn middleware with given encoder type

func (*EncodedMiddleware) EncodedMiddleware added in v0.2.0

func (e *EncodedMiddleware) EncodedMiddleware(NatsCtxHandler) NatsCtxHandler

Actual middleware function

type ErrorConfig

type ErrorConfig struct {
	Tag    string
	Format string
}

Error configuration `Tag` is the header key and `Format` is the error encoding Defaults are "error" and "json"

type Handler added in v0.2.0

type Handler interface{}

handler := func(m *NatsMsg) handler := func(ctx context.Context, m *Msg) handler := func(ctx context.Context, p *person) handler := func(ctx context.Context, subject string, o *obj) handler := func(ctx context.Context, subject, reply string, o *obj)

type HandlerError

type HandlerError struct {
	Message string `json:"message"`
	Code    int    `json:"code"`
}

func (*HandlerError) Error

func (e *HandlerError) Error() string

type NatsCtxHandler

type NatsCtxHandler func(*NatsMsg) error

Handler function that adds a `context.Context` to a `*nats.Msg`

type NatsMiddlewareFunc

type NatsMiddlewareFunc func(NatsCtxHandler) NatsCtxHandler

Middleware function that takes a `NatsCtxHandler` and returns a new `NatsCtxHandler`

type NatsMsg

type NatsMsg struct {
	*nats.Msg
	// contains filtered or unexported fields
}

Extend nats.Msg struct to include context

func (*NatsMsg) Context

func (n *NatsMsg) Context() context.Context

Get current attached message context

func (*NatsMsg) Respond

func (n *NatsMsg) Respond(data []byte) error

Send a response back to requester

func (*NatsMsg) RespondWithHeaders

func (n *NatsMsg) RespondWithHeaders(data []byte, headers map[string]string) error

Send a response with given headers

func (*NatsMsg) RespondWithOriginalHeaders

func (n *NatsMsg) RespondWithOriginalHeaders(data []byte, headers ...string) error

Send a response and copy given original headers, or all if nothing defined

func (*NatsMsg) WithContext

func (n *NatsMsg) WithContext(ctx context.Context) *NatsMsg

Set new message context

type NatsRouter

type NatsRouter struct {
	// contains filtered or unexported fields
}

NatsRouter is a middleware-supported NATS router that provides a fluent API for subscribing to subjects and chaining middleware functions.

func Connect added in v0.1.0

func Connect(url string, options ...RouterOption) (*NatsRouter, error)

func NewRouter deprecated

func NewRouter(nc *nats.Conn, options ...RouterOption) *NatsRouter

Create a new NatsRouter with a *nats.Conn and an optional list of RouterOptions functions. It sets the default RouterOptions to use a default ErrorConfig, and then iterates through each option function, calling it with the RouterOptions struct pointer to set any additional options.

Deprecated: Use Connect instead. This does not support properly draining publications and subscriptions.

func NewRouterWithAddress deprecated

func NewRouterWithAddress(addr string, options ...RouterOption) (*NatsRouter, error)

Creates a new NatsRouter with a string address and an optional list of RouterOptions functions. It connects to the NATS server using the provided address, and then calls NewRouter to create a new NatsRouter with the resulting *nats.Conn and optional RouterOptions. If there was an error connecting to the server, it returns nil and the error.

Deprecated: Use Connect instead. This does not support properly draining publications and subscriptions.

func (*NatsRouter) ChanQueueSubscribe added in v0.0.5

func (n *NatsRouter) ChanQueueSubscribe(subject, queue string, ch chan *NatsMsg) (*nats.Subscription, error)

Same as QueueSubscribe, except uses channels. Note that error handling is available only for middleware, since the message is processed first by middleware and then inserted into the *NatsMsg channel.

func (*NatsRouter) ChanSubscribe added in v0.0.5

func (n *NatsRouter) ChanSubscribe(subject string, ch chan *NatsMsg) (*nats.Subscription, error)

Same as Subscribe, except uses channels. Note that error handling is available only for middleware, since the message is processed first by middleware and then inserted into the *NatsMsg channel.

func (*NatsRouter) Close

func (n *NatsRouter) Close()

Close connection to NATS server

func (*NatsRouter) Conn

func (n *NatsRouter) Conn() *nats.Conn

Get current underlying NATS connection

func (*NatsRouter) Drain added in v0.1.0

func (n *NatsRouter) Drain()

Drain pubs/subs and close connection to NATS server

func (*NatsRouter) Publish

func (n *NatsRouter) Publish(subject string, data []byte) error

Publish is a passthrough function for the `nats` Publish function

func (*NatsRouter) Queue

func (n *NatsRouter) Queue(queue string) *Queue

Returns a new `Queue` object with the given queue name

func (*NatsRouter) QueueSubscribe

func (n *NatsRouter) QueueSubscribe(subject, queue string, handler NatsCtxHandler) (*nats.Subscription, error)

QueueSubscribe registers a handler function for the specified subject and queue group and returns a *nats.Subscription. The handler function is wrapped with any registered middleware functions in reverse order.

func (*NatsRouter) Subject

func (n *NatsRouter) Subject(subjects ...string) *Subject

Returns a new `Subject` object with the given subject name(s)

func (*NatsRouter) Subscribe

func (n *NatsRouter) Subscribe(subject string, handler NatsCtxHandler) (*nats.Subscription, error)

Subscribe registers a handler function for the specified subject and returns a *nats.Subscription. The handler function is wrapped with any registered middleware functions in reverse order.

func (*NatsRouter) Use

func (n *NatsRouter) Use(fns ...NatsMiddlewareFunc) *NatsRouter

Alias for `WithMiddleware`

func (*NatsRouter) Wiretap

func (n *NatsRouter) Wiretap() *Subject

Returns a new `Subject` object with the wildcard ALL_SUBJECT. This is also knows as a "wiretap" mode, listening on all requests.

func (*NatsRouter) WithMiddleware

func (n *NatsRouter) WithMiddleware(fns ...NatsMiddlewareFunc) *NatsRouter

Returns a new `NatsRouter` with additional middleware functions

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue group that can be subscribed to subjects

func (*Queue) ChanSubscribe added in v0.0.5

func (q *Queue) ChanSubscribe(subject string, ch chan *NatsMsg) (*nats.Subscription, error)

Same as Subscribe, with channel support

func (*Queue) Subject

func (q *Queue) Subject(subjects ...string) *Subject

Create a new `Subject` object that is part of this `Queue` group

func (*Queue) Subscribe

func (q *Queue) Subscribe(subject string, handler NatsCtxHandler) (*nats.Subscription, error)

Subscribe to a subject as a part of this queue group with the specified handler function

func (*Queue) Use

func (q *Queue) Use(fns ...NatsMiddlewareFunc) *Queue

Alias for `WithMiddleware`

func (*Queue) WithMiddleware

func (q *Queue) WithMiddleware(fns ...NatsMiddlewareFunc) *Queue

Returns a new `Queue` object with additional middleware functions

type RouterOption

type RouterOption func(*RouterOptions)

Defines a function type that will be used to define options for the router.

func WithErrorConfig

func WithErrorConfig(ec *ErrorConfig) RouterOption

Define error config in the router options.

func WithErrorConfigString

func WithErrorConfigString(tag, format string) RouterOption

Define error config as strings in the router options.

func WithNatsOption added in v0.3.1

func WithNatsOption(options ...nats.Option) RouterOption

Apply one or more nats.Option to the config before connecting

func WithNatsOptions added in v0.1.0

func WithNatsOptions(nopts nats.Options) RouterOption

Set nats.Options for the connection before connecting

func WithRequestIdTag

func WithRequestIdTag(tag string) RouterOption

Define new request id header tag

type RouterOptions

type RouterOptions struct {
	ErrorConfig  *ErrorConfig
	RequestIdTag string
	NatsOptions  nats.Options
}

Defines a struct for the router options, which currently contains error config, default request id tag (for error reporting) and optional list of NATS connection options.

func GetDefaultRouterOptions added in v0.1.0

func GetDefaultRouterOptions() RouterOptions

Get default RouterOptions

func (RouterOptions) Connect added in v0.1.0

func (r RouterOptions) Connect() (*NatsRouter, error)

Connect will attempt to connect to a NATS server with multiple options.

type Subject

type Subject struct {
	// contains filtered or unexported fields
}

This type defines a subject in NATS messaging.

func (*Subject) All

func (s *Subject) All() *Subject

This method returns a new subject that includes the wildcard ALL_SUBJECT.

func (*Subject) Any

func (s *Subject) Any() *Subject

This method returns a new subject that includes the wildcard ANY_SUBJECT.

func (*Subject) ChanSubscribe added in v0.0.5

func (s *Subject) ChanSubscribe(ch chan *NatsMsg) (*nats.Subscription, error)

Same as Subscribe, with channel support

func (*Subject) Queue

func (s *Subject) Queue(queue string) *Subject

This method returns a new subject with a queue group that is used to load balance messages across multiple subscribers.

func (*Subject) Subject

func (s *Subject) Subject(subjects ...string) *Subject

This method returns a new subject that includes the specified subject strings. It appends the new subject strings to the existing subjects slice.

func (*Subject) Subscribe

func (s *Subject) Subscribe(handler NatsCtxHandler) (*nats.Subscription, error)

This function subscribes a NATS context handler to a subject or a queue.

Directories

Path Synopsis
extra
middleware Module
middleware module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL