retry

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2021 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Backoff

type Backoff interface {
	// RetryAfter should return the time duration which should be
	// elapsed before the next queueForRetry.
	RetryAfter(attempts int) time.Duration
}

Backoff represents a backoff strategy to be used by the Retrier.

func ConstBackoff

func ConstBackoff(interval time.Duration) Backoff

ConstBackoff implements a constant interval Backoff strategy.

func ExpBackoff

func ExpBackoff(base float64, initialTimeout, maxTimeout time.Duration) Backoff

ExpBackoff provides a simple exponential Backoff strategy for retries.

type DelayQueue

type DelayQueue interface {
	// Enqueue must save the message with priority based on the timestamp set.
	// If no timestamp is set, current timestamp should be assumed.
	Enqueue(item Item) error

	// Dequeue should read one message that has an expired timestamp and call
	// readFn with it. Success/failure from readFn must be considered as ACK
	// or nACK respectively. When message is not available, Dequeue should not
	// block but return ErrNoMessage. queue can return EOF to indicate that the
	// queue is fully drained. Other errors from the queue will be logged and
	// ignored.
	Dequeue(ctx context.Context, readFn ReadFn) error
}

DelayQueue implementation maintains the messages in a timestamp based order. This is used by retrier for retries.

type InMemQ

type InMemQ struct {
	// contains filtered or unexported fields
}

InMemQ implements an in-memory min-heap based message queue.

func (*InMemQ) Dequeue

func (q *InMemQ) Dequeue(ctx context.Context, readFn ReadFn) error

Dequeue reads a message from the in-mem heap if available and calls readFn with it. Otherwise returns ErrNoMessage.

func (*InMemQ) Enqueue

func (q *InMemQ) Enqueue(item Item) error

Enqueue pushes the message into the in-mem heap with timestamp as its priority. If timestamp is not set, current timestamp will be assumed.

type Item

type Item struct {
	Message     fusion.Msg `json:"message"`
	Attempts    int        `json:"attempts"`
	NextAttempt time.Time  `json:"next_attempt"`
	LastAttempt time.Time  `json:"last_attempt"`
}

Item is maintained by the delay queue and tracks the retries done etc.

type ReadFn

type ReadFn func(ctx context.Context, item Item) error

ReadFn implementation is called by the message queue to handle a message.

type Retrier

type Retrier struct {
	// Proc is the fusion Proc that must be executed for each message.
	// This field must be set.
	Proc fusion.Proc

	// Queue can be set to use a delay queue. If not set, an in-memory
	// queue will be used.
	Queue DelayQueue

	// Backoff can be set to configure a backoff strategy to be used
	// during retries. If not set, constant backoff strategy will be
	// used with 1s intervals.
	Backoff Backoff

	// MaxRetries can be set to control how many retries should be done
	// before returning Fail status. Defaults to 3.
	MaxRetries int

	// EnqueueWorkers is the number of worker threads to use for moving
	// messages from stream to the queue.
	EnqueueWorkers int

	// ProcWorkers is the number of main worker threads to use for running
	// proc.
	ProcWorkers int

	// OnFailure is called when a message fails and exhausts all retries.
	// If not set, such messages will be logged and discarded.
	OnFailure func(item Item)

	// Log can be set to customise logging mechanism used by retrier. If
	// not set, logging will be disabled.
	Log fusion.Log
}

Retrier is a fusion Proc that can wrap other Proc implementation to provide automatic retries.

func (*Retrier) Run added in v0.3.1

func (ret *Retrier) Run(ctx context.Context, stream <-chan fusion.Msg) error

Run starts the proc workers and the retry worker threads and blocks until all the workers return.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL