Documentation
¶
Index ¶
- Variables
- func DefaultConsumerErrorHandler(err error)
- func DefaultErrorHandler(err error)
- func DefaultProducerErrorHandler(err ProducerError)
- func SetLogging()
- type Broker
- type ConnectionConfig
- func (config *ConnectionConfig) SetAutoReconnect(autoReconnect bool)
- func (config *ConnectionConfig) SetErrorHandler(errorHandler func(error))
- func (config *ConnectionConfig) SetHost(host string)
- func (config *ConnectionConfig) SetPassword(password string)
- func (config *ConnectionConfig) SetPort(port int)
- func (config *ConnectionConfig) SetReconnectDelay(reconnectDelay time.Duration)
- func (config *ConnectionConfig) SetUser(user string)
- type Consumer
- type Exchange
- func (e *Exchange) SetArgs(args amqp.Table)
- func (e *Exchange) SetAutoDelete(autoDelete bool)
- func (e *Exchange) SetDurable(durable bool)
- func (e *Exchange) SetInternal(internal bool)
- func (e *Exchange) SetName(name string)
- func (e *Exchange) SetNoWait(noWait bool)
- func (e *Exchange) SetType(exchangeType ExchangeType) error
- type ExchangeType
- type MockBroker
- type MockConsumer
- type MockProducer
- type Producer
- type ProducerError
- type Queue
- type RabbitBroker
- type RabbitConsumer
- type RabbitProducer
Constants ¶
This section is empty.
Variables ¶
var DefaultConfig = &ConnectionConfig{ user: "guest", password: "guest", host: "localhost", port: 5672, autoReconnect: true, reconnectDelay: time.Second * 10, errorHandler: DefaultErrorHandler, }
DefaultConfig is the default configuration for RabbitMQ.
User: "guest", password: "guest", host: "localhost", port: 5672, autoReconnect: true, reconnectDelay: time.Second * 10
Functions ¶
func DefaultConsumerErrorHandler ¶
func DefaultConsumerErrorHandler(err error)
DefaultConsumerErrorHandler handles the errors of this consumer
func DefaultErrorHandler ¶
func DefaultErrorHandler(err error)
DefaultErrorHandler is the default Connection error handler
func DefaultProducerErrorHandler ¶
func DefaultProducerErrorHandler(err ProducerError)
DefaultProducerErrorHandler is the default producer error handler
Types ¶
type Broker ¶ added in v0.1.8
type Broker interface { CreateConsumer(queue *Queue, bindingKey string, consumerTag string, errorHandler func(error)) (Consumer, error) CreateProducer(exchange *Exchange, errorHandler func(ProducerError)) (Producer, error) }
A Broker models a broker
type ConnectionConfig ¶
type ConnectionConfig struct {
// contains filtered or unexported fields
}
ConnectionConfig is a config structure to use when setting up a RabbitMQ connection
func CreateConfig ¶ added in v0.1.3
func CreateConfig(user string, password string, host string, port int, autoReconnect bool, reconnectDelay time.Duration, errorHandler func(error)) *ConnectionConfig
CreateConfig creates a connection configuration with the supplied parameters
func (*ConnectionConfig) SetAutoReconnect ¶
func (config *ConnectionConfig) SetAutoReconnect(autoReconnect bool)
SetAutoReconnect sets whether the connection should try reconnecting
func (*ConnectionConfig) SetErrorHandler ¶
func (config *ConnectionConfig) SetErrorHandler(errorHandler func(error))
SetErrorHandler sets the error handler this connection uses
func (*ConnectionConfig) SetHost ¶
func (config *ConnectionConfig) SetHost(host string)
SetHost sets the broker connection host URI
func (*ConnectionConfig) SetPassword ¶
func (config *ConnectionConfig) SetPassword(password string)
SetPassword sets the password to use for the connection to the broker
func (*ConnectionConfig) SetPort ¶
func (config *ConnectionConfig) SetPort(port int)
SetPort sets the broker connection port
func (*ConnectionConfig) SetReconnectDelay ¶
func (config *ConnectionConfig) SetReconnectDelay(reconnectDelay time.Duration)
SetReconnectDelay sets the delay between reconnection attempts
func (*ConnectionConfig) SetUser ¶
func (config *ConnectionConfig) SetUser(user string)
SetUser sets the user to use for the connection to the broker
type Consumer ¶
type Consumer interface { ConsumeMessages(args amqp.Table, autoAck bool, messageHandler func(amqp.Delivery)) Shutdown() error }
A Consumer models a broker consumer
type Exchange ¶
type Exchange struct {
// contains filtered or unexported fields
}
Exchange models a RabbitMQ exchange
func CreateDefaultExchange ¶
func CreateDefaultExchange(name string, exchangeType ExchangeType) (*Exchange, error)
CreateDefaultExchange returns an exchange with the following specifications:
durable: true, autodelete: false, internal: false, noWait: false, args: nil
func CreateExchange ¶
func CreateExchange(name string, exchangeType ExchangeType, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table) (*Exchange, error)
CreateExchange creates an exchange according to the specified arguments
func (*Exchange) SetAutoDelete ¶
SetAutoDelete sets exchange autoDelete
func (*Exchange) SetDurable ¶
SetDurable sets exchange durability
func (*Exchange) SetInternal ¶
SetInternal sets whether exchange is internal
func (*Exchange) SetType ¶
func (e *Exchange) SetType(exchangeType ExchangeType) error
SetType sets the exchange type
type ExchangeType ¶
type ExchangeType string
ExchangeType denotes the types of exchanges RabbitMQ has
const ( // Direct delivers messages to queues based on the message routing key Direct ExchangeType = "direct" // Fanout delivers messages to all connected queues Fanout ExchangeType = "fanout" // Topic delivers messages based on the matching between the message routing key and the pattern used to bind queues to the exchange Topic ExchangeType = "topic" // Headers delivers messages based on header values, similar to direct routing Headers ExchangeType = "headers" )
func (*ExchangeType) IsValid ¶
func (t *ExchangeType) IsValid() bool
IsValid determines whether an exchangeType is valid
func (ExchangeType) String ¶
func (t ExchangeType) String() string
type MockBroker ¶ added in v0.1.8
type MockBroker struct { Messages map[*Queue]chan amqp.Delivery // The messages sent in a queue // contains filtered or unexported fields }
func CreateMockBroker ¶ added in v0.1.8
func CreateMockBroker() *MockBroker
func (*MockBroker) CreateConsumer ¶ added in v0.1.8
func (*MockBroker) CreateProducer ¶ added in v0.1.8
func (b *MockBroker) CreateProducer(exchange *Exchange, errorHandler func(ProducerError)) (Producer, error)
type MockConsumer ¶ added in v0.1.8
type MockConsumer struct { ReceivedMessages []amqp.Delivery // contains filtered or unexported fields }
func (*MockConsumer) ConsumeMessages ¶ added in v0.1.8
func (c *MockConsumer) ConsumeMessages(args amqp.Table, autoAck bool, messageHandler func(amqp.Delivery))
ConsumeMessages consumes messages sent to the consumer
func (*MockConsumer) Shutdown ¶ added in v0.1.8
func (c *MockConsumer) Shutdown() error
Shutdown shuts down the consumer
type MockProducer ¶ added in v0.1.8
type MockProducer struct {
// contains filtered or unexported fields
}
A MockProducer is a mocked producer
func (*MockProducer) PublishMessage ¶ added in v0.1.8
func (p *MockProducer) PublishMessage(msg []byte, key *string, headers *amqp.Table)
PublishMessage publishes a message
func (*MockProducer) Shutdown ¶ added in v0.1.8
func (p *MockProducer) Shutdown() error
Shutdown shuts this producer down
type Producer ¶
type Producer interface { PublishMessage(msg []byte, key *string, headers *amqp.Table) Shutdown() error }
A Producer models a broker producer
type ProducerError ¶ added in v0.1.4
type ProducerError struct {
// contains filtered or unexported fields
}
ProducerError are errors the producer can throw and which need to be handled by the producer error handler
func (*ProducerError) Error ¶ added in v0.1.4
func (pe *ProducerError) Error() string
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue models a RabbitMQ queue
func CreateDefaultQueue ¶
CreateDefaultQueue creates and returns a queue with the following parameters:
durable: true, exclusive, false, autoDelete: false, noWait: false, args: nil
func CreateQueue ¶
func CreateQueue(exchange *Exchange, name string, durable bool, exclusive bool, autoDelete bool, noWait bool, arguments amqp.Table) *Queue
CreateQueue returns a queue created with the accompanied specifications
func (*Queue) SetAutoDelete ¶
SetAutoDelete sets queue auto deletion
func (*Queue) SetDurable ¶
SetDurable set exchange durability
func (*Queue) SetExclusive ¶
SetExclusive sets queue exclusivity
type RabbitBroker ¶ added in v0.1.8
type RabbitBroker struct {
// contains filtered or unexported fields
}
A RabbitBroker is a RabbitMQ broker
func CreateBroker ¶ added in v0.1.8
func CreateBroker(config *ConnectionConfig) (*RabbitBroker, error)
CreateBroker creates a RabbitBroker
config: *ConnectionConfig, the connection configuration that should be used to connect to the broker Returns *RabbitBroker and a possible error
func (*RabbitBroker) CreateConsumer ¶ added in v0.1.8
func (b *RabbitBroker) CreateConsumer(queue *Queue, bindingKey string, consumerTag string, errorHandler func(error)) (Consumer, error)
CreateConsumer creates a consumer
queue: *Queue, the queue this consumer should bind to bindingKey: string, the key with which this consumer binds to the queue errorHandler: func(error), the function to handle possible consumer errors Returns: Consumer and a possible error
func (*RabbitBroker) CreateProducer ¶ added in v0.1.8
func (b *RabbitBroker) CreateProducer(exchange *Exchange, errorHandler func(ProducerError)) (Producer, error)
CreateProducer creates a producer
exchange: *Exchange, the exchange this producer will produce to errorHandler: func(ProducerError), the errorhandler for this producer Returns: Producer and a possible error
type RabbitConsumer ¶ added in v0.1.8
type RabbitConsumer struct {
// contains filtered or unexported fields
}
RabbitConsumer models a RabbitMQ consumer
func (*RabbitConsumer) ConsumeMessages ¶ added in v0.1.8
func (c *RabbitConsumer) ConsumeMessages(args amqp.Table, autoAck bool, messageHandler func(amqp.Delivery))
ConsumeMessages starts the consumption of messages from the queue the consumer is bound to
args: amqp.Table, additional arguments for this consumer autoAck: bool, whether to automatically acknowledge messages messageHandler: func(amqp.Delivery), a handler for incoming messages. Every message the handler is called in a new goroutine
func (*RabbitConsumer) ReconnectChannel ¶ added in v0.1.16
func (c *RabbitConsumer) ReconnectChannel() error
ReconnectChannel tries to re-open this consumers channel
func (*RabbitConsumer) Shutdown ¶ added in v0.1.8
func (c *RabbitConsumer) Shutdown() error
Shutdown shuts down the consumer
type RabbitProducer ¶ added in v0.1.8
type RabbitProducer struct {
// contains filtered or unexported fields
}
RabbitProducer models a RabbitMQ producer
func (*RabbitProducer) PublishMessage ¶ added in v0.1.8
func (p *RabbitProducer) PublishMessage(msg []byte, key *string, headers *amqp.Table)
PublishMessage publishes a message with the given routing key
func (*RabbitProducer) ReconnectChannel ¶ added in v0.1.8
func (p *RabbitProducer) ReconnectChannel()
ReconnectChannel tries to re-open this producer's channel
func (*RabbitProducer) Shutdown ¶ added in v0.1.8
func (p *RabbitProducer) Shutdown() error
Shutdown closes this producer's channel