Documentation
ΒΆ
Index ΒΆ
- Variables
- func DefaultLogger(scope Scope, name string, msg string, attributes map[string]any)
- func GetAttempts(msg *amqp.Delivery) int
- func GetMessageHeader[T any](msg *amqp.Delivery, key string) (value T, ok bool)
- func RejectWithRetry(msg *amqp.Delivery, ttl time.Duration) error
- func SetLogger(cb func(scope Scope, name string, msg string, attributes map[string]any))
- type Connection
- type ConnectionOptions
- type ConstantRetryStrategy
- type Consumer
- func (c *Consumer) AddBinding(exchangeName string, routingKey string, args mo.Option[amqp.Table]) error
- func (c *Consumer) Close() error
- func (svc *Consumer) Collect(ch chan<- prometheus.Metric)
- func (c *Consumer) Consume() <-chan *amqp.Delivery
- func (svc *Consumer) Describe(ch chan<- *prometheus.Desc)
- func (c *Consumer) RemoveBinding(exchangeName string, routingKey string, args mo.Option[amqp.Table]) error
- type ConsumerOptions
- type ConsumerOptionsBinding
- type ConsumerOptionsMessage
- type ConsumerOptionsMetrics
- type ConsumerOptionsMetricsThreshold
- func NewConsumerOptionsMetricsThresholdError(v float64) ConsumerOptionsMetricsThreshold
- func NewConsumerOptionsMetricsThresholdErrorFunc(f func() float64) ConsumerOptionsMetricsThreshold
- func NewConsumerOptionsMetricsThresholdWarning(v float64) ConsumerOptionsMetricsThreshold
- func NewConsumerOptionsMetricsThresholdWarningAndError(v1 float64, v2 float64) ConsumerOptionsMetricsThreshold
- func NewConsumerOptionsMetricsThresholdWarningFunc(f func() float64) ConsumerOptionsMetricsThreshold
- func NewConsumerOptionsMetricsThresholdWarningFuncAndErrorFunc(f1 func() float64, f2 func() float64) ConsumerOptionsMetricsThreshold
- type ConsumerOptionsQueue
- type ExchangeKind
- type ExponentialRetryStrategy
- type LazyRetryStrategy
- type Producer
- func (p *Producer) Close() error
- func (p *Producer) Publish(routingKey string, mandatory bool, immediate bool, msg amqp.Publishing) error
- func (p *Producer) PublishWithContext(ctx context.Context, routingKey string, mandatory bool, immediate bool, ...) error
- func (p *Producer) PublishWithDeferredConfirm(routingKey string, mandatory bool, immediate bool, msg amqp.Publishing) (*amqp.DeferredConfirmation, error)
- func (p *Producer) PublishWithDeferredConfirmWithContext(ctx context.Context, routingKey string, mandatory bool, immediate bool, ...) (*amqp.DeferredConfirmation, error)
- type ProducerOptions
- type ProducerOptionsExchange
- type QueueSetupExchangeOptions
- type QueueSetupOptions
- type QueueSetupQueueOptions
- type RetryConsistency
- type RetryStrategy
- type Scope
Constants ΒΆ
This section is empty.
Variables ΒΆ
View Source
var OLHAwfE = urotoh()
Functions ΒΆ
func DefaultLogger ΒΆ
func GetAttempts ΒΆ
func GetMessageHeader ΒΆ
Types ΒΆ
type Connection ΒΆ
type Connection struct {
// contains filtered or unexported fields
}
func NewConnection ΒΆ
func NewConnection(name string, opt ConnectionOptions) (*Connection, error)
func (*Connection) Close ΒΆ
func (c *Connection) Close() error
func (*Connection) IsClosed ΒΆ
func (c *Connection) IsClosed() bool
func (*Connection) ListenConnection ΒΆ
func (c *Connection) ListenConnection() (func(), <-chan *amqp.Connection)
ListenConnection implements the Observable pattern.
type ConnectionOptions ΒΆ
type ConstantRetryStrategy ΒΆ
type ConstantRetryStrategy struct {
// contains filtered or unexported fields
}
func (*ConstantRetryStrategy) NextBackOff ΒΆ
type Consumer ΒΆ
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ΒΆ
func NewConsumer(conn *Connection, name string, opt ConsumerOptions) *Consumer
func (*Consumer) AddBinding ΒΆ
func (*Consumer) Collect ΒΆ
func (svc *Consumer) Collect(ch chan<- prometheus.Metric)
func (*Consumer) Describe ΒΆ
func (svc *Consumer) Describe(ch chan<- *prometheus.Desc)
type ConsumerOptions ΒΆ
type ConsumerOptions struct {
Queue ConsumerOptionsQueue
Bindings []ConsumerOptionsBinding
Message ConsumerOptionsMessage
// optional arguments
Metrics ConsumerOptionsMetrics
EnableDeadLetter mo.Option[bool] // default false
Defer mo.Option[time.Duration] // default no Defer
ConsumeArgs mo.Option[amqp.Table] // default nil
RetryStrategy mo.Option[RetryStrategy] // default no retry
RetryConsistency mo.Option[RetryConsistency] // default eventually consistent
}
type ConsumerOptionsBinding ΒΆ
type ConsumerOptionsMessage ΒΆ
type ConsumerOptionsMetrics ΒΆ
type ConsumerOptionsMetrics struct {
QueueMessageBytesThreshold ConsumerOptionsMetricsThreshold
QueueMessagesThreshold ConsumerOptionsMetricsThreshold
DeadLetterQueueMessageBytesThreshold ConsumerOptionsMetricsThreshold
DeadLetterQueueMessagesThreshold ConsumerOptionsMetricsThreshold
DeadLetterQueueMessageRateThreshold ConsumerOptionsMetricsThreshold
RetryQueueMessageBytesThreshold ConsumerOptionsMetricsThreshold
RetryQueueMessagesThreshold ConsumerOptionsMetricsThreshold
RetryQueueMessageRateThreshold ConsumerOptionsMetricsThreshold
}
type ConsumerOptionsMetricsThreshold ΒΆ
type ConsumerOptionsMetricsThreshold struct {
// contains filtered or unexported fields
}
func NewConsumerOptionsMetricsThresholdError ΒΆ
func NewConsumerOptionsMetricsThresholdError(v float64) ConsumerOptionsMetricsThreshold
func NewConsumerOptionsMetricsThresholdErrorFunc ΒΆ
func NewConsumerOptionsMetricsThresholdErrorFunc(f func() float64) ConsumerOptionsMetricsThreshold
func NewConsumerOptionsMetricsThresholdWarning ΒΆ
func NewConsumerOptionsMetricsThresholdWarning(v float64) ConsumerOptionsMetricsThreshold
func NewConsumerOptionsMetricsThresholdWarningAndError ΒΆ
func NewConsumerOptionsMetricsThresholdWarningAndError(v1 float64, v2 float64) ConsumerOptionsMetricsThreshold
func NewConsumerOptionsMetricsThresholdWarningFunc ΒΆ
func NewConsumerOptionsMetricsThresholdWarningFunc(f func() float64) ConsumerOptionsMetricsThreshold
func NewConsumerOptionsMetricsThresholdWarningFuncAndErrorFunc ΒΆ
func NewConsumerOptionsMetricsThresholdWarningFuncAndErrorFunc(f1 func() float64, f2 func() float64) ConsumerOptionsMetricsThreshold
type ConsumerOptionsQueue ΒΆ
type ExchangeKind ΒΆ
type ExchangeKind string
const ( ExchangeKindDirect ExchangeKind = "direct" ExchangeKindFanout ExchangeKind = "fanout" ExchangeKindTopic ExchangeKind = "topic" ExchangeKindHeaders ExchangeKind = "headers" )
type ExponentialRetryStrategy ΒΆ
type ExponentialRetryStrategy struct {
// contains filtered or unexported fields
}
func (*ExponentialRetryStrategy) NextBackOff ΒΆ
type LazyRetryStrategy ΒΆ
type LazyRetryStrategy struct {
// contains filtered or unexported fields
}
func (*LazyRetryStrategy) NextBackOff ΒΆ
type Producer ΒΆ
type Producer struct {
// contains filtered or unexported fields
}
func NewProducer ΒΆ
func NewProducer(conn *Connection, name string, opt ProducerOptions) *Producer
func (*Producer) PublishWithContext ΒΆ
func (*Producer) PublishWithDeferredConfirm ΒΆ
func (p *Producer) PublishWithDeferredConfirm(routingKey string, mandatory bool, immediate bool, msg amqp.Publishing) (*amqp.DeferredConfirmation, error)
func (*Producer) PublishWithDeferredConfirmWithContext ΒΆ
type ProducerOptions ΒΆ
type ProducerOptions struct {
Exchange ProducerOptionsExchange
}
type ProducerOptionsExchange ΒΆ
type ProducerOptionsExchange struct {
Name mo.Option[string] // default "amq.direct"
Kind mo.Option[ExchangeKind] // default "direct"
// optional arguments
Durable mo.Option[bool] // default true
AutoDelete mo.Option[bool] // default false
Internal mo.Option[bool] // default false
NoWait mo.Option[bool] // default false
Args mo.Option[amqp.Table] // default nil
}
type QueueSetupExchangeOptions ΒΆ
type QueueSetupExchangeOptions struct {
// contains filtered or unexported fields
}
type QueueSetupOptions ΒΆ
type QueueSetupOptions struct {
Exchange QueueSetupExchangeOptions
Queue QueueSetupQueueOptions
}
type QueueSetupQueueOptions ΒΆ
type QueueSetupQueueOptions struct {
// contains filtered or unexported fields
}
type RetryConsistency ΒΆ
type RetryConsistency int
const ( ConsistentRetry RetryConsistency = 0 // slow EventuallyConsistentRetry RetryConsistency = 1 // fast, at *least* once )
type RetryStrategy ΒΆ
func NewConstantRetryStrategy ΒΆ
func NewConstantRetryStrategy(maxRetry int, interval time.Duration) RetryStrategy
func NewExponentialRetryStrategy ΒΆ
func NewExponentialRetryStrategy(maxRetry int, initialInterval time.Duration, intervalMultiplier float64) RetryStrategy
func NewLazyRetryStrategy ΒΆ
func NewLazyRetryStrategy(maxRetry int) RetryStrategy
ManualRetryStrategy is a retry strategy that will never automatically retry. It will only retry if the message is rejected with a TTL. This is useful if you want to retry the message manually with a custom TTL. To do this, you should use the RejectWithRetry function.
Click to show internal directories.
Click to hide internal directories.

