Documentation ¶
Overview ¶
Package rconsumer provides an easy-to use consumer abstraction for setting up delivery consumers with minimal boilerplate.
Index ¶
- type AmqpArgs
- type Consumer
- type DeliveryProcessor
- type Middleware
- func (config *Middleware) AddCleanupChannel(processorMiddleware middleware.CleanupChannel)
- func (config *Middleware) AddDelivery(processorMiddleware middleware.Delivery)
- func (config *Middleware) AddProvider(provider middleware.ProvidesMiddleware) error
- func (config *Middleware) AddSetupChannel(processorMiddleware middleware.SetupChannel)
- type Opts
- func (opts Opts) WithDefaultLogging(log bool) Opts
- func (opts Opts) WithLogDeliveryLevel(level zerolog.Level) Opts
- func (opts Opts) WithLogSuccessLevel(level zerolog.Level) Opts
- func (opts Opts) WithLogger(logger zerolog.Logger) Opts
- func (opts Opts) WithLoggingLevel(level zerolog.Level) Opts
- func (opts Opts) WithMaxWorkers(max int) Opts
- func (opts Opts) WithMiddleware(processorMiddleware Middleware) Opts
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AmqpArgs ¶
type AmqpArgs struct { // Queue is the name of the Queue to consume from Queue string // ConsumerName identifies this consumer with the broker. ConsumerName string // AutoAck is whether the broker should ack messages automatically as it sends them. // Otherwise the consumer will handle acking messages. AutoAck bool // Exclusive is whether this consumer should be the exclusive consumer for this // Queue. Exclusive bool // Args are additional args to pass to the amqp.Channel.Consume() method. Args amqp.Table }
AmqpArgs are the args the consumer will be created with by calling amqp.Channel.Args.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a service helper for consuming messages from one or more queues.
Example ¶
package main import ( "context" "fmt" "github.com/peake100/rogerRabbit-go/pkg/amqp" "github.com/peake100/rogerRabbit-go/pkg/amqptest" "github.com/peake100/rogerRabbit-go/pkg/roger/rconsumer" "github.com/peake100/rogerRabbit-go/pkg/roger/rconsumer/middleware" ) type BasicProcessor struct { } // ConsumeArgs returns the args to be made to the consumer's internal // Channel.Consume() method. func (processor *BasicProcessor) AmqpArgs() rconsumer.AmqpArgs { return rconsumer.AmqpArgs{ ConsumerName: "example_consumer_queue", AutoAck: false, Exclusive: false, Args: nil, } } // SetupChannel is called before consuming begins, and allows the handler to declare // any routes, bindings, etc, necessary to handle it's route. func (processor *BasicProcessor) SetupChannel( ctx context.Context, amqpChannel middleware.AmqpRouteManager, ) error { _, err := amqpChannel.QueueDeclare( "example_consumer_queue", false, false, false, false, nil, ) if err != nil { return fmt.Errorf("error declaring Queue: %w", err) } return nil } // HandleDelivery is the business logic invoked for each delivery. func (processor *BasicProcessor) HandleDelivery( ctx context.Context, delivery amqp.Delivery, ) (requeue bool, err error) { // Print the message fmt.Println("BODY:", delivery.Body) // Returning no error will result in an ACK of the message. return false, nil } // Cleanup allows the route handler to remove any resources necessary on close. func (processor *BasicProcessor) CleanupChannel( ctx context.Context, amqpChannel middleware.AmqpRouteManager, ) error { _, err := amqpChannel.QueueDelete( "example_consumer_queue", false, false, false, ) if err != nil { return fmt.Errorf("error deleting Queue: %w", err) } return nil } func main() { // Get a new connection to our test broker. connection, err := amqp.Dial(amqptest.TestDialAddress) if err != nil { panic(err) } defer connection.Close() // Get a new channel from our robust connection. channel, err := connection.Channel() if err != nil { panic(err) } // Create a new consumer that uses our robust channel. consumer := rconsumer.New(channel, rconsumer.DefaultOpts()) defer consumer.StartShutdown() // Create a new delivery processor and register it. processor := new(BasicProcessor) err = consumer.RegisterProcessor(processor) if err != nil { panic(err) } // This method will block forever as the consumer runs. err = consumer.Run() if err != nil { panic(err) } }
Output:
func (*Consumer) RegisterProcessor ¶
func (consumer *Consumer) RegisterProcessor( processor DeliveryProcessor, ) error
RegisterProcessor registers a DeliveryProcessor implementation value. Will panic if called after consumer start.
func (*Consumer) Run ¶
Run the consumer. This method blocks until the consumer has completed shutdown.
func (*Consumer) StartShutdown ¶
func (consumer *Consumer) StartShutdown()
StartShutdown beings shutdown of the Consumer. This method will return immediately, it does not block until shutdown is complete.
type DeliveryProcessor ¶ added in v0.4.10
type DeliveryProcessor interface { // AmqpArgs returns the args that amqp.Channel.Consume should be called with. AmqpArgs() AmqpArgs // SetupChannel is called before the consumer is created, and is designed to let // this handler declare any exchanges or queues necessary to handle deliveries. SetupChannel(ctx context.Context, amqpChannel middleware.AmqpRouteManager) error // HandleDelivery will be called once per delivery. Returning a non-nil err will // result in it being logged and the delivery being nacked. If requeue is true, the // nacked delivery will be requeued. If err is nil, requeue is ignored. // // NOTE: if this method panics, the delivery will be nacked regardless of requeue's // value HandleDelivery(ctx context.Context, delivery amqp.Delivery) (requeue bool, err error) // CleanupChannel is called at shutdown to allow the route handler to clean up any // necessary resources. CleanupChannel(ctx context.Context, amqpChannel middleware.AmqpRouteManager) error }
DeliveryProcessor is an interface for handling consuming from a route. Implementors of this interface will be registered with a consumer.
type Middleware ¶
type Middleware struct {
// contains filtered or unexported fields
}
Middleware holds the middleware to register on a consumer.
func (*Middleware) AddCleanupChannel ¶
func (config *Middleware) AddCleanupChannel(processorMiddleware middleware.CleanupChannel)
AddCleanupChannel adds a middleware.CleanupChannel to be added to each DeliveryProcessor.CleanupChannel passed to a Consumer.
func (*Middleware) AddDelivery ¶
func (config *Middleware) AddDelivery(processorMiddleware middleware.Delivery)
AddDelivery adds a middleware.Delivery to be added to each DeliveryProcessor.HandleDelivery passed to a Consumer.
func (*Middleware) AddProvider ¶
func (config *Middleware) AddProvider(provider middleware.ProvidesMiddleware) error
AddProvider adds consume middleware provided by methods of provider.
func (*Middleware) AddSetupChannel ¶
func (config *Middleware) AddSetupChannel(processorMiddleware middleware.SetupChannel)
AddSetupChannel adds a middleware.SetupChannel to be added to each DeliveryProcessor.SetupChannel passed to a Consumer.
type Opts ¶
type Opts struct {
// contains filtered or unexported fields
}
Opts holds options for running a consumer.
func DefaultOpts ¶
func DefaultOpts() Opts
DefaultOpts returns new Opts object with default settings.
func (Opts) WithDefaultLogging ¶
WithDefaultLogging enables the default zerolog.Logger logging middleware. If false all other logging settings have no effect.
Default: true
func (Opts) WithLogDeliveryLevel ¶
WithLogDeliveryLevel is the minimum logging level to log the full delivery object at.
Default: zerolog.DebugLevel.
func (Opts) WithLogSuccessLevel ¶
WithLogSuccessLevel is the minimum logging level to log a successful delivery at.
Default: zerolog.DebugLevel.
func (Opts) WithLogger ¶
WithLogger sets the zerolog.Logger for the default logging middleware to use If WithDefaultLogging is false, this setting has no effect.
Default: lockless, pretty-printed logger set to Info level.
func (Opts) WithLoggingLevel ¶
WithLoggingLevel sets the level of the logger passed to WithLogger.
func (Opts) WithMaxWorkers ¶
WithMaxWorkers sets the maximum number of workers that can be running at the same time. If 0 or less, no limit will be used.
Default: 0.
func (Opts) WithMiddleware ¶
func (opts Opts) WithMiddleware(processorMiddleware Middleware) Opts
WithMiddleware sets the Middleware to use. Default: includes all default middleware in the consumer/middleware package.
Default: Middleware{}