Back to godoc.org
github.com/rafaeljesus/rabbus

package rabbus

v3.0.0+incompatible
Latest Go to latest
Published: Jul 23, 2019 | License: MIT | Module: github.com/rafaeljesus/rabbus

Index

Constants

const (
	// Transient means higher throughput but messages will not be restored on broker restart.
	Transient uint8 = 1
	// Persistent messages will be restored to durable queues and lost on non-durable queues during server restart.
	Persistent uint8 = 2
	// ContentTypeJSON define json content type.
	ContentTypeJSON = "application/json"
	// ContentTypePlain define plain text content type.
	ContentTypePlain = "plain/text"
	// ExchangeDirect indicates the exchange is of direct type.
	ExchangeDirect = "direct"
	// ExchangeFanout indicates the exchange is of fanout type.
	ExchangeFanout = "fanout"
	// ExchangeTopic indicates the exchange is of topic type.
	ExchangeTopic = "topic"
)

Variables

var (
	// ErrMissingExchange is returned when exchange name is not passed as parameter.
	ErrMissingExchange = errors.New("Missing field exchange")
	// ErrMissingKind is returned when exchange type is not passed as parameter.
	ErrMissingKind = errors.New("Missing field kind")
	// ErrMissingQueue is returned when queue name is not passed as parameter.
	ErrMissingQueue = errors.New("Missing field queue")
	// ErrMissingHandler is returned when function handler is not passed as parameter.
	ErrMissingHandler = errors.New("Missing field handler")
	// ErrUnsupportedArguments is returned when more than the permitted arguments is passed to a function.
	ErrUnsupportedArguments = errors.New("Unsupported arguments size")
)

type AMQP

type AMQP interface {
	// Publish wraps amqp.Publish method
	Publish(exchange, key string, opts amqp.Publishing) error
	// CreateConsumer creates a amqp consumer
	CreateConsumer(exchange, key, kind, queue string, durable bool, declareArgs, bindArgs amqp.Table) (<-chan amqp.Delivery, error)
	// WithExchange creates a amqp exchange
	WithExchange(exchange, kind string, durable bool) error
	// WithQos wrapper over amqp.Qos method
	WithQos(count, size int, global bool) error
	// NotifyClose wrapper over notifyClose method
	NotifyClose(c chan *amqp.Error) chan *amqp.Error
	// Close closes the running amqp connection and channel
	Close() error
}

AMQP exposes a interface for interacting with AMQP broker

type BindArgs

type BindArgs struct {
	// contains filtered or unexported fields
}

BindArgs is the wrapper for AMQP Table class to set common queue bind values

func NewBindArgs

func NewBindArgs() *BindArgs

NewBindArgs creates new queue bind values builder

func (*BindArgs) With

func (a *BindArgs) With(name string, value interface{}) *BindArgs

With sets the value by name

type ConsumerMessage

type ConsumerMessage struct {
	ContentType     string
	ContentEncoding string
	// DeliveryMode queue implementation use, non-persistent (1) or persistent (2)
	DeliveryMode uint8
	// Priority queue implementation use, 0 to 9
	Priority uint8
	// CorrelationId application use, correlation identifier
	CorrelationId string
	// ReplyTo application use, address to to reply to (ex: RPC)
	ReplyTo string
	// Expiration implementation use, message expiration spec
	Expiration string
	// MessageId application use, message identifier
	MessageId string
	// Timestamp application use, message timestamp
	Timestamp time.Time
	// Type application use, message type name
	Type string
	// ConsumerTag valid only with Channel.Consume
	ConsumerTag string
	// MessageCount valid only with Channel.Get
	MessageCount uint32
	DeliveryTag  uint64
	Redelivered  bool
	Exchange     string
	// Headers application or header exchange table
	Headers map[string]interface{}
	// Key basic.publish routing key
	Key  string
	Body []byte
	// contains filtered or unexported fields
}

ConsumerMessage captures the fields for a previously delivered message resident in a queue to be delivered by the server to a consumer.

func (ConsumerMessage) Ack

func (cm ConsumerMessage) Ack(multiple bool) error

Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery. All deliveries in AMQP must be acknowledged. If you called Channel.Consume with autoAck true then the server will be automatically ack each message and this method should not be called. Otherwise, you must call Delivery.Ack after you have successfully processed this delivery. When multiple is true, this delivery and all prior unacknowledged deliveries on the same channel will be acknowledged. This is useful for batch processing of deliveries. An error will indicate that the acknowledge could not be delivered to the channel it was sent from. Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every delivery that is not automatically acknowledged.

func (ConsumerMessage) Nack

func (cm ConsumerMessage) Nack(multiple, requeue bool) error

Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel. When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue. This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time. Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every delivery that is not automatically acknowledged.

func (ConsumerMessage) Reject

func (cm ConsumerMessage) Reject(requeue bool) error

Reject delegates a negatively acknowledgement through the Acknowledger interface. When requeue is true, queue this message to be delivered to a consumer on a different channel. When requeue is false or the server is unable to queue this message, it will be dropped. If you are batch processing deliveries, and your server supports it, prefer Delivery.Nack. Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every delivery that is not automatically acknowledged.

type DeclareArgs

type DeclareArgs struct {
	// contains filtered or unexported fields
}

DeclareArgs is the queue declaration values builder

func NewDeclareArgs

func NewDeclareArgs() *DeclareArgs

NewDeclareArgs creates new queue declaration values builder

func (*DeclareArgs) With

func (a *DeclareArgs) With(name string, value interface{}) *DeclareArgs

With sets the value by name

func (*DeclareArgs) WithMessageTTL

func (a *DeclareArgs) WithMessageTTL(d time.Duration) *DeclareArgs

WithMessageTTL sets Queue message TTL. See details at https://www.rabbitmq.com/ttl.html#message-ttl-using-x-args

type Delivery

type Delivery struct {
	amqp.Delivery
}

Delivery wraps amqp.Delivery struct

type Emitter

type Emitter interface {
	// EmitAsync emits a message to RabbitMQ, but does not wait for the response from broker.
	EmitAsync() chan<- Message
	// EmitErr returns an error if encoding payload fails, or if after circuit breaker is open or retries attempts exceed.
	EmitErr() <-chan error
	// EmitOk returns true when the message was sent.
	EmitOk() <-chan struct{}
}

Emitter exposes a interface for publishing messages to AMQP broker

type ListenConfig

type ListenConfig struct {
	// Exchange the exchange name.
	Exchange string
	// Kind the exchange type.
	Kind string
	// Key the routing key name.
	Key string
	// PassiveExchange determines a passive exchange connection it uses
	// amqp's ExchangeDeclarePassive instead the default ExchangeDeclare
	PassiveExchange bool
	// Queue the queue name
	Queue string
	// DeclareArgs is a list of arguments accepted for when declaring the queue.
	// See https://www.rabbitmq.com/queues.html#optional-arguments for more info.
	DeclareArgs *DeclareArgs
	// BindArgs is a list of arguments accepted for when binding the exchange to the queue
	BindArgs *BindArgs
}

ListenConfig carries fields for listening messages.

type Message

type Message struct {
	// Exchange the exchange name.
	Exchange string
	// Kind the exchange type.
	Kind string
	// Key the routing key name.
	Key string
	// Payload the message payload.
	Payload []byte
	// DeliveryMode indicates if the is Persistent or Transient.
	DeliveryMode uint8
	// ContentType the message content-type.
	ContentType string
	// Headers the message application headers
	Headers map[string]interface{}
	// ContentEncoding the message encoding.
	ContentEncoding string
}

Message carries fields for sending messages.

type OnStateChangeFunc

type OnStateChangeFunc func(name, from, to string)

OnStateChangeFunc is the callback function when circuit breaker state changes.

type Option

type Option func(*Rabbus) error

Option represents an option you can pass to New. See the documentation for the individual options.

func AMQPProvider

func AMQPProvider(provider AMQP) Option

AMQPProvider expose a interface for interacting with amqp broker

func Attempts

func Attempts(attempts int) Option

Attempts is the max number of retries on broker outages.

func BreakerInterval

func BreakerInterval(interval time.Duration) Option

BreakerInterval is the cyclic period of the closed state for CircuitBreaker to clear the internal counts, If Interval is 0, CircuitBreaker doesn't clear the internal counts during the closed state.

func BreakerTimeout

func BreakerTimeout(timeout time.Duration) Option

BreakerTimeout is the period of the open state, after which the state of CircuitBreaker becomes half-open. If Timeout is 0, the timeout value of CircuitBreaker is set to 60 seconds.

func Durable

func Durable(durable bool) Option

Durable indicates of the queue will survive broker restarts. Default to true.

func OnStateChange

func OnStateChange(fn OnStateChangeFunc) Option

OnStateChange is called whenever the state of CircuitBreaker changes.

func PassiveExchange

func PassiveExchange(isExchangePassive bool) Option

PassiveExchange forces passive connection with all exchanges using amqp's ExchangeDeclarePassive instead the default ExchangeDeclare

func PrefetchCount

func PrefetchCount(count int) Option

PrefetchCount limit the number of unacknowledged messages.

func PrefetchSize

func PrefetchSize(size int) Option

PrefetchSize when greater than zero, the server will try to keep at least that many bytes of deliveries flushed to the network before receiving acknowledgments from the consumers.

func QosGlobal

func QosGlobal(global bool) Option

QosGlobal when global is true, these Qos settings apply to all existing and future consumers on all channels on the same connection. When false, the Channel.Qos settings will apply to all existing and future consumers on this channel. RabbitMQ does not implement the global flag.

func Sleep

func Sleep(sleep time.Duration) Option

Sleep is the sleep time of the retry mechanism.

func Threshold

func Threshold(threshold uint32) Option

Threshold when a threshold of failures has been reached, future calls to the broker will not run. During this state, the circuit breaker will periodically allow the calls to run and, if it is successful, will start running the function again. Default value is 5.

type Rabbus

type Rabbus struct {
	AMQP
	// contains filtered or unexported fields
}

Rabbus interpret (implement) Rabbus interface definition

func New

func New(dsn string, options ...Option) (*Rabbus, error)

New returns a new Rabbus configured with the variables from the config parameter, or returning an non-nil err if an error occurred while creating connection and channel.

func (*Rabbus) Close

func (r *Rabbus) Close() error

Close channels and attempt to close channel and connection.

func (*Rabbus) EmitAsync

func (r *Rabbus) EmitAsync() chan<- Message

EmitAsync emits a message to RabbitMQ, but does not wait for the response from broker.

func (*Rabbus) EmitErr

func (r *Rabbus) EmitErr() <-chan error

EmitErr returns an error if encoding payload fails, or if after circuit breaker is open or retries attempts exceed.

func (*Rabbus) EmitOk

func (r *Rabbus) EmitOk() <-chan struct{}

EmitOk returns true when the message was sent.

func (*Rabbus) Listen

func (r *Rabbus) Listen(c ListenConfig) (chan ConsumerMessage, error)

Listen to a message from RabbitMQ, returns an error if exchange, queue name and function handler not passed or if an error occurred while creating amqp consumer.

func (*Rabbus) Run

func (r *Rabbus) Run(ctx context.Context) error

Run starts rabbus channels for emitting and listening for amqp connection close returns ctx error in case of any.

Documentation was rendered with GOOS=linux and GOARCH=amd64.

Jump to identifier

Keyboard shortcuts

? : This menu
f or F : Jump to identifier