batch

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2025 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package batch provides a generic, type-safe batch processor for efficient API ingestion.

The processor buffers incoming records in a channel-based queue and batches them by size or time interval. It uses configurable worker goroutines for parallel processing and provides graceful shutdown with timeout handling.

The batch processor is designed to work with any type implementing the Sender[T] interface, making it reusable across different API endpoints and data types.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrProcessorClosed = errors.New("batch processor is closed")
	ErrBufferFull      = errors.New("event recordCh is full")
	ErrShutdownTimeout = errors.New("shutdown timeout exceeded")
)

Functions

func WithBufferSize

func WithBufferSize(bufferSize int) applyOption

WithBufferSize sets the size of the internal record recordCh. If the recordCh is full, Submit will return an error. Default is 1000 records.

func WithFlushInterval

func WithFlushInterval(flushInterval time.Duration) applyOption

WithFlushInterval sets the time interval for automatic batch flushing. Batches will be sent after this interval even if not full. Default is 3 seconds.

func WithMaxBatchSize

func WithMaxBatchSize(maxBatchSize int) applyOption

WithMaxBatchSize sets the maximum number of records to send in a single batch. Default is 100 records per batch.

func WithNumWorkers

func WithNumWorkers(numWorkers int) applyOption

WithNumWorkers sets the number of worker goroutines for processing batches. More workers enable higher concurrency but use more resources. Default is 1.

func WithShutdownTimeout

func WithShutdownTimeout(shutdownTimeout time.Duration) applyOption

WithShutdownTimeout sets the maximum time to wait for graceful shutdown. If the processor doesn't shut down within this time, an error is returned. Default is 30 seconds.

Types

type Config

type Config struct {
	// MaxBatchSize defines the maximum number of records to send in a single batch.
	// Default is 32.
	MaxBatchSize int
	// FlushInterval defines the interval at which the processor will flush the records
	// even if the batch size is not reached.
	// Default is 3 seconds.
	FlushInterval time.Duration
	// BufferSize defines the size of the internal recordCh for incoming records.
	// If the recordCh is full, Submit will return an error.
	// Default is MaxBatchSize * 10.
	BufferSize int
	// NumWorkers defines the number of worker goroutines that will process the batches.
	// Default is 1.
	NumWorkers int
	// ShutdownTimeout defines the maximum time to wait for the processor to shut down gracefully.
	// If the processor does not shut down within this time, an error will be returned.
	// Default is 30 seconds.
	ShutdownTimeout time.Duration
}

Config holds the configuration for the batch processor.

type Processor

type Processor[T any] struct {
	// contains filtered or unexported fields
}

Processor is a generic, type-safe batch processor that efficiently collects and sends records.

The processor uses a channel-based architecture with configurable batching by size and time. It supports multiple worker goroutines for parallel processing and provides graceful shutdown with timeout handling. Records are buffered in memory and automatically flushed when batch size limits are reached or flush intervals expire.

The processor is thread-safe and can be used concurrently from multiple goroutines.

func NewProcessor

func NewProcessor[T any](sender Sender[T], options ...applyOption) *Processor[T]

NewProcessor creates a new Processor instance with the provided Sender and optional configuration.

The processor is immediately started with the configured number of worker goroutines. Use the provided With* option functions to customize batch size, flush interval, recordCh size, number of workers, and shutdown timeout.

Example:

processor := NewProcessor(sender,
	WithMaxBatchSize(50),
	WithFlushInterval(5*time.Second),
	WithNumWorkers(2),
)

func (*Processor[T]) Close

func (p *Processor[T]) Close() error

Close gracefully shuts down the processor, ensuring all pendingCh records are sent. It waits for the shutdown to complete or times out based on the configured ShutdownTimeout.

func (*Processor[T]) Flush added in v0.0.3

func (p *Processor[T]) Flush()

func (*Processor[T]) Submit

func (p *Processor[T]) Submit(record T) error

Submit adds a record to the processor's recordCh. If the recordCh is full, it returns an error.

type Sender

type Sender[T any] interface {
	Send(ctx context.Context, records []T) error
}

Sender defines the interface for sending batched records to an external service.

Implementations should handle the actual HTTP requests or other transport mechanisms to deliver the batched records. The Send method receives a context for cancellation and a slice of records to be sent as a batch.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL