carrot

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2020 License: MIT Imports: 10 Imported by: 0

README

go-carrot

Declarative API for AMQP consumers in Go.


(It's supposed to be a cute logo...)
tl;dr features list
  • Topology declaration API
  • Consumer listener API
  • Message handler API
  • Consumers router
    • Middleware support
    • Common middlewares implementation
  • Graceful shutdown
  • Automatic reconnection

Description

Carrot exposes a nice API for dealing with AMQP connections, such as declaring topologies (exchanges, queues, ...) and declaring consumers on one or more queues.

Check out the examples for more information.

Architecture

Carrot uses three main components for its API:

  1. Topology declarator, to declare the AMQP topology from the application that uses it
  2. Message handlers, to define functions able to handle incoming messages from consumers
  3. Consumer listeners, to receive messages from one or more queues
Topology declarator

Carrot allows to define a topology by exposing an expressive API backed by topology.Declarer interface.

The current supported topologies are:

Topology declaration is optional, and can be controlled with carrot.WithTopology:

carrot.WithTopology(topology.All(
    exchange.Declare("messages"),
    queue.Declare("consumer.message.received",
        queue.BindTo("messages", "message.published"),
    ),
    queue.Declare("consumer.message.deleted",
        queue.BindTo("messages", "message.deleted"),
    ),
)),

When specified, Carrot will open a dedicated AMQP channel to declare the topology, before listening to messages.

Carrot can also be used exclusively for topology declaration:

conn, err := amqp.Dial("amqp://guest:guest@rabbit:5672")
if err != nil {
    panic(err)
}

// The carrot.Closer handle returned is useless if only topology declaration
// is used.
_, err := carrot.Run(conn,
    // Declare your topology here
    carrot.WithTopology(topology.All(
        // topology.All is used to declare more than one topology
        // in a single transaction.
        exchange.Declare("messages"),
        queue.Declare("consumer.message.received",
            queue.BindTo("messages", "message.published"),
        ),
        queue.Declare("consumer.message.deleted",
            queue.BindTo("messages", "message.deleted"),
        ),
    )),
)
Message handlers

Carrot defines an interface for handling incoming messages (amqp.Delivery) in handler.Handler interface:

type Handler interface {
    Handle(context.Context, amqp.Delivery) error
}

Message handlers are fallible, so they can return an error.

Error handling can be specified at Consumer Listeners level.

You can specify a message handler for all incoming messages by using carrot.WithHandler:

carrot.WithHandler(handler.Func(func(context.Context, amqp.Delivery) error {
    // Handle messages here!
    return nil
}))
Router

Carrot also exposes a Router interface and implementation to support:

  • Multiple listeners with their own message handlers
  • Middleware support

An example of how a Router setup might look like:

// Router implements the handler.Handler interface.
router.New().Group(func(r router.Router) {
    // This is how you set middlewares.
    r.Use(LogMessages(logger))
    r.Use(middleware.Timeout(50 * time.Millisecond))
    r.Use(SimulateWork(100*time.Millisecond, logger))

    // This is how you bind an handler function to a specific queue.
    // In order for it to work, you must register these queues
    // in the listener.
    r.Bind("consumer.message.received", handler.Func(Acknowledger))
    r.Bind("consumer.message.deleted", handler.Func(Acknowledger))

    // You can also specify additional middlewares only for one queue:
    r.With(AuthenticateUser).
        Bind("consumer.message.created", handler.Func(Acknowledger))
})
Consumer listeners

As the name says, Listeners listens for incoming messages on a specific queue.

Carrot defines a listener.Listener interface to represent these components:

type Listener interface {
    Listen(Connection, Channel, handler.Handler) (Closer, error)
}

so that the listener can:

  • Start listening to incoming amqp.Delivery from a Channel
  • Serving these messages using the provided handler.Handler
  • Hand out a Closer handler to close the listener/server goroutine and/or wait for its closing

An example of how to define Listeners:

// WithListener specifies the listener.Listener to start.
carrot.WithListener(listener.Sink(
    // listener.Sink allows to listen to messages coming from one or more consumers,
    // and pilots closing the child listeners.
    consumer.Listen("consumer.message.deleted"),
    listener.UseDedicatedChannel(
        // By default, carrot uses a single amqp.Channel to establish
        // consumer listeners. But we can tell carrot to use a dedicated
        // amqp.Channel for certain consumers.
        consumer.Listen("consumer.message.received"),
    ),
))

Full example

Let's put all the pieces together now!

conn, err := amqp.Dial("amqp://guest:guest@rabbit:5672")
if err != nil {
    panic(err)
}

closer, err := carrot.Run(conn,
    // First, declare your topology...
    carrot.WithTopology(topology.All(
        exchange.Declare("messages"),
        queue.Declare("consumer.message.received",
            queue.BindTo("messages", "message.published"),
        ),
        queue.Declare("consumer.message.deleted",
            queue.BindTo("messages", "message.deleted"),
        ),
    )),
    // Second, declare the consumers to receive messages from...
    carrot.WithListener(listener.Sink(
        consumer.Listen("consumer.message.deleted"),
        listener.UseDedicatedChannel(
            consumer.Listen("consumer.message.received"),
        ),
    ))
    // Lastly, specify an handler function that will receive the messages
    // coming from the specified consumers.
    carrot.WithHandler(router.New().Group(func(r router.Router) {
        r.Use(LogMessages(logger))
        r.Use(middleware.Timeout(50 * time.Millisecond))
        r.Use(SimulateWork(100*time.Millisecond, logger))

        r.Bind("consumer.message.received", handler.Func(Acknowledger))
        r.Bind("consumer.message.deleted", handler.Func(Acknowledger))
    })),
)

if err != nil {
    panic(err)
}

// Wait on the main goroutine until the consumer has exited:
err := <-closer.Closed()
log.Fatalf("Consumers closed (error %s)", err)

License

This project is licensed under the MIT license.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in go-carrot by you, shall be licensed as MIT, without any additional terms or conditions.

Documentation

Overview

Package carrot provides a nice API for dealing with AMQP connections.

Carrot is split in 3 separate components: topology declarator, incoming messages listener and messages handler.

Index

Examples

Constants

This section is empty.

Variables

View Source
var DefaultShutdownOptions = Shutdown{
	Timeout: 1 * time.Minute,
	Signals: []os.Signal{os.Interrupt},
	OnError: func(err error) {
		panic(err)
	},
}

DefaultShutdownOptions are the default Shutdown options used in case no overriding options are specified in WithGracefulShutdown.

View Source
var ErrNoConnection = errors.New("carrot: no connection provided")

ErrNoConnection is returned by Runner.Run when no valid AMQP connection has been specified.

View Source
var ErrNoHandler = errors.New("carrot: no handler specified")

ErrNoHandler is returned by Runner.Run when no handler has been specified, so that Runner.Run can't handle any incoming messages.

View Source
var ErrNoListener = errors.New("carrot: no listener specified")

ErrNoListener is returned by Runner.Run when no delivery listener has been specified, so that Runner.Run can't receive any messages from the AMQP broker.

Functions

This section is empty.

Types

type Closer

type Closer struct {
	// contains filtered or unexported fields
}

Closer allows to close the amqp.Connection provided and any active Listener after Runner.Run has called.

func Run added in v0.1.1

func Run(conn listener.Connection, options ...Option) (Closer, error)

Run is a convenience method to run a new Runner instance directly.

It is the equivalent of carrot.From(...).Run()

func (Closer) Close

func (closer Closer) Close(ctx context.Context) error

Close closes both the amqp.Connection provided and the Listener declared in the Runner.

func (Closer) Closed

func (closer Closer) Closed() <-chan error

Closed returns a channel that gets closed when the Listener gets closed.

Useful to wait for consumers completion.

type Option

type Option func(*Runner)

Option represents an additional argument for the Runner factory method.

func WithGracefulShutdown

func WithGracefulShutdown(options *Shutdown) Option

WithGracefulShutdown enables graceful shutdown after certain signals are received by the process.

Use WithGracefulShutdown(nil) to use the default options, which can be found in DefaultShutdownOptions.

func WithHandler

func WithHandler(handler handler.Handler) Option

WithHandler specifies the component in charge of handling incoming messages for the new Runner instance.

func WithListener

func WithListener(listener listener.Listener) Option

WithListener specifies the component in charge of start listening messages coming from the AMQP broker.

func WithTopology

func WithTopology(declarer topology.Declarer) Option

WithTopology adds a topology declaration step to the new Runner instance.

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner instruments all the different parts of the go-carrot library, provided with a valid AMQP connection.

func From

func From(conn listener.Connection, options ...Option) Runner

From creates a new Runner instance, given an AMQP connection and options.

Required options are WithListener, to bind a channel to an amqp.Delivery sink and start receiving messages, and WithHandler, to handle all the incoming messages.

Example
From(nil,
	WithTopology(topology.All(
		exchange.Declare("orders",
			exchange.Kind(kind.Topic),
			exchange.Durable,
		),
		exchange.Declare("orders-internal",
			exchange.Kind(kind.Topic),
			exchange.Durable,
		),
		queue.Declare(
			"my-service.order.invalidate",
			queue.BindTo("orders", "*.order.changed"),
			queue.Durable,
			queue.DeadLetterWithQueue(
				"orders", "my-service.order.invalidate.dead",
				queue.Declare(
					"my-service.order.invalidate.failed",
					queue.Durable,
					queue.NoWait,
				),
			),
		),
		queue.Declare(
			"my-service.order.finalized",
			queue.BindTo("orders", "*.order.finalized"),
			queue.Durable,
			queue.DeadLetterWithQueue(
				"orders", "my-service.order.finalized.dead",
				queue.Declare("my-service.order.finalized.failed"),
			),
		),
	)),
	WithListener(consumer.Listen(
		"my-service.order.invalidate",
		consumer.Title("Invalidate Order"),
	)),
	WithHandler(router.New().Group(func(r router.Router) {
		r.Use(middleware.SessionPerRequest(nil))

		r.Bind("my-service.order.invalidate", handler.Func(func(context.Context, amqp.Delivery) error {
			return nil
		}))
	})),
).Run()
Output:

func (Runner) Run

func (runner Runner) Run() (Closer, error)

Run starts all the different parts of the Runner instrumentator, in the following order: topology declaration, delivery listener and messages listener.

Message listener uses the sink channel coming from the delivery listener, and spawns a separate worker goroutine to run the message handler specified during configuration with the new amqp.Delivery received.

An error is returned if the supplied parameters during configuration are not valid, or if something happened on the AMQP connection.

type Shutdown

type Shutdown struct {
	Timeout time.Duration
	Signals []os.Signal
	OnError func(err error)
}

Shutdown contains all the different options used by the graceful shutdown component.

Default values can be found in DefaultShutdownOptions.

Directories

Path Synopsis
examples
Package handler contains the reference interface for a message handler.
Package handler contains the reference interface for a message handler.
router
Package router contains a Router interface and Mux, message handler multiplexer, to allow for multiple message handlers to be assigned to multiple queues.
Package router contains a Router interface and Mux, message handler multiplexer, to allow for multiple message handlers to be assigned to multiple queues.
Package topology contains the reference interface for declaring the topology used by the application through an AMQP connection.
Package topology contains the reference interface for declaring the topology used by the application through an AMQP connection.
exchange
Package exchange adds a topology.Declarer interface able to describe AMQP exchanges.
Package exchange adds a topology.Declarer interface able to describe AMQP exchanges.
exchange/kind
Package kind contains all supported AMQP exchange kinds.
Package kind contains all supported AMQP exchange kinds.
queue
Package queue adds a topology.Declarer interface able to describe AMQP queues.
Package queue adds a topology.Declarer interface able to describe AMQP queues.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL