stream_batcher

package
v1.27.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2025 License: MIT Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Batcher

type Batcher[T, R any] struct {
	// contains filtered or unexported fields
}

Batcher collects items concurrently passed to Add into batches and calls a processing function on them a batch at a time. The processing function will be called on batches of items in a single-threaded manner, and Add will block while fn is running.

func NewBatcher

func NewBatcher[T, R any](fn func([]T) R, opts BatcherOptions, timeSource clock.TimeSource) *Batcher[T, R]

NewBatcher creates a Batcher. `fn` is the processing function, `opts` are the timing options. `clock` is usually clock.NewRealTimeSource but can be a fake time source for testing.

func (*Batcher[T, R]) Add

func (b *Batcher[T, R]) Add(ctx context.Context, t T) (R, error)

Add adds an item to the stream and returns when it has been processed, or if the context is canceled or times out. It returns two values: the value that the batch processor returned for the whole batch that the item ended up in, and a context error. Even if Add returns a context error, the item may still be processed in the future!

type BatcherOptions

type BatcherOptions struct {
	// MaxItems is the maximum number of items in a batch.
	MaxItems int
	// MinDelay is how long to wait for no more items to come in after any item before
	// finishing the batch.
	MinDelay time.Duration
	// MaxDelay is the maximum time to wait after the first item in a batch before finishing
	// the batch.
	MaxDelay time.Duration
	// IdleTime is the time after which the internal goroutine will exit, to avoid wasting
	// resources on idle streams.
	IdleTime time.Duration
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL