Documentation
¶
Overview ¶
Package batch provides a generic, type-safe batch processor for efficient API ingestion.
The processor buffers incoming records in a channel-based queue and batches them by size or time interval. It uses configurable worker goroutines for parallel processing and provides graceful shutdown with timeout handling.
The batch processor is designed to work with any type implementing the Sender[T] interface, making it reusable across different API endpoints and data types.
Index ¶
- Variables
- func WithBufferSize(bufferSize int) applyOption
- func WithFlushInterval(flushInterval time.Duration) applyOption
- func WithMaxBatchSize(maxBatchSize int) applyOption
- func WithNumWorkers(numWorkers int) applyOption
- func WithShutdownTimeout(shutdownTimeout time.Duration) applyOption
- type Config
- type Processor
- type Sender
Constants ¶
This section is empty.
Variables ¶
Functions ¶
func WithBufferSize ¶
func WithBufferSize(bufferSize int) applyOption
WithBufferSize sets the size of the internal record recordCh. If the recordCh is full, Submit will return an error. Default is 1000 records.
func WithFlushInterval ¶
WithFlushInterval sets the time interval for automatic batch flushing. Batches will be sent after this interval even if not full. Default is 3 seconds.
func WithMaxBatchSize ¶
func WithMaxBatchSize(maxBatchSize int) applyOption
WithMaxBatchSize sets the maximum number of records to send in a single batch. Default is 100 records per batch.
func WithNumWorkers ¶
func WithNumWorkers(numWorkers int) applyOption
WithNumWorkers sets the number of worker goroutines for processing batches. More workers enable higher concurrency but use more resources. Default is 1.
func WithShutdownTimeout ¶
WithShutdownTimeout sets the maximum time to wait for graceful shutdown. If the processor doesn't shut down within this time, an error is returned. Default is 30 seconds.
Types ¶
type Config ¶
type Config struct {
// MaxBatchSize defines the maximum number of records to send in a single batch.
// Default is 32.
MaxBatchSize int
// FlushInterval defines the interval at which the processor will flush the records
// even if the batch size is not reached.
// Default is 3 seconds.
FlushInterval time.Duration
// BufferSize defines the size of the internal recordCh for incoming records.
// If the recordCh is full, Submit will return an error.
// Default is MaxBatchSize * 10.
BufferSize int
// NumWorkers defines the number of worker goroutines that will process the batches.
// Default is 1.
NumWorkers int
// ShutdownTimeout defines the maximum time to wait for the processor to shut down gracefully.
// If the processor does not shut down within this time, an error will be returned.
// Default is 30 seconds.
ShutdownTimeout time.Duration
}
Config holds the configuration for the batch processor.
type Processor ¶
type Processor[T any] struct { // contains filtered or unexported fields }
Processor is a generic, type-safe batch processor that efficiently collects and sends records.
The processor uses a channel-based architecture with configurable batching by size and time. It supports multiple worker goroutines for parallel processing and provides graceful shutdown with timeout handling. Records are buffered in memory and automatically flushed when batch size limits are reached or flush intervals expire.
The processor is thread-safe and can be used concurrently from multiple goroutines.
func NewProcessor ¶
NewProcessor creates a new Processor instance with the provided Sender and optional configuration.
The processor is immediately started with the configured number of worker goroutines. Use the provided With* option functions to customize batch size, flush interval, recordCh size, number of workers, and shutdown timeout.
Example:
processor := NewProcessor(sender, WithMaxBatchSize(50), WithFlushInterval(5*time.Second), WithNumWorkers(2), )
type Sender ¶
Sender defines the interface for sending batched records to an external service.
Implementations should handle the actual HTTP requests or other transport mechanisms to deliver the batched records. The Send method receives a context for cancellation and a slice of records to be sent as a batch.