rabbus

package module
v3.0.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 23, 2019 License: MIT Imports: 8 Imported by: 24

README ¶

Rabbus 🚌 ✨

  • A tiny wrapper over amqp exchanges and queues.
  • In memory retries with exponential backoff for sending messages.
  • Protect producer calls with circuit breaker.
  • Automatic reconnect to RabbitMQ broker when connection is lost.
  • Go channel API.

Installation

go get -u github.com/rafaeljesus/rabbus

Usage

The rabbus package exposes an interface for emitting and listening RabbitMQ messages.

Emit

import (
	"context"
	"time"

	"github.com/rafaeljesus/rabbus"
)

func main() {
	timeout := time.After(time.Second * 3)
	cbStateChangeFunc := func(name, from, to string) {
		// do something when state is changed
	}
	r, err := rabbus.New(
		rabbusDsn,
		rabbus.Durable(true),
		rabbus.Attempts(5),
		rabbus.Sleep(time.Second*2),
		rabbus.Threshold(3),
		rabbus.OnStateChange(cbStateChangeFunc),
	)
	if err != nil {
		// handle error
	}

	defer func(r Rabbus) {
		if err := r.Close(); err != nil {
			// handle error
		}
	}(r)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go r.Run(ctx)

	msg := rabbus.Message{
		Exchange: "test_ex",
		Kind:     "topic",
		Key:      "test_key",
		Payload:  []byte(`foo`),
	}

	r.EmitAsync() <- msg

	for {
		select {
		case <-r.EmitOk():
			// message was sent
		case <-r.EmitErr():
			// failed to send message
		case <-timeout:
			// handle timeout error
		}
	}
}

Listen

import (
	"context"
	"encoding/json"
	"time"

	"github.com/rafaeljesus/rabbus"
)

func main() {
	timeout := time.After(time.Second * 3)
	cbStateChangeFunc := func(name, from, to string) {
		// do something when state is changed
	}
	r, err := rabbus.New(
		rabbusDsn,
		rabbus.Durable(true),
		rabbus.Attempts(5),
		rabbus.Sleep(time.Second*2),
		rabbus.Threshold(3),
		rabbus.OnStateChange(cbStateChangeFunc),
	)
	if err != nil {
		// handle error
	}

	defer func(r Rabbus) {
		if err := r.Close(); err != nil {
			// handle error
		}
	}(r)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go r.Run(ctx)

	messages, err := r.Listen(rabbus.ListenConfig{
		Exchange:    "events_ex",
		Kind:        "topic",
		Key:         "events_key",
		Queue:       "events_q",
		DeclareArgs: rabbus.NewDeclareArgs().WithMessageTTL(15 * time.Minute).With("foo", "bar"),
		BindArgs:    rabbus.NewBindArgs().With("baz", "qux"),
	})
	if err != nil {
		// handle errors during adding listener
	}
	defer close(messages)

	go func(messages chan ConsumerMessage) {
		for m := range messages {
			m.Ack(false)
		}
	}(messages)
}

Contributing

  • Fork it
  • Create your feature branch (git checkout -b my-new-feature)
  • Commit your changes (git commit -am 'Add some feature')
  • Push to the branch (git push origin my-new-feature)
  • Create new Pull Request

Badges

Build Status Go Report Card Go Doc


GitHub @rafaeljesus  ·  Medium @_jesus_rafael  ·  Twitter @_jesus_rafael

Documentation ¶

Index ¶

Constants ¶

View Source
const (
	// Transient means higher throughput but messages will not be restored on broker restart.
	Transient uint8 = 1
	// Persistent messages will be restored to durable queues and lost on non-durable queues during server restart.
	Persistent uint8 = 2
	// ContentTypeJSON define json content type.
	ContentTypeJSON = "application/json"
	// ContentTypePlain define plain text content type.
	ContentTypePlain = "plain/text"
	// ExchangeDirect indicates the exchange is of direct type.
	ExchangeDirect = "direct"
	// ExchangeFanout indicates the exchange is of fanout type.
	ExchangeFanout = "fanout"
	// ExchangeTopic indicates the exchange is of topic type.
	ExchangeTopic = "topic"
)

Variables ¶

View Source
var (
	// ErrMissingExchange is returned when exchange name is not passed as parameter.
	ErrMissingExchange = errors.New("Missing field exchange")
	// ErrMissingKind is returned when exchange type is not passed as parameter.
	ErrMissingKind = errors.New("Missing field kind")
	// ErrMissingQueue is returned when queue name is not passed as parameter.
	ErrMissingQueue = errors.New("Missing field queue")
	// ErrMissingHandler is returned when function handler is not passed as parameter.
	ErrMissingHandler = errors.New("Missing field handler")
	// ErrUnsupportedArguments is returned when more than the permitted arguments is passed to a function.
	ErrUnsupportedArguments = errors.New("Unsupported arguments size")
)

Functions ¶

This section is empty.

Types ¶

type AMQP ¶

type AMQP interface {
	// Publish wraps amqp.Publish method
	Publish(exchange, key string, opts amqp.Publishing) error
	// CreateConsumer creates a amqp consumer
	CreateConsumer(exchange, key, kind, queue string, durable bool, declareArgs, bindArgs amqp.Table) (<-chan amqp.Delivery, error)
	// WithExchange creates a amqp exchange
	WithExchange(exchange, kind string, durable bool) error
	// WithQos wrapper over amqp.Qos method
	WithQos(count, size int, global bool) error
	// NotifyClose wrapper over notifyClose method
	NotifyClose(c chan *amqp.Error) chan *amqp.Error
	// Close closes the running amqp connection and channel
	Close() error
}

AMQP exposes a interface for interacting with AMQP broker

type BindArgs ¶

type BindArgs struct {
	// contains filtered or unexported fields
}

BindArgs is the wrapper for AMQP Table class to set common queue bind values

func NewBindArgs ¶

func NewBindArgs() *BindArgs

NewBindArgs creates new queue bind values builder

func (*BindArgs) With ¶

func (a *BindArgs) With(name string, value interface{}) *BindArgs

With sets the value by name

type ConsumerMessage ¶

type ConsumerMessage struct {
	ContentType     string
	ContentEncoding string
	// DeliveryMode queue implementation use, non-persistent (1) or persistent (2)
	DeliveryMode uint8
	// Priority queue implementation use, 0 to 9
	Priority uint8
	// CorrelationId application use, correlation identifier
	CorrelationId string
	// ReplyTo application use, address to to reply to (ex: RPC)
	ReplyTo string
	// Expiration implementation use, message expiration spec
	Expiration string
	// MessageId application use, message identifier
	MessageId string
	// Timestamp application use, message timestamp
	Timestamp time.Time
	// Type application use, message type name
	Type string
	// ConsumerTag valid only with Channel.Consume
	ConsumerTag string
	// MessageCount valid only with Channel.Get
	MessageCount uint32
	DeliveryTag  uint64
	Redelivered  bool
	Exchange     string
	// Headers application or header exchange table
	Headers map[string]interface{}
	// Key basic.publish routing key
	Key  string
	Body []byte
	// contains filtered or unexported fields
}

ConsumerMessage captures the fields for a previously delivered message resident in a queue to be delivered by the server to a consumer.

func (ConsumerMessage) Ack ¶

func (cm ConsumerMessage) Ack(multiple bool) error

Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery. All deliveries in AMQP must be acknowledged. If you called Channel.Consume with autoAck true then the server will be automatically ack each message and this method should not be called. Otherwise, you must call Delivery.Ack after you have successfully processed this delivery. When multiple is true, this delivery and all prior unacknowledged deliveries on the same channel will be acknowledged. This is useful for batch processing of deliveries. An error will indicate that the acknowledge could not be delivered to the channel it was sent from. Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every delivery that is not automatically acknowledged.

func (ConsumerMessage) Nack ¶

func (cm ConsumerMessage) Nack(multiple, requeue bool) error

Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel. When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue. This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time. Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every delivery that is not automatically acknowledged.

func (ConsumerMessage) Reject ¶

func (cm ConsumerMessage) Reject(requeue bool) error

Reject delegates a negatively acknowledgement through the Acknowledger interface. When requeue is true, queue this message to be delivered to a consumer on a different channel. When requeue is false or the server is unable to queue this message, it will be dropped. If you are batch processing deliveries, and your server supports it, prefer Delivery.Nack. Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every delivery that is not automatically acknowledged.

type DeclareArgs ¶

type DeclareArgs struct {
	// contains filtered or unexported fields
}

DeclareArgs is the queue declaration values builder

func NewDeclareArgs ¶

func NewDeclareArgs() *DeclareArgs

NewDeclareArgs creates new queue declaration values builder

func (*DeclareArgs) With ¶

func (a *DeclareArgs) With(name string, value interface{}) *DeclareArgs

With sets the value by name

func (*DeclareArgs) WithMessageTTL ¶

func (a *DeclareArgs) WithMessageTTL(d time.Duration) *DeclareArgs

WithMessageTTL sets Queue message TTL. See details at https://www.rabbitmq.com/ttl.html#message-ttl-using-x-args

type Delivery ¶

type Delivery struct {
	amqp.Delivery
}

Delivery wraps amqp.Delivery struct

type Emitter ¶

type Emitter interface {
	// EmitAsync emits a message to RabbitMQ, but does not wait for the response from broker.
	EmitAsync() chan<- Message
	// EmitErr returns an error if encoding payload fails, or if after circuit breaker is open or retries attempts exceed.
	EmitErr() <-chan error
	// EmitOk returns true when the message was sent.
	EmitOk() <-chan struct{}
}

Emitter exposes a interface for publishing messages to AMQP broker

type ListenConfig ¶

type ListenConfig struct {
	// Exchange the exchange name.
	Exchange string
	// Kind the exchange type.
	Kind string
	// Key the routing key name.
	Key string
	// PassiveExchange determines a passive exchange connection it uses
	// amqp's ExchangeDeclarePassive instead the default ExchangeDeclare
	PassiveExchange bool
	// Queue the queue name
	Queue string
	// DeclareArgs is a list of arguments accepted for when declaring the queue.
	// See https://www.rabbitmq.com/queues.html#optional-arguments for more info.
	DeclareArgs *DeclareArgs
	// BindArgs is a list of arguments accepted for when binding the exchange to the queue
	BindArgs *BindArgs
}

ListenConfig carries fields for listening messages.

type Message ¶

type Message struct {
	// Exchange the exchange name.
	Exchange string
	// Kind the exchange type.
	Kind string
	// Key the routing key name.
	Key string
	// Payload the message payload.
	Payload []byte
	// DeliveryMode indicates if the is Persistent or Transient.
	DeliveryMode uint8
	// ContentType the message content-type.
	ContentType string
	// Headers the message application headers
	Headers map[string]interface{}
	// ContentEncoding the message encoding.
	ContentEncoding string
}

Message carries fields for sending messages.

type OnStateChangeFunc ¶

type OnStateChangeFunc func(name, from, to string)

OnStateChangeFunc is the callback function when circuit breaker state changes.

type Option ¶

type Option func(*Rabbus) error

Option represents an option you can pass to New. See the documentation for the individual options.

func AMQPProvider ¶

func AMQPProvider(provider AMQP) Option

AMQPProvider expose a interface for interacting with amqp broker

func Attempts ¶

func Attempts(attempts int) Option

Attempts is the max number of retries on broker outages.

func BreakerInterval ¶

func BreakerInterval(interval time.Duration) Option

BreakerInterval is the cyclic period of the closed state for CircuitBreaker to clear the internal counts, If Interval is 0, CircuitBreaker doesn't clear the internal counts during the closed state.

func BreakerTimeout ¶

func BreakerTimeout(timeout time.Duration) Option

BreakerTimeout is the period of the open state, after which the state of CircuitBreaker becomes half-open. If Timeout is 0, the timeout value of CircuitBreaker is set to 60 seconds.

func Durable ¶

func Durable(durable bool) Option

Durable indicates of the queue will survive broker restarts. Default to true.

func OnStateChange ¶

func OnStateChange(fn OnStateChangeFunc) Option

OnStateChange is called whenever the state of CircuitBreaker changes.

func PassiveExchange ¶

func PassiveExchange(isExchangePassive bool) Option

PassiveExchange forces passive connection with all exchanges using amqp's ExchangeDeclarePassive instead the default ExchangeDeclare

func PrefetchCount ¶

func PrefetchCount(count int) Option

PrefetchCount limit the number of unacknowledged messages.

func PrefetchSize ¶

func PrefetchSize(size int) Option

PrefetchSize when greater than zero, the server will try to keep at least that many bytes of deliveries flushed to the network before receiving acknowledgments from the consumers.

func QosGlobal ¶

func QosGlobal(global bool) Option

QosGlobal when global is true, these Qos settings apply to all existing and future consumers on all channels on the same connection. When false, the Channel.Qos settings will apply to all existing and future consumers on this channel. RabbitMQ does not implement the global flag.

func Sleep ¶

func Sleep(sleep time.Duration) Option

Sleep is the sleep time of the retry mechanism.

func Threshold ¶

func Threshold(threshold uint32) Option

Threshold when a threshold of failures has been reached, future calls to the broker will not run. During this state, the circuit breaker will periodically allow the calls to run and, if it is successful, will start running the function again. Default value is 5.

type Rabbus ¶

type Rabbus struct {
	AMQP
	// contains filtered or unexported fields
}

Rabbus interpret (implement) Rabbus interface definition

func New ¶

func New(dsn string, options ...Option) (*Rabbus, error)

New returns a new Rabbus configured with the variables from the config parameter, or returning an non-nil err if an error occurred while creating connection and channel.

func (*Rabbus) Close ¶

func (r *Rabbus) Close() error

Close channels and attempt to close channel and connection.

func (*Rabbus) EmitAsync ¶

func (r *Rabbus) EmitAsync() chan<- Message

EmitAsync emits a message to RabbitMQ, but does not wait for the response from broker.

func (*Rabbus) EmitErr ¶

func (r *Rabbus) EmitErr() <-chan error

EmitErr returns an error if encoding payload fails, or if after circuit breaker is open or retries attempts exceed.

func (*Rabbus) EmitOk ¶

func (r *Rabbus) EmitOk() <-chan struct{}

EmitOk returns true when the message was sent.

func (*Rabbus) Listen ¶

func (r *Rabbus) Listen(c ListenConfig) (chan ConsumerMessage, error)

Listen to a message from RabbitMQ, returns an error if exchange, queue name and function handler not passed or if an error occurred while creating amqp consumer.

func (*Rabbus) Run ¶

func (r *Rabbus) Run(ctx context.Context) error

Run starts rabbus channels for emitting and listening for amqp connection close returns ctx error in case of any.

Directories ¶

Path Synopsis
_examples
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL