rabbit

package
v0.1.2-beta.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 19, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BusRoutingKey   = "#"
	BusExchangeKind = "direct"
)
View Source
const (
	PropRabbitMqEnabled     = "rabbitmq.enabled"
	PropRabbitMqHost        = "rabbitmq.host"
	PropRabbitMqPort        = "rabbitmq.port"
	PropRabbitMqUsername    = "rabbitmq.username"
	PropRabbitMqPassword    = "rabbitmq.password"
	PropRabbitMqVhost       = "rabbitmq.vhost"
	PropRabbitMqConsumerQos = "rabbitmq.consumer.qos"
)

Configuration Properties for RabbitMQ

View Source
const (
	// Default QOS
	DefaultQos = 68

	// RabbitMQ messages are redelivered every 5 seconds, 180 times is roughly equivalent to 15 minutes retry.
	MaxRetryTimes15Min = 180

	// Header key of rabbitmq message, specify how many times the message can be redelivered.
	//
	// Actual redelivery mechanism is implemented by miso's message listener.
	HeaderRabbitMaxRetry = "miso-rabbitmq-max-retry"

	// Header key of rabbitmq message, specify how many times the message has been redelivered.
	//
	// Actual redelivery mechanism is implemented by miso's message listener.
	HeaderRabbitCurrRetry = "miso-rabbitmq-curr-retry"
)

Variables

This section is empty.

Functions

func AddRabbitListener

func AddRabbitListener(listener RabbitListener)

Register pending message listener.

Listeners will be started in StartRabbitMqClient func when the connection to broker is established.

For any message that the listener is unable to process (returning error), the message is redelivered indefinitively with a delay of 10 seconds until the message is finally processed without error.

func DeclareRabbitBinding

func DeclareRabbitBinding(ch *amqp.Channel, bind BindingRegistration) error

Declare binding using the provided channel immediately

func DeclareRabbitExchange

func DeclareRabbitExchange(ch *amqp.Channel, exchange ExchangeRegistration) error

Declare exchange using the provided channel immediately

func DeclareRabbitQueue

func DeclareRabbitQueue(ch *amqp.Channel, queue QueueRegistration) error

Declare queue using the provided channel immediately

func NewEventBus

func NewEventBus(name string)

Declare event bus.

It basically is to create an direct exchange and a queue identified by the name, and bind them using routing key '#'.

func NewRabbitChan

func NewRabbitChan() (*amqp.Channel, error)

Create new channel from the established connection

func PubEventBus

func PubEventBus(rail miso.Rail, eventObject any, name string) error

Send msg to event bus.

It's identical to sending a message to an exchange identified by the name using routing key '#'.

Before calling this method, the NewEventBus(...) should be called at least once to create the necessary components.

func PubEventBusHeaders

func PubEventBusHeaders(rail miso.Rail, eventObject any, name string, headers map[string]any) error

Send msg to event bus.

It's identical to sending a message to an exchange identified by the name using routing key '#'.

Before calling this method, the NewEventBus(...) should be called at least once to create the necessary components.

func PublishJson

func PublishJson(c miso.Rail, obj any, exchange string, routingKey string) error

Publish json message with confirmation

func PublishJsonHeaders

func PublishJsonHeaders(c miso.Rail, obj any, exchange string, routingKey string, headers map[string]any) error

Publish json message with headers and confirmation

func PublishMsg

func PublishMsg(c miso.Rail, msg []byte, exchange string, routingKey string, contentType string, headers map[string]any) error

Publish message with confirmation

func PublishText

func PublishText(c miso.Rail, msg string, exchange string, routingKey string) error

Publish plain text message with confirmation

func RabbitBootstrap

func RabbitBootstrap(rail miso.Rail) error

func RabbitBootstrapCondition

func RabbitBootstrapCondition(rail miso.Rail) (bool, error)

func RabbitConnected

func RabbitConnected() bool

Check if connection exists

func RabbitDisconnect

func RabbitDisconnect(rail miso.Rail) error

Disconnect from RabbitMQ server

func RabbitMQEnabled

func RabbitMQEnabled() bool

Is RabbitMQ Enabled

func RegisterRabbitBinding

func RegisterRabbitBinding(b BindingRegistration)

Declare binding on client initialization

func RegisterRabbitExchange

func RegisterRabbitExchange(e ExchangeRegistration)

Declare exchange on client initialization

func RegisterRabbitQueue

func RegisterRabbitQueue(q QueueRegistration)

Declare queue on client initialization

func StartRabbitMqClient

func StartRabbitMqClient(rail miso.Rail) error

Start RabbitMQ Client (synchronous for the first time, then auto-reconnect later in another goroutine)

This func will attempt to establish connection to broker, declare queues, exchanges and bindings.

Listeners are also created once the intial setup is done.

When connection is lost, it will attmpt to reconnect to recover, unless the given context is done.

To register listener, please use 'AddListener' func.

func SubEventBus

func SubEventBus[T any](name string, concurrency int, listener func(rail miso.Rail, t T) error)

Subscribe to event bus.

Internally, it calls NewEventBus(...) and registers a listener for the queue identified by the bus name.

Types

type BindingRegistration

type BindingRegistration struct {
	Queue      string
	RoutingKey string
	Exchange   string
}

type EventPipeline

type EventPipeline[T any] struct {
	// contains filtered or unexported fields
}

EventPipeline is a thin wrapper of NewEventBus, SubEventBus and PubEventBus. It's used to make things easier and more consistent.

Use NewEventPipeline to instantiate.

func NewEventPipeline

func NewEventPipeline[T any](name string) *EventPipeline[T]

Create new EventPipeline. NewEventBus is internally called as well.

func (*EventPipeline[T]) Document

func (ep *EventPipeline[T]) Document(name string, desc string, provider string) *EventPipeline[T]

Document EventPipline in the generated apidoc.

func (*EventPipeline[T]) Listen

func (ep *EventPipeline[T]) Listen(concurrency int, listener func(rail miso.Rail, t T) error) *EventPipeline[T]

Call SubEventBus.

func (*EventPipeline[T]) LogPayload

func (ep *EventPipeline[T]) LogPayload() *EventPipeline[T]

Log payload in message consumer.

func (*EventPipeline[T]) MaxRetry

func (ep *EventPipeline[T]) MaxRetry(n int) *EventPipeline[T]

Specify max retry times, by default, it's -1, meaning that the message will be redelivered forever until it's successfully consumed.

The message redelivery mechanism is implemented in miso's message consumer not publisher.

func (*EventPipeline[T]) Name

func (ep *EventPipeline[T]) Name() string

Name of the pipeline.

func (*EventPipeline[T]) Send

func (ep *EventPipeline[T]) Send(rail miso.Rail, event T) error

Call PubEventBus.

type EventPipelineDesc

type EventPipelineDesc struct {
	Name       string
	Desc       string
	PayloadVal any
	Exchange   string
	RoutingKey string
	Queue      string
}

type ExchangeRegistration

type ExchangeRegistration struct {
	Name       string
	Kind       string
	Durable    bool
	Properties map[string]any
}

type JsonMsgListener

type JsonMsgListener[T any] struct {
	QueueName     string
	Handler       func(rail miso.Rail, payload T) error
	NumOfRoutines int
}

Json Message Listener for Queue

func (JsonMsgListener[T]) Concurrency

func (m JsonMsgListener[T]) Concurrency() int

func (JsonMsgListener[T]) Handle

func (m JsonMsgListener[T]) Handle(rail miso.Rail, payload string) error

func (JsonMsgListener[T]) Queue

func (m JsonMsgListener[T]) Queue() string

func (JsonMsgListener[T]) String

func (m JsonMsgListener[T]) String() string

type MsgListener

type MsgListener struct {
	QueueName     string
	Handler       func(rail miso.Rail, payload string) error
	NumOfRoutines int
}

Message Listener for Queue

func (MsgListener) Concurrency

func (m MsgListener) Concurrency() int

func (MsgListener) Handle

func (m MsgListener) Handle(rail miso.Rail, payload string) error

func (MsgListener) Queue

func (m MsgListener) Queue() string

func (MsgListener) String

func (m MsgListener) String() string

type QueueRegistration

type QueueRegistration struct {
	Name    string
	Durable bool
}

type RabbitListener

type RabbitListener interface {
	Queue() string                               // return name of the queue
	Handle(rail miso.Rail, payload string) error // handle message
	Concurrency() int
}

RabbitListener of Queue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL