batch

package
v0.0.0-...-4e0b803 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrBatcherClosed = errors.New("batch: add on closed batcher")

ErrBatcherClosed is returned by Add when the Batcher has been closed.

Functions

This section is empty.

Types

type Batcher

type Batcher[K comparable, V any] struct {
	// contains filtered or unexported fields
}

Batcher groups items by key and flushes them together to a user-supplied function. Items with the same key always land in the same shard and the same flush call. Flush is triggered either when a key's item count reaches MaxSize or when the flush interval elapses. Close gracefully drains all pending items before stopping.

func New

func New[K comparable, V any](flushFn func(context.Context, K, []V) error, opts ...Option[K, V]) *Batcher[K, V]

New creates a new Batcher. flushFn is required; it is called with (ctx, key, items) whenever a batch is ready. Panics if flushFn is nil after all options are applied.

func (*Batcher[K, V]) Add

func (b *Batcher[K, V]) Add(ctx context.Context, key K, val V) error

Add enqueues val under key to be flushed by a shard goroutine. Returns ErrBatcherClosed if the Batcher is closed. Returns ctx.Err() if ctx is already done before the item is enqueued.

func (*Batcher[K, V]) Close

func (b *Batcher[K, V]) Close() error

Close stops the Batcher from accepting new items, drains all pending items, and waits for all shard goroutines to finish. It is safe to call multiple times.

type Option

type Option[K comparable, V any] func(*options[K, V])

Option configures a Batcher.

func WithFlushFunc

func WithFlushFunc[K comparable, V any](fn func(context.Context, K, []V) error) Option[K, V]

WithFlushFunc overrides the flush function set via New.

func WithFlushInterval

func WithFlushInterval[K comparable, V any](d time.Duration) Option[K, V]

WithFlushInterval sets the interval at which each shard flushes all pending items (default: 10ms).

func WithMaxSize

func WithMaxSize[K comparable, V any](n int) Option[K, V]

WithMaxSize sets the maximum number of items per key before a flush is triggered (default: 100).

func WithOnPanic

func WithOnPanic[K comparable, V any](fn func(r any)) Option[K, V]

WithOnPanic sets a hook called when a flush function panics. r is the recovered panic value.

func WithShardKey

func WithShardKey[K comparable, V any](fn func(K) int) Option[K, V]

WithShardKey sets a custom shard-selection function. The function receives a key and returns an integer; the batcher maps it to a shard via abs(n) % numShards. Default: FNV-32a hash of the key.

func WithShards

func WithShards[K comparable, V any](n int) Option[K, V]

WithShards sets the number of shards (default: 16).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL