rabbit

package
v0.3.4-beta.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 14, 2025 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BusRoutingKey   = "#"
	BusExchangeKind = "direct"
)
View Source
const (
	// misoconfig-prop: enable RabbitMQ client | false
	PropRabbitMqEnabled = "rabbitmq.enabled"

	// misoconfig-prop: RabbitMQ server host | localhost
	PropRabbitMqHost = "rabbitmq.host"

	// misoconfig-prop: RabbitMQ server port | 5672
	PropRabbitMqPort = "rabbitmq.port"

	// misoconfig-prop: username used to connect to server | guest
	PropRabbitMqUsername = "rabbitmq.username"

	// misoconfig-prop: password used to connect to server | guest
	PropRabbitMqPassword = "rabbitmq.password"

	// misoconfig-prop: virtual host
	PropRabbitMqVhost = "rabbitmq.vhost"

	// misoconfig-prop: consumer QOS | 68
	PropRabbitMqConsumerQos = "rabbitmq.consumer.qos"

	// misoconfig-prop: publisher channel pool size | 20
	PropRabbitMqPublisherChanPoolSize = "rabbitmq.publisher.channel-pool-size"
)

misoconfig-section: RabbitMQ Configuration

View Source
const (
	// Default QOS
	DefaultQos = 68

	// RabbitMQ messages are redelivered every 5 seconds, 180 times is roughly equivalent to 15 minutes retry.
	MaxRetryTimes15Min = 180

	// Header key of rabbitmq message, specify how many times the message can be redelivered.
	//
	// Actual redelivery mechanism is implemented by miso's message listener.
	HeaderRabbitMaxRetry = "miso-rabbitmq-max-retry"

	// Header key of rabbitmq message, specify how many times the message has been redelivered.
	//
	// Actual redelivery mechanism is implemented by miso's message listener.
	HeaderRabbitCurrRetry = "miso-rabbitmq-curr-retry"
)

Variables

This section is empty.

Functions

func AddRabbitListener

func AddRabbitListener(listener RabbitListener)

Register pending message listener.

Listeners will be started in StartRabbitMqClient func when the connection to broker is established.

For any message that the listener is unable to process (returning error), the message is redelivered indefinitively with a delay of 10 seconds until the message is finally processed without error.

func NewEventBus

func NewEventBus(name string)

Declare event bus.

It basically is to create an direct exchange and a queue identified by the name, and bind them using routing key '#'.

func PubEventBus

func PubEventBus(rail miso.Rail, eventObject any, name string) error

Send msg to event bus.

It's identical to sending a message to an exchange identified by the name using routing key '#'.

Before calling this method, the NewEventBus(...) should be called at least once to create the necessary components.

func PubEventBusHeaders

func PubEventBusHeaders(rail miso.Rail, eventObject any, name string, headers map[string]any) error

Send msg to event bus.

It's identical to sending a message to an exchange identified by the name using routing key '#'.

Before calling this method, the NewEventBus(...) should be called at least once to create the necessary components.

func PubRetryEventBus added in v0.1.13

func PubRetryEventBus(rail miso.Rail, eventObject any, name string, retry int) error

Send msg to event bus with "miso-rabbitmq-max-retry" header.

Notice that redelivery mechanism is implemented on the consumer side (by miso).

It's identical to sending a message to an exchange identified by the name using routing key '#'.

Before calling this method, the NewEventBus(...) should be called at least once to create the necessary components.

func PublishJson

func PublishJson(c miso.Rail, obj any, exchange string, routingKey string) error

Publish json message with confirmation

func PublishJsonHeaders

func PublishJsonHeaders(c miso.Rail, obj any, exchange string, routingKey string, headers map[string]any) error

Publish json message with headers and confirmation

func PublishMsg

func PublishMsg(c miso.Rail, msg []byte, exchange string, routingKey string, contentType string, headers map[string]any) error

Publish message with confirmation

func PublishText

func PublishText(c miso.Rail, msg string, exchange string, routingKey string) error

Publish plain text message with confirmation

func RabbitConnected

func RabbitConnected() bool

func RabbitDisconnect

func RabbitDisconnect(rail miso.Rail) error

func RegisterRabbitBinding

func RegisterRabbitBinding(b BindingRegistration)

Declare binding on client initialization

func RegisterRabbitExchange

func RegisterRabbitExchange(e ExchangeRegistration)

Declare exchange on client initialization

func RegisterRabbitQueue

func RegisterRabbitQueue(q QueueRegistration)

Declare queue on client initialization

func StartRabbitMqClient

func StartRabbitMqClient(rail miso.Rail) error

Start RabbitMQ Client (synchronous for the first time, then auto-reconnect later in another goroutine)

This func will attempt to establish connection to broker, declare queues, exchanges and bindings.

Listeners are also created once the intial setup is done.

When connection is lost, it will attmpt to reconnect to recover, unless the given context is done.

To register listener, please use 'AddRabbitListener' func.

func SubEventBus

func SubEventBus[T any](name string, concurrency int, listener func(rail miso.Rail, t T) error)

Subscribe to event bus.

Internally, it calls NewEventBus(...) and registers a listener for the queue identified by the bus name.

func SubEventBusQos added in v0.1.13

func SubEventBusQos[T any](name string, concurrency int, qos int, listener func(rail miso.Rail, t T) error)

Subscribe to event bus.

Internally, it calls NewEventBus(...) and registers a listener for the queue identified by the bus name.

Types

type BindingRegistration

type BindingRegistration struct {
	Queue      string
	RoutingKey string
	Exchange   string
}

type EventPipeline

type EventPipeline[T any] struct {
	// contains filtered or unexported fields
}

EventPipeline is a thin wrapper of NewEventBus, SubEventBus and PubEventBus. It's used to make things easier and more consistent.

Use NewEventPipeline to instantiate.

func NewEventPipeline

func NewEventPipeline[T any](name string) *EventPipeline[T]

Create new EventPipeline. NewEventBus is internally called as well.

func (*EventPipeline[T]) Document

func (ep *EventPipeline[T]) Document(name string, desc string, provider string) *EventPipeline[T]

Document EventPipline in the generated apidoc.

func (*EventPipeline[T]) Listen

func (ep *EventPipeline[T]) Listen(concurrency int, listener func(rail miso.Rail, t T) error) *EventPipeline[T]

Call SubEventBus.

func (*EventPipeline[T]) ListenerQos added in v0.1.13

func (ep *EventPipeline[T]) ListenerQos(v int) *EventPipeline[T]

Specify QOS for listener, should be called before #Listen func.

func (*EventPipeline[T]) LogPayload

func (ep *EventPipeline[T]) LogPayload() *EventPipeline[T]

Log payload in message consumer.

func (*EventPipeline[T]) MaxRetry

func (ep *EventPipeline[T]) MaxRetry(n int) *EventPipeline[T]

Specify max retry times, by default, it's -1, meaning that the message will be redelivered forever until it's successfully consumed.

The message redelivery mechanism is implemented in miso's message consumer not publisher.

func (*EventPipeline[T]) Name

func (ep *EventPipeline[T]) Name() string

Name of the pipeline.

func (*EventPipeline[T]) Send

func (ep *EventPipeline[T]) Send(rail miso.Rail, event T) error

Call PubEventBus.

type EventPipelineDesc

type EventPipelineDesc struct {
	Name       string
	Desc       string
	PayloadVal any
	Exchange   string
	RoutingKey string
	Queue      string
}

type ExchangeRegistration

type ExchangeRegistration struct {
	Name       string
	Kind       string
	Durable    bool
	Properties map[string]any
}

type JsonMsgListener

type JsonMsgListener[T any] struct {
	QueueName     string
	Handler       func(rail miso.Rail, payload T) error
	NumOfRoutines int
	Qos           int
}

Json Message Listener for Queue

func (JsonMsgListener[T]) Concurrency

func (m JsonMsgListener[T]) Concurrency() int

func (JsonMsgListener[T]) Handle

func (m JsonMsgListener[T]) Handle(rail miso.Rail, payload string) error

func (JsonMsgListener[T]) QosSpec added in v0.1.13

func (m JsonMsgListener[T]) QosSpec() int

func (JsonMsgListener[T]) Queue

func (m JsonMsgListener[T]) Queue() string

type MsgListener

type MsgListener struct {
	QueueName     string
	Handler       func(rail miso.Rail, payload string) error
	NumOfRoutines int
	Qos           int
}

Message Listener for Queue

func (MsgListener) Concurrency

func (m MsgListener) Concurrency() int

func (MsgListener) Handle

func (m MsgListener) Handle(rail miso.Rail, payload string) error

func (MsgListener) QosSpec added in v0.1.13

func (m MsgListener) QosSpec() int

func (MsgListener) Queue

func (m MsgListener) Queue() string

type QueueRegistration

type QueueRegistration struct {
	Name    string
	Durable bool
}

type RabbitListener

type RabbitListener interface {
	Queue() string                               // return name of the queue
	Handle(rail miso.Rail, payload string) error // handle message
	Concurrency() int
	QosSpec() int
}

RabbitListener of Queue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL