retrypool

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: MIT Imports: 7 Imported by: 0

README

Go Reference Go Coverage CI Status Stability Go Report

RetryPool

The RetryPool package provides a mechanism for executing tasks with automatic retry and error handling capabilities. It allows you to define a work function that performs the actual task, an error handling function that determines whether to retry or stop based on the error, and a success handling function that is called when the task is successfully completed.

The RetryPool is created with a specified number of worker goroutines and a maximum number of tasks that can be queued. Tasks are added to the pool using the Add method, and the pool executes them concurrently. The pool automatically retries failed tasks based on the error handling function and configurable retry settings.

Its also supports various configuration options, such as maximum age for a task, retry delay, maximum retry delay, and processing delay. These options allow you to fine-tune the behavior of the RetryPool according to your specific requirements.

Installation

go get github.com/n-r-w/retrypool

Usage

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/n-r-w/retrypool"
)

type Data struct {
    Value int
}

func main() {
    // Define your work function
    workFunc := func(ctx context.Context, v Data, age time.Duration) error {
        if age < time.Second/2 {
            fmt.Printf("Processing with error: %v after %v\n", v, age)
            return fmt.Errorf("error %v", v)
        }

        fmt.Printf("Processing: %v after %v\n", v, age)
        return nil
    }

    // Define your error handling function
    errorFunc := func(err error, v Data, age time.Duration, closing bool) bool {
        fmt.Printf("Error: %v after %v\n", v, age)

        if closing {
            fmt.Printf("Closing: %v after %v\n", v, age)
        }

        return age < time.Minute // Retry for a minute
    }

    // Define your success handling function
    successFunc := func(v Data, age time.Duration) {
        fmt.Printf("Success: %v after %v\n", v, age)
    }

    // Create a new RetryPool
    pool := retrypool.New(100, 100, 2, workFunc, errorFunc,
        retrypool.WithMaxAge[Data](time.Minute*10),
        retrypool.WithRetryDelay[Data](time.Second/2),
        retrypool.WithMaxRetryDelay[Data](time.Second),
        retrypool.WithProcessingDelay[Data](time.Hour),
        retrypool.WithSuccessFunc[Data](successFunc))

    // Add tasks to the pool
    pool.Add(Data{Value: 1})
    
    time.Sleep(time.Second * 2)

    pool.Stop()
}

Output:

Processing with error: {1} after 19.191µs
Error: {1} after 36.772µs
Processing: {1} after 1.000079775s
Success: {1} after 1.000079775s

Documentation

Overview

Package retrypool Execution of tasks for processing data in multiple workers with error control. In case of an error, the data is deferred to a buffer and periodic retries are made. Data is deleted from the error buffer through the ErrorFunc function.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ErrorFunc

type ErrorFunc[T any] func(err error, v T, age time.Duration, closing bool) (keep bool)

ErrorFunc is a function for error handling. The parameter "closing" is true when the last attempt is made before stopping.

type LogPanicFunc

type LogPanicFunc func(ctx context.Context, recoverInfo any, stack []byte)

LogPanicFunc is a function for logging panics.

type MetricCounterFunc

type MetricCounterFunc func()

MetricCounterFunc is a function for modifying Counter metrics

type MetricGaugeFunc

type MetricGaugeFunc func(float64)

MetricGaugeFunc is a function for modifying Gauge metrics

type Option

type Option[T any] func(*options[T])

func WithMaxAge

func WithMaxAge[T any](age time.Duration) Option[T]

WithMaxAge is the maximum message lifetime. Used for monitoring purposes.

func WithMaxRetryDelay

func WithMaxRetryDelay[T any](delay time.Duration) Option[T]

WithMaxRetryDelay is the maximum pause between retries. The retry time will start with RetryDelay and double after each unsuccessful error queue processing.

func WithMetricFuncs

func WithMetricFuncs[T any](
	dataLostCountFunc, metricRetryCountFunc, metricFirstCount MetricCounterFunc,
	metricTimeToMaxFunc, metricRetryDelayToMaxFunc MetricGaugeFunc,
) Option[T]

WithMetricFuncs functions for working with metrics

func WithProcessingDelay

func WithProcessingDelay[T any](delay time.Duration) Option[T]

WithProcessingDelay is the timeout based on which a context will be created and passed to the data processing function.

func WithRetryDelay

func WithRetryDelay[T any](delay time.Duration) Option[T]

WithRetryDelay initial pause between retries

func WithSuccessFunc

func WithSuccessFunc[T any](f SuccessFunc[T]) Option[T]

WithSuccessFunc. Called when data is successfully sent after an error

type RetryPool

type RetryPool[T any] struct {
	// contains filtered or unexported fields
}

RetryPool ...

func New

func New[T any](inboundQueueSize, retryQueueSize, workerCount int, workFunc WorkFunc[T], errorFunc ErrorFunc[T], opts ...Option[T]) *RetryPool[T]

New creates a RetryPool

func (*RetryPool[T]) Add

func (d *RetryPool[T]) Add(v T) bool

Add adds data to the processing queue. Returns false if the queue is full or if Stop was called.

func (*RetryPool[T]) Exec

func (d *RetryPool[T]) Exec(ctx context.Context, v T) error

Exec executes the processing immediately and if it returns an error, adds it to the queue. Returns an error if it occurred and it was not possible to add it to the queue due to overflow or stop.

func (*RetryPool[T]) InboundQueueLen

func (d *RetryPool[T]) InboundQueueLen() int64

InboundQueueLen returns the current size of the queue for processing data plus the number of items being processed. Used for statistics and debugging.

func (*RetryPool[T]) InboundQueueMax

func (d *RetryPool[T]) InboundQueueMax() int

InboundQueueMax is the maximum size of the queue for processing data

func (*RetryPool[T]) MaxAge

func (d *RetryPool[T]) MaxAge() time.Duration

MaxAge is the maximum lifetime of messages. Used for monitoring the state.

func (*RetryPool[T]) MaxRetryDelay

func (d *RetryPool[T]) MaxRetryDelay() time.Duration

MaxRetryDelay is the maximum pause between retries

func (*RetryPool[T]) ProcessingDelay

func (d *RetryPool[T]) ProcessingDelay() time.Duration

ProcessingDelay is the timeout based on which a context will be created and passed to the data processing function

func (*RetryPool[T]) RetryDelay

func (d *RetryPool[T]) RetryDelay() time.Duration

RetryDelay initial pause between retries

func (*RetryPool[T]) RetryQueueLen

func (d *RetryPool[T]) RetryQueueLen() int64

RetryQueueLen returns the current size of the error queue (data from this queue periodically goes back to InboundQueue) plus the number of items being retried. Used for statistics and debugging.

func (*RetryPool[T]) RetryQueueMax

func (d *RetryPool[T]) RetryQueueMax() int

RetryQueueMax is the maximum size of the error queue

func (*RetryPool[T]) Stop

func (d *RetryPool[T]) Stop()

Stop closes the reception of new tasks (Add will return false), then tries to process the buffer one last time.

type StateInformer

type StateInformer interface {
	InboundQueueLen() int64
	RetryQueueLen() int64
	InboundQueueMax() int
	RetryQueueMax() int
}

StateInformer interface for collecting usage statistics without the need to specify a pointer to the RetryPool itself, which requires typing

type SuccessFunc

type SuccessFunc[T any] func(v T, age time.Duration)

SuccessFunc is called when data is successfully sent after an error

type WorkFunc

type WorkFunc[T any] func(ctx context.Context, v T, age time.Duration) error

WorkFunc is the main function for processing data.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL