Documentation
¶
Index ¶
- func GetRoutingKey(routingKey string, headers amqp.Table, queueName string) (string, error)
- func HaltErrorf(format string, a ...any) error
- func NewDeadletterError() error
- type Action
- type Consumer
- type ConsumerOptions
- type DeadletterError
- type ErrorFunc
- type HaltError
- type HaltFunc
- type HandlerFunc
- type TransitionCode
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetRoutingKey ¶
GetRoutingKey searches for the original routing key in headers or returns the passed routingKey if it's recognized as original.
func HaltErrorf ¶
func NewDeadletterError ¶
func NewDeadletterError() error
Types ¶
type Action ¶
type Action int
const ( // Ack default ack this msg after you have successfully processed this // delivery. Ack Action = iota // NackDiscard the message will be dropped or delivered to a server // configured dead-letter queue. NackDiscard // NackRequeue deliver this message to a different consumer. NackRequeue // ManualAck leaves acknowledgement to the user using the msg.Ack() // method. ManualAck )
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Example ¶
package main
import (
"context"
"fmt"
"log"
"log/slog"
"os"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/ttab/briar"
)
func main() {
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
slog.SetDefault(logger)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
c := briar.NewConsumer(logger, "amqp://guest:guest@localhost",
briar.ConsumerOptions{
ExchangeName: "test",
QueueName: "briar",
}, func(_ context.Context, delivery amqp.Delivery) error {
logger.Info("got message",
"payload", string(delivery.Body))
err := delivery.Ack(false)
if err != nil {
return fmt.Errorf("ack message: %w", err)
}
return nil
})
err := c.Run(ctx)
if err != nil {
log.Printf("failed to run: %v", err)
}
}
Output:
func NewConsumer ¶
func NewConsumer( logger *slog.Logger, uri string, opts ConsumerOptions, handler HandlerFunc, ) *Consumer
NewConsumer creates a rabbitmq consumer that is started by calling Run().
type ConsumerOptions ¶
type ConsumerOptions struct {
ConnectionName string
ExchangeName string
QueueName string
// DeliveryLimit is the number of times we'll try to handle a message
// before deadlettering it.
DeliveryLimit int64
// RetryHandlingWait is the time we wait before retrying a failed message.
RetryHandlingWait time.Duration
RoutingKeys []string
Exclusive bool
RetryWait time.Duration
ConnectRetryWait time.Duration
ChannelRetryWait time.Duration
HaltFunction HaltFunc
OnError ErrorFunc
}
type DeadletterError ¶
type DeadletterError struct{}
func (*DeadletterError) Error ¶
func (he *DeadletterError) Error() string
type ErrorFunc ¶
type ErrorFunc func(code TransitionCode, count int, err error)
type HaltFunc ¶
type HaltFunc func(code TransitionCode, count int, err error) bool
func HaltOnNthFailure ¶
func HaltOnNthFailure(code TransitionCode, n int) HaltFunc
HaltOnNthFailure is a convenience function for stopping retries after a number of consecutive failures.
type HandlerFunc ¶
func AckHandler ¶
func AckHandler( fn func(ctx context.Context, delivery amqp.Delivery) (Action, error), ) HandlerFunc
AckHandler is a convenience wrapper that allows the handler to return the Ack/Nack action as a value. Returning an error will cause the channel to be closed and re-opened. Return a HaltError to stop the consumer.
type TransitionCode ¶
type TransitionCode string
TransitionCode are used to describe what step of the consumer process failed.
const ( CodeConnect TransitionCode = "connect" CodeOpenChan TransitionCode = "channel" CodeDeclare TransitionCode = "declare" CodeConsume TransitionCode = "consume" CodeDisconnected TransitionCode = "disconnected" CodeChanClosed TransitionCode = "chan_closed" CodeHandlerFail TransitionCode = "handler_fail" CodeCancelled TransitionCode = "cancelled" )
Click to show internal directories.
Click to hide internal directories.