Documentation
¶
Overview ¶
Package processor contains several implementations of the batch.Processor interface for common processing scenarios, including:
- Error: For simulating errors with configurable failure rates - Filter: For filtering items based on custom predicates - Nil: For testing timing behavior without modifying items - Transform: For transforming item data values - Channel: For writing item data to an output channel
Each processor implementation follows a consistent error handling pattern and respects context cancellation.
Basic usage of the Transform processor:
p := &Transform{Func: func(v interface{}) (interface{}, error) {
if n, ok := v.(int); ok {
return n * 2, nil
}
return v, nil
}}
items := []*batch.Item{{Data: 1}, {Data: 2}}
res, _ := p.Process(context.Background(), items)
fmt.Println(res[0].Data, res[1].Data)
Output:
2 4
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Channel ¶ added in v0.3.0
type Channel struct {
// Output is the channel that receives each item's Data value.
// If nil, the processor does nothing.
Output chan<- interface{}
}
Channel is a Processor that sends the Data field of each item to an output channel. Items with existing errors are ignored. The channel is not closed by the processor.
type Error ¶
type Error struct {
// Err is the error to apply to each item.
// If nil, a default "processor error" will be used.
Err error
// FailFraction controls what fraction of items should have errors applied.
// Value range is 0.0 to 1.0, where:
// - 0.0 means no items will have errors (processor becomes a pass-through)
// - 1.0 means all items will have errors (default)
// - 0.5 means approximately half the items will have errors
FailFraction float64
}
Error is a Processor that marks all incoming items with the given error. It's useful for testing error handling in batch processing pipelines and for simulating scenarios where items fail processing.
type Filter ¶ added in v0.2.0
type Filter struct {
// Predicate is a function that returns true for items that should be kept
// and false for items that should be filtered out.
// If nil, no filtering occurs (all items pass through).
Predicate FilterFunc
// InvertMatch inverts the predicate logic: if true, items matching the predicate
// will be removed instead of kept.
// Default is false (keep matching items).
InvertMatch bool
}
Filter is a processor that filters items based on a predicate function. It can be used to remove items from the pipeline that don't meet certain criteria.
type FilterFunc ¶ added in v0.2.0
FilterFunc is a function that decides whether an item should be included in the output. Return true to keep the item, false to filter it out.
type Nil ¶
type Nil struct {
// Duration specifies how long the processor should sleep before returning.
// If zero or negative, the processor will return immediately.
Duration time.Duration
// MarkCancelled controls whether items should be marked with ctx.Err()
// when the context is canceled during processing.
// If false, items are returned unchanged on cancellation.
MarkCancelled bool
}
Nil is a Processor that sleeps for the configured duration and does nothing else. It's useful for testing timing behavior and simulating time-consuming operations without actually modifying items.
type Transform ¶ added in v0.2.0
type Transform struct {
// Func is the transformation function to apply to each item's Data field.
// If nil, items pass through unchanged.
Func TransformFunc
// ContinueOnError determines whether to continue processing items after a transformation error.
// If true, items with transformation errors will have their Error field set but processing continues.
// If false, the processor will return an error and stop after the first transformation failure.
// Default is true (continue processing).
ContinueOnError bool
}
Transform is a processor that applies a transformation function to each item's Data field. It can be used to convert, modify, or restructure data during batch processing.
Example ¶
package main
import (
"context"
"fmt"
"github.com/MasterOfBinary/gobatch/batch"
"github.com/MasterOfBinary/gobatch/processor"
)
func main() {
p := &processor.Transform{Func: func(v interface{}) (interface{}, error) {
if n, ok := v.(int); ok {
return n * 2, nil
}
return v, nil
}}
items := []*batch.Item{{Data: 1}, {Data: 2}}
res, _ := p.Process(context.Background(), items)
fmt.Println(res[0].Data, res[1].Data)
}
Output: 2 4
type TransformFunc ¶ added in v0.2.0
type TransformFunc func(data interface{}) (interface{}, error)
TransformFunc is a function that transforms an item's Data field. It takes the current Data value and returns the new Data value.