queuer

package
v0.0.0-...-34c5bf1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2020 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// Errors is a channel over which non-fatal errors are
	// sent. This channel must have a listener otherwise
	// deadlock will arise.
	Errors chan error
	// contains filtered or unexported fields
}

Consumer uses Redis Streams to publish and subscribe

func NewConsumer

func NewConsumer(opts *ConsumerOptions) *Consumer

NewConsumer returns an initialised Consumer

func (*Consumer) Listen

func (c *Consumer) Listen(ctx context.Context) error

Listen subscribes to the channels and listens for messages

func (*Consumer) Subscribe

func (c *Consumer) Subscribe(channel string, handler Handler, deadLetter ...Handler)

Subscribe registers a handler for the given channel

type ConsumerOptions

type ConsumerOptions struct {
	// Group is the name of the consumer group used when
	// listening for new messages from Redis. This must
	// be set before calling Listen().
	Group string

	// Consumer is the name of this particular consumer
	// within the consumer group. This must be set before
	// calling Listen().
	Consumer string

	// Redis is an instance of *redis.Client for use by
	// the client. This must be set before using the client.
	Redis *redis.Client

	// ReadTimeout is the duration for which the XREADGROUP
	// call blocks for. A duration of zero means the client
	// will block indefinitely. It is recommended to set
	// this to a non-zero duration so that the client is
	// able to gracefully shutdown.
	ReadTimeout time.Duration

	// HandlerTimeout is the duration after which the
	// context passed to handlers is cancelled. Note that
	// handlers are not forcefully stopped after this time.
	// It is up to them to handle context cancellation.
	// A duration of zero means handlers never timeout.
	HandlerTimeout time.Duration

	// PendingTimeout is the duration for which a message
	// can be pending before the consumer tries to claim it.
	//
	// This value should not be shorter than HandlerTimeout
	// otherwise you risk claiming messages that are still
	// being processed.
	PendingTimeout time.Duration

	// ClaimInterval is the time between attempts to claim
	// any messages that have been pending for longer than
	// the PendingTimeout. If this value is zero, then the
	// consumer will not try to claim pending messages.
	ClaimInterval time.Duration

	// MaxRetry is the number of times a message will be
	// retried before it is passed to the dead-letter
	// consumer(s) for the stream. If < 0, then the
	// message will never be dead-lettered.
	MaxRetry int

	// Concurrency is the number of goroutines that are
	// spawned to concurrently handle incoming messages.
	// A value of zero is equal to a value of one.
	Concurrency int

	// BufferSize is the size of the channel that holds
	// incoming messages and therefore determines how many
	// messages the consumer can read from Redis in a
	// single call. A value of zero will create an
	// unbuffered channel.
	BufferSize int

	// Backoff is used to retry requests to Redis in the
	// case of network failures. If this value is nil, a
	// Backoff with sensible defaults will be used.
	Backoff *backoff.Backoff

	// NetworkRetry is the number of times to retry failed
	// network requests to Redis before returning a fatal
	// error. A value of zero means requests will not be
	// retried. A value of < 0 means requests will be
	// retried indefinitely until the context is cancelled.
	NetworkRetry int
}

ConsumerOptions contains options to configure the Consumer

type Handler

type Handler interface {
	HandleEvent(ctx context.Context, m *Message) Result
}

Handler processes messages. HandleEvent should return a result that tells the client whether the handling was successful or not.

type HandlerError

type HandlerError struct {
	Err error
	Msg *Message
}

HandlerError wraps any errors returned from message handlers

func (*HandlerError) Error

func (e *HandlerError) Error() string

Error returns a formatted error string

func (*HandlerError) Unwrap

func (e *HandlerError) Unwrap() error

Unwrap returns the underlying error

type HandlerFunc

type HandlerFunc func(ctx context.Context, m *Message) Result

HandlerFunc is an adapter that allows ordinary functions to be used as event handlers. If f is a function with the appropriate signature, HandlerFunc(f) is a Handler that calls f.

func (HandlerFunc) HandleEvent

func (f HandlerFunc) HandleEvent(ctx context.Context, m *Message) Result

HandleEvent calls f(e)

type HandlerPanic

type HandlerPanic struct {
	Err error
	Msg *Message
}

HandlerPanic is returned if a message handler panics. The panic is recovered and converted to an error.

func (*HandlerPanic) Error

func (e *HandlerPanic) Error() string

Error returns a formatted error string

func (*HandlerPanic) Unwrap

func (e *HandlerPanic) Unwrap() error

Unwrap returns the underlying error

type Message

type Message struct {
	// ID is the Redis ID of the message. When publishing,
	// this can be set to "*" to instruct Redis to generate
	// an ID. This is usually what you want. If left blank,
	// it will automatically be set to "*".
	ID string

	// Stream is the name of the stream this message should
	// be published to or was received from.
	Stream string

	// Values represents the message's payload
	Values map[string]interface{}
	// contains filtered or unexported fields
}

Message represents a message published to or received from a Redis stream.

type NetworkError

type NetworkError struct {
	Err      net.Error
	Retrying bool
	Backoff  time.Duration
}

NetworkError is returned if a network error occurs when communicating with Redis. The Retrying field tells you whether the library is planning to retry the request. If this is true, Backoff will be set.

func (*NetworkError) Error

func (e *NetworkError) Error() string

Error returns a formatted error string

func (*NetworkError) Unwrap

func (e *NetworkError) Unwrap() error

Unwrap returns the underlying error

type Publisher

type Publisher struct {
	// Errors is a channel over which non-fatal errors are
	// sent. This channel must have a listener otherwise
	// deadlock will arise.
	//
	// If NetworkRetry is set to 0, nothing will be sent
	// over this channel. It doesn't need a listener in
	// that case.
	Errors chan error
	// contains filtered or unexported fields
}

Publisher is capable of publishing messages to Redis Streams. It should be created via NewPublisher().

func NewPublisher

func NewPublisher(opts *PublisherOptions) *Publisher

NewPublisher returns an initialised Publisher

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, m *Message) error

Publish publishes the message to a Redis Stream

type PublisherOptions

type PublisherOptions struct {
	// StreamMaxLength sets the MAXLEN option when calling
	// XADD. This limits the size of the stream. Old entries
	// are automatically evicted when the specified length
	// is reached.
	StreamMaxLength int64

	// ApproximateMaxLength is an optimisation that allows
	// the stream to be capped more efficiently, as long as
	// an exact length is not required.
	ApproximateMaxLength bool

	// Redis is an instance of *redis.Client for use by
	// the client. This must be set before using the client.
	Redis *redis.Client

	// Backoff is used to retry requests to Redis in the
	// case of network failures. If this value is nil, a
	// Backoff with sensible defaults will be used.
	Backoff *backoff.Backoff

	// NetworkRetry is the number of times to retry failed
	// network requests to Redis before returning a fatal
	// error. A value of zero means requests will not be
	// retried. A value of < 0 means requests will be
	// retried indefinitely until the context is cancelled.
	NetworkRetry int
}

PublisherOptions contains options to configure the Publisher

type RedisError

type RedisError struct {
	Err error
}

RedisError is returned if the Redis client returns an error that is not a network error.

func (*RedisError) Error

func (e *RedisError) Error() string

Error returns a formatted error string

func (*RedisError) Unwrap

func (e *RedisError) Unwrap() error

Unwrap returns the underlying error

type Result

type Result struct {
	Retry   bool
	Err     error
	Backoff time.Duration
}

Result defines the result of a handler

func Discard

func Discard(err error) Result

Discard should be returned when the message was not successfully processed but should not be retried. It will be acknowledged and therefore not re-processed by any consumers. The error will be enqueued on the error channel.

func Fail

func Fail(err error) Result

Fail should be returned when the message was not successfully processed and should be retried. The message will not be acknowledged and therefore retried. It will not necessarily be retried by the same consumer. The error will be enqueued on the error channel.

func Success

func Success() Result

Success should be returned when the message was successfully processed. The message will be acknowledged and therefore not re-processed by any consumers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL