operation

package
v0.73.70 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 20, 2025 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithDescription

func WithDescription(description string) func(*SerialOperation)

func WithInterval

func WithInterval(
	l *zerolog.Logger,
	repo v1.IntervalSettingsRepository,
	maxJitter, startInterval, maxInterval time.Duration,
	incBackoffCount int,
	gauge IntervalGauge,
) func(*SerialOperation)

func WithPoolInterval

func WithPoolInterval(
	repo v1.IntervalSettingsRepository,
	maxJitter, startInterval, maxInterval time.Duration,
	incBackoffCount int,
	gauge IntervalGauge,
) func(*OperationPool)

func WithTimeout

func WithTimeout(timeout time.Duration) func(*SerialOperation)

Types

type ID

type ID interface {
	string | int64
}

type Interval

type Interval struct {
	// contains filtered or unexported fields
}

func NewInterval

func NewInterval(
	l *zerolog.Logger,
	repo v1.IntervalSettingsRepository,
	operationId, resourceId string,
	maxJitter, startInterval, maxInterval time.Duration,
	incBackoffCount int,
	gauge IntervalGauge,
) *Interval

func (*Interval) RunInterval

func (i *Interval) RunInterval(ctx context.Context) <-chan struct{}

runInterval sends a struct{} on the returned channel at the configured interval, and exits when the context is cancelled.

func (*Interval) SetIntervalGauge

func (i *Interval) SetIntervalGauge(rowsModified int)

type IntervalGauge

type IntervalGauge func(ctx context.Context, resourceId string) (int, error)

IntervalGauge is a function that determines whether or not to increase or reset the interval. If the returned integer is >0, the interval is reset to the start interval. If 0, the no-rows count is increased, and if it exceeds the incBackoffCount, the interval is doubled.

type OpMethod

type OpMethod func(ctx context.Context, id string) (shouldContinue bool, err error)

type OperationPool

type OperationPool struct {
	// contains filtered or unexported fields
}

func NewOperationPool

func NewOperationPool(p *partition.Partition, ql *zerolog.Logger, operationId string, timeout time.Duration, description string, method OpMethod, fs ...func(*OperationPool)) *OperationPool

func (*OperationPool) Cleanup

func (p *OperationPool) Cleanup()

func (*OperationPool) RunOrContinue

func (p *OperationPool) RunOrContinue(id string)

type SerialOperation

type SerialOperation struct {
	// contains filtered or unexported fields
}

SerialOperation represents a method that can only run serially. Each operation can optionally run on an interval with jitter. This interval can be configured by reading from an external source like a database.

When RunOrContinue is called, the operation will run if it is not already running. Each method called in RunOrContinue must return a boolean indicating whether the operation should continue running, along with an integer indicating the number of rows modified.

Intervals which are configured with withBackoff=true will double their interval each time the method returns rowsModified=0 for a given number of times (incBackoffCount). The interval will reset to the original interval when rowsModified>0 is returned.

The jitter is disabled by default (maxJitter=0). The jitter is applied to the interval after any backoff is applied. This is designed to help prevent the "thundering herd" problem when many operations might start at the same time.

func NewSerialOperation

func NewSerialOperation(
	l *zerolog.Logger,
	id string,
	operationId string,
	method OpMethod,
	fs ...func(*SerialOperation),
) *SerialOperation

func (*SerialOperation) Run

func (o *SerialOperation) Run(l *zerolog.Logger)

func (*SerialOperation) RunOrContinue

func (o *SerialOperation) RunOrContinue(l *zerolog.Logger)

func (*SerialOperation) Stop

func (o *SerialOperation) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL