rmq

package module
v0.0.17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2024 License: MIT Imports: 8 Imported by: 0

README

go-rmq

RabbitMQ Wrappers for amqp091-go

Features:

  • Using builder pattern to make it much easier working with exchanges, queues, consumers, etc...
  • KeepAlive functionality to persist the connection upon temporary failures

Install

go get -u github.com/aliforever/go-rmq

Usage:

To initialize the connection with 5 retry times and 10 seconds of delay on each try:

r := rmq.New(address)

errChan, err := r.Connect(5, 10*time.Second)
if err != nil {
    return nil, err
}

panic(<-errChan)

Documentation

Index

Constants

View Source
const (
	DataTypeBytes dataType = iota
	DataTypeJSON
)

Variables

View Source
var (
	ConnectionNotSetError                = errors.New("connection_is_not_set")
	DataIsNotBytesError                  = errors.New("data_is_not_of_bytes_type")
	ResponseMapNotSetError               = errors.New("response_map_not_set")
	CorrelationIdNotSetError             = errors.New("correlation_id_not_set")
	PublishResponseInvalidReplyToIdError = errors.New("publish_response_invalid_reply_to_id")
)
View Source
var ConnectionClosedError = fmt.Errorf("connection_closed")

Functions

This section is empty.

Types

type Channel added in v0.0.10

type Channel struct {
	// contains filtered or unexported fields
}

func (*Channel) CloseChan added in v0.0.10

func (c *Channel) CloseChan() <-chan error

func (*Channel) ConsumerBuilder added in v0.0.10

func (c *Channel) ConsumerBuilder(name, queue string) *ConsumerBuilder

func (*Channel) DirectExchangeBuilder added in v0.0.11

func (c *Channel) DirectExchangeBuilder(name string) *ExchangeBuilder

func (*Channel) ExchangeBuilder added in v0.0.11

func (c *Channel) ExchangeBuilder(name string) *ExchangeBuilder

func (*Channel) FanoutExchangeBuilder added in v0.0.11

func (c *Channel) FanoutExchangeBuilder(name string) *ExchangeBuilder

func (*Channel) PublisherBuilder added in v0.0.10

func (c *Channel) PublisherBuilder(exchange string, routingKey string) *PublisherBuilder

func (*Channel) QueueBuilder added in v0.0.10

func (c *Channel) QueueBuilder() *QueueBuilder

func (*Channel) TopicExchangeBuilder added in v0.0.11

func (c *Channel) TopicExchangeBuilder(name string) *ExchangeBuilder

type ChannelImpl added in v0.0.16

type ChannelImpl interface {
	PublisherBuilder(exchange string, routingKey string) PublisherBuilderImpl
	ConsumerBuilder(name, queue string) ConsumerBuilderImpl
	QueueBuilder() QueueBuilderImpl
	ExchangeBuilder(name string) ExchangeBuilderImpl
	FanoutExchangeBuilder(name string) ExchangeBuilderImpl
	DirectExchangeBuilder(name string) ExchangeBuilderImpl
	TopicExchangeBuilder(name string) ExchangeBuilderImpl
	CloseChan() <-chan error
}

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) Cancel added in v0.0.5

func (c *Consumer) Cancel() error

Cancel stops the consumer

func (*Consumer) ErrorChan added in v0.0.10

func (c *Consumer) ErrorChan() <-chan error

ErrorChan returns a channel that will receive an error when the consumer is closed

func (*Consumer) Messages added in v0.0.10

func (c *Consumer) Messages() <-chan amqp091.Delivery

type ConsumerBuilder added in v0.0.10

type ConsumerBuilder struct {
	// contains filtered or unexported fields
}

func (*ConsumerBuilder) AddArg added in v0.0.10

func (c *ConsumerBuilder) AddArg(key string, val interface{}) *ConsumerBuilder

func (*ConsumerBuilder) Build added in v0.0.10

func (c *ConsumerBuilder) Build() (*Consumer, error)

func (*ConsumerBuilder) SetAutoAck added in v0.0.10

func (c *ConsumerBuilder) SetAutoAck() *ConsumerBuilder

func (*ConsumerBuilder) SetExclusive added in v0.0.10

func (c *ConsumerBuilder) SetExclusive() *ConsumerBuilder

func (*ConsumerBuilder) SetNoLocal added in v0.0.10

func (c *ConsumerBuilder) SetNoLocal() *ConsumerBuilder

func (*ConsumerBuilder) SetNoWait added in v0.0.10

func (c *ConsumerBuilder) SetNoWait() *ConsumerBuilder

func (*ConsumerBuilder) SetPrefetch added in v0.0.12

func (c *ConsumerBuilder) SetPrefetch(prefetch int) *ConsumerBuilder

type ConsumerBuilderImpl added in v0.0.16

type ConsumerBuilderImpl interface {
	SetAutoAck() ConsumerBuilderImpl
	SetExclusive() ConsumerBuilderImpl
	SetNoLocal() ConsumerBuilderImpl
	SetNoWait() ConsumerBuilderImpl
	SetPrefetch(prefetch int) ConsumerBuilderImpl
	AddArg(key string, val interface{}) ConsumerBuilderImpl
	Build() (ConsumerImpl, error)
}

type ConsumerImpl added in v0.0.16

type ConsumerImpl interface {
	Messages() <-chan amqp091.Delivery
	ErrorChan() <-chan error
	Cancel() error
}

type Delivery

type Delivery struct {
	*amqp091.Delivery
	// contains filtered or unexported fields
}

func (*Delivery) Ack

func (d *Delivery) Ack() error

func (*Delivery) NAck

func (d *Delivery) NAck(multiple, requeue bool) error

type ExchangeBuilder added in v0.0.10

type ExchangeBuilder struct {
	// contains filtered or unexported fields
}

func (*ExchangeBuilder) AddArg added in v0.0.10

func (e *ExchangeBuilder) AddArg(key string, val interface{}) *ExchangeBuilder

func (*ExchangeBuilder) Declare added in v0.0.10

func (e *ExchangeBuilder) Declare() error

func (*ExchangeBuilder) DeleteOnDeclare added in v0.0.10

func (e *ExchangeBuilder) DeleteOnDeclare(ifUnused, noWait bool) *ExchangeBuilder

func (*ExchangeBuilder) SetAutoDelete added in v0.0.10

func (e *ExchangeBuilder) SetAutoDelete() *ExchangeBuilder

func (*ExchangeBuilder) SetDurable added in v0.0.10

func (e *ExchangeBuilder) SetDurable() *ExchangeBuilder

func (*ExchangeBuilder) SetInternal added in v0.0.10

func (e *ExchangeBuilder) SetInternal() *ExchangeBuilder

func (*ExchangeBuilder) SetNoWait added in v0.0.10

func (e *ExchangeBuilder) SetNoWait() *ExchangeBuilder

type ExchangeBuilderImpl added in v0.0.16

type ExchangeBuilderImpl interface {
	DeleteOnDeclare(ifUnused, noWait bool) ExchangeBuilderImpl
	SetDurable() ExchangeBuilderImpl
	SetAutoDelete() ExchangeBuilderImpl
	SetInternal() ExchangeBuilderImpl
	SetNoWait() ExchangeBuilderImpl
	AddArg(key string, val interface{}) ExchangeBuilderImpl
	Declare() error
}

type PublishFields added in v0.0.3

type PublishFields struct {
	// contains filtered or unexported fields
}

func NewPublishFields added in v0.0.3

func NewPublishFields() *PublishFields

func (*PublishFields) AddHeader added in v0.0.3

func (p *PublishFields) AddHeader(key string, val interface{}) *PublishFields

func (*PublishFields) DeliveryModePersistent added in v0.0.3

func (p *PublishFields) DeliveryModePersistent() *PublishFields

func (*PublishFields) DeliveryModeTransient added in v0.0.3

func (p *PublishFields) DeliveryModeTransient() *PublishFields

func (*PublishFields) SetContentType added in v0.0.3

func (p *PublishFields) SetContentType(contentType string) *PublishFields

func (*PublishFields) SetCorrelationID added in v0.0.3

func (p *PublishFields) SetCorrelationID(id string) *PublishFields

func (*PublishFields) SetDataTypeBytes added in v0.0.3

func (p *PublishFields) SetDataTypeBytes() *PublishFields

func (*PublishFields) SetDataTypeJSON added in v0.0.3

func (p *PublishFields) SetDataTypeJSON() *PublishFields

func (*PublishFields) SetExpiration added in v0.0.3

func (p *PublishFields) SetExpiration(dur time.Duration) *PublishFields

func (*PublishFields) SetImmediate added in v0.0.3

func (p *PublishFields) SetImmediate() *PublishFields

func (*PublishFields) SetMandatory added in v0.0.3

func (p *PublishFields) SetMandatory() *PublishFields

func (*PublishFields) SetReplyToID added in v0.0.3

func (p *PublishFields) SetReplyToID(id string) *PublishFields

type PublishFieldsImpl added in v0.0.16

type PublishFieldsImpl interface {
	SetDataTypeBytes() PublisherBuilderImpl
	SetDataTypeJSON() PublisherBuilderImpl
	SetContentType(contentType string) PublisherBuilderImpl
	DeliveryModePersistent() PublisherBuilderImpl
	DeliveryModeTransient() PublisherBuilderImpl
	AddHeader(key string, val interface{}) PublisherBuilderImpl
	SetCorrelationID(id string) PublisherBuilderImpl
	SetReplyToID(id string) PublisherBuilderImpl
	SetExpiration(dur time.Duration) PublisherBuilderImpl
	SetMandatory() PublisherBuilderImpl
	SetImmediate() PublisherBuilderImpl
}

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

func (*Publisher) Fields added in v0.0.10

func (p *Publisher) Fields() *PublishFields

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, data interface{}) error

func (*Publisher) PublishAwaitResponse

func (p *Publisher) PublishAwaitResponse(
	ctx context.Context,
	data interface{},
	responseMap *genericSync.Map[chan amqp091.Delivery],
) (amqp091.Delivery, error)

PublishAwaitResponse creates a channel and stores it in responseMap

If an outsider writes the response to that map it checks for the reply to id to see if it matches the correlation id
Fails upon an invalid correlation id or in case of a timeout

func (*Publisher) PublishWithConfirmation

func (p *Publisher) PublishWithConfirmation(ctx context.Context, data interface{}) (bool, error)

func (*Publisher) WithFields added in v0.0.3

func (p *Publisher) WithFields(fields *PublishFields) *Publisher

type PublisherBuilder added in v0.0.10

type PublisherBuilder struct {
	// contains filtered or unexported fields
}

func (*PublisherBuilder) New added in v0.0.10

func (p *PublisherBuilder) New() *Publisher

func (*PublisherBuilder) NewWithDefaultFields added in v0.0.10

func (p *PublisherBuilder) NewWithDefaultFields() *Publisher

func (*PublisherBuilder) WithFields added in v0.0.10

func (p *PublisherBuilder) WithFields(fields *PublishFields) *PublisherBuilder

type PublisherBuilderImpl added in v0.0.16

type PublisherBuilderImpl interface {
	WithFields(fields *PublishFields) PublisherBuilderImpl
	New() PublisherImpl
	NewWithDefaultFields() PublisherImpl
}

type PublisherImpl added in v0.0.16

type PublisherImpl interface {
	WithFields(fields PublishFieldsImpl) PublisherImpl
	Fields() PublishFieldsImpl
	Publish(ctx context.Context, data interface{}) error
	PublishAwaitResponse(
		ctx context.Context,
		data interface{},
		responseMap *genericSync.Map[chan amqp091.Delivery],
	) (amqp091.Delivery, error)
	PublishWithConfirmation(ctx context.Context, data interface{}) (bool, error)
}

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func (*Queue) BindToExchange

func (q *Queue) BindToExchange(
	exchange string,
	routingKey string,
	noWait bool,
	args map[string]interface{},
) error

func (*Queue) Name added in v0.0.10

func (q *Queue) Name() string

type QueueBuilder

type QueueBuilder struct {
	// contains filtered or unexported fields
}

func (*QueueBuilder) AddArg

func (q *QueueBuilder) AddArg(key string, val interface{}) *QueueBuilder

func (*QueueBuilder) Declare

func (q *QueueBuilder) Declare() (*Queue, error)

func (*QueueBuilder) SetAutoDelete

func (q *QueueBuilder) SetAutoDelete() *QueueBuilder

func (*QueueBuilder) SetDurable

func (q *QueueBuilder) SetDurable() *QueueBuilder

func (*QueueBuilder) SetExclusive

func (q *QueueBuilder) SetExclusive() *QueueBuilder

func (*QueueBuilder) SetName

func (q *QueueBuilder) SetName(name string) *QueueBuilder

func (*QueueBuilder) SetNoWait

func (q *QueueBuilder) SetNoWait() *QueueBuilder

type QueueBuilderImpl added in v0.0.16

type QueueBuilderImpl interface {
	SetName(name string) QueueBuilderImpl
	SetDurable() QueueBuilderImpl
	SetAutoDelete() QueueBuilderImpl
	SetExclusive() QueueBuilderImpl
	SetNoWait() QueueBuilderImpl
	AddArg(key string, val interface{}) QueueBuilderImpl
	Declare() (QueueImpl, error)
}

type QueueImpl added in v0.0.16

type QueueImpl interface {
	Name() string
	BindToExchange(
		exchange string,
		routingKey string,
		noWait bool,
		args map[string]interface{},
	) error
}

type RMQ added in v0.0.10

type RMQ struct {
	// contains filtered or unexported fields
}

func New added in v0.0.10

func New(address string) *RMQ

func (*RMQ) Close added in v0.0.10

func (r *RMQ) Close() error

func (*RMQ) Connect added in v0.0.10

func (r *RMQ) Connect(retryCount int, retryDelay time.Duration, onRetryError func(err error)) (<-chan error, error)

Connect

Important: onRetryError It will block the reconnection so make sure to use goroutine in the callback

func (*RMQ) NewChannel added in v0.0.10

func (r *RMQ) NewChannel() (*Channel, error)

func (*RMQ) NewChannelWithConfirm added in v0.0.10

func (r *RMQ) NewChannelWithConfirm() (*Channel, error)

type RmqImpl added in v0.0.16

type RmqImpl interface {
	SetOnError(onError func(err error))
	Connect(retryCount int, retryDelay time.Duration, onRetryError func(err error)) (<-chan error, error)
	Close() error
	NewChannel() (ChannelImpl, error)
	NewChannelWithConfirm() (ChannelImpl, error)
}

type RmqOptions added in v0.0.10

type RmqOptions struct {
	// contains filtered or unexported fields
}

func NewRmqOptions added in v0.0.10

func NewRmqOptions() *RmqOptions

func (*RmqOptions) SetReconnectDelay added in v0.0.10

func (r *RmqOptions) SetReconnectDelay(delay int) *RmqOptions

func (*RmqOptions) SetReconnectTries added in v0.0.10

func (r *RmqOptions) SetReconnectTries(tries int) *RmqOptions

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL