amqp

package
v1.10.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2024 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RequestIDHeader = "requestId"
	AccountIDHeader = "accountId"
	UserIDHeader    = "userId"
	InternalHeader  = "internal"
)
View Source
const AddressConfigKey = "amqp-url"
View Source
const ReconnectDelay = 5 * time.Second

Variables

This section is empty.

Functions

func ExtractAMQPHeaders added in v1.8.1

func ExtractAMQPHeaders(ctx context.Context, headers map[string]interface{}) context.Context

ExtractAMQPHeaders extracts the tracing from the header and puts it into the context

func Flags

func Flags() *pflag.FlagSet

func InjectAMQPHeaders added in v1.8.1

func InjectAMQPHeaders(ctx context.Context) map[string]interface{}

InjectAMQPHeaders injects the tracing from the context into the header map

Types

type Binding

type Binding struct {
	// contains filtered or unexported fields
}

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(options ...Option) *Broker

func (*Broker) Consume

func (b *Broker) Consume(wg *sync.WaitGroup)

func (*Broker) Disconnect

func (b *Broker) Disconnect() error

func (*Broker) EnsureExchange added in v1.1.3

func (b *Broker) EnsureExchange(exchange string)

func (*Broker) HasConsumer added in v1.3.1

func (b *Broker) HasConsumer() bool

func (*Broker) Initialize

func (b *Broker) Initialize() error

Initialize will setup the connections and declare all required amqp bindings for producers and consumers

func (*Broker) Publish

func (b *Broker) Publish(exchange, routingKey string, message *broker.Message) error

func (*Broker) Subscribe

func (b *Broker) Subscribe(exchange, routingKey string, handler broker.Handler) error

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection is a wrapper for amqp.Connection but adding reconnection functionality.

func NewConnection

func NewConnection(addr string) *Connection

func (*Connection) Channel

func (c *Connection) Channel() (channel *amqp.Channel, err error)

func (*Connection) Connect

func (c *Connection) Connect() (err error)

Consume will dial to the specified AMQP server addr.

func (*Connection) IsConnected

func (c *Connection) IsConnected() bool

func (*Connection) SetName

func (c *Connection) SetName(name string)

func (*Connection) Shutdown

func (c *Connection) Shutdown()

Shutdown the reconnector and terminate any existing connections

func (*Connection) WaitForConnection

func (c *Connection) WaitForConnection()

type Declaration

type Declaration func(Declarator) error

func AutoBinding

func AutoBinding(routingKey, queue, exchange string) Declaration

func AutoExchange

func AutoExchange(name string) Declaration

func AutoQueue

func AutoQueue(name string) Declaration

func DeclareBinding

func DeclareBinding(b *Binding) Declaration

func DeclareExchange

func DeclareExchange(e *Exchange) Declaration

func DeclareQueue

func DeclareQueue(q *Queue) Declaration

type Declarator

type Declarator interface {
	QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
	ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error
	QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
}

Declarator is implemented by amqp.Channel

type Delivery

type Delivery struct {
	amqp.Delivery
}

func (*Delivery) Ack

func (d *Delivery) Ack(multiple bool)

func (*Delivery) Nack

func (d *Delivery) Nack(multiple, requeue bool)

type Event

type Event struct {
	// contains filtered or unexported fields
}

func NewEvent

func NewEvent(queue, routingKey string, delivery amqp.Delivery) *Event

func (*Event) Ack

func (evt *Event) Ack()

func (*Event) Message

func (evt *Event) Message() *broker.Message

func (*Event) Nack

func (evt *Event) Nack(requeue bool)

func (*Event) NackWithTimeout added in v1.7.4

func (evt *Event) NackWithTimeout(requeue bool, timeoutInMs int64)

func (*Event) QueueName

func (evt *Event) QueueName() string

func (*Event) RoutingKey

func (evt *Event) RoutingKey() string

func (*Event) SetContext

func (evt *Event) SetContext(ctx context.Context)

type Exchange

type Exchange struct {
	// contains filtered or unexported fields
}

type HeadersCarrier added in v1.8.1

type HeadersCarrier map[string]interface{}

func (HeadersCarrier) Get added in v1.8.1

func (a HeadersCarrier) Get(key string) string

func (HeadersCarrier) Keys added in v1.8.1

func (a HeadersCarrier) Keys() []string

func (HeadersCarrier) Set added in v1.8.1

func (a HeadersCarrier) Set(key string, value string)

type Option

type Option func(*Options)

func Address

func Address(address string) Option

func ConsumerName

func ConsumerName(name string) Option

func ConsumerQueue

func ConsumerQueue(queue string) Option

func PrefetchCount

func PrefetchCount(prefetch int) Option

type Options

type Options struct {
	Address         string
	PrefetchCount   int
	SubscriberQueue string
	ConsumerName    string
}

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL