Documentation
¶
Overview ¶
Package retrypool Execution of tasks for processing data in multiple workers with error control. In case of an error, the data is deferred to a buffer and periodic retries are made. Data is deleted from the error buffer through the ErrorFunc function.
Index ¶
- type ErrorFunc
- type LogPanicFunc
- type MetricCounterFunc
- type MetricGaugeFunc
- type Option
- func WithMaxAge[T any](age time.Duration) Option[T]
- func WithMaxRetryDelay[T any](delay time.Duration) Option[T]
- func WithMetricFuncs[T any](dataLostCountFunc, metricRetryCountFunc, metricFirstCount MetricCounterFunc, ...) Option[T]
- func WithProcessingDelay[T any](delay time.Duration) Option[T]
- func WithRetryDelay[T any](delay time.Duration) Option[T]
- func WithSuccessFunc[T any](f SuccessFunc[T]) Option[T]
- type RetryPool
- func (d *RetryPool[T]) Add(v T) bool
- func (d *RetryPool[T]) Exec(ctx context.Context, v T) error
- func (d *RetryPool[T]) InboundQueueLen() int64
- func (d *RetryPool[T]) InboundQueueMax() int
- func (d *RetryPool[T]) MaxAge() time.Duration
- func (d *RetryPool[T]) MaxRetryDelay() time.Duration
- func (d *RetryPool[T]) ProcessingDelay() time.Duration
- func (d *RetryPool[T]) RetryDelay() time.Duration
- func (d *RetryPool[T]) RetryQueueLen() int64
- func (d *RetryPool[T]) RetryQueueMax() int
- func (d *RetryPool[T]) Stop()
- type StateInformer
- type SuccessFunc
- type WorkFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ErrorFunc ¶
ErrorFunc is a function for error handling. The parameter "closing" is true when the last attempt is made before stopping.
type LogPanicFunc ¶
LogPanicFunc is a function for logging panics.
type MetricCounterFunc ¶
type MetricCounterFunc func()
MetricCounterFunc is a function for modifying Counter metrics
type MetricGaugeFunc ¶
type MetricGaugeFunc func(float64)
MetricGaugeFunc is a function for modifying Gauge metrics
type Option ¶
type Option[T any] func(*options[T])
func WithMaxAge ¶
WithMaxAge is the maximum message lifetime. Used for monitoring purposes.
func WithMaxRetryDelay ¶
WithMaxRetryDelay is the maximum pause between retries. The retry time will start with RetryDelay and double after each unsuccessful error queue processing.
func WithMetricFuncs ¶
func WithMetricFuncs[T any]( dataLostCountFunc, metricRetryCountFunc, metricFirstCount MetricCounterFunc, metricTimeToMaxFunc, metricRetryDelayToMaxFunc MetricGaugeFunc, ) Option[T]
WithMetricFuncs functions for working with metrics
func WithProcessingDelay ¶
WithProcessingDelay is the timeout based on which a context will be created and passed to the data processing function.
func WithRetryDelay ¶
WithRetryDelay initial pause between retries
func WithSuccessFunc ¶
func WithSuccessFunc[T any](f SuccessFunc[T]) Option[T]
WithSuccessFunc. Called when data is successfully sent after an error
type RetryPool ¶
type RetryPool[T any] struct { // contains filtered or unexported fields }
RetryPool ...
func New ¶
func New[T any](inboundQueueSize, retryQueueSize, workerCount int, workFunc WorkFunc[T], errorFunc ErrorFunc[T], opts ...Option[T]) *RetryPool[T]
New creates a RetryPool
func (*RetryPool[T]) Add ¶
Add adds data to the processing queue. Returns false if the queue is full or if Stop was called.
func (*RetryPool[T]) Exec ¶
Exec executes the processing immediately and if it returns an error, adds it to the queue. Returns an error if it occurred and it was not possible to add it to the queue due to overflow or stop.
func (*RetryPool[T]) InboundQueueLen ¶
InboundQueueLen returns the current size of the queue for processing data plus the number of items being processed. Used for statistics and debugging.
func (*RetryPool[T]) InboundQueueMax ¶
InboundQueueMax is the maximum size of the queue for processing data
func (*RetryPool[T]) MaxAge ¶
MaxAge is the maximum lifetime of messages. Used for monitoring the state.
func (*RetryPool[T]) MaxRetryDelay ¶
MaxRetryDelay is the maximum pause between retries
func (*RetryPool[T]) ProcessingDelay ¶
ProcessingDelay is the timeout based on which a context will be created and passed to the data processing function
func (*RetryPool[T]) RetryDelay ¶
RetryDelay initial pause between retries
func (*RetryPool[T]) RetryQueueLen ¶
RetryQueueLen returns the current size of the error queue (data from this queue periodically goes back to InboundQueue) plus the number of items being retried. Used for statistics and debugging.
func (*RetryPool[T]) RetryQueueMax ¶
RetryQueueMax is the maximum size of the error queue
type StateInformer ¶
type StateInformer interface { InboundQueueLen() int64 RetryQueueLen() int64 InboundQueueMax() int RetryQueueMax() int }
StateInformer interface for collecting usage statistics without the need to specify a pointer to the RetryPool itself, which requires typing
type SuccessFunc ¶
SuccessFunc is called when data is successfully sent after an error