tackle

package module
v0.0.0-...-fb4f71d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2025 License: Apache-2.0 Imports: 8 Imported by: 5

README

Tackle

An opinionated RabbitMQ message processing and publishing library, ideal for async communicaton between microservices.

Installation

go get github.com/renderedtext/go-tackle

Publishing messages to a RabbitMQ exchange (simple)

To publish a message to a RabbitMQ exchange, use the tackle.PublishMessage function. For example, if you are writting a user managment service and want to publish that a user was created, use the following snippet.

package main

import (
  tackle "github.com/renderedtext/go-tackle"
)

func main() {
  publishParams := tackle.PublishParams{
    Body:       []byte(`{"user_id": "123"}`),
    RoutingKey: "user-created",
    Exchange:   "user-exchange",
    AmqpURL:    "guest@localhost:5467",
  }

  err := tackle.PublishMessage(&publishParams)
  if err != nil {
    log.Info("something went wrong while publishing %v", err)
  }
}

Publishing messages to a RabbitMQ exchange (advanced)

In the simple publishing mechanism, tackle will open and close a connection every time it sends a message. This is fine for sending one or two messages, however, if you plan to publish large batches of messages, it will be more efficient to create a dedicated publisher that keeps the connection open for a longer duration.

tackle.NewPublisher creates a publisher that will lazily create the connection, re-connecting if the current connection is closed for some reason.

package main

import (
  tackle "github.com/renderedtext/go-tackle"
)

func main() {
  publisher := tackle.NewPublisher("guest@localhost:5467")
  defer publisher.Close()

  publishParams := tackle.PublishParams{
    Body:       []byte(`{"user_id": "123"}`),
    RoutingKey: "user-created",
    Exchange:   "user-exchange",
  }

  err := publisher.PublishWithContext(context.Background(), &publishParams)
  if err != nil {
    log.Info("something went wrong while publishing %v", err)
  }
}

Consuming messages from RabbitMQ

To consume messages from rabbit mq, you need to set up a consumer. Here is an example consumer for the above example messages:

package main

import (
  tackle "github.com/renderedtext/go-tackle"
)

func main() {
  consumer := tackle.NewConsumer()

  options := tackle.Options{
    URL:            "amqp://guest:guest@rabbitmq:5672",
    RemoteExchange: "user-exchange",
    Service:        "user-persister",
    RoutingKey:     "user-created",
  }

  consumer.Start(&options, func(delivery Delivery) {
    fmt.Printf("Received message from the consumer: %s", delivery.Body())

    delivery.Ack()
  })
}

Let's break down what happens here: We are connecting to the remote exchange "user-exchange" and consuming those messages in our local "user-persister" queue.

Visually this looks like this:

+--------------------+
| Publishing Service |
+--------------------+
       | 
       | Publish 
       | "user-created"
       |           
   +---|---RabbitMQServer-------------------------------------------------------+
   |   v                                                                        |
   | +---------------+                       * defined by publishing service *  |
   | | user-exchange |                                                          |
   | +---------------+                                                          |
   |   |                                                                        |
   |   | key = user-created                                                     |
   |   |                                                                        |
   |---|------------------------------------------------------------------------|
   |   |                                                                        |
   |   |                                     * defined by subscriber service *  |
   |   v                                                                        |
   |  +-------------------------+                                               |
   |  | user-persister-exchange | <-+                                           |
   |  +------*------------------+   |                                           |
   |         |                      | after N secs                              |
   |         v                      |                                           |
   |  +----------------+   +----------------------+    +--------------------+   |
   |  | user-persister |   | user-persister-delay |    | user-perister-dead |   |
   |  +------*---------+   +----------------------+    +--------------------+   |
   |         |                               ^                 ^                |
   +---------|-------------------------------|-----------------|----------------+
             |                               |                 |
             v                               |                 |
       +-------------------+ ----(on err)----+                 |
       | Consuming Service |                                   |
       +-------------------+ ------------------- (after N err)-+

License

This software is licensed under the Apache 2.0 license.

Documentation

Index

Constants

View Source
const (
	StateListening    = "listening"
	StateNotListening = "not-listening"

	ReconnectionAttempts      = 50
	ReconnectionDelayDuration = 2 * time.Second

	PrefetchCount = 1
	PrefetchSize  = 0
	Global        = true

	ConsumerName = "tackle-consumer"
	Durable      = true
	Exclusive    = false
	AutoAck      = false
	AutoDeleted  = false
	Internal     = false
	NoWait       = false
	NoLocal      = false
)
View Source
const (
	DefaultRetryLimit = 10
	DefaultRetryDelay = 10
)
View Source
const (
	DeadLetterTimeout = 604_800_000 // 1 week
)

Variables

This section is empty.

Functions

func ConfigureExchanges

func ConfigureExchanges(channel *rabbit.Channel, options *Options) error

func ConfigureQueues

func ConfigureQueues(channel *rabbit.Channel, options *Options) error

func PublishMessage

func PublishMessage(params *PublishParams) error

Types

type Consumer

type Consumer struct {
	State string
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer() *Consumer

func (*Consumer) SetLogger

func (c *Consumer) SetLogger(l Logger)

func (*Consumer) Start

func (c *Consumer) Start(options *Options, f ProcessorFunc) error

func (*Consumer) Stop

func (c *Consumer) Stop()

type Delivery

type Delivery interface {
	Ack() error
	Nack(requeue bool) error
	Body() []byte
}

func NewDelivery

func NewDelivery(d *rabbit.Delivery) Delivery

func NewFakeDelivery

func NewFakeDelivery(body []byte) Delivery

type Logger

type Logger interface {
	Infof(string, ...interface{})
	Errorf(string, ...interface{})
}

type Options

type Options struct {
	URL            string
	ConnectionName string
	RemoteExchange string
	Service        string
	RoutingKey     string
	RetryDelay     int32
	RetryLimit     int32
	OnDeadFunc     func(Delivery)
}

func (*Options) GetConnectionName

func (o *Options) GetConnectionName() string

func (*Options) GetDeadQueueName

func (o *Options) GetDeadQueueName() string

func (*Options) GetDelayQueueName

func (o *Options) GetDelayQueueName() string

func (*Options) GetQueueName

func (o *Options) GetQueueName() string

func (*Options) GetRetryDelay

func (o *Options) GetRetryDelay() int32

func (*Options) GetRetryLimit

func (o *Options) GetRetryLimit() int32

func (*Options) GetServiceExchangeName

func (o *Options) GetServiceExchangeName() string

type ProcessorFunc

type ProcessorFunc func(Delivery) error

type Publish

type Publish interface {
	Publish(*PublishParams) error
}

type PublishParams

type PublishParams struct {
	Body    []byte
	Headers rabbit.Table

	AmqpURL    string
	RoutingKey string
	Exchange   string

	IsMandatory bool
	IsImmediate bool
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func NewPublisher

func NewPublisher(amqpURL string, options PublisherOptions) (*Publisher, error)

func (*Publisher) Close

func (p *Publisher) Close()

func (*Publisher) ExchangeDeclare

func (p *Publisher) ExchangeDeclare(exchange string) error

func (*Publisher) Publish

func (p *Publisher) Publish(params *PublishParams) error

func (*Publisher) PublishWithContext

func (p *Publisher) PublishWithContext(ctx context.Context, params *PublishParams) error

func (*Publisher) SetConnectionName

func (p *Publisher) SetConnectionName(connName string)

func (*Publisher) SetLogger

func (p *Publisher) SetLogger(l Logger)

type PublisherOptions

type PublisherOptions struct {
	ConnectionName    string
	ConnectionTimeout time.Duration
	ConnectFunc       func() (*rabbit.Connection, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL