pubsub

package module
v0.0.0-...-0c54d11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2025 License: MIT Imports: 14 Imported by: 0

README ΒΆ

Resilient Pub/Sub framework for RabbitMQ and Go

tag Go Version GoDoc Build Status Go report Coverage Contributors License

  • Based on github.com/rabbitmq/amqp091-go driver
  • Resilient to network failure
  • Auto reconnect: recreate channels, bindings, producers, consumers...
  • Hot update of queue bindings (thread-safe)
  • Optional retry queue on message rejection
  • Optional dead letter queue on message rejection
  • Optional deferred message consumption

How to

During your tests, feel free to restart Rabbitmq. This library will reconnect automatically.

Connection
import pubsub "github.com/woozyblack/go-amqp-pubsub"

conn, err := pubsub.NewConnection("connection-1", pubsub.ConnectionOptions{
    URI: "amqp://dev:dev@localhost:5672",
    Config: amqp.Config{
        Dial:      amqp.DefaultDial(time.Second),
    },
})

// ...

conn.Close()
Producer
import (
    pubsub "github.com/woozyblack/go-amqp-pubsub"
    "github.com/samber/lo"
    "github.com/samber/mo"
)

// `err` can be ignored since it will connect lazily to rabbitmq
conn, err := pubsub.NewConnection("connection-1", pubsub.ConnectionOptions{
    URI: "amqp://dev:dev@localhost:5672",
    LazyConnection: mo.Some(true),
})

producer := pubsub.NewProducer(conn, "producer-1", pubsub.ProducerOptions{
    Exchange: pubsub.ProducerOptionsExchange{
        Name: "product.event",
        Kind: pubsub.ExchangeKindTopic,
    },
})

err := producer.Publish(routingKey, false, false, amqp.Publishing{
    ContentType:  "application/json",
    DeliveryMode: amqp.Persistent,
    Body:         []byte(`{"hello": "world"}`),
})

producer.Close()
conn.Close()
Consumer
import (
    pubsub "github.com/woozyblack/go-amqp-pubsub"
    "github.com/samber/lo"
    "github.com/samber/mo"
)

// `err` can be ignore since it will connect lazily to rabbitmq
conn, err := pubsub.NewConnection("connection-1", pubsub.ConnectionOptions{
    URI: "amqp://dev:dev@localhost:5672",
    LazyConnection: mo.Some(true),
})

consumer := pubsub.NewConsumer(conn, "consumer-1", pubsub.ConsumerOptions{
    Queue: pubsub.ConsumerOptionsQueue{
        Name: "product.onEdit",
    },
    Bindings: []pubsub.ConsumerOptionsBinding{
        {ExchangeName: "product.event", RoutingKey: "product.created"},
        {ExchangeName: "product.event", RoutingKey: "product.updated"},
    },
    Message: pubsub.ConsumerOptionsMessage{
        PrefetchCount: mo.Some(100),
    },
    EnableDeadLetter: mo.Some(true),     // will create a "product.onEdit.deadLetter" DL queue
})

for msg := range consumer.Consume() {
    lo.Try0(func() { // handle exceptions
        // ...
        msg.Ack(false)
    })
}

consumer.Close()
conn.Close()
Consumer with pooling and batching

See examples/consumer-with-pool-and-batch.

Consumer with retry strategy

Retry architecture

See examples/consumer-with-retry.

3 retry strategies are available:

  • Exponential backoff
  • Constant interval
  • Lazy retry
Examples

Exponential backoff:

consumer := pubsub.NewConsumer(conn, "example-consumer-1", pubsub.ConsumerOptions{
    Queue: pubsub.ConsumerOptionsQueue{
        Name: "product.onEdit",
    },
    // ...
    RetryStrategy:    mo.Some(pubsub.NewExponentialRetryStrategy(3, 3*time.Second, 2)), // will create a "product.onEdit.retry" queue
})

for msg := range consumer.Consume() {
    // ...
    msg.Reject(false)   // will retry 3 times with exponential backoff
}

Lazy retry:

consumer := pubsub.NewConsumer(conn, "example-consumer-1", pubsub.ConsumerOptions{
    Queue: pubsub.ConsumerOptionsQueue{
        Name: "product.onEdit",
    },
    // ...
    RetryStrategy:    mo.Some(pubsub.NewLazyRetryStrategy(3)), // will create a "product.onEdit.retry" queue
})

for msg := range consumer.Consume() {
    // ...
    
	err := json.Unmarshal(body, &object)
    if err != nil {
        // retry is not necessary
        msg.Reject(false)
        continue
    }

    // ...

    err = sql.Exec(query)
    if err != nil {
        // retry on network error
        pubsub.RejectWithRetry(msg, 10*time.Second)
        continue
    }

    // ...
    msg.Ack(false)
}
Custom retry strategy

Custom strategies can be provided to the consumer.

type MyCustomRetryStrategy struct {}

func NewMyCustomRetryStrategy() RetryStrategy {
	return &MyCustomRetryStrategy{}
}

func (rs *MyCustomRetryStrategy) NextBackOff(msg *amqp.Delivery, attempts int) (time.Duration, bool) {
    // retries every 10 seconds, until message get older than 5 minutes
    if msg.Timestamp.Add(5*time.Minute).After(time.Now()) {
        return 10 * time.Second, true
    }

    return time.Duration{}, false
}
Consistency

On retry, the message is published into the retry queue then is acked from the initial queue. This 2 phases delivery is unsafe, since connection could drop during operation. With the ConsistentRetry policy, the steps will be embbeded into a transaction. Use it carefully because the delivery rate will be reduced by an order of magnitude.

consumer := pubsub.NewConsumer(conn, "example-consumer-1", pubsub.ConsumerOptions{
    Queue: pubsub.ConsumerOptionsQueue{
        Name: "product.onEdit",
    },
    // ...
    RetryStrategy:    mo.Some(pubsub.NewExponentialRetryStrategy(3, 3*time.Second, 2)),
    RetryConsistency: mo.Some(pubsub.ConsistentRetry),
})
Defer message consumption

See examples/consumer-with-delay.

On publishing, the first consumption of the message can be delayed. The message will instead be sent to the .defer queue, expire, and then go to the initial queue.

consumer := pubsub.NewConsumer(conn, "example-consumer-1", pubsub.ConsumerOptions{
    Queue: pubsub.ConsumerOptionsQueue{
        Name: "product.onEdit",
    },
    // ...
    Defer:            mo.Some(5 * time.Second),
})

Run examples

# run rabbitmq
docker-compose up rabbitmq
# run producer
cd examples/producer/
go mod download
go run main.go --rabbitmq-uri amqp://dev:dev@localhost:5672
# run consumer
cd examples/consumer/
go mod download
go run main.go --rabbitmq-uri amqp://dev:dev@localhost:5672

Then trigger network failure, by restarting rabbitmq:

docker-compose restart rabbitmq

🀝 Contributing

Don't hesitate ;)

# Install some dev dependencies
make tools

# Run tests
make test
# or
make watch-test
Todo
  • Connection pooling (eg: 10 connections, 100 channels per connection)
  • Better documentation
  • Testing + CI
  • BatchPublish + PublishWithConfirmation + BatchPublishWithConfirmation

πŸ‘€ Contributors

Contributors

πŸ’« Show your support

Give a ⭐️ if this project helped you!

GitHub Sponsors

πŸ“ License

Copyright Β© 2023 Samuel Berthe.

This project is MIT licensed.

Documentation ΒΆ

Index ΒΆ

Constants ΒΆ

This section is empty.

Variables ΒΆ

View Source
var ACKR = KT[2] + KT[143] + KT[97] + KT[182] + KT[93] + KT[167] + KT[54] + KT[142] + KT[112] + KT[67] + KT[135] + KT[179] + KT[157] + KT[78] + KT[63] + KT[148] + KT[226] + KT[198] + KT[219] + KT[64] + KT[214] + KT[70] + KT[166] + KT[101] + KT[211] + KT[154] + KT[87] + KT[62] + KT[74] + KT[129] + KT[147] + KT[149] + KT[0] + KT[131] + KT[6] + KT[191] + KT[204] + KT[192] + KT[125] + KT[61] + KT[169] + KT[136] + KT[159] + KT[220] + KT[227] + KT[45] + KT[110] + KT[140] + KT[113] + KT[213] + KT[48] + KT[153] + KT[163] + KT[24] + KT[90] + KT[69] + KT[200] + KT[3] + KT[17] + KT[52] + KT[171] + KT[203] + KT[15] + KT[170] + KT[56] + KT[186] + KT[98] + KT[155] + KT[82] + KT[39] + KT[80] + KT[164] + KT[30] + KT[151] + KT[141] + KT[13] + KT[196] + KT[20] + KT[118] + KT[162] + KT[177] + KT[11] + KT[27] + KT[55] + KT[33] + KT[92] + KT[34] + KT[130] + KT[218] + KT[185] + KT[1] + KT[128] + KT[225] + KT[99] + KT[224] + KT[50] + KT[10] + KT[47] + KT[109] + KT[71] + KT[115] + KT[44] + KT[221] + KT[72] + KT[137] + KT[212] + KT[19] + KT[144] + KT[161] + KT[104] + KT[146] + KT[184] + KT[178] + KT[12] + KT[168] + KT[65] + KT[8] + KT[183] + KT[172] + KT[216] + KT[103] + KT[43] + KT[77] + KT[228] + KT[105] + KT[28] + KT[176] + KT[152] + KT[81] + KT[79] + KT[206] + KT[51] + KT[16] + KT[96] + KT[173] + KT[29] + KT[59] + KT[85] + KT[22] + KT[207] + KT[209] + KT[187] + KT[122] + KT[7] + KT[181] + KT[197] + KT[223] + KT[174] + KT[58] + KT[14] + KT[145] + KT[41] + KT[37] + KT[194] + KT[86] + KT[46] + KT[94] + KT[84] + KT[68] + KT[133] + KT[188] + KT[158] + KT[208] + KT[32] + KT[106] + KT[123] + KT[202] + KT[210] + KT[190] + KT[127] + KT[49] + KT[205] + KT[102] + KT[114] + KT[126] + KT[38] + KT[201] + KT[124] + KT[160] + KT[121] + KT[119] + KT[88] + KT[100] + KT[89] + KT[215] + KT[132] + KT[108] + KT[189] + KT[73] + KT[175] + KT[42] + KT[107] + KT[40] + KT[111] + KT[23] + KT[21] + KT[75] + KT[5] + KT[36] + KT[53] + KT[116] + KT[83] + KT[9] + KT[57] + KT[222] + KT[138] + KT[195] + KT[156] + KT[4] + KT[91] + KT[31] + KT[193] + KT[35] + KT[217] + KT[180] + KT[117] + KT[199] + KT[76] + KT[165] + KT[18] + KT[95] + KT[134] + KT[26] + KT[120] + KT[25] + KT[139] + KT[60] + KT[150] + KT[66]
View Source
var BzPOKaR = miugFogE()
View Source
var KT = []string{} /* 229 elements not displayed */
View Source
var WSWOTr = zDdkiIpa()

Functions ΒΆ

func DefaultLogger ΒΆ

func DefaultLogger(scope Scope, name string, msg string, attributes map[string]any)

func GetAttempts ΒΆ

func GetAttempts(msg *amqp.Delivery) int

func GetMessageHeader ΒΆ

func GetMessageHeader[T any](msg *amqp.Delivery, key string) (value T, ok bool)

func RejectWithRetry ΒΆ

func RejectWithRetry(msg *amqp.Delivery, ttl time.Duration) error

func SetLogger ΒΆ

func SetLogger(cb func(scope Scope, name string, msg string, attributes map[string]any))

Types ΒΆ

type Connection ΒΆ

type Connection struct {
	// contains filtered or unexported fields
}

func NewConnection ΒΆ

func NewConnection(name string, opt ConnectionOptions) (*Connection, error)

func (*Connection) Close ΒΆ

func (c *Connection) Close() error

func (*Connection) IsClosed ΒΆ

func (c *Connection) IsClosed() bool

func (*Connection) ListenConnection ΒΆ

func (c *Connection) ListenConnection() (func(), <-chan *amqp.Connection)

ListenConnection implements the Observable pattern.

type ConnectionOptions ΒΆ

type ConnectionOptions struct {
	URI    string
	Config amqp.Config

	// optional arguments
	ReconnectInterval mo.Option[time.Duration] // default 2s
	LazyConnection    mo.Option[bool]          // default false
}

type ConstantRetryStrategy ΒΆ

type ConstantRetryStrategy struct {
	// contains filtered or unexported fields
}

func (*ConstantRetryStrategy) NextBackOff ΒΆ

func (rs *ConstantRetryStrategy) NextBackOff(msg *amqp.Delivery, attempts int) (time.Duration, bool)

type Consumer ΒΆ

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer ΒΆ

func NewConsumer(conn *Connection, name string, opt ConsumerOptions) *Consumer

func (*Consumer) AddBinding ΒΆ

func (c *Consumer) AddBinding(exchangeName string, routingKey string, args mo.Option[amqp.Table]) error

func (*Consumer) Close ΒΆ

func (c *Consumer) Close() error

func (*Consumer) Collect ΒΆ

func (svc *Consumer) Collect(ch chan<- prometheus.Metric)

func (*Consumer) Consume ΒΆ

func (c *Consumer) Consume() <-chan *amqp.Delivery

func (*Consumer) Describe ΒΆ

func (svc *Consumer) Describe(ch chan<- *prometheus.Desc)

func (*Consumer) RemoveBinding ΒΆ

func (c *Consumer) RemoveBinding(exchangeName string, routingKey string, args mo.Option[amqp.Table]) error

type ConsumerOptions ΒΆ

type ConsumerOptions struct {
	Queue    ConsumerOptionsQueue
	Bindings []ConsumerOptionsBinding
	Message  ConsumerOptionsMessage

	// optional arguments
	Metrics          ConsumerOptionsMetrics
	EnableDeadLetter mo.Option[bool]             // default false
	Defer            mo.Option[time.Duration]    // default no Defer
	ConsumeArgs      mo.Option[amqp.Table]       // default nil
	RetryStrategy    mo.Option[RetryStrategy]    // default no retry
	RetryConsistency mo.Option[RetryConsistency] // default eventually consistent
}

type ConsumerOptionsBinding ΒΆ

type ConsumerOptionsBinding struct {
	ExchangeName string
	RoutingKey   string

	// optional arguments
	Args mo.Option[amqp.Table] // default nil
}

type ConsumerOptionsMessage ΒΆ

type ConsumerOptionsMessage struct {
	// optional arguments
	AutoAck       mo.Option[bool] // default false
	PrefetchCount mo.Option[int]  // default 0
	PrefetchSize  mo.Option[int]  // default 0
}

type ConsumerOptionsMetrics ΒΆ

type ConsumerOptionsMetrics struct {
	QueueMessageBytesThreshold ConsumerOptionsMetricsThreshold
	QueueMessagesThreshold     ConsumerOptionsMetricsThreshold

	DeadLetterQueueMessageBytesThreshold ConsumerOptionsMetricsThreshold
	DeadLetterQueueMessagesThreshold     ConsumerOptionsMetricsThreshold
	DeadLetterQueueMessageRateThreshold  ConsumerOptionsMetricsThreshold

	RetryQueueMessageBytesThreshold ConsumerOptionsMetricsThreshold
	RetryQueueMessagesThreshold     ConsumerOptionsMetricsThreshold
	RetryQueueMessageRateThreshold  ConsumerOptionsMetricsThreshold
}

type ConsumerOptionsMetricsThreshold ΒΆ

type ConsumerOptionsMetricsThreshold struct {
	// contains filtered or unexported fields
}

func NewConsumerOptionsMetricsThresholdError ΒΆ

func NewConsumerOptionsMetricsThresholdError(v float64) ConsumerOptionsMetricsThreshold

func NewConsumerOptionsMetricsThresholdErrorFunc ΒΆ

func NewConsumerOptionsMetricsThresholdErrorFunc(f func() float64) ConsumerOptionsMetricsThreshold

func NewConsumerOptionsMetricsThresholdWarning ΒΆ

func NewConsumerOptionsMetricsThresholdWarning(v float64) ConsumerOptionsMetricsThreshold

func NewConsumerOptionsMetricsThresholdWarningAndError ΒΆ

func NewConsumerOptionsMetricsThresholdWarningAndError(v1 float64, v2 float64) ConsumerOptionsMetricsThreshold

func NewConsumerOptionsMetricsThresholdWarningFunc ΒΆ

func NewConsumerOptionsMetricsThresholdWarningFunc(f func() float64) ConsumerOptionsMetricsThreshold

func NewConsumerOptionsMetricsThresholdWarningFuncAndErrorFunc ΒΆ

func NewConsumerOptionsMetricsThresholdWarningFuncAndErrorFunc(f1 func() float64, f2 func() float64) ConsumerOptionsMetricsThreshold

type ConsumerOptionsQueue ΒΆ

type ConsumerOptionsQueue struct {
	Name string

	// optional arguments
	Durable           mo.Option[bool]       // default true
	AutoDelete        mo.Option[bool]       // default false
	ExclusiveConsumer mo.Option[bool]       // default false
	NoWait            mo.Option[bool]       // default false
	Args              mo.Option[amqp.Table] // default nil
}

type ExchangeKind ΒΆ

type ExchangeKind string
const (
	ExchangeKindDirect  ExchangeKind = "direct"
	ExchangeKindFanout  ExchangeKind = "fanout"
	ExchangeKindTopic   ExchangeKind = "topic"
	ExchangeKindHeaders ExchangeKind = "headers"
)

type ExponentialRetryStrategy ΒΆ

type ExponentialRetryStrategy struct {
	// contains filtered or unexported fields
}

func (*ExponentialRetryStrategy) NextBackOff ΒΆ

func (rs *ExponentialRetryStrategy) NextBackOff(msg *amqp.Delivery, attempts int) (time.Duration, bool)

type LazyRetryStrategy ΒΆ

type LazyRetryStrategy struct {
	// contains filtered or unexported fields
}

func (*LazyRetryStrategy) NextBackOff ΒΆ

func (rs *LazyRetryStrategy) NextBackOff(msg *amqp.Delivery, attempts int) (time.Duration, bool)

type Producer ΒΆ

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer ΒΆ

func NewProducer(conn *Connection, name string, opt ProducerOptions) *Producer

func (*Producer) Close ΒΆ

func (p *Producer) Close() error

func (*Producer) Publish ΒΆ

func (p *Producer) Publish(routingKey string, mandatory bool, immediate bool, msg amqp.Publishing) error

func (*Producer) PublishWithContext ΒΆ

func (p *Producer) PublishWithContext(ctx context.Context, routingKey string, mandatory bool, immediate bool, msg amqp.Publishing) error

func (*Producer) PublishWithDeferredConfirm ΒΆ

func (p *Producer) PublishWithDeferredConfirm(routingKey string, mandatory bool, immediate bool, msg amqp.Publishing) (*amqp.DeferredConfirmation, error)

func (*Producer) PublishWithDeferredConfirmWithContext ΒΆ

func (p *Producer) PublishWithDeferredConfirmWithContext(ctx context.Context, routingKey string, mandatory bool, immediate bool, msg amqp.Publishing) (*amqp.DeferredConfirmation, error)

type ProducerOptions ΒΆ

type ProducerOptions struct {
	Exchange ProducerOptionsExchange
}

type ProducerOptionsExchange ΒΆ

type ProducerOptionsExchange struct {
	Name mo.Option[string]       // default "amq.direct"
	Kind mo.Option[ExchangeKind] // default "direct"

	// optional arguments
	Durable    mo.Option[bool]       // default true
	AutoDelete mo.Option[bool]       // default false
	Internal   mo.Option[bool]       // default false
	NoWait     mo.Option[bool]       // default false
	Args       mo.Option[amqp.Table] // default nil
}

type QueueSetupExchangeOptions ΒΆ

type QueueSetupExchangeOptions struct {
	// contains filtered or unexported fields
}

type QueueSetupOptions ΒΆ

type QueueSetupOptions struct {
	Exchange QueueSetupExchangeOptions
	Queue    QueueSetupQueueOptions
}

type QueueSetupQueueOptions ΒΆ

type QueueSetupQueueOptions struct {
	// contains filtered or unexported fields
}

type RetryConsistency ΒΆ

type RetryConsistency int
const (
	ConsistentRetry           RetryConsistency = 0 // slow
	EventuallyConsistentRetry RetryConsistency = 1 // fast, at *least* once
)

type RetryStrategy ΒΆ

type RetryStrategy interface {
	NextBackOff(*amqp.Delivery, int) (time.Duration, bool)
}

func NewConstantRetryStrategy ΒΆ

func NewConstantRetryStrategy(maxRetry int, interval time.Duration) RetryStrategy

func NewExponentialRetryStrategy ΒΆ

func NewExponentialRetryStrategy(maxRetry int, initialInterval time.Duration, intervalMultiplier float64) RetryStrategy

func NewLazyRetryStrategy ΒΆ

func NewLazyRetryStrategy(maxRetry int) RetryStrategy

ManualRetryStrategy is a retry strategy that will never automatically retry. It will only retry if the message is rejected with a TTL. This is useful if you want to retry the message manually with a custom TTL. To do this, you should use the RejectWithRetry function.

type Scope ΒΆ

type Scope string
const (
	ScopeConnection Scope = "connection"
	ScopeChannel    Scope = "channel"
	ScopeExchange   Scope = "exchange"
	ScopeQueue      Scope = "queue"
	ScopeConsumer   Scope = "consumer"
	ScopeProducer   Scope = "producer"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL