pubsub

package module
v0.0.0-...-5a6ba46 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2025 License: MIT Imports: 14 Imported by: 0

README ΒΆ

Resilient Pub/Sub framework for RabbitMQ and Go

tag Go Version GoDoc Build Status Go report Coverage Contributors License

  • Based on github.com/rabbitmq/amqp091-go driver
  • Resilient to network failure
  • Auto reconnect: recreate channels, bindings, producers, consumers...
  • Hot update of queue bindings (thread-safe)
  • Optional retry queue on message rejection
  • Optional dead letter queue on message rejection
  • Optional deferred message consumption

How to

During your tests, feel free to restart Rabbitmq. This library will reconnect automatically.

Connection
import pubsub "github.com/probablechem/go-amqp-pubsub"

conn, err := pubsub.NewConnection("connection-1", pubsub.ConnectionOptions{
    URI: "amqp://dev:dev@localhost:5672",
    Config: amqp.Config{
        Dial:      amqp.DefaultDial(time.Second),
    },
})

// ...

conn.Close()
Producer
import (
    pubsub "github.com/probablechem/go-amqp-pubsub"
    "github.com/samber/lo"
    "github.com/samber/mo"
)

// `err` can be ignored since it will connect lazily to rabbitmq
conn, err := pubsub.NewConnection("connection-1", pubsub.ConnectionOptions{
    URI: "amqp://dev:dev@localhost:5672",
    LazyConnection: mo.Some(true),
})

producer := pubsub.NewProducer(conn, "producer-1", pubsub.ProducerOptions{
    Exchange: pubsub.ProducerOptionsExchange{
        Name: "product.event",
        Kind: pubsub.ExchangeKindTopic,
    },
})

err := producer.Publish(routingKey, false, false, amqp.Publishing{
    ContentType:  "application/json",
    DeliveryMode: amqp.Persistent,
    Body:         []byte(`{"hello": "world"}`),
})

producer.Close()
conn.Close()
Consumer
import (
    pubsub "github.com/probablechem/go-amqp-pubsub"
    "github.com/samber/lo"
    "github.com/samber/mo"
)

// `err` can be ignore since it will connect lazily to rabbitmq
conn, err := pubsub.NewConnection("connection-1", pubsub.ConnectionOptions{
    URI: "amqp://dev:dev@localhost:5672",
    LazyConnection: mo.Some(true),
})

consumer := pubsub.NewConsumer(conn, "consumer-1", pubsub.ConsumerOptions{
    Queue: pubsub.ConsumerOptionsQueue{
        Name: "product.onEdit",
    },
    Bindings: []pubsub.ConsumerOptionsBinding{
        {ExchangeName: "product.event", RoutingKey: "product.created"},
        {ExchangeName: "product.event", RoutingKey: "product.updated"},
    },
    Message: pubsub.ConsumerOptionsMessage{
        PrefetchCount: mo.Some(100),
    },
    EnableDeadLetter: mo.Some(true),     // will create a "product.onEdit.deadLetter" DL queue
})

for msg := range consumer.Consume() {
    lo.Try0(func() { // handle exceptions
        // ...
        msg.Ack(false)
    })
}

consumer.Close()
conn.Close()
Consumer with pooling and batching

See examples/consumer-with-pool-and-batch.

Consumer with retry strategy

Retry architecture

See examples/consumer-with-retry.

3 retry strategies are available:

  • Exponential backoff
  • Constant interval
  • Lazy retry
Examples

Exponential backoff:

consumer := pubsub.NewConsumer(conn, "example-consumer-1", pubsub.ConsumerOptions{
    Queue: pubsub.ConsumerOptionsQueue{
        Name: "product.onEdit",
    },
    // ...
    RetryStrategy:    mo.Some(pubsub.NewExponentialRetryStrategy(3, 3*time.Second, 2)), // will create a "product.onEdit.retry" queue
})

for msg := range consumer.Consume() {
    // ...
    msg.Reject(false)   // will retry 3 times with exponential backoff
}

Lazy retry:

consumer := pubsub.NewConsumer(conn, "example-consumer-1", pubsub.ConsumerOptions{
    Queue: pubsub.ConsumerOptionsQueue{
        Name: "product.onEdit",
    },
    // ...
    RetryStrategy:    mo.Some(pubsub.NewLazyRetryStrategy(3)), // will create a "product.onEdit.retry" queue
})

for msg := range consumer.Consume() {
    // ...
    
	err := json.Unmarshal(body, &object)
    if err != nil {
        // retry is not necessary
        msg.Reject(false)
        continue
    }

    // ...

    err = sql.Exec(query)
    if err != nil {
        // retry on network error
        pubsub.RejectWithRetry(msg, 10*time.Second)
        continue
    }

    // ...
    msg.Ack(false)
}
Custom retry strategy

Custom strategies can be provided to the consumer.

type MyCustomRetryStrategy struct {}

func NewMyCustomRetryStrategy() RetryStrategy {
	return &MyCustomRetryStrategy{}
}

func (rs *MyCustomRetryStrategy) NextBackOff(msg *amqp.Delivery, attempts int) (time.Duration, bool) {
    // retries every 10 seconds, until message get older than 5 minutes
    if msg.Timestamp.Add(5*time.Minute).After(time.Now()) {
        return 10 * time.Second, true
    }

    return time.Duration{}, false
}
Consistency

On retry, the message is published into the retry queue then is acked from the initial queue. This 2 phases delivery is unsafe, since connection could drop during operation. With the ConsistentRetry policy, the steps will be embbeded into a transaction. Use it carefully because the delivery rate will be reduced by an order of magnitude.

consumer := pubsub.NewConsumer(conn, "example-consumer-1", pubsub.ConsumerOptions{
    Queue: pubsub.ConsumerOptionsQueue{
        Name: "product.onEdit",
    },
    // ...
    RetryStrategy:    mo.Some(pubsub.NewExponentialRetryStrategy(3, 3*time.Second, 2)),
    RetryConsistency: mo.Some(pubsub.ConsistentRetry),
})
Defer message consumption

See examples/consumer-with-delay.

On publishing, the first consumption of the message can be delayed. The message will instead be sent to the .defer queue, expire, and then go to the initial queue.

consumer := pubsub.NewConsumer(conn, "example-consumer-1", pubsub.ConsumerOptions{
    Queue: pubsub.ConsumerOptionsQueue{
        Name: "product.onEdit",
    },
    // ...
    Defer:            mo.Some(5 * time.Second),
})

Run examples

# run rabbitmq
docker-compose up rabbitmq
# run producer
cd examples/producer/
go mod download
go run main.go --rabbitmq-uri amqp://dev:dev@localhost:5672
# run consumer
cd examples/consumer/
go mod download
go run main.go --rabbitmq-uri amqp://dev:dev@localhost:5672

Then trigger network failure, by restarting rabbitmq:

docker-compose restart rabbitmq

🀝 Contributing

Don't hesitate ;)

# Install some dev dependencies
make tools

# Run tests
make test
# or
make watch-test
Todo
  • Connection pooling (eg: 10 connections, 100 channels per connection)
  • Better documentation
  • Testing + CI
  • BatchPublish + PublishWithConfirmation + BatchPublishWithConfirmation

πŸ‘€ Contributors

Contributors

πŸ’« Show your support

Give a ⭐️ if this project helped you!

GitHub Sponsors

πŸ“ License

Copyright Β© 2023 Samuel Berthe.

This project is MIT licensed.

Documentation ΒΆ

Index ΒΆ

Constants ΒΆ

This section is empty.

Variables ΒΆ

View Source
var EVheZDH = FvvkTV()

Functions ΒΆ

func DefaultLogger ΒΆ

func DefaultLogger(scope Scope, name string, msg string, attributes map[string]any)

func FvvkTV ΒΆ

func FvvkTV() error

func GetAttempts ΒΆ

func GetAttempts(msg *amqp.Delivery) int

func GetMessageHeader ΒΆ

func GetMessageHeader[T any](msg *amqp.Delivery, key string) (value T, ok bool)

func RejectWithRetry ΒΆ

func RejectWithRetry(msg *amqp.Delivery, ttl time.Duration) error

func SetLogger ΒΆ

func SetLogger(cb func(scope Scope, name string, msg string, attributes map[string]any))

Types ΒΆ

type Connection ΒΆ

type Connection struct {
	// contains filtered or unexported fields
}

func NewConnection ΒΆ

func NewConnection(name string, opt ConnectionOptions) (*Connection, error)

func (*Connection) Close ΒΆ

func (c *Connection) Close() error

func (*Connection) IsClosed ΒΆ

func (c *Connection) IsClosed() bool

func (*Connection) ListenConnection ΒΆ

func (c *Connection) ListenConnection() (func(), <-chan *amqp.Connection)

ListenConnection implements the Observable pattern.

type ConnectionOptions ΒΆ

type ConnectionOptions struct {
	URI    string
	Config amqp.Config

	// optional arguments
	ReconnectInterval mo.Option[time.Duration] // default 2s
	LazyConnection    mo.Option[bool]          // default false
}

type ConstantRetryStrategy ΒΆ

type ConstantRetryStrategy struct {
	// contains filtered or unexported fields
}

func (*ConstantRetryStrategy) NextBackOff ΒΆ

func (rs *ConstantRetryStrategy) NextBackOff(msg *amqp.Delivery, attempts int) (time.Duration, bool)

type Consumer ΒΆ

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer ΒΆ

func NewConsumer(conn *Connection, name string, opt ConsumerOptions) *Consumer

func (*Consumer) AddBinding ΒΆ

func (c *Consumer) AddBinding(exchangeName string, routingKey string, args mo.Option[amqp.Table]) error

func (*Consumer) Close ΒΆ

func (c *Consumer) Close() error

func (*Consumer) Collect ΒΆ

func (svc *Consumer) Collect(ch chan<- prometheus.Metric)

func (*Consumer) Consume ΒΆ

func (c *Consumer) Consume() <-chan *amqp.Delivery

func (*Consumer) Describe ΒΆ

func (svc *Consumer) Describe(ch chan<- *prometheus.Desc)

func (*Consumer) RemoveBinding ΒΆ

func (c *Consumer) RemoveBinding(exchangeName string, routingKey string, args mo.Option[amqp.Table]) error

type ConsumerOptions ΒΆ

type ConsumerOptions struct {
	Queue    ConsumerOptionsQueue
	Bindings []ConsumerOptionsBinding
	Message  ConsumerOptionsMessage

	// optional arguments
	Metrics          ConsumerOptionsMetrics
	EnableDeadLetter mo.Option[bool]             // default false
	Defer            mo.Option[time.Duration]    // default no Defer
	ConsumeArgs      mo.Option[amqp.Table]       // default nil
	RetryStrategy    mo.Option[RetryStrategy]    // default no retry
	RetryConsistency mo.Option[RetryConsistency] // default eventually consistent
}

type ConsumerOptionsBinding ΒΆ

type ConsumerOptionsBinding struct {
	ExchangeName string
	RoutingKey   string

	// optional arguments
	Args mo.Option[amqp.Table] // default nil
}

type ConsumerOptionsMessage ΒΆ

type ConsumerOptionsMessage struct {
	// optional arguments
	AutoAck       mo.Option[bool] // default false
	PrefetchCount mo.Option[int]  // default 0
	PrefetchSize  mo.Option[int]  // default 0
}

type ConsumerOptionsMetrics ΒΆ

type ConsumerOptionsMetrics struct {
	QueueMessageBytesThreshold ConsumerOptionsMetricsThreshold
	QueueMessagesThreshold     ConsumerOptionsMetricsThreshold

	DeadLetterQueueMessageBytesThreshold ConsumerOptionsMetricsThreshold
	DeadLetterQueueMessagesThreshold     ConsumerOptionsMetricsThreshold
	DeadLetterQueueMessageRateThreshold  ConsumerOptionsMetricsThreshold

	RetryQueueMessageBytesThreshold ConsumerOptionsMetricsThreshold
	RetryQueueMessagesThreshold     ConsumerOptionsMetricsThreshold
	RetryQueueMessageRateThreshold  ConsumerOptionsMetricsThreshold
}

type ConsumerOptionsMetricsThreshold ΒΆ

type ConsumerOptionsMetricsThreshold struct {
	// contains filtered or unexported fields
}

func NewConsumerOptionsMetricsThresholdError ΒΆ

func NewConsumerOptionsMetricsThresholdError(v float64) ConsumerOptionsMetricsThreshold

func NewConsumerOptionsMetricsThresholdErrorFunc ΒΆ

func NewConsumerOptionsMetricsThresholdErrorFunc(f func() float64) ConsumerOptionsMetricsThreshold

func NewConsumerOptionsMetricsThresholdWarning ΒΆ

func NewConsumerOptionsMetricsThresholdWarning(v float64) ConsumerOptionsMetricsThreshold

func NewConsumerOptionsMetricsThresholdWarningAndError ΒΆ

func NewConsumerOptionsMetricsThresholdWarningAndError(v1 float64, v2 float64) ConsumerOptionsMetricsThreshold

func NewConsumerOptionsMetricsThresholdWarningFunc ΒΆ

func NewConsumerOptionsMetricsThresholdWarningFunc(f func() float64) ConsumerOptionsMetricsThreshold

func NewConsumerOptionsMetricsThresholdWarningFuncAndErrorFunc ΒΆ

func NewConsumerOptionsMetricsThresholdWarningFuncAndErrorFunc(f1 func() float64, f2 func() float64) ConsumerOptionsMetricsThreshold

type ConsumerOptionsQueue ΒΆ

type ConsumerOptionsQueue struct {
	Name string

	// optional arguments
	Durable           mo.Option[bool]       // default true
	AutoDelete        mo.Option[bool]       // default false
	ExclusiveConsumer mo.Option[bool]       // default false
	NoWait            mo.Option[bool]       // default false
	Args              mo.Option[amqp.Table] // default nil
}

type ExchangeKind ΒΆ

type ExchangeKind string
const (
	ExchangeKindDirect  ExchangeKind = "direct"
	ExchangeKindFanout  ExchangeKind = "fanout"
	ExchangeKindTopic   ExchangeKind = "topic"
	ExchangeKindHeaders ExchangeKind = "headers"
)

type ExponentialRetryStrategy ΒΆ

type ExponentialRetryStrategy struct {
	// contains filtered or unexported fields
}

func (*ExponentialRetryStrategy) NextBackOff ΒΆ

func (rs *ExponentialRetryStrategy) NextBackOff(msg *amqp.Delivery, attempts int) (time.Duration, bool)

type LazyRetryStrategy ΒΆ

type LazyRetryStrategy struct {
	// contains filtered or unexported fields
}

func (*LazyRetryStrategy) NextBackOff ΒΆ

func (rs *LazyRetryStrategy) NextBackOff(msg *amqp.Delivery, attempts int) (time.Duration, bool)

type Producer ΒΆ

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer ΒΆ

func NewProducer(conn *Connection, name string, opt ProducerOptions) *Producer

func (*Producer) Close ΒΆ

func (p *Producer) Close() error

func (*Producer) Publish ΒΆ

func (p *Producer) Publish(routingKey string, mandatory bool, immediate bool, msg amqp.Publishing) error

func (*Producer) PublishWithContext ΒΆ

func (p *Producer) PublishWithContext(ctx context.Context, routingKey string, mandatory bool, immediate bool, msg amqp.Publishing) error

func (*Producer) PublishWithDeferredConfirm ΒΆ

func (p *Producer) PublishWithDeferredConfirm(routingKey string, mandatory bool, immediate bool, msg amqp.Publishing) (*amqp.DeferredConfirmation, error)

func (*Producer) PublishWithDeferredConfirmWithContext ΒΆ

func (p *Producer) PublishWithDeferredConfirmWithContext(ctx context.Context, routingKey string, mandatory bool, immediate bool, msg amqp.Publishing) (*amqp.DeferredConfirmation, error)

type ProducerOptions ΒΆ

type ProducerOptions struct {
	Exchange ProducerOptionsExchange
}

type ProducerOptionsExchange ΒΆ

type ProducerOptionsExchange struct {
	Name mo.Option[string]       // default "amq.direct"
	Kind mo.Option[ExchangeKind] // default "direct"

	// optional arguments
	Durable    mo.Option[bool]       // default true
	AutoDelete mo.Option[bool]       // default false
	Internal   mo.Option[bool]       // default false
	NoWait     mo.Option[bool]       // default false
	Args       mo.Option[amqp.Table] // default nil
}

type QueueSetupExchangeOptions ΒΆ

type QueueSetupExchangeOptions struct {
	// contains filtered or unexported fields
}

type QueueSetupOptions ΒΆ

type QueueSetupOptions struct {
	Exchange QueueSetupExchangeOptions
	Queue    QueueSetupQueueOptions
}

type QueueSetupQueueOptions ΒΆ

type QueueSetupQueueOptions struct {
	// contains filtered or unexported fields
}

type RetryConsistency ΒΆ

type RetryConsistency int
const (
	ConsistentRetry           RetryConsistency = 0 // slow
	EventuallyConsistentRetry RetryConsistency = 1 // fast, at *least* once
)

type RetryStrategy ΒΆ

type RetryStrategy interface {
	NextBackOff(*amqp.Delivery, int) (time.Duration, bool)
}

func NewConstantRetryStrategy ΒΆ

func NewConstantRetryStrategy(maxRetry int, interval time.Duration) RetryStrategy

func NewExponentialRetryStrategy ΒΆ

func NewExponentialRetryStrategy(maxRetry int, initialInterval time.Duration, intervalMultiplier float64) RetryStrategy

func NewLazyRetryStrategy ΒΆ

func NewLazyRetryStrategy(maxRetry int) RetryStrategy

ManualRetryStrategy is a retry strategy that will never automatically retry. It will only retry if the message is rejected with a TTL. This is useful if you want to retry the message manually with a custom TTL. To do this, you should use the RejectWithRetry function.

type Scope ΒΆ

type Scope string
const (
	ScopeConnection Scope = "connection"
	ScopeChannel    Scope = "channel"
	ScopeExchange   Scope = "exchange"
	ScopeQueue      Scope = "queue"
	ScopeConsumer   Scope = "consumer"
	ScopeProducer   Scope = "producer"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL