batch

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2025 License: MIT Imports: 5 Imported by: 6

Documentation

Overview

Package batch provides a flexible batch processing pipeline for handling data.

Basic usage:

cfg := NewConstantConfig(&ConfigValues{MinItems: 1})
b := New(cfg)
src := &source.Nil{}
proc := &processor.Nil{}
IgnoreErrors(b.Go(ctx, src, proc))
<-b.Done()

The configuration is reloaded before each batch is collected. This allows dynamic Config implementations to update batch behavior during processing.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/MasterOfBinary/gobatch/batch"
	"github.com/MasterOfBinary/gobatch/processor"
	"github.com/MasterOfBinary/gobatch/source"
)

func main() {
	// Create a batch processor with simple config
	b := batch.New(batch.NewConstantConfig(&batch.ConfigValues{
		MinItems: 2,
		MaxItems: 5,
		MinTime:  10 * time.Millisecond,
		MaxTime:  100 * time.Millisecond,
	}))

	// Create an input channel
	ch := make(chan interface{})

	// Wrap it with source.Channel
	src := &source.Channel{Input: ch}

	// First processor: double the value
	doubleProc := &processor.Transform{
		Func: func(data interface{}) (interface{}, error) {
			if v, ok := data.(int); ok {
				return v * 2, nil
			}
			return data, nil
		},
	}

	// Second processor: print the result
	printProc := &processor.Transform{
		Func: func(data interface{}) (interface{}, error) {
			fmt.Println(data)
			return data, nil
		},
	}

	ctx := context.Background()

	// Start processing with both processors chained
	errs := b.Go(ctx, src, doubleProc, printProc)

	// Ignore errors
	batch.IgnoreErrors(errs)

	// Send some items
	go func() {
		for i := 1; i <= 5; i++ {
			ch <- i
		}
		close(ch)
	}()

	// Wait for completion
	<-b.Done()

}
Output:

2
4
6
8
10
Example (CustomConfig)
package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/MasterOfBinary/gobatch/batch"
)

type sliceSource struct {
	items []interface{}
	delay time.Duration
}

func (s *sliceSource) Read(ctx context.Context) (<-chan interface{}, <-chan error) {
	out := make(chan interface{}, 100)
	errs := make(chan error)

	go func() {
		defer close(out)
		defer close(errs)

		for _, item := range s.items {
			if s.delay > 0 {
				time.Sleep(s.delay)
			}
			select {
			case <-ctx.Done():
				return
			case out <- item:
			}
		}
	}()

	return out, errs
}

type loadBasedConfig struct {
	mu          sync.RWMutex
	currentLoad int
	baseMin     uint64
	baseMax     uint64
	baseMinTime time.Duration
	baseMaxTime time.Duration
}

func newLoadBasedConfig(baseMin, baseMax uint64, minTime, maxTime time.Duration) *loadBasedConfig {
	return &loadBasedConfig{
		currentLoad: 50,
		baseMin:     baseMin,
		baseMax:     baseMax,
		baseMinTime: minTime,
		baseMaxTime: maxTime,
	}
}

func (c *loadBasedConfig) Get() batch.ConfigValues {
	c.mu.RLock()
	defer c.mu.RUnlock()

	loadFactor := float64(100-c.currentLoad) / 100.0
	minItems := uint64(float64(c.baseMin) * loadFactor)
	if minItems < 1 {
		minItems = 1
	}
	maxItems := uint64(float64(c.baseMax) * loadFactor)
	if maxItems < minItems {
		maxItems = minItems
	}

	timeFactor := float64(c.currentLoad)/100.0 + 0.5
	minTime := time.Duration(float64(c.baseMinTime) * timeFactor)
	maxTime := time.Duration(float64(c.baseMaxTime) * timeFactor)

	return batch.ConfigValues{
		MinItems: minItems,
		MaxItems: maxItems,
		MinTime:  minTime,
		MaxTime:  maxTime,
	}
}

func (c *loadBasedConfig) UpdateLoad(load int) {
	c.mu.Lock()
	defer c.mu.Unlock()

	if load < 0 {
		load = 0
	} else if load > 100 {
		load = 100
	}

	c.currentLoad = load
	fmt.Printf("System load set to %d%%\n", load)
}

type batchInfoProcessor struct{}

func (p *batchInfoProcessor) Process(ctx context.Context, items []*batch.Item) ([]*batch.Item, error) {
	fmt.Printf("Batch of %d items\n", len(items))
	return items, nil
}

func main() {
	cfg := newLoadBasedConfig(
		10,                   // base min items
		50,                   // base max items
		200*time.Millisecond, // base min time
		1*time.Second,        // base max time
	)

	fmt.Println("=== Custom Config Example ===")
	fmt.Println("Initial config:")
	printConfig(cfg.Get())

	b := batch.New(cfg)
	p := &batchInfoProcessor{}

	nums := make([]interface{}, 200)
	for i := range nums {
		nums[i] = i
	}
	src := &sliceSource{items: nums}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	fmt.Println("Starting batch processing...")
	errs := b.Go(ctx, src, p)

	batch.IgnoreErrors(errs)
	<-b.Done()
	fmt.Println("Processing complete")

}

func printConfig(c batch.ConfigValues) {
	fmt.Printf("MinItems=%d, MaxItems=%d, MinTime=%v, MaxTime=%v\n",
		c.MinItems, c.MaxItems, c.MinTime, c.MaxTime)
}
Output:

=== Custom Config Example ===
Initial config:
MinItems=5, MaxItems=25, MinTime=200ms, MaxTime=1s
Starting batch processing...
Batch of 25 items
Batch of 25 items
Batch of 25 items
Batch of 25 items
Batch of 25 items
Batch of 25 items
Batch of 25 items
Batch of 25 items
Processing complete
Example (DynamicConfig)
package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/MasterOfBinary/gobatch/batch"
	"github.com/MasterOfBinary/gobatch/source"
)

type batchSizeMonitor struct {
	mu      sync.Mutex
	name    string
	batches int
	items   int
}

func (p *batchSizeMonitor) Process(ctx context.Context, items []*batch.Item) ([]*batch.Item, error) {
	p.mu.Lock()
	p.batches++
	p.items += len(items)
	batchSize := len(items)
	name := p.name
	p.mu.Unlock()

	fmt.Printf("[%s] Batch size: %d\n", name, batchSize)
	return items, nil
}

func main() {
	cfg := batch.NewDynamicConfig(&batch.ConfigValues{
		MinItems: 5,
		MaxItems: 10,
	})

	b := batch.New(cfg)
	monitor := &batchSizeMonitor{name: "Dynamic"}

	ch := make(chan interface{})
	src := &source.Channel{Input: ch}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	fmt.Println("=== Dynamic Config Example ===")

	errs := b.Go(ctx, src, monitor)

	// Simulate sending data and changing config dynamically
	go func() {
		for i := 0; i < 100; i++ {
			ch <- i

			switch i {
			case 20:
				fmt.Println("*** Updating batch size: min=10, max=20 ***")
				cfg.UpdateBatchSize(10, 20)
			case 50:
				fmt.Println("*** Updating batch size: min=20, max=30 ***")
				cfg.UpdateBatchSize(20, 30)
			}

			time.Sleep(5 * time.Millisecond)
		}
		close(ch)
	}()

	batch.IgnoreErrors(errs)
	<-b.Done()

	fmt.Println("Processing complete")

}
Output:

=== Dynamic Config Example ===
[Dynamic] Batch size: 5
[Dynamic] Batch size: 5
[Dynamic] Batch size: 5
[Dynamic] Batch size: 5
*** Updating batch size: min=10, max=20 ***
[Dynamic] Batch size: 5
[Dynamic] Batch size: 10
[Dynamic] Batch size: 10
*** Updating batch size: min=20, max=30 ***
[Dynamic] Batch size: 10
[Dynamic] Batch size: 20
[Dynamic] Batch size: 20
[Dynamic] Batch size: 5
Processing complete
Example (ErrorHandling)
package main

import (
	"context"
	"errors"
	"fmt"
	"strconv"
	"sync/atomic"
	"time"

	"github.com/MasterOfBinary/gobatch/batch"
)

type errorSource struct {
	items     []int
	errorRate int
}

func (s *errorSource) Read(ctx context.Context) (<-chan interface{}, <-chan error) {
	out := make(chan interface{})
	errs := make(chan error)

	go func() {
		defer close(out)
		defer close(errs)

		for i, val := range s.items {
			select {
			case <-ctx.Done():
				errs <- fmt.Errorf("source interrupted: %w", ctx.Err())
				return
			default:
				if s.errorRate > 0 && i > 0 && i%s.errorRate == 0 {
					errs <- fmt.Errorf("source error at item %d", i)
					time.Sleep(10 * time.Millisecond)
					continue
				}
				out <- val
				time.Sleep(5 * time.Millisecond)
			}
		}
	}()

	return out, errs
}

type validationProcessor struct {
	maxValue int
}

func (p *validationProcessor) Process(ctx context.Context, items []*batch.Item) ([]*batch.Item, error) {
	for _, item := range items {
		if item.Error != nil {
			continue
		}
		select {
		case <-ctx.Done():
			return items, ctx.Err()
		default:
		}

		if num, ok := item.Data.(int); ok {
			if num > p.maxValue {
				item.Error = fmt.Errorf("value %d exceeds maximum %d", num, p.maxValue)
			}
		} else {
			item.Error = errors.New("expected int")
		}
	}
	return items, nil
}

type errorProneProcessor struct {
	failOnBatch int
	batchCount  int32
}

func (p *errorProneProcessor) Process(ctx context.Context, items []*batch.Item) ([]*batch.Item, error) {
	batchNum := atomic.AddInt32(&p.batchCount, 1)

	if int(batchNum) == p.failOnBatch {
		return items, fmt.Errorf("processor failed on batch %d", batchNum)
	}

	for _, item := range items {
		if item.Error != nil {
			continue
		}
		select {
		case <-ctx.Done():
			return items, ctx.Err()
		default:
		}

		if num, ok := item.Data.(int); ok {
			item.Data = "Item: " + strconv.Itoa(num)
		}
	}

	return items, nil
}

type errorLogger struct{}

func (p *errorLogger) Process(ctx context.Context, items []*batch.Item) ([]*batch.Item, error) {
	fmt.Println("Batch:")
	errorCount := 0

	for _, item := range items {
		if item.Error != nil {
			fmt.Printf("- Item %d error: %v\n", item.ID, item.Error)
			errorCount++
		} else {
			fmt.Printf("- Item %d: %v\n", item.ID, item.Data)
		}
	}

	if errorCount > 0 {
		fmt.Printf("Batch had %d error(s)\n", errorCount)
	}

	return items, nil
}

func main() {
	src := &errorSource{
		items:     []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 15, 20, 25},
		errorRate: 5,
	}

	validator := &validationProcessor{maxValue: 10}
	transformer := &errorProneProcessor{failOnBatch: 2}
	logger := &errorLogger{}

	config := batch.NewConstantConfig(&batch.ConfigValues{
		MinItems: 3,
		MaxItems: 5,
	})
	b := batch.New(config)

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	fmt.Println("=== Error Handling Example ===")

	errs := batch.RunBatchAndWait(ctx, b, src, validator, transformer, logger)

	fmt.Println("\nSummary:")
	if len(errs) == 0 {
		fmt.Println("No errors")
	} else {
		for i, err := range errs {
			var srcErr *batch.SourceError
			var procErr *batch.ProcessorError

			switch {
			case errors.As(err, &srcErr):
				fmt.Printf("%d. Source error: %v\n", i+1, srcErr.Unwrap())
			case errors.As(err, &procErr):
				fmt.Printf("%d. Processor error: %v\n", i+1, procErr.Unwrap())
			default:
				fmt.Printf("%d. Other error: %v\n", i+1, err)
			}
		}
	}

}
Output:

=== Error Handling Example ===
Batch:
- Item 0: Item: 1
- Item 1: Item: 2
- Item 2: Item: 3
Batch:
- Item 3: 4
- Item 4: 5
- Item 5: 7
Batch:
- Item 6: Item: 8
- Item 7: Item: 9
- Item 8: Item: 10
Batch:
- Item 9 error: value 12 exceeds maximum 10
- Item 10 error: value 15 exceeds maximum 10
- Item 11 error: value 20 exceeds maximum 10
Batch had 3 error(s)
Batch:
- Item 12 error: value 25 exceeds maximum 10
Batch had 1 error(s)

Summary:
1. Source error: source error at item 5
2. Processor error: processor failed on batch 2
3. Source error: source error at item 10
4. Processor error: value 12 exceeds maximum 10
5. Processor error: value 15 exceeds maximum 10
6. Processor error: value 20 exceeds maximum 10
7. Processor error: value 25 exceeds maximum 10
Example (ProcessorChain)
package main

import (
	"context"
	"errors"
	"fmt"
	"strings"
	"time"

	"github.com/MasterOfBinary/gobatch/batch"
)

type textSource struct {
	texts []string
}

func (s *textSource) Read(ctx context.Context) (<-chan interface{}, <-chan error) {
	out := make(chan interface{})
	errs := make(chan error)

	go func() {
		defer close(out)
		defer close(errs)
		for _, text := range s.texts {
			select {
			case <-ctx.Done():
				return
			case out <- text:
				time.Sleep(10 * time.Millisecond)
			}
		}
	}()

	return out, errs
}

type validateProcessor struct {
	minLength int
	maxLength int
}

func (p *validateProcessor) Process(ctx context.Context, items []*batch.Item) ([]*batch.Item, error) {
	fmt.Println("Validation:")
	for i, item := range items {
		if item.Error != nil {
			continue
		}
		if text, ok := item.Data.(string); ok {
			switch {
			case len(text) < p.minLength:
				item.Error = fmt.Errorf("too short (min %d)", p.minLength)
			case len(text) > p.maxLength:
				item.Error = fmt.Errorf("too long (max %d)", p.maxLength)
			}
		} else {
			item.Error = errors.New("not a string")
		}
		status := "ok"
		if item.Error != nil {
			status = "error: " + item.Error.Error()
		}
		fmt.Printf("  Item %d: %v (%s)\n", i, item.Data, status)
	}
	return items, nil
}

type formatProcessor struct{}

func (p *formatProcessor) Process(ctx context.Context, items []*batch.Item) ([]*batch.Item, error) {
	fmt.Println("Format:")
	for i, item := range items {
		if item.Error != nil {
			continue
		}
		if text, ok := item.Data.(string); ok {
			item.Data = "[" + strings.ToUpper(text) + "]"
			fmt.Printf("  Item %d: formatted to %v\n", i, item.Data)
		}
	}
	return items, nil
}

type enrichProcessor struct {
	metadata map[string]string
}

func (p *enrichProcessor) Process(ctx context.Context, items []*batch.Item) ([]*batch.Item, error) {
	fmt.Println("Enrich:")
	for i, item := range items {
		if item.Error != nil {
			continue
		}
		if text, ok := item.Data.(string); ok {
			key := strings.ToLower(strings.Trim(text, "[]"))
			if meta, exists := p.metadata[key]; exists {
				item.Data = struct {
					Text     string
					Metadata string
				}{
					Text:     text,
					Metadata: meta,
				}
				fmt.Printf("  Item %d: enriched with %q\n", i, meta)
			} else {
				fmt.Printf("  Item %d: no metadata\n", i)
			}
		}
	}
	return items, nil
}

type displayProcessor struct{}

func (p *displayProcessor) Process(ctx context.Context, items []*batch.Item) ([]*batch.Item, error) {
	fmt.Println("Results:")
	for i, item := range items {
		if item.Error != nil {
			fmt.Printf("  Item %d: ERROR: %v\n", i, item.Error)
		} else {
			fmt.Printf("  Item %d: OK: %v\n", i, item.Data)
		}
	}
	return items, nil
}

func main() {
	src := &textSource{
		texts: []string{
			"hello",
			"a", // too short
			"world",
			"processing",
			"thisisaverylongstringthatwillexceedthemaximumlength",
			"batch",
		},
	}

	meta := map[string]string{
		"hello":      "English greeting",
		"world":      "Planet Earth",
		"processing": "Act of handling data",
		"batch":      "Group of items processed together",
	}

	config := batch.NewConstantConfig(&batch.ConfigValues{
		MinItems: 4,
		MaxItems: 3,
	})
	b := batch.New(config)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	fmt.Println("=== Processor Chain Example ===")

	errs := batch.RunBatchAndWait(ctx, b, src,
		&validateProcessor{minLength: 3, maxLength: 15},
		&formatProcessor{},
		&enrichProcessor{metadata: meta},
		&displayProcessor{},
	)

	if len(errs) > 0 {
		fmt.Printf("Total errors: %d\n", len(errs))
	}

}
Output:

=== Processor Chain Example ===
Validation:
  Item 0: hello (ok)
  Item 1: a (error: too short (min 3))
  Item 2: world (ok)
Format:
  Item 0: formatted to [HELLO]
  Item 2: formatted to [WORLD]
Enrich:
  Item 0: enriched with "English greeting"
  Item 2: enriched with "Planet Earth"
Results:
  Item 0: OK: {[HELLO] English greeting}
  Item 1: ERROR: too short (min 3)
  Item 2: OK: {[WORLD] Planet Earth}
Validation:
  Item 0: processing (ok)
  Item 1: thisisaverylongstringthatwillexceedthemaximumlength (error: too long (max 15))
  Item 2: batch (ok)
Format:
  Item 0: formatted to [PROCESSING]
  Item 2: formatted to [BATCH]
Enrich:
  Item 0: enriched with "Act of handling data"
  Item 2: enriched with "Group of items processed together"
Results:
  Item 0: OK: {[PROCESSING] Act of handling data}
  Item 1: ERROR: too long (max 15)
  Item 2: OK: {[BATCH] Group of items processed together}
Total errors: 2
Example (SimpleProcessor)
package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/MasterOfBinary/gobatch/batch"
	"github.com/MasterOfBinary/gobatch/source"
)

type simpleProcessor struct{}

func (p *simpleProcessor) Process(ctx context.Context, items []*batch.Item) ([]*batch.Item, error) {
	var values []interface{}

	for _, item := range items {
		if val, ok := item.Data.(int); ok && val == 5 {
			item.Error = errors.New("value 5 not allowed")
			continue
		}
		values = append(values, item.Data)
	}

	fmt.Println("Batch:", values)
	return items, nil
}

func main() {
	ch := make(chan interface{})

	go func() {
		for _, v := range []interface{}{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} {
			ch <- v
			time.Sleep(10 * time.Millisecond)
		}
		close(ch)
	}()

	src := &source.Channel{Input: ch}

	config := batch.NewConstantConfig(&batch.ConfigValues{
		MinItems: 3,
		MaxItems: 5,
	})

	p := &simpleProcessor{}
	b := batch.New(config)

	ctx := context.Background()
	fmt.Println("Starting...")

	errs := batch.RunBatchAndWait(ctx, b, src, p)

	if len(errs) > 0 {
		fmt.Printf("Errors: %d\n", len(errs))
		fmt.Println("Last error:", errs[len(errs)-1])
	}

}
Output:

Starting...
Batch: [1 2 3]
Batch: [4 6]
Batch: [7 8 9]
Batch: [10]
Errors: 1
Last error: processor error: value 5 not allowed

Index

Examples

Constants

View Source
const (
	// DefaultItemBufferSize is the default buffer size for the items channel.
	// This determines how many items can be queued between the reader and processor.
	DefaultItemBufferSize = 100

	// DefaultIDBufferSize is the default buffer size for the ID generator channel.
	// This should match or exceed the item buffer size to avoid blocking.
	DefaultIDBufferSize = 100

	// DefaultErrorBufferSize is the default buffer size for the error channel.
	// This should be large enough to handle bursts of errors without blocking.
	DefaultErrorBufferSize = 100
)

Default buffer sizes for channels used in batch processing. These values can be tuned based on performance requirements.

Variables

This section is empty.

Functions

func CollectErrors added in v0.2.1

func CollectErrors(errs <-chan error) []error

CollectErrors collects all errors from the error channel into a slice. This is useful when you need to process all errors after batch processing completes.

Example usage:

errs := batch.CollectErrors(myBatch.Go(ctx, source, processor))
<-myBatch.Done()
for _, err := range errs {
	log.Printf("Error: %v", err)
}

func ExecuteBatches added in v0.2.1

func ExecuteBatches(ctx context.Context, configs ...*BatchConfig) []error

ExecuteBatches runs multiple batches concurrently and waits for all to complete. It returns all errors from all batches as a slice. This is useful when you need to process multiple data sources in parallel.

Example usage:

errs := batch.ExecuteBatches(ctx,
	&batch.BatchConfig{B: batch1, S: source1, P: []Processor{proc1}},
	&batch.BatchConfig{B: batch2, S: source2, P: []Processor{proc2}},
)

func IgnoreErrors

func IgnoreErrors(errs <-chan error)

IgnoreErrors starts a goroutine that reads errors from errs but ignores them. It can be used with Batch.Go if errors aren't needed. Ignoring the returned channel without reading from it can block once the buffer fills. For example:

// NOTE: bad - this can cause a deadlock!
_ = batch.Go(ctx, p, s)

Instead, IgnoreErrors can be used to safely discard all errors:

batch.IgnoreErrors(myBatch.Go(ctx, p, s))

func RunBatchAndWait added in v0.2.1

func RunBatchAndWait(ctx context.Context, b *Batch, s Source, procs ...Processor) []error

RunBatchAndWait is a convenience function that runs a batch with the given source and processors, waits for it to complete, and returns all errors encountered. This is useful for simple batch processing where you don't need to handle errors asynchronously.

Example usage:

errs := batch.RunBatchAndWait(ctx, myBatch, source, processor1, processor2)
if len(errs) > 0 {
	// Handle errors
}

Types

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch provides batch processing given a Source and one or more Processors. Data is read from the Source and processed through each Processor in sequence. Any errors are wrapped in either a SourceError or a ProcessorError, so the caller can determine where the errors came from.

To create a new Batch, call New. Creating one using &Batch{} will also work.

// The following are equivalent:
defaultBatch1 := &batch.Batch{}
defaultBatch2 := batch.New(nil)
defaultBatch3 := batch.New(batch.NewConstantConfig(&batch.ConfigValues{}))

If Config is nil, a default configuration is used, where items are processed immediately as they are read.

Batch runs asynchronously after Go is called. When processing is complete, either the error channel returned from Go is closed, or the channel returned from Done is closed.

A simple way to wait for completion while handling errors:

errs := b.Go(ctx, s, p)
for err := range errs {
  log.Print(err.Error())
}
// Now batch processing is done

If errors don't need to be handled, IgnoreErrors can be used:

batch.IgnoreErrors(b.Go(ctx, s, p))
<-b.Done()
// Now batch processing is done

Errors returned on the error channel may be wrapped. Source errors will be of type SourceError, processor errors will be of type ProcessorError, and Batch errors (internal errors) will be plain.

func New

func New(config Config) *Batch

New creates a new Batch using the provided config. If config is nil, a default configuration is used.

To avoid race conditions, the config cannot be changed after the Batch is created. Instead, implement the Config interface to support changing values.

func (*Batch) Done

func (b *Batch) Done() <-chan struct{}

Done returns a channel that is closed when batch processing is complete.

The Done channel can be used to wait for processing to finish, either by blocking or using a select statement with a timeout or context cancellation.

Example:

b := batch.New(config)
batch.IgnoreErrors(b.Go(ctx, source, processor))

<-b.Done()
fmt.Println("Processing complete")

Or using a select statement:

select {
case <-b.Done():
	fmt.Println("Processing complete")
case <-ctx.Done():
	fmt.Println("Context canceled")
case <-time.After(10 * time.Second):
	fmt.Println("Timed out waiting for processing to finish")
}

func (*Batch) Go

func (b *Batch) Go(ctx context.Context, s Source, procs ...Processor) <-chan error

Go starts batch processing asynchronously and returns an error channel.

The pipeline consists of the following steps:

  • Items are read from the Source.
  • Items are grouped into batches based on the Config.
  • Each batch is processed through the Processors in sequence.

Go must only be called once at a time. Calling Go again while a batch is already running will cause a panic.

Context cancellation:

  • Go does not immediately stop processing when the context is canceled.
  • Any items already read from the Source are still processed to avoid data loss.

Example:

b := batch.New(config)
errs := b.Go(ctx, source, processor)

go func() {
	for err := range errs {
		log.Println("error:", err)
	}
}()

<-b.Done()

Important:

  • The Source must close its channels when reading is complete.
  • Processors must check for context cancellation and stop early if needed.
  • All items that have already been read will be processed even if the context is canceled.

func (*Batch) WithBufferConfig added in v0.4.0

func (b *Batch) WithBufferConfig(config BufferConfig) *Batch

WithBufferConfig sets custom buffer sizes for the Batch. This must be called before Go() is called.

Example:

b := batch.New(config).WithBufferConfig(batch.BufferConfig{
	ItemBufferSize:  1000,
	IDBufferSize:    1000,
	ErrorBufferSize: 500,
})

Panics if called after Go() has started to prevent data races and confusion.

type BatchConfig added in v0.2.1

type BatchConfig struct {
	B *Batch      // The Batch instance to use
	S Source      // The Source to read items from
	P []Processor // The processors to apply to the items
}

BatchConfig holds the configuration for a single batch execution. It combines a Batch instance, a Source to read from, and a list of Processors to apply to the data from the source. This is used primarily with the ExecuteBatches function to run multiple batch operations concurrently.

type BufferConfig added in v0.4.0

type BufferConfig struct {
	// ItemBufferSize is the buffer size for the items channel.
	// Default: DefaultItemBufferSize
	ItemBufferSize int

	// IDBufferSize is the buffer size for the ID generator channel.
	// Default: DefaultIDBufferSize
	IDBufferSize int

	// ErrorBufferSize is the buffer size for the error channel.
	// Default: DefaultErrorBufferSize
	ErrorBufferSize int
}

BufferConfig configures the internal buffer sizes used by Batch. If not specified, default values are used.

type Config

type Config interface {
	// Get returns the values for configuration.
	//
	// If MinItems > MaxItems or MinTime > MaxTime, the min value will be
	// set to the maximum value.
	//
	// If the config values may be modified during batch processing, Get
	// must properly handle concurrency issues.
	Get() ConfigValues
}

Config retrieves the config values used by Batch. If these values are constant, NewConstantConfig can be used to create an implementation of the interface.

The Config interface allows for dynamic configuration of the batching behavior, which can be adjusted during runtime. This is useful for tuning the system under different load scenarios or adapting to changing performance requirements.

type ConfigValues

type ConfigValues struct {
	// MinTime specifies that a minimum amount of time that should pass
	// before processing items. The exception to this is if a max number
	// of items was specified and that number is reached before MinTime;
	// in that case those items will be processed right away.
	//
	// This parameter is useful to prevent processing very small batches
	// too frequently when items arrive at a slow but steady rate.
	MinTime time.Duration `json:"minTime"`

	// MinItems specifies that a minimum number of items should be
	// processed at a time. Items will not be processed until MinItems
	// items are ready for processing. The exceptions to that are if MaxTime
	// is specified and that time is reached before the minimum number of
	// items is available, or if all items have been read and are ready
	// to process.
	//
	// This parameter helps optimize processing by ensuring batches are
	// large enough to amortize the overhead of processing across multiple items.
	MinItems uint64 `json:"minItems"`

	// MaxTime specifies that a maximum amount of time should pass before
	// processing. Once that time has been reached, items will be processed
	// whether or not MinItems items are available.
	//
	// This parameter ensures that items don't wait in the queue for too long,
	// which is important for latency-sensitive applications.
	MaxTime time.Duration `json:"maxTime"`

	// MaxItems specifies that a maximum number of items should be available
	// before processing. Once that number of items is available, they will
	// be processed whether or not MinTime has been reached.
	//
	// This parameter prevents the system from accumulating too many items
	// in a single batch, which could lead to memory pressure or processing
	// spikes.
	MaxItems uint64 `json:"maxItems"`
}

ConfigValues is a struct that contains the Batch config values. These values control the timing and sizing behavior of batches in the pipeline. The batch system uses these parameters to determine when to process a batch based on time elapsed and number of items collected.

type ConstantConfig

type ConstantConfig struct {
	// contains filtered or unexported fields
}

ConstantConfig is a Config with constant values. Create one with NewConstantConfig.

This implementation is safe to use concurrently since the values never change after initialization.

func NewConstantConfig

func NewConstantConfig(values *ConfigValues) *ConstantConfig

NewConstantConfig returns a Config with constant values. If values is nil, the default values are used as described in Batch.

This is a convenience function for creating a configuration that doesn't change during the lifetime of the batch processing. It's the simplest way to provide configuration to the Batch system.

func (*ConstantConfig) Get

func (b *ConstantConfig) Get() ConfigValues

Get implements the Config interface. Returns the constant configuration values stored in this ConstantConfig.

type DynamicConfig added in v0.3.0

type DynamicConfig struct {
	// contains filtered or unexported fields
}

DynamicConfig implements the Config interface with values that can be modified at runtime. It provides thread-safe access to configuration values and methods to update batch size and timing parameters.

Unlike ConstantConfig, DynamicConfig allows changing batch parameters while the system is running, enabling dynamic adaptation to varying conditions.

func NewDynamicConfig added in v0.3.0

func NewDynamicConfig(values *ConfigValues) *DynamicConfig

NewDynamicConfig creates a configuration that can be adjusted at runtime. It is thread-safe and suitable for use in environments where batch processing parameters need to change dynamically in response to system conditions.

If values is nil, the default values are used as described in Batch.

This is useful for: - Systems that need to adapt to changing workloads - Services that implement backpressure mechanisms - Applications that tune batch parameters based on performance metrics

func (*DynamicConfig) Get added in v0.3.0

func (c *DynamicConfig) Get() ConfigValues

Get implements the Config interface by returning the current configuration values. It uses a read lock to ensure thread safety when accessing the values.

func (*DynamicConfig) Update added in v0.3.0

func (c *DynamicConfig) Update(config ConfigValues)

Update replaces all configuration values at once. This method is thread-safe and can be called while batch processing is active.

func (*DynamicConfig) UpdateBatchSize added in v0.3.0

func (c *DynamicConfig) UpdateBatchSize(minItems, maxItems uint64)

UpdateBatchSize updates the batch size parameters. This method is thread-safe and can be called while batch processing is active.

func (*DynamicConfig) UpdateTiming added in v0.3.0

func (c *DynamicConfig) UpdateTiming(minTime, maxTime time.Duration)

UpdateTiming updates the timing parameters. This method is thread-safe and can be called while batch processing is active.

type Item

type Item struct {
	// ID is a unique identifier for the item. It must not be modified by processors.
	ID uint64

	// Data holds the payload being processed. It is safe for processors to modify.
	Data interface{}

	// Error is set by processors to indicate a failure specific to this item.
	Error error
}

Item represents a single data item flowing through the batch pipeline.

type Processor

type Processor interface {
	// Process applies operations to a batch of items.
	// It may modify item data or set item.Error on individual items.
	//
	// Process should respect context cancellation.
	// It returns the modified slice of items and a processor-wide error, if any.
	//
	// Example:
	//
	//	func (p *MyProcessor) Process(ctx context.Context, items []*batch.Item) ([]*batch.Item, error) {
	//		for _, item := range items {
	//			if item.Error != nil {
	//				continue
	//			}
	//
	//			select {
	//			case <-ctx.Done():
	//				return items, ctx.Err()
	//			default:
	//			}
	//
	//			result, err := p.processItem(item.Data)
	//			if err != nil {
	//				item.Error = err
	//				continue
	//			}
	//
	//			item.Data = result
	//		}
	//
	//		return items, nil
	//	}
	Process(ctx context.Context, items []*Item) ([]*Item, error)
}

Processor processes items in batches. Implementations apply operations to each batch and may modify items or set per-item errors. Processors can be chained together to form multi-stage pipelines.

type ProcessorError

type ProcessorError struct {
	Err error
}

ProcessorError is returned when a processor fails. It wraps the original error from the processor to maintain the error chain while providing context about the source of the error.

func (ProcessorError) Error

func (e ProcessorError) Error() string

Error implements the error interface, returning a formatted error message that includes the wrapped processor error.

func (ProcessorError) Unwrap added in v0.2.0

func (e ProcessorError) Unwrap() error

Unwrap returns the underlying error for compatibility with errors.Is and errors.As.

type Source

type Source interface {
	// Read reads items from a data source and returns two channels:
	// one for items, and one for errors.
	//
	// Read must create both channels (never return nil channels), and must close them
	// when reading is finished or when context is canceled.
	//
	// Example:
	//
	//	func (s *MySource) Read(ctx context.Context) (<-chan interface{}, <-chan error) {
	//		out := make(chan interface{})
	//		errs := make(chan error)
	//
	//		go func() {
	//			defer close(out)
	//			defer close(errs)
	//
	//			for _, item := range s.items {
	//				select {
	//				case <-ctx.Done():
	//					errs <- ctx.Err()
	//					return
	//				case out <- item:
	//					// sent successfully
	//				}
	//			}
	//		}()
	//
	//		return out, errs
	//	}
	Read(ctx context.Context) (<-chan interface{}, <-chan error)
}

Source reads items that are to be batch processed.

type SourceError

type SourceError struct {
	Err error
}

SourceError is returned when a source fails. It wraps the original error from the source to maintain the error chain while providing context about the source of the error.

func (SourceError) Error

func (e SourceError) Error() string

Error implements the error interface, returning a formatted error message that includes the wrapped source error.

func (SourceError) Unwrap added in v0.2.0

func (e SourceError) Unwrap() error

Unwrap returns the underlying error for compatibility with errors.Is and errors.As.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL