rabbitmq

package
v1.0.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2024 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const MaxAttempts = 3

MaxAttempts of retries that will be performed

Variables

This section is empty.

Functions

func GenerateQueueName added in v1.0.0

func GenerateQueueName(ex string, topic string) string

GenerateQueueName is responsible to generate a unique queue for the connector to use It follows the naming schema OpenFaaS_[EXCHANGE_NAME]_[TOPIC]

Types

type Broker added in v1.0.0

type Broker struct{}

Broker is a wrapper around the RabbitMQ Client lib, which allows better unit testing. By abstracting away the RabbitMQ raw types, which are struct based.

func (*Broker) Dial added in v1.0.0

func (b *Broker) Dial(url string) (RBConnection, error)

Dial tries to connect to the providing url, returning either a RBConnection or the received connection error.

func (*Broker) DialTLS added in v1.0.0

func (b *Broker) DialTLS(url string, conf *tls.Config) (RBConnection, error)

DialTLS tries to connect to the providing url using TLS, returning either a RBConnection or the received connection error.

type ChannelConsumer added in v1.0.0

type ChannelConsumer interface {
	Consume(queue string, consumer string, autoAck bool, exclusive bool, noLocal bool, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
	NotifyClose(c chan *amqp.Error) chan *amqp.Error
	Close() error
}

ChannelConsumer are interacting on channels

type ChannelCreator added in v1.0.0

type ChannelCreator interface {
	Channel() (RabbitChannel, error)
}

ChannelCreator interface allows the generations of channels

type ConnectionManager added in v1.0.0

type ConnectionManager struct {
	// contains filtered or unexported fields
}

ConnectionManager is tasked with managing the connection Rabbit MQ

func (*ConnectionManager) Channel added in v1.0.0

func (m *ConnectionManager) Channel() (RabbitChannel, error)

Channel creates a new Rabbit MQ channel on the existing connection

func (*ConnectionManager) Connect added in v1.0.0

func (m *ConnectionManager) Connect(connectionURL string) (<-chan *amqp.Error, error)

Connect uses the provided connection urls and tries up to 3 times to establish a connection. The retries are performed exponentially starting with 2s. It also creates a listener for close notifications.

func (*ConnectionManager) Disconnect added in v1.0.0

func (m *ConnectionManager) Disconnect()

Disconnect closes the connection and frees up the reference

type Connector added in v1.0.0

type Connector interface {
	Connect(connectionURL string) (<-chan *amqp.Error, error)
	Disconnect()
}

Connector is a high level interface for connection related methods

type Exchange added in v1.0.0

type Exchange struct {
	// contains filtered or unexported fields
}

Exchange contains all of the relevant units to handle communication with an exchange

func (*Exchange) Start added in v1.0.0

func (e *Exchange) Start() error

Start s consuming deliveries from a unique queue for the specific exchange. Further creating a listener for channel errors

func (*Exchange) StartConsuming added in v1.0.0

func (e *Exchange) StartConsuming(topic string, deliveries <-chan amqp.Delivery)

StartConsuming will consume deliveries from the provided channel and if the received delivery is for the target topic it will invoke it. If the delivery is not for the correct topic it will reject it so that the delivery is returned to the exchange. Retries are exponential and up to 3 times.

func (*Exchange) Stop added in v1.0.0

func (e *Exchange) Stop()

Stop s consuming messages

type ExchangeFactory added in v1.0.0

type ExchangeFactory struct {
	// contains filtered or unexported fields
}

ExchangeFactory keeps tracks of all the build options provided to it during construction

func (*ExchangeFactory) Build added in v1.0.0

func (f *ExchangeFactory) Build() (ExchangeOrganizer, error)

Build uses the set values and builds a new exchange from them

func (*ExchangeFactory) WithChanCreator added in v1.0.0

func (f *ExchangeFactory) WithChanCreator(creator ChannelCreator) Factory

WithChanCreator sets the channel creator that will be used

func (*ExchangeFactory) WithExchange added in v1.0.0

func (f *ExchangeFactory) WithExchange(ex *types.Exchange) Factory

WithExchange sets the exchange definition and further ensures that the correct type is used

func (*ExchangeFactory) WithInvoker added in v1.0.0

func (f *ExchangeFactory) WithInvoker(client types.Invoker) Factory

WithInvoker sets the invoker which will interact with OpenFaaS

type ExchangeHandler added in v1.0.0

type ExchangeHandler interface {
	ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
}

ExchangeHandler offers a interface for the decleration of an exchange or the validation against existing exchanges on the RabbitMQ cluster

type ExchangeOrganizer added in v1.0.0

type ExchangeOrganizer interface {
	Starter
	Stopper
}

ExchangeOrganizer combines the ability to start & stop exchanges

func NewExchange added in v1.0.0

func NewExchange(channel ChannelConsumer, client types.Invoker, definition *types.Exchange) ExchangeOrganizer

NewExchange creates a new exchange instance using the provided parameter

type Factory added in v1.0.0

type Factory interface {
	WithInvoker(client types.Invoker) Factory
	WithChanCreator(creator ChannelCreator) Factory
	WithExchange(ex *types.Exchange) Factory
	Build() (ExchangeOrganizer, error)
}

Factory for building a Exchange

func NewFactory added in v1.0.0

func NewFactory() Factory

NewFactory creates a new instance with no defaults set.

type Manager added in v1.0.0

type Manager interface {
	Connector
	ChannelCreator
}

Manager is a interface that combines the relevant methods to connect to Rabbit MQ And create a new channel on an existing connection.

func NewConnectionManager added in v1.0.0

func NewConnectionManager(dialer RBDialer, conf *tls.Config) Manager

NewConnectionManager creates a new instance using the provided dialer

type QueueHandler added in v1.0.0

type QueueHandler interface {
	QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
	QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
}

QueueHandler offers a interface for the decleration & binding of an queues. Further it allows the validation against existing queues on the RabbitMQ cluster

type RBConnection added in v1.0.0

type RBConnection interface {
	NotifyClose(receiver chan *amqp.Error) chan *amqp.Error
	Close() error
	Channel() (*amqp.Channel, error)
}

RBConnection is a abstraction of a RabbitMQ Connection

type RBDialer added in v1.0.0

type RBDialer interface {
	Dial(url string) (RBConnection, error)
	DialTLS(url string, conf *tls.Config) (RBConnection, error)
}

RBDialer is a abstraction of the RabbitMQ Dial methods

func NewBroker added in v1.0.0

func NewBroker() RBDialer

NewBroker generates a new wrapper around the RabbitMQ Client lib

type RabbitChannel added in v1.0.0

type RabbitChannel interface {
	ExchangeHandler
	QueueHandler
	ChannelConsumer
}

RabbitChannel is a abstraction of a RabbitMQ Channel

type Starter added in v1.0.0

type Starter interface {
	Start() error
}

Starter defines something that can be started

type Stopper added in v1.0.0

type Stopper interface {
	Stop()
}

Stopper defines something that can be stopped

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL