queueutils

package
v0.73.68 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2025 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BatchConcurrent

func BatchConcurrent[T any](batchSize int, things []T, fn func(group []T) error) error

func BatchLinear

func BatchLinear[T any](batchSize int, things []T, fn func(group []T) error) error

func SleepWithExponentialBackoff

func SleepWithExponentialBackoff(base, max time.Duration, retryCount int)

SleepWithExponentialBackoff sleeps for a duration calculated using exponential backoff and jitter, based on the retry count. The base sleep time and maximum sleep time are provided as inputs. retryCount determines the exponential backoff multiplier.

Types

type ID added in v0.73.0

type ID interface {
	string | int64
}

type OpMethod

type OpMethod[T ID] func(ctx context.Context, id T) (bool, error)

type OperationPool

type OperationPool[T ID] struct {
	// contains filtered or unexported fields
}

func NewOperationPool

func NewOperationPool[T ID](ql *zerolog.Logger, timeout time.Duration, description string, method OpMethod[T]) *OperationPool[T]

func (*OperationPool[T]) GetOperation

func (p *OperationPool[T]) GetOperation(id T) *SerialOperation[T]

func (*OperationPool[T]) RunOrContinue

func (p *OperationPool[T]) RunOrContinue(id T)

func (*OperationPool[T]) SetPartitions added in v0.73.0

func (p *OperationPool[T]) SetPartitions(partitions []int64)

func (*OperationPool[T]) SetTenants

func (p *OperationPool[T]) SetTenants(tenants []*dbsqlc.Tenant)

func (*OperationPool[T]) WithJitter

func (p *OperationPool[T]) WithJitter(maxJitter time.Duration) *OperationPool[T]

type SerialOperation

type SerialOperation[T ID] struct {
	// contains filtered or unexported fields
}

SerialOperation represents a method that can only run serially. It can be configured with a maxJitter duration to add a random delay before executing, which helps prevent the "thundering herd" problem when many operations might start at the same time. The jitter is disabled by default (maxJitter=0) and can be enabled via OperationPool.WithJitter().

func (*SerialOperation[T]) Run

func (o *SerialOperation[T]) Run(ql *zerolog.Logger)

func (*SerialOperation[T]) RunOrContinue

func (o *SerialOperation[T]) RunOrContinue(ql *zerolog.Logger)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL