Documentation
¶
Index ¶
- Constants
- Variables
- type BrokerConfig
- type Consumer
- type ConsumerConfig
- func (c *ConsumerConfig) BuildDeadletterQueue(routes *Routes, ch *amqp.Channel, con *amqp.Connection, ex string) (err error)
- func (c *ConsumerConfig) BuildQueue(queueName string, routes *Routes, ch *amqp.Channel, ex string) (err error)
- func (e *ConsumerConfig) GetArgs() map[string]interface{}
- func (c *ConsumerConfig) GetAutoDelete() bool
- func (e *ConsumerConfig) GetDeadletterName() string
- func (c *ConsumerConfig) GetDurable() bool
- func (c *ConsumerConfig) GetExclusive() bool
- func (e *ConsumerConfig) GetHasDeadletter() bool
- func (c *ConsumerConfig) GetName() string
- func (c *ConsumerConfig) GetNoWait() bool
- func (c *ConsumerConfig) GetPrefetchCount() uint
- func (c *ConsumerConfig) GetPrefetchSize() uint
- type Exchange
- type ExchangeConfig
- func (e *ExchangeConfig) BuildExchange(ch *amqp.Channel) (err error)
- func (e *ExchangeConfig) GetArgs() map[string]interface{}
- func (e *ExchangeConfig) GetAutoDelete() bool
- func (e *ExchangeConfig) GetDurable() bool
- func (e *ExchangeConfig) GetInternal() bool
- func (e *ExchangeConfig) GetName() (string, error)
- func (e *ExchangeConfig) GetType() string
- type HandlerFunc
- type Host
- type HostConfig
- type HostMiddleware
- type KeyHandlerFunc
- type MessageHandler
- type MiddlewareList
- type RabbitHost
- func (h *RabbitHost) AddBroker(ctx context.Context, cfg *ExchangeConfig, consumers []Consumer) error
- func (h *RabbitHost) GetConnectionStatus() bool
- func (h *RabbitHost) Middleware(fn ...HostMiddleware)
- func (h *RabbitHost) Run(ctx context.Context) (err error)
- func (h *RabbitHost) Stop(context.Context) error
- type Routes
Constants ¶
const (
TOPIC_EXCHANGE = "topic"
)
Variables ¶
var (
ERRNAMEREQUIRED = errors.New("name is a required exchange field")
)
Functions ¶
This section is empty.
Types ¶
type BrokerConfig ¶
type BrokerConfig struct { Exchange ExchangeConfig Consumers map[string]ConsumerConfig }
type Consumer ¶
type Consumer interface { // Init can be used to get a custom // consumer config. If it returns nil a consumer // with default params will be setup Init() (*ConsumerConfig, error) // Prefix defines a common prefix to be added // to all queue names Prefix() string // Middleware can be used to implement custom // middleware which gets called before messages // are passed to handlers Middleware(HandlerFunc) HandlerFunc // Queues is used to define the queues, keys and handlers // Config is passed which can be used to set QOS and consumer name Queues(context.Context) map[string]*Routes }
Consumer is an interface which can be implemented to create a consumer
The consumer can setup multiple queues, define n routing keys for each queue and in turn assign a handler to manage the messages received
Custom middleware can be added using the Middleware method
Example implementation shown below:
type MyConsumer struct{ } func NewMyConsumer() events.Consumer { return &MyConsumer{} } func (c *MyConsumer) Prefix() string{ return "" } func (c *MyConsumer) Middleware(h events.HandlerFunc) events.HandlerFunc{ return func(m events.BasicMessage) error { return h(m) } } func (c *MyConsumer) Setup(ctx context.Context) (map[string]*events.ConsumerRoutes, ){ return map[string]*events.ConsumerRoutes{"mark.queue":{ Keys: []string{"test.message", "mark.#"}, Handler:c.TestHandler, }, } } func (c *MyConsumer) TestHandler(m events.BasicMessage) error{ return nil }
type ConsumerConfig ¶
type ConsumerConfig struct { Name string Durable *bool AutoDelete *bool NoWait *bool Exclusive *bool Ttl *uint PrefetchCount *uint PrefetchSize *uint Args map[string]interface{} HasDeadletter *bool DeadletterName *string }
ConsumerConfig defines the setup of a consumer If this isn't set default values will be used. To set a custom config for a consumer setup a new consumer struct, pass the config and return it in the Init() method
func (*ConsumerConfig) BuildDeadletterQueue ¶
func (c *ConsumerConfig) BuildDeadletterQueue(routes *Routes, ch *amqp.Channel, con *amqp.Connection, ex string) (err error)
func (*ConsumerConfig) BuildQueue ¶
func (*ConsumerConfig) GetArgs ¶
func (e *ConsumerConfig) GetArgs() map[string]interface{}
GetArgs gets a table of arbitrary arguments which are passed to the exchange
func (*ConsumerConfig) GetAutoDelete ¶
func (c *ConsumerConfig) GetAutoDelete() bool
GetAutoDelete determines whether the queue is deleted on server restart, default is false
func (*ConsumerConfig) GetDeadletterName ¶
func (e *ConsumerConfig) GetDeadletterName() string
GetDeadletterName gets the name for the deadletter queue to be setup, if nil then a name of %QueueName%.deadletter is used
func (*ConsumerConfig) GetDurable ¶
func (c *ConsumerConfig) GetDurable() bool
GetDurable returns the type of durability set in config, if nil then it returns a default of true
func (*ConsumerConfig) GetExclusive ¶
func (c *ConsumerConfig) GetExclusive() bool
GetExclusive queues are only accessible by the connection that declares them and will be deleted when the connection closes. default is false
func (*ConsumerConfig) GetHasDeadletter ¶
func (e *ConsumerConfig) GetHasDeadletter() bool
GetArgs gets a table of arbitrary arguments which are passed to the exchange
func (*ConsumerConfig) GetName ¶
func (c *ConsumerConfig) GetName() string
GetName returns the consumer name if set in config otherwise it returns a random uuid
func (*ConsumerConfig) GetNoWait ¶
func (c *ConsumerConfig) GetNoWait() bool
GetNoWait When true, the queue will assume to be declared on the server. A channel exception will arrive if the conditions are met for existing queues or attempting to modify an existing queue from a different connection. default is false
func (*ConsumerConfig) GetPrefetchCount ¶
func (c *ConsumerConfig) GetPrefetchCount() uint
GetPrefetchCount returns the Qos value for number of messages pulled from the queue at a time default is 0 which will pull the default count for most libs
func (*ConsumerConfig) GetPrefetchSize ¶
func (c *ConsumerConfig) GetPrefetchSize() uint
type ExchangeConfig ¶
type ExchangeConfig struct { Name string // contains filtered or unexported fields }
Exchange config sets up a new exchange with the provided params Defaults are enabled so not all params may need set depending on requirements
func (*ExchangeConfig) BuildExchange ¶
func (e *ExchangeConfig) BuildExchange(ch *amqp.Channel) (err error)
BuildExchange builds an exchange
func (*ExchangeConfig) GetArgs ¶
func (e *ExchangeConfig) GetArgs() map[string]interface{}
GetArgs gets a table of arbitrary arguments which are passed to the exchange
func (*ExchangeConfig) GetAutoDelete ¶
func (e *ExchangeConfig) GetAutoDelete() bool
GetAutoDelete returns the type of deletion policy set in config, if nil then it returns a default of false
func (*ExchangeConfig) GetDurable ¶
func (e *ExchangeConfig) GetDurable() bool
GetDurable returns the type of durability set in config, if nil then it returns a default of true
func (*ExchangeConfig) GetInternal ¶
func (e *ExchangeConfig) GetInternal() bool
GetInternal determines whether this exchange can only be published to from other exchanges default value is false meaning external sources can by default publish to this exchange
func (*ExchangeConfig) GetName ¶
func (e *ExchangeConfig) GetName() (string, error)
GetAutoDelete returns the type of deletion policy set in config, if nil then it returns a default of false
func (*ExchangeConfig) GetType ¶
func (e *ExchangeConfig) GetType() string
GetType returns the type of exchange set in config, if nil then it returns a default of Topic
type HandlerFunc ¶
func JsonHandler ¶
func JsonHandler(h HandlerFunc) HandlerFunc
func MessageDump ¶
func MessageDump(h HandlerFunc) HandlerFunc
MessageDump will output the entire amqp message with the body converted to a string Handy for debugging
func (HandlerFunc) HandleMessage ¶
func (f HandlerFunc) HandleMessage(ctx context.Context, m amqp.Delivery)
type Host ¶
type Host interface { // AddBroker will register an exchange and n consumers // which will consume from that exchange AddBroker(context.Context, *ExchangeConfig, []Consumer) error // Start will setup all queues and routing keys // assigned to each consumer and then in turn start them Run(context.Context) (err error) // Middleware can be used to implement custom // middleware which gets called before messages // are passed to handlers Middleware(...HostMiddleware) // Stop can be called when you wish to shut down the host Stop(context.Context) error GetConnectionStatus() bool }
Host is the container which is used to host all consumers that are registered. It is responsible for the amqp connection starting & gracefully stopping all running consumers h := NewRabbitHost().Init(cfg.Host) h.AddBroker(NewBroker(cfg.Exchange, [])
func NewConsumerHost ¶
func NewConsumerHost(cfg *HostConfig) Host
Init sets up the initial connection & quality of service to be used by all registered consumers
type HostConfig ¶
type HostConfig struct {
Address string
}
HostConfig contains global config used for the rabbit connection
type HostMiddleware ¶
type HostMiddleware func(handler HandlerFunc) HandlerFunc
type MessageHandler ¶
MessageHandler works in the same way httpHandlers do allowing middleware etc to be used on consumers
type MiddlewareList ¶
type MiddlewareList []HostMiddleware
type RabbitHost ¶
type RabbitHost struct {
// contains filtered or unexported fields
}
func (*RabbitHost) AddBroker ¶
func (h *RabbitHost) AddBroker(ctx context.Context, cfg *ExchangeConfig, consumers []Consumer) error
AddBroker will register an exchange and n consumers which will consume from that exchange
func (*RabbitHost) GetConnectionStatus ¶
func (h *RabbitHost) GetConnectionStatus() bool
func (*RabbitHost) Middleware ¶
func (h *RabbitHost) Middleware(fn ...HostMiddleware)
type Routes ¶
type Routes struct { Keys []string DeliveryFunc KeyHandlerFunc }
Routes contains a set of routing keys and a handlerFunc that will be used to process messages meeting the routing keys