Documentation
¶
Index ¶
- Constants
- func AddRabbitListener(listener RabbitListener)
- func DeclareRabbitBinding(ch *amqp.Channel, bind BindingRegistration) error
- func DeclareRabbitExchange(ch *amqp.Channel, exchange ExchangeRegistration) error
- func DeclareRabbitQueue(ch *amqp.Channel, queue QueueRegistration) error
- func NewEventBus(name string)
- func NewRabbitChan() (*amqp.Channel, error)
- func PubEventBus(rail miso.Rail, eventObject any, name string) error
- func PubEventBusHeaders(rail miso.Rail, eventObject any, name string, headers map[string]any) error
- func PublishJson(c miso.Rail, obj any, exchange string, routingKey string) error
- func PublishJsonHeaders(c miso.Rail, obj any, exchange string, routingKey string, ...) error
- func PublishMsg(c miso.Rail, msg []byte, exchange string, routingKey string, ...) error
- func PublishText(c miso.Rail, msg string, exchange string, routingKey string) error
- func RabbitBootstrap(rail miso.Rail) error
- func RabbitBootstrapCondition(rail miso.Rail) (bool, error)
- func RabbitConnected() bool
- func RabbitDisconnect(rail miso.Rail) error
- func RabbitMQEnabled() bool
- func RegisterRabbitBinding(b BindingRegistration)
- func RegisterRabbitExchange(e ExchangeRegistration)
- func RegisterRabbitQueue(q QueueRegistration)
- func StartRabbitMqClient(rail miso.Rail) error
- func SubEventBus[T any](name string, concurrency int, listener func(rail miso.Rail, t T) error)
- type BindingRegistration
- type EventPipeline
- func (ep *EventPipeline[T]) Document(name string, desc string, provider string) *EventPipeline[T]
- func (ep *EventPipeline[T]) Listen(concurrency int, listener func(rail miso.Rail, t T) error) *EventPipeline[T]
- func (ep *EventPipeline[T]) LogPayload() *EventPipeline[T]
- func (ep *EventPipeline[T]) MaxRetry(n int) *EventPipeline[T]
- func (ep *EventPipeline[T]) Name() string
- func (ep *EventPipeline[T]) Send(rail miso.Rail, event T) error
- type EventPipelineDesc
- type ExchangeRegistration
- type JsonMsgListener
- type MsgListener
- type QueueRegistration
- type RabbitListener
Constants ¶
const ( BusRoutingKey = "#" BusExchangeKind = "direct" )
const ( PropRabbitMqEnabled = "rabbitmq.enabled" PropRabbitMqHost = "rabbitmq.host" PropRabbitMqPort = "rabbitmq.port" PropRabbitMqUsername = "rabbitmq.username" PropRabbitMqPassword = "rabbitmq.password" PropRabbitMqVhost = "rabbitmq.vhost" PropRabbitMqConsumerQos = "rabbitmq.consumer.qos" )
Configuration Properties for RabbitMQ
const ( // Default QOS DefaultQos = 68 // RabbitMQ messages are redelivered every 5 seconds, 180 times is roughly equivalent to 15 minutes retry. MaxRetryTimes15Min = 180 // Header key of rabbitmq message, specify how many times the message can be redelivered. // // Actual redelivery mechanism is implemented by miso's message listener. HeaderRabbitMaxRetry = "miso-rabbitmq-max-retry" // Header key of rabbitmq message, specify how many times the message has been redelivered. // // Actual redelivery mechanism is implemented by miso's message listener. HeaderRabbitCurrRetry = "miso-rabbitmq-curr-retry" )
Variables ¶
This section is empty.
Functions ¶
func AddRabbitListener ¶
func AddRabbitListener(listener RabbitListener)
Register pending message listener.
Listeners will be started in StartRabbitMqClient func when the connection to broker is established.
For any message that the listener is unable to process (returning error), the message is redelivered indefinitively with a delay of 10 seconds until the message is finally processed without error.
func DeclareRabbitBinding ¶
func DeclareRabbitBinding(ch *amqp.Channel, bind BindingRegistration) error
Declare binding using the provided channel immediately
func DeclareRabbitExchange ¶
func DeclareRabbitExchange(ch *amqp.Channel, exchange ExchangeRegistration) error
Declare exchange using the provided channel immediately
func DeclareRabbitQueue ¶
func DeclareRabbitQueue(ch *amqp.Channel, queue QueueRegistration) error
Declare queue using the provided channel immediately
func NewEventBus ¶
func NewEventBus(name string)
Declare event bus.
It basically is to create an direct exchange and a queue identified by the name, and bind them using routing key '#'.
func NewRabbitChan ¶
Create new channel from the established connection
func PubEventBus ¶
Send msg to event bus.
It's identical to sending a message to an exchange identified by the name using routing key '#'.
Before calling this method, the NewEventBus(...) should be called at least once to create the necessary components.
func PubEventBusHeaders ¶
Send msg to event bus.
It's identical to sending a message to an exchange identified by the name using routing key '#'.
Before calling this method, the NewEventBus(...) should be called at least once to create the necessary components.
func PublishJson ¶
Publish json message with confirmation
func PublishJsonHeaders ¶
func PublishJsonHeaders(c miso.Rail, obj any, exchange string, routingKey string, headers map[string]any) error
Publish json message with headers and confirmation
func PublishMsg ¶
func PublishMsg(c miso.Rail, msg []byte, exchange string, routingKey string, contentType string, headers map[string]any) error
Publish message with confirmation
func PublishText ¶
Publish plain text message with confirmation
func RabbitBootstrap ¶
func RegisterRabbitBinding ¶
func RegisterRabbitBinding(b BindingRegistration)
Declare binding on client initialization
func RegisterRabbitExchange ¶
func RegisterRabbitExchange(e ExchangeRegistration)
Declare exchange on client initialization
func RegisterRabbitQueue ¶
func RegisterRabbitQueue(q QueueRegistration)
Declare queue on client initialization
func StartRabbitMqClient ¶
Start RabbitMQ Client (synchronous for the first time, then auto-reconnect later in another goroutine)
This func will attempt to establish connection to broker, declare queues, exchanges and bindings.
Listeners are also created once the intial setup is done.
When connection is lost, it will attmpt to reconnect to recover, unless the given context is done.
To register listener, please use 'AddListener' func.
Types ¶
type BindingRegistration ¶
type EventPipeline ¶
type EventPipeline[T any] struct { // contains filtered or unexported fields }
EventPipeline is a thin wrapper of NewEventBus, SubEventBus and PubEventBus. It's used to make things easier and more consistent.
Use NewEventPipeline to instantiate.
func NewEventPipeline ¶
func NewEventPipeline[T any](name string) *EventPipeline[T]
Create new EventPipeline. NewEventBus is internally called as well.
func (*EventPipeline[T]) Document ¶
func (ep *EventPipeline[T]) Document(name string, desc string, provider string) *EventPipeline[T]
Document EventPipline in the generated apidoc.
func (*EventPipeline[T]) Listen ¶
func (ep *EventPipeline[T]) Listen(concurrency int, listener func(rail miso.Rail, t T) error) *EventPipeline[T]
Call SubEventBus.
func (*EventPipeline[T]) LogPayload ¶
func (ep *EventPipeline[T]) LogPayload() *EventPipeline[T]
Log payload in message consumer.
func (*EventPipeline[T]) MaxRetry ¶
func (ep *EventPipeline[T]) MaxRetry(n int) *EventPipeline[T]
Specify max retry times, by default, it's -1, meaning that the message will be redelivered forever until it's successfully consumed.
The message redelivery mechanism is implemented in miso's message consumer not publisher.
type EventPipelineDesc ¶
type ExchangeRegistration ¶
type JsonMsgListener ¶
type JsonMsgListener[T any] struct { QueueName string Handler func(rail miso.Rail, payload T) error NumOfRoutines int }
Json Message Listener for Queue
func (JsonMsgListener[T]) Concurrency ¶
func (m JsonMsgListener[T]) Concurrency() int
func (JsonMsgListener[T]) Handle ¶
func (m JsonMsgListener[T]) Handle(rail miso.Rail, payload string) error
func (JsonMsgListener[T]) Queue ¶
func (m JsonMsgListener[T]) Queue() string
func (JsonMsgListener[T]) String ¶
func (m JsonMsgListener[T]) String() string
type MsgListener ¶
type MsgListener struct {
QueueName string
Handler func(rail miso.Rail, payload string) error
NumOfRoutines int
}
Message Listener for Queue
func (MsgListener) Concurrency ¶
func (m MsgListener) Concurrency() int
func (MsgListener) Queue ¶
func (m MsgListener) Queue() string
func (MsgListener) String ¶
func (m MsgListener) String() string