rabbitmq

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2025 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package rabbitmq provides a RabbitMQ implementation of the queue interface.

Index

Constants

View Source
const Name = "rabbitmq"

Name of the queue.

Variables

This section is empty.

Functions

func Get

Get returns a setup Queue, or set it up.

func Set

Set the Queue, primarily used for testing.

Types

type Config

type Config struct {
	EnableConfirms bool `json:"enableConfirms"`
	Global         bool `json:"global"`
	PrefetchCount  int  `json:"prefetchCount"`
	PrefetchSize   int  `json:"prefetchSize"`
}

Config is the RabbitMQ configuration.

func NewConfig

func NewConfig() *Config

NewConfig returns a default RabbitMQ configuration.

type PublishParams

type PublishParams struct {
	queue.PublishParams

	// ContentType specifies the content type of the message.
	ContentType string `default:"application/json" json:"contentType"`

	// Confirm enables publisher confirmation mode for this message.
	// Only works if publisher confirms are enabled at the channel level.
	Confirm bool `default:"false" json:"confirm"`

	// ConfirmCh is the channel where confirmation will be sent after publishing.
	// This channel must be initialized by the caller if Confirm is true.
	// The confirmation will contain:
	// - Ack: true if message was confirmed, false if nacked
	// - DeliveryTag: the sequence number of this delivery
	ConfirmCh chan amqp.Confirmation `json:"-"`

	// DeliveryMode. Transient means higher throughput but messages will not be
	// restored on broker restart. The delivery mode of publishings is unrelated
	// to the durability of the queues they reside on. Transient messages will
	// not be restored to durable queues, persistent messages will be restored
	// to durable queues and lost on non-durable queues during server restart.
	// This remains typed as uint8 to match Publishing.DeliveryMode. Other
	// delivery modes specific to custom queue implementations are not
	// enumerated here.
	//
	// - Transient  uint8 = 1
	// - Persistent uint8 = 2
	DeliveryMode uint8 `default:"2" json:"deliveryMode"`

	// Exchange specifies the exchange to publish to.
	Exchange string `json:"exchange"`

	// Immediate delivers the message to the first available consumer immediately.
	Immediate bool `default:"false" json:"immediate"`

	// Mandatory ensures the message is delivered to at least one consumer.
	Mandatory bool `default:"false" json:"mandatory"`
}

PublishParams defines the parameters for publishing a message to a queue.

func NewPublishParams

func NewPublishParams() *PublishParams

NewPublishParams returns a default PublishParams.

type RabbitMQ

type RabbitMQ struct {
	*queue.Queue

	// Client is the RabbitMQ client.
	Client *amqp.Channel `json:"-" validate:"required"`

	// Config is the RabbitMQ configuration.
	Config *Config `json:"-" validate:"required"`
	// contains filtered or unexported fields
}

RabbitMQ queue definition.

func New

func New(ctx context.Context, url string, cfg *Config) (*RabbitMQ, error)

New creates a new RabbitMQ Queue.

func (*RabbitMQ) GetClient

func (m *RabbitMQ) GetClient() any

GetClient returns the client.

func (*RabbitMQ) Publish

func (m *RabbitMQ) Publish(ctx context.Context, queueName string, msg *queue.Message, prm *PublishParams, options ...queue.OptionsFunc[PublishParams, SubscribeParams]) error

Publish data.

func (*RabbitMQ) Subscribe

func (m *RabbitMQ) Subscribe(ctx context.Context, queueName string, cb queue.CallbackFunc, prm *SubscribeParams, options ...queue.OptionsFunc[PublishParams, SubscribeParams]) error

Subscribe to channel.

type SubscribeParams

type SubscribeParams struct {
	queue.SubscribeParams

	// Args are optional arguments that have specific semantics for the queue or
	// server.
	Args amqp.Table `json:"args"`

	// AutoAck acknowledges deliveries to this consumer prior to writing the
	// delivery to the network.
	AutoAck bool `default:"false" json:"autoAck"`

	// AutoDelete deletes the queue when the last consumer unsubscribes.
	AutoDelete bool `default:"false" json:"autoDelete"`

	// Consumer is a unique string that identifies the consumer.
	Consumer string `json:"consumer"`

	// Durability ensures the queue survives a broker restart.
	Durable bool `default:"true" json:"durable"`

	// Exclusive ensures that this is the sole consumer from this queue.
	Exclusive bool `default:"false" json:"exclusive"`

	// NoLocal is not supported by RabbitMQ.
	NoLocal bool `default:"false" json:"noLocal"`

	// NoWait does not wait for the server to confirm the request and immediately
	// begins deliveries.
	NoWait bool `default:"false" json:"noWait"`
}

SubscribeParams defines the parameters for subscribing to a queue.

func NewSubscribeParams

func NewSubscribeParams() *SubscribeParams

NewSubscribeParams creates a new SubscribeParams.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL