batch

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2025 License: MIT Imports: 5 Imported by: 6

Documentation

Overview

Package batch contains the core batch processing functionality. The main type is Batch, which can be created using New. It reads from an implementation of the Source interface, and items are processed in batches by one or more implementations of the Processor interface. Some Source and Processor implementations may be provided in related packages, or you can create your own based on your needs.

Batch uses the MinTime, MinItems, MaxTime, and MaxItems configuration parameters in Config to determine when and how many items are processed at once.

These parameters may conflict, however; for example, during a slow time, MaxTime may be reached before MinItems are read. Thus it is necessary to prioritize the parameters in some way. They are prioritized as follows (with EOF signifying the end of the input data):

MaxTime = MaxItems > EOF > MinTime > MinItems

A few examples:

MinTime = 2s. After 1s the input channel is closed. The items are processed right away.

MinItems = 10, MinTime = 2s. After 1s, 10 items have been read. They are not processed until 2s has passed (along with all other items that have been read up to the 2s mark).

MaxItems = 10, MinTime = 2s. After 1s, 10 items have been read. They aren't processed until 2s has passed.

Multiple processors can be chained together when calling Go(). In a processor chain, each processor receives the output items from the previous processor, allowing for multi-stage processing pipelines:

// Chain three processors together
b.Go(ctx, source, validator, transformer, enricher)

Note that the timers and item counters are relative to the time when the previous batch started processing. Just before the timers and counters are started the config is read from the Config interface. This is so that the configuration can be changed at any time during processing.

Package batch provides a flexible batch processing pipeline for handling data.

Example
package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/MasterOfBinary/gobatch/batch"
	"github.com/MasterOfBinary/gobatch/source"
)

// printProcessor is a Processor that prints items in batches.
// To demonstrate how errors can be handled, it fails to process the number 5.
type printProcessor struct{}

// Process prints a batch of items and marks item 5 as failed.
func (p printProcessor) Process(ctx context.Context, items []*batch.Item) ([]*batch.Item, error) {
	toPrint := make([]interface{}, 0, len(items))
	for _, item := range items {
		if num, ok := item.Data.(int); ok && num == 5 {
			item.Error = errors.New("cannot process 5")
			continue
		}
		toPrint = append(toPrint, item.Data)
	}
	fmt.Println(toPrint)
	return items, nil
}

func main() {
	// Create a batch processor that processes items 5 at a time
	config := batch.NewConstantConfig(&batch.ConfigValues{
		MinItems: 5,
	})
	b := batch.New(config)
	p := &printProcessor{}

	// Channel is a Source that reads from a channel until it's closed
	ch := make(chan interface{})
	s := source.Channel{
		Input: ch,
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	errs := b.Go(ctx, &s, p)

	go func() {
		for i := 0; i < 20; i++ {
			time.Sleep(time.Millisecond * 10)
			ch <- i
		}
		close(ch)
	}()

	var lastErr error
	for err := range errs {
		lastErr = err
	}

	fmt.Println("Finished processing.")
	if lastErr != nil {
		fmt.Println("Found error:", lastErr.Error())
	}
}
Output:
[0 1 2 3 4]
[6 7 8 9]
[10 11 12 13 14]
[15 16 17 18 19]
Finished processing.
Found error: processor error: cannot process 5
Example (CustomSourceAndProcessor)
package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/MasterOfBinary/gobatch/batch"
)

// stringSource is a custom Source that generates string data.
type stringSource struct {
	strings []string
}

// Read implements the Source interface by sending strings to the output channel.
func (s *stringSource) Read(ctx context.Context) (<-chan interface{}, <-chan error) {
	out := make(chan interface{})
	errs := make(chan error)

	go func() {
		defer close(out)
		defer close(errs)

		for _, str := range s.strings {
			select {
			case <-ctx.Done():
				return
			case out <- str:

				time.Sleep(time.Millisecond * 5)
			}
		}
	}()

	return out, errs
}

// uppercaseProcessor is a custom Processor that converts strings to uppercase.
type uppercaseProcessor struct{}

// Process implements the Processor interface by converting strings to uppercase.
func (p *uppercaseProcessor) Process(ctx context.Context, items []*batch.Item) ([]*batch.Item, error) {
	processed := make([]interface{}, 0, len(items))

	for _, item := range items {

		if item.Error != nil {
			continue
		}

		select {
		case <-ctx.Done():
			return items, ctx.Err()
		default:

		}

		if str, ok := item.Data.(string); ok {
			item.Data = fmt.Sprintf("[%s]", str)
			processed = append(processed, item.Data)
		} else {
			item.Error = errors.New("not a string")
		}
	}

	if len(processed) > 0 {
		fmt.Printf("Processed batch: %v\n", processed)
	}

	return items, nil
}

// filterShortStringsProcessor filters out strings shorter than specified length
type filterShortStringsProcessor struct {
	minLength int
}

// Process implements the Processor interface by filtering short strings
func (p *filterShortStringsProcessor) Process(ctx context.Context, items []*batch.Item) ([]*batch.Item, error) {
	result := make([]*batch.Item, 0, len(items))
	filtered := make([]string, 0)

	for _, item := range items {

		if item.Error != nil {
			result = append(result, item)
			continue
		}

		if str, ok := item.Data.(string); ok {
			if len(str) >= p.minLength {
				result = append(result, item)
			} else {
				filtered = append(filtered, str)
			}
		} else {

			result = append(result, item)
		}
	}

	if len(filtered) > 0 {
		fmt.Printf("Filtered out short strings: %v\n", filtered)
	}

	return result, nil
}

func main() {
	// Create custom source with string data
	source := &stringSource{
		strings: []string{"hello", "world", "go", "batch", "processing", "is", "fun"},
	}

	// Create batch processor with custom config
	config := batch.NewConstantConfig(&batch.ConfigValues{
		MinItems: 2,                     // Process at least 2 items at once
		MaxItems: 3,                     // Process at most 3 items at once
		MinTime:  10 * time.Millisecond, // Wait at least 10ms before processing
	})

	// Create processor chain
	uppercaseProc := &uppercaseProcessor{}
	filterProc := &filterShortStringsProcessor{minLength: 4}

	// Create batch processor
	batchProcessor := batch.New(config)

	// Setup context
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	fmt.Println("Starting custom source and processor example...")

	// Start processing
	errs := batchProcessor.Go(ctx, source, filterProc, uppercaseProc)

	// Wait for completion and collect errors
	var processingErrors []error
	for err := range errs {
		processingErrors = append(processingErrors, err)
	}

	fmt.Println("Processing complete")
	if len(processingErrors) > 0 {
		fmt.Println("Errors occurred during processing:")
		for _, err := range processingErrors {
			fmt.Println("-", err)
		}
	}

}
Output:
Starting custom source and processor example...
Processed batch: [[hello] [world]]
Filtered out short strings: [go]
Processed batch: [[batch]]
Filtered out short strings: [is]
Processed batch: [[processing]]
Filtered out short strings: [fun]
Processing complete

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func CollectErrors added in v0.2.1

func CollectErrors(errs <-chan error) []error

CollectErrors collects all errors from the error channel into a slice. This is useful when you need to process all errors after batch processing completes.

Example usage:

errs := batch.CollectErrors(myBatch.Go(ctx, source, processor))
<-myBatch.Done()
for _, err := range errs {
	log.Printf("Error: %v", err)
}

func ExecuteBatches added in v0.2.1

func ExecuteBatches(ctx context.Context, configs ...*BatchConfig) []error

ExecuteBatches runs multiple batches concurrently and waits for all to complete. It returns all errors from all batches as a slice. This is useful when you need to process multiple data sources in parallel.

Example usage:

errs := batch.ExecuteBatches(ctx,
	&batch.BatchConfig{B: batch1, S: source1, P: []Processor{proc1}},
	&batch.BatchConfig{B: batch2, S: source2, P: []Processor{proc2}},
)

func IgnoreErrors

func IgnoreErrors(errs <-chan error)

IgnoreErrors starts a goroutine that reads errors from errs but ignores them. It can be used with Batch.Go if errors aren't needed. Since the error channel is unbuffered, one cannot just throw away the error channel like this:

// NOTE: bad - this can cause a deadlock!
_ = batch.Go(ctx, p, s)

Instead, IgnoreErrors can be used to safely throw away all errors:

batch.IgnoreErrors(myBatch.Go(ctx, p, s))

func RunBatchAndWait added in v0.2.1

func RunBatchAndWait(ctx context.Context, b *Batch, s Source, procs ...Processor) []error

RunBatchAndWait is a convenience function that runs a batch with the given source and processors, waits for it to complete, and returns all errors encountered. This is useful for simple batch processing where you don't need to handle errors asynchronously.

Example usage:

errs := batch.RunBatchAndWait(ctx, myBatch, source, processor1, processor2)
if len(errs) > 0 {
	// Handle errors
}

Types

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch provides batch processing given a Source and one or more Processors. Data is read from the Source and processed in batches through the Processors. Any errors are wrapped in either a SourceError or a ProcessorError, so the caller can determine where the errors came from.

To create a new Batch, call the New function. Creating one using &Batch{} will return the default Batch.

// The following are equivalent
defaultBatch1 := &batch.Batch{}
defaultBatch2 := batch.New(nil)
defaultBatch3 := batch.New(batch.NewConstantConfig(&batch.ConfigValues{}))

The defaults (with nil Config) provide a usable, but likely suboptimal, Batch where items are processed as soon as they are retrieved from the source. Processing is done in the background using as many goroutines as necessary.

Batch runs asynchronously until the source is exhausted and all items have been processed. There are two ways for the caller to know when processing is complete: the error channel returned from Go() is closed, or the channel returned from Done() is closed.

The first way can be used if errors need to be processed elsewhere. A simple loop could look like this:

errs := b.Go(ctx, s, p)
for err := range errs {
  // Log the error here...
  log.Print(err.Error())
}
// Now batch processing is done

If the errors don't need to be processed, the IgnoreErrors function can be used to drain the error channel. Then the Done channel can be used to determine whether or not batch processing is complete:

batch.IgnoreErrors(b.Go(ctx, s, p))
<-b.Done()
// Now batch processing is done

For synchronous processing where you want to wait for completion and collect all errors, the RunBatchAndWait helper function provides a convenient way to do this:

errors := batch.RunBatchAndWait(ctx, b, source, processor)
// Process errors here
for _, err := range errors {
	log.Printf("Error: %v", err)
}

Note that the errors returned on the error channel may be wrapped in a batch.Error so the caller knows whether they come from the source or the processor (or neither). Errors from the source will be of type SourceError, and errors from the processor will be of type ProcessorError. Errors from Batch itself will be neither.

func New

func New(config Config) *Batch

New creates a new Batch based on specified config. If config is nil, the default config is used as described in Batch.

To avoid race conditions, the config cannot be changed after the Batch is created. Instead, implement the Config interface to support changing values.

func (*Batch) Done

func (b *Batch) Done() <-chan struct{}

Done provides a channel that is closed when processing is complete. This can be used to block until batch processing finishes or to be notified of completion in a select statement.

Example usage:

// Start processing and ignore errors
batch.IgnoreErrors(b.Go(ctx, source, processor))

// Wait for completion
<-b.Done()
fmt.Println("Processing complete")

Or with a select statement:

select {
case <-b.Done():
	fmt.Println("Processing complete")
case <-ctx.Done():
	fmt.Println("Context canceled")
case <-time.After(timeout):
	fmt.Println("Timed out waiting for completion")
}

func (*Batch) Go

func (b *Batch) Go(ctx context.Context, s Source, procs ...Processor) <-chan error

Go starts batch processing asynchronously and returns a channel on which errors are written. When processing is done and the pipeline is drained, the error channel is closed.

Go launches the entire batch processing pipeline: 1. The Source reads items and passes them to the batch collector 2. The batch collector groups items according to Config parameters 3. Each batch is processed through all Processors in sequence 4. Errors from Source and Processors are forwarded to the returned channel

Multiple processors can be chained together in order:

// Chain three processors in sequence
errCh := b.Go(ctx, source, validateProc, transformProc, enrichProc)

Even though Go has several goroutines running concurrently, concurrent calls to Go are not allowed. If Go is called before a previous call completes, the second one will panic.

// NOTE: bad - this will panic!
errs := b.Go(ctx, s, p)
errs2 := b.Go(ctx, s, p) // this call panics

Note that Go does not stop if ctx is done. Otherwise loss of data could occur. Suppose the source reads item A and then ctx is canceled. If Go were to return right away, item A would not be processed and it would be lost.

To avoid situations like that, a proper way to handle context completion is for the source to check for ctx done and then close its channels. The batch processor realizes the source is finished reading items and it sends all remaining items to the processor for processing. Once the processor is done, it closes its error channel to signal to the batch processor. Finally, the batch processor signals to its caller that processing is complete and the entire pipeline is drained.

Example usage with error handling:

// Start processing
errCh := b.Go(ctx, source, processor)

// Handle errors as they occur
go func() {
	for err := range errCh {
		if sourceErr, ok := err.(*batch.SourceError); ok {
			log.Printf("Source error: %v", sourceErr.Unwrap())
		} else if procErr, ok := err.(*batch.ProcessorError); ok {
			log.Printf("Processor error: %v", procErr.Unwrap())
		} else {
			log.Printf("Other error: %v", err)
		}
	}
	// Processing is complete when the error channel is closed
	log.Println("Processing complete")
}()

type BatchConfig added in v0.2.1

type BatchConfig struct {
	B *Batch      // The Batch instance to use
	S Source      // The Source to read items from
	P []Processor // The processors to apply to the items
}

BatchConfig holds the configuration for a single batch execution. It combines a Batch instance, a Source to read from, and a list of Processors to apply to the data from the source. This is used primarily with the ExecuteBatches function to run multiple batch operations concurrently.

type Config

type Config interface {
	// Get returns the values for configuration.
	//
	// If MinItems > MaxItems or MinTime > MaxTime, the min value will be
	// set to the maximum value.
	//
	// If the config values may be modified during batch processing, Get
	// must properly handle concurrency issues.
	Get() ConfigValues
}

Config retrieves the config values used by Batch. If these values are constant, NewConstantConfig can be used to create an implementation of the interface.

The Config interface allows for dynamic configuration of the batching behavior, which can be adjusted during runtime. This is useful for tuning the system under different load scenarios or adapting to changing performance requirements.

type ConfigValues

type ConfigValues struct {
	// MinTime specifies that a minimum amount of time that should pass
	// before processing items. The exception to this is if a max number
	// of items was specified and that number is reached before MinTime;
	// in that case those items will be processed right away.
	//
	// This parameter is useful to prevent processing very small batches
	// too frequently when items arrive at a slow but steady rate.
	MinTime time.Duration `json:"minTime"`

	// MinItems specifies that a minimum number of items should be
	// processed at a time. Items will not be processed until MinItems
	// items are ready for processing. The exceptions to that are if MaxTime
	// is specified and that time is reached before the minimum number of
	// items is available, or if all items have been read and are ready
	// to process.
	//
	// This parameter helps optimize processing by ensuring batches are
	// large enough to amortize the overhead of processing across multiple items.
	MinItems uint64 `json:"minItems"`

	// MaxTime specifies that a maximum amount of time should pass before
	// processing. Once that time has been reached, items will be processed
	// whether or not MinItems items are available.
	//
	// This parameter ensures that items don't wait in the queue for too long,
	// which is important for latency-sensitive applications.
	MaxTime time.Duration `json:"maxTime"`

	// MaxItems specifies that a maximum number of items should be available
	// before processing. Once that number of items is available, they will
	// be processed whether or not MinTime has been reached.
	//
	// This parameter prevents the system from accumulating too many items
	// in a single batch, which could lead to memory pressure or processing
	// spikes.
	MaxItems uint64 `json:"maxItems"`
}

ConfigValues is a struct that contains the Batch config values. These values control the timing and sizing behavior of batches in the pipeline. The batch system uses these parameters to determine when to process a batch based on time elapsed and number of items collected.

type ConstantConfig

type ConstantConfig struct {
	// contains filtered or unexported fields
}

ConstantConfig is a Config with constant values. Create one with NewConstantConfig.

This implementation is safe to use concurrently since the values never change after initialization.

func NewConstantConfig

func NewConstantConfig(values *ConfigValues) *ConstantConfig

NewConstantConfig returns a Config with constant values. If values is nil, the default values are used as described in Batch.

This is a convenience function for creating a configuration that doesn't change during the lifetime of the batch processing. It's the simplest way to provide configuration to the Batch system.

func (*ConstantConfig) Get

func (b *ConstantConfig) Get() ConfigValues

Get implements the Config interface. Returns the constant configuration values stored in this ConstantConfig.

type Item

type Item struct {
	ID    uint64      // Do not modify - unique identifier for tracking items
	Data  interface{} // Safe to modify during processing - contains the item data
	Error error       // Populated by processors to track per-item failure
}

Item represents a single data item being processed through the batch pipeline. Each item has a unique ID, the data payload being processed, and an optional error field that processors can set when item-specific processing fails.

type Processor

type Processor interface {
	// Process receives a batch of items, performs operations on them, and returns
	// the processed items along with any processor-wide error.
	//
	// Each implementation should:
	// 1. Process items in the batch synchronously
	// 2. Set item.Error on individual items that fail processing
	// 3. Return processor-wide errors separately from per-item errors
	// 4. Respect context cancellation
	//
	// The returned slice may contain a different number of items than the input slice.
	// Items can be added, removed, or reordered as needed.
	//
	// Processors can be chained together in the Go() method, with each processor
	// receiving the output from the previous one:
	//
	//	// Chain three processors together
	//	batch.Go(ctx, source, processor1, processor2, processor3)
	//
	// In a processor chain, each processor receives the output items from the
	// previous processor, allowing for multi-stage processing pipelines. This is
	// useful for separating different processing concerns:
	//
	//	// First processor validates items
	//	// Second processor transforms items
	//	// Third processor enriches items with additional data
	//	batch.Go(ctx, source, validationProc, transformProc, enrichmentProc)
	//
	// Example implementation:
	//
	//	func (p *MyProcessor) Process(ctx context.Context, items []*batch.Item) ([]*batch.Item, error) {
	//		for _, item := range items {
	//			// Skip items that already have errors
	//			if item.Error != nil {
	//				continue
	//			}
	//
	//			// Check for context cancellation
	//			select {
	//			case <-ctx.Done():
	//				return items, ctx.Err()
	//			default:
	//				// Continue processing
	//			}
	//
	//			// Process the item
	//			result, err := p.processItem(item.Data)
	//			if err != nil {
	//				item.Error = err
	//				continue
	//			}
	//
	//			// Update with processed data
	//			item.Data = result
	//		}
	//		return items, nil
	//	}
	//
	// Important:
	// - Never modify the ID field of items
	// - Individual item errors should be set on the item.Error field
	// - Return a non-nil error only for processor-wide failures
	Process(ctx context.Context, items []*Item) ([]*Item, error)
}

Processor processes items in batches.

type ProcessorError

type ProcessorError struct {
	Err error
}

ProcessorError is returned when a processor fails. It wraps the original error from the processor to maintain the error chain while providing context about the source of the error.

func (ProcessorError) Error

func (e ProcessorError) Error() string

Error implements the error interface, returning a formatted error message that includes the wrapped processor error.

func (ProcessorError) Unwrap added in v0.2.0

func (e ProcessorError) Unwrap() error

Unwrap returns the underlying error for compatibility with errors.Is and errors.As.

type Source

type Source interface {
	// Read reads items from a data source and returns two channels:
	// - out: for data items
	// - errs: for any errors encountered during reading
	//
	// Each implementation should:
	// 1. Set up appropriate channels for data and errors
	// 2. Start a goroutine to read from the source
	// 3. Close both channels when reading is complete
	// 4. Respect context cancellation
	//
	// Example implementation:
	//
	//	func (s *MySource) Read(ctx context.Context) (out <-chan interface{}, errs <-chan error) {
	//		dataCh := make(chan interface{})
	//		errCh := make(chan error)
	//
	//		go func() {
	//			defer close(dataCh)
	//			defer close(errCh)
	//
	//			// Read items until done...
	//			for _, item := range s.items {
	//				select {
	//				case <-ctx.Done():
	//					errCh <- ctx.Err()
	//					return
	//				case dataCh <- item:
	//					// Item sent successfully
	//				}
	//			}
	//		}()
	//
	//		return dataCh, errCh
	//	}
	//
	// Important: Both channels must be properly closed when reading is finished
	// or when the context is canceled. The source should never leave channels open
	// indefinitely.
	Read(ctx context.Context) (out <-chan interface{}, errs <-chan error)
}

Source reads items that are to be batch processed.

type SourceError

type SourceError struct {
	Err error
}

SourceError is returned when a source fails. It wraps the original error from the source to maintain the error chain while providing context about the source of the error.

func (SourceError) Error

func (e SourceError) Error() string

Error implements the error interface, returning a formatted error message that includes the wrapped source error.

func (SourceError) Unwrap added in v0.2.0

func (e SourceError) Unwrap() error

Unwrap returns the underlying error for compatibility with errors.Is and errors.As.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL