Documentation
¶
Index ¶
- Constants
- type BindingConfig
- type ClientOptions
- func (c *ClientOptions) SetConnectionName(connectionName string) *ClientOptions
- func (c *ClientOptions) SetCredentials(username, password string) *ClientOptions
- func (c *ClientOptions) SetHost(host string) *ClientOptions
- func (c *ClientOptions) SetKeepAlive(keepAlive bool) *ClientOptions
- func (c *ClientOptions) SetMarshaller(marshaller Marshaller) *ClientOptions
- func (c *ClientOptions) SetMaxRetry(retry uint) *ClientOptions
- func (c *ClientOptions) SetMode(mode string) *ClientOptions
- func (c *ClientOptions) SetPort(port uint) *ClientOptions
- func (c *ClientOptions) SetPublishingCacheSize(size uint64) *ClientOptions
- func (c *ClientOptions) SetPublishingCacheTTL(ttl time.Duration) *ClientOptions
- func (c *ClientOptions) SetRetryDelay(delay time.Duration) *ClientOptions
- func (c *ClientOptions) SetUseTLS(use bool) *ClientOptions
- func (c *ClientOptions) SetVhost(vhost string) *ClientOptions
- type DeliveryMode
- type ExchangeConfig
- type ExchangeType
- type MQTTClient
- type MQTTManager
- type MQTTMessageHandlerFunc
- type MQTTMessageHandlers
- type ManagerOptions
- func (m *ManagerOptions) SetCredentials(username, password string) *ManagerOptions
- func (m *ManagerOptions) SetHost(host string) *ManagerOptions
- func (m *ManagerOptions) SetMarshaller(marshaller Marshaller) *ManagerOptions
- func (m *ManagerOptions) SetMode(mode string) *ManagerOptions
- func (m *ManagerOptions) SetPort(port uint) *ManagerOptions
- func (m *ManagerOptions) SetUseTLS(use bool) *ManagerOptions
- func (m *ManagerOptions) SetVhost(vhost string) *ManagerOptions
- type Marshaller
- type MessageConsumer
- type MessagePriority
- type PublishingOptions
- type QueueConfig
- type RabbitMQEnvs
- type SchemaDefinitions
Constants ¶
const ( Release = "release" Debug = "debug" )
Logging Modes.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BindingConfig ¶
type ClientOptions ¶
type ClientOptions struct {
// Host is the RabbitMQ server host name.
Host string
// Port is the RabbitMQ server port number.
Port uint
// Username is the RabbitMQ server allowed username.
Username string
// Password is the RabbitMQ server allowed password.
Password string
// Vhost is used for CloudAMQP connections to set the specific vhost.
Vhost string
// UseTLS defines whether we use amqp or amqps protocol.
UseTLS bool
// ConnectionName is the client connection name passed on to the RabbitMQ server.
ConnectionName string
// KeepAlive will determine whether the re-connection and retry mechanisms should be triggered.
KeepAlive bool
// RetryDelay will define the delay for the re-connection and retry mechanism.
RetryDelay time.Duration
// MaxRetry will define the number of retries when an amqpMessage could not be processed.
MaxRetry uint
// PublishingCacheTTL defines the time to live for each publishing cache item.
PublishingCacheTTL time.Duration
// PublishingCacheSize defines the max length of the publishing cache.
PublishingCacheSize uint64
// Mode will specify whether logs are enabled or not.
Mode string
// Marshaller defines the content type used for messages and how they're marshalled (default: JSON).
Marshaller Marshaller
}
ClientOptions holds all necessary properties to launch a successful connection with an MQTTClient.
func DefaultClientOptions ¶
func DefaultClientOptions() *ClientOptions
DefaultClientOptions will return a ClientOptions with default values.
func NewClientOptions ¶
func NewClientOptions() *ClientOptions
NewClientOptions is the exported builder for a ClientOptions and will offer setter methods for an easy construction. Any non-assigned field will be set to default through DefaultClientOptions.
func NewClientOptionsFromEnv ¶
func NewClientOptionsFromEnv() *ClientOptions
NewClientOptionsFromEnv will generate a ClientOptions from environment variables. Empty values will be taken as default through the DefaultClientOptions.
func (*ClientOptions) SetConnectionName ¶
func (c *ClientOptions) SetConnectionName(connectionName string) *ClientOptions
SetConnectionName will assign the ConnectionName.
func (*ClientOptions) SetCredentials ¶
func (c *ClientOptions) SetCredentials(username, password string) *ClientOptions
SetCredentials will assign the Username and Password.
func (*ClientOptions) SetHost ¶
func (c *ClientOptions) SetHost(host string) *ClientOptions
SetHost will assign the Host.
func (*ClientOptions) SetKeepAlive ¶
func (c *ClientOptions) SetKeepAlive(keepAlive bool) *ClientOptions
SetKeepAlive will assign the KeepAlive status.
func (*ClientOptions) SetMarshaller ¶
func (c *ClientOptions) SetMarshaller(marshaller Marshaller) *ClientOptions
SetMarshaller will assign the Marshaller.
func (*ClientOptions) SetMaxRetry ¶
func (c *ClientOptions) SetMaxRetry(retry uint) *ClientOptions
SetMaxRetry will assign the max retry count.
func (*ClientOptions) SetMode ¶
func (c *ClientOptions) SetMode(mode string) *ClientOptions
SetMode will assign the Mode if valid.
func (*ClientOptions) SetPort ¶
func (c *ClientOptions) SetPort(port uint) *ClientOptions
SetPort will assign the Port.
func (*ClientOptions) SetPublishingCacheSize ¶
func (c *ClientOptions) SetPublishingCacheSize(size uint64) *ClientOptions
SetPublishingCacheSize will assign the publishing cache max length.
func (*ClientOptions) SetPublishingCacheTTL ¶
func (c *ClientOptions) SetPublishingCacheTTL(ttl time.Duration) *ClientOptions
SetPublishingCacheTTL will assign the publishing cache item TTL.
func (*ClientOptions) SetRetryDelay ¶
func (c *ClientOptions) SetRetryDelay(delay time.Duration) *ClientOptions
SetRetryDelay will assign the retry delay.
func (*ClientOptions) SetUseTLS ¶
func (c *ClientOptions) SetUseTLS(use bool) *ClientOptions
SetUseTLS will assign the UseTLS status.
func (*ClientOptions) SetVhost ¶
func (c *ClientOptions) SetVhost(vhost string) *ClientOptions
SetVhost will assign the Vhost.
type DeliveryMode ¶
type DeliveryMode uint8
const ( Transient DeliveryMode = 1 Persistent DeliveryMode = 2 )
func (DeliveryMode) Uint8 ¶
func (d DeliveryMode) Uint8() uint8
type ExchangeConfig ¶
type ExchangeConfig struct {
Name string `yaml:"name"`
Type ExchangeType `yaml:"type"`
Persisted bool `yaml:"persisted"`
Args map[string]interface{} `yaml:"args"`
}
type ExchangeType ¶
type ExchangeType string
const ( ExchangeTypeTopic ExchangeType = "topic" ExchangeTypeDirect ExchangeType = "direct" ExchangeTypeFanout ExchangeType = "fanout" ExchangeTypeHeaders ExchangeType = "headers" )
func (ExchangeType) String ¶
func (e ExchangeType) String() string
type MQTTClient ¶
type MQTTClient interface {
// Disconnect launches the disconnection process.
// This operation disables to client permanently.
Disconnect() error
// Publish will send the desired payload through the selected channel.
// - exchange is the name of the exchange targeted for event publishing.
// - routingKey is the route that the exchange will use to forward the message.
// - payload is the object you want to send as a byte array.
// Returns an error if the connection to the RabbitMQ server is down.
Publish(exchange, routingKey string, payload interface{}) error
// PublishWithOptions will send the desired payload through the selected channel.
// - exchange is the name of the exchange targeted for event publishing.
// - routingKey is the route that the exchange will use to forward the message.
// - payload is the object you want to send as a byte array.
// Optionally you can add publishingOptions for extra customization.
// Returns an error if the connection to the RabbitMQ server is down.
PublishWithOptions(exchange, routingKey string, payload interface{}, options *PublishingOptions) error
// RegisterConsumer will register a MessageConsumer for internal queue subscription and message processing.
// The MessageConsumer will hold a list of MQTTMessageHandlers to internalize message processing.
// Based on the return of error of each handler, the process of acknowledgment, rejection and retry of messages is
// fully handled internally.
// Furthermore, connection lost and channel errors are also internally handled by the connectionManager that will keep consumers
// alive if and when necessary.
RegisterConsumer(consumer MessageConsumer) error
// IsReady returns true if the client is fully operational and connected to the RabbitMQ.
IsReady() bool
// IsHealthy returns true if the client is ready (IsReady) and all channels are operating successfully.
IsHealthy() bool
// GetHost returns the host used to initialize the client.
GetHost() string
// GetPort returns the port used to initialize the client.
GetPort() uint
// GetUsername returns the username used to initialize the client.
GetUsername() string
// GetVhost returns the vhost used to initialize the client.
GetVhost() string
// IsDisabled returns whether the client is disabled or not.
IsDisabled() bool
}
MQTTClient is a simple MQTT interface that offers basic client operations such as:
- Publishing
- Consuming
- Disconnecting
- Ready and health checks
func NewClient ¶
func NewClient(options *ClientOptions) MQTTClient
NewClient will instantiate a new MQTTClient. If options is set to nil, the DefaultClientOptions will be used.
func NewClientFromEnv ¶
func NewClientFromEnv() MQTTClient
NewClientFromEnv will instantiate a new MQTTClient from environment variables.
type MQTTManager ¶
type MQTTManager interface {
// Disconnect launches the disconnection process.
// This operation disables to manager permanently.
Disconnect() error
// CreateQueue will create a new queue from QueueConfig.
CreateQueue(config QueueConfig) error
// CreateExchange will create a new exchange from ExchangeConfig.
CreateExchange(config ExchangeConfig) error
// BindExchangeToQueueViaRoutingKey will bind an exchange to a queue via a given routingKey.
// Returns an error if the connection to the RabbitMQ server is down or if the exchange or queue does not exist.
BindExchangeToQueueViaRoutingKey(exchange, queue, routingKey string) error
// GetNumberOfMessages retrieves the number of messages currently sitting in a given queue.
// Returns an error if the connection to the RabbitMQ server is down or the queue does not exist.
GetNumberOfMessages(queue string) (int, error)
// PushMessageToExchange pushes a message to a given exchange with a given routing key.
// Returns an error if the connection to the RabbitMQ server is down or the exchange does not exist.
PushMessageToExchange(exchange, routingKey string, payload interface{}) error
// PopMessageFromQueue retrieves the first message of a queue. The message can then be auto-acknowledged or not.
// Returns an error if the connection to the RabbitMQ server is down or the queue does not exist or is empty.
PopMessageFromQueue(queue string, autoAck bool) (*amqp.Delivery, error)
// PurgeQueue will empty a queue of all its current messages.
// Returns an error if the connection to the RabbitMQ server is down or the queue does not exist.
PurgeQueue(queue string) error
// DeleteQueue permanently deletes an existing queue.
// Returns an error if the connection to the RabbitMQ server is down or the queue does not exist.
DeleteQueue(queue string) error
// DeleteExchange permanently deletes an existing exchange.
// Returns an error if the connection to the RabbitMQ server is down or the exchange does not exist.
DeleteExchange(exchange string) error
// SetupFromDefinitions loads a definitions.json file and automatically sets up exchanges, queues and bindings.
SetupFromDefinitions(path string) error
// GetHost returns the host used to initialize the manager.
GetHost() string
// GetPort returns the port used to initialize the manager.
GetPort() uint
// GetUsername returns the username used to initialize the manager.
GetUsername() string
// GetVhost returns the vhost used to initialize the manager.
GetVhost() string
// IsDisabled returns whether the manager is disabled or not.
IsDisabled() bool
}
MQTTManager is a simple MQTT interface that offers basic management operations such as:
- Creation of queue, exchange and bindings
- Deletion of queues and exchanges
- Purge of queues
- Queue evaluation (existence and number of messages)
func NewManager ¶
func NewManager(options *ManagerOptions) (MQTTManager, error)
NewManager will instantiate a new MQTTManager. If options is set to nil, the DefaultManagerOptions will be used.
func NewManagerFromEnv ¶
func NewManagerFromEnv() (MQTTManager, error)
NewManagerFromEnv will instantiate a new MQTTManager from environment variables.
type MQTTMessageHandlerFunc ¶
MQTTMessageHandlerFunc is the function that will be called when a delivery is received.
type MQTTMessageHandlers ¶
type MQTTMessageHandlers map[string]MQTTMessageHandlerFunc
MQTTMessageHandlers is a wrapper that holds a map[string]MQTTMessageHandlerFunc.
func (MQTTMessageHandlers) FindFunc ¶
func (mh MQTTMessageHandlers) FindFunc(routingKey string) MQTTMessageHandlerFunc
func (MQTTMessageHandlers) Validate ¶
func (mh MQTTMessageHandlers) Validate() error
Validate verifies that all routing keys in the handlers are properly formatted and allowed.
type ManagerOptions ¶
type ManagerOptions struct {
// Host is the RabbitMQ server host name.
Host string
// Port is the RabbitMQ server port number.
Port uint
// Username is the RabbitMQ server allowed username.
Username string
// Password is the RabbitMQ server allowed password.
Password string
// Vhost is used for CloudAMQP connections to set the specific vhost.
Vhost string
// UseTLS defines whether we use amqp or amqps protocol.
UseTLS bool
// Mode will specify whether logs are enabled or not.
Mode string
// Marshaller defines the content type used for messages and how they're marshalled (default: JSON).
Marshaller Marshaller
}
ManagerOptions holds all necessary properties to launch a successful connection with an MQTTManager.
func DefaultManagerOptions ¶
func DefaultManagerOptions() *ManagerOptions
DefaultManagerOptions will return a ManagerOptions with default values.
func NewManagerOptions ¶
func NewManagerOptions() *ManagerOptions
NewManagerOptions is the exported builder for a ManagerOptions and will offer setter methods for an easy construction. Any non-assigned field will be set to default through DefaultManagerOptions.
func NewManagerOptionsFromEnv ¶
func NewManagerOptionsFromEnv() *ManagerOptions
NewManagerOptionsFromEnv will generate a ManagerOptions from environment variables. Empty values will be taken as default through the DefaultManagerOptions.
func (*ManagerOptions) SetCredentials ¶
func (m *ManagerOptions) SetCredentials(username, password string) *ManagerOptions
SetCredentials will assign the username and password.
func (*ManagerOptions) SetHost ¶
func (m *ManagerOptions) SetHost(host string) *ManagerOptions
SetHost will assign the host.
func (*ManagerOptions) SetMarshaller ¶
func (m *ManagerOptions) SetMarshaller(marshaller Marshaller) *ManagerOptions
SetMarshaller will assign the Marshaller.
func (*ManagerOptions) SetMode ¶
func (m *ManagerOptions) SetMode(mode string) *ManagerOptions
SetMode will assign the mode if valid.
func (*ManagerOptions) SetPort ¶
func (m *ManagerOptions) SetPort(port uint) *ManagerOptions
SetPort will assign the port.
func (*ManagerOptions) SetUseTLS ¶
func (m *ManagerOptions) SetUseTLS(use bool) *ManagerOptions
SetUseTLS will assign the UseTLS status.
func (*ManagerOptions) SetVhost ¶
func (m *ManagerOptions) SetVhost(vhost string) *ManagerOptions
SetVhost will assign the Vhost.
type Marshaller ¶
func NewJSONMarshaller ¶
func NewJSONMarshaller() Marshaller
func NewTextMarshaller ¶
func NewTextMarshaller() Marshaller
type MessageConsumer ¶
type MessageConsumer struct {
// Queue defines the queue from which we want to consume messages.
Queue string
// Name is a unique identifier of the consumer. Should be as explicit as possible.
Name string
// PrefetchSize defines the max size of messages that are allowed to be processed at the same time.
// This property is dropped if AutoAck is set to true.
PrefetchSize int
// PrefetchCount defines the max number of messages that are allowed to be processed at the same time.
// This property is dropped if AutoAck is set to true.
PrefetchCount int
// AutoAck defines whether a message is directly acknowledged or not when being consumed.
AutoAck bool
// ConcurrentProcess will make MQTTMessageHandlers run concurrently for faster consumption, if set to true.
ConcurrentProcess bool
// Handlers is the list of defined handlers.
Handlers MQTTMessageHandlers
}
MessageConsumer holds all the information needed to consume messages.
func (MessageConsumer) HashCode ¶
func (c MessageConsumer) HashCode() string
HashCode returns a unique identifier for the defined consumer.
type MessagePriority ¶
type MessagePriority uint8
const ( PriorityLowest MessagePriority = 1 PriorityVeryLow MessagePriority = 2 PriorityLow MessagePriority = 3 PriorityMedium MessagePriority = 4 PriorityHigh MessagePriority = 5 PriorityHighest MessagePriority = 6 )
func (MessagePriority) Uint8 ¶
func (m MessagePriority) Uint8() uint8
type PublishingOptions ¶
type PublishingOptions struct {
MessagePriority *MessagePriority
DeliveryMode *DeliveryMode
TTL *time.Duration
}
func SendOptions ¶
func SendOptions() *PublishingOptions
func (*PublishingOptions) SetMode ¶
func (m *PublishingOptions) SetMode(mode DeliveryMode) *PublishingOptions
func (*PublishingOptions) SetPriority ¶
func (m *PublishingOptions) SetPriority(priority MessagePriority) *PublishingOptions
func (*PublishingOptions) SetTTL ¶
func (m *PublishingOptions) SetTTL(ttl time.Duration) *PublishingOptions
type QueueConfig ¶
type RabbitMQEnvs ¶
type RabbitMQEnvs struct {
Host string `env:"RABBITMQ_HOST"`
Port uint `env:"RABBITMQ_PORT"`
Username string `env:"RABBITMQ_USERNAME"`
Password string `env:"RABBITMQ_PASSWORD"`
Vhost string `env:"RABBITMQ_VHOST"`
UseTLS bool `env:"RABBITMQ_USE_TLS"`
ConnectionName string `env:"RABBITMQ_CONNECTION_NAME"`
}
type SchemaDefinitions ¶
type SchemaDefinitions struct {
Exchanges []struct {
Name string `json:"name"`
Vhost string `json:"vhost"`
Type string `json:"type"`
Durable bool `json:"durable"`
AutoDelete bool `json:"auto_delete"`
Internal bool `json:"internal"`
Arguments struct {
} `json:"arguments"`
} `json:"exchanges"`
Queues []struct {
Name string `json:"name"`
Vhost string `json:"vhost"`
Durable bool `json:"durable"`
AutoDelete bool `json:"auto_delete"`
Arguments struct {
} `json:"arguments"`
} `json:"queues"`
Bindings []struct {
Source string `json:"source"`
Vhost string `json:"vhost"`
Destination string `json:"destination"`
DestinationType string `json:"destination_type"`
RoutingKey string `json:"routing_key"`
Arguments struct {
} `json:"arguments"`
} `json:"bindings"`
}


