processor

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2026 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package processor contains several implementations of the batch.Processor interface for common processing scenarios, including:

- Error: For simulating errors with configurable failure rates - Filter: For filtering items based on custom predicates - Nil: For testing timing behavior without modifying items - Transform: For transforming item data values - Channel: For writing item data to an output channel

Each processor implementation follows a consistent error handling pattern and respects context cancellation.

Basic usage of the Transform processor:

p := &Transform[int]{Func: func(v int) (int, error) {
    return v * 2, nil
}}

items := []*batch.Item[int]{{Data: 1}, {Data: 2}}
res, _ := p.Process(context.Background(), items)
fmt.Println(res[0].Data, res[1].Data)

Output:

2 4

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Channel added in v0.3.0

type Channel[T any] struct {
	// Output is the channel that receives each item's Data value.
	// If nil, the processor does nothing.
	Output chan<- T
}

Channel is a Processor that sends the Data field of each item to an output channel. Items with existing errors are ignored. The channel is not closed by the processor.

func (*Channel[T]) Process added in v0.3.0

func (p *Channel[T]) Process(ctx context.Context, items []*batch.Item[T]) ([]*batch.Item[T], error)

Process implements the Processor interface by forwarding item data to the Output channel until the context is canceled.

type Error

type Error[T any] struct {
	// Err is the error to apply to each item.
	// If nil, a default "processor error" will be used.
	Err error

	// FailFraction controls what fraction of items should have errors applied.
	// Value range is 0.0 to 1.0, where:
	// - 0.0 means no items will have errors (processor becomes a pass-through)
	// - 1.0 means all items will have errors (default)
	// - 0.5 means approximately half the items will have errors
	FailFraction float64
}

Error is a Processor that marks all incoming items with the given error. It's useful for testing error handling in batch processing pipelines and for simulating scenarios where items fail processing.

func (*Error[T]) Process

func (p *Error[T]) Process(_ context.Context, items []*batch.Item[T]) ([]*batch.Item[T], error)

Process implements the Processor interface by marking items with errors according to the configured FailFraction.

type Filter added in v0.2.0

type Filter[T any] struct {
	// Predicate is a function that returns true for items that should be kept
	// and false for items that should be filtered out.
	// If nil, no filtering occurs (all items pass through).
	Predicate FilterFunc[T]

	// InvertMatch inverts the predicate logic: if true, items matching the predicate
	// will be removed instead of kept.
	// Default is false (keep matching items).
	InvertMatch bool
}

Filter is a processor that filters items based on a predicate function. It can be used to remove items from the pipeline that don't meet certain criteria.

func (*Filter[T]) Process added in v0.2.0

func (p *Filter[T]) Process(_ context.Context, items []*batch.Item[T]) ([]*batch.Item[T], error)

Process implements the Processor interface by filtering items according to the predicate. Items that don't pass the filter are simply not included in the returned slice. This does not set any errors on items, it just excludes them from further processing.

type FilterFunc added in v0.2.0

type FilterFunc[T any] func(item *batch.Item[T]) bool

FilterFunc is a function that decides whether an item should be included in the output. Return true to keep the item, false to filter it out.

type Nil

type Nil[T any] struct {
	// Duration specifies how long the processor should sleep before returning.
	// If zero or negative, the processor will return immediately.
	Duration time.Duration

	// MarkCancelled controls whether items should be marked with ctx.Err()
	// when the context is canceled during processing.
	// If false, items are returned unchanged on cancellation.
	MarkCancelled bool
}

Nil is a Processor that sleeps for the configured duration and does nothing else. It's useful for testing timing behavior and simulating time-consuming operations without actually modifying items.

func (*Nil[T]) Process

func (p *Nil[T]) Process(ctx context.Context, items []*batch.Item[T]) ([]*batch.Item[T], error)

Process implements the Processor interface by waiting for the specified duration and returning the items unchanged, unless canceled.

type Transform added in v0.2.0

type Transform[T any] struct {
	// Func is the transformation function to apply to each item's Data field.
	// If nil, items pass through unchanged.
	Func TransformFunc[T]

	// StopOnError determines whether to stop processing items after a transformation error.
	// If true, the processor will return an error and stop after the first transformation failure.
	// If false, items with transformation errors will have their Error field set but processing continues.
	// Default is false (continue processing).
	StopOnError bool
}

Transform is a processor that applies a transformation function to each item's Data field. It can be used to convert, modify, or restructure data during batch processing.

Example
package main

import (
	"context"
	"fmt"

	"github.com/MasterOfBinary/gobatch/batch"
	"github.com/MasterOfBinary/gobatch/processor"
)

func main() {
	p := &processor.Transform[int]{Func: func(v int) (int, error) {
		return v * 2, nil
	}}

	items := []*batch.Item[int]{{Data: 1}, {Data: 2}}
	res, _ := p.Process(context.Background(), items)
	fmt.Println(res[0].Data, res[1].Data)
}
Output:
2 4

func (*Transform[T]) Process added in v0.2.0

func (p *Transform[T]) Process(_ context.Context, items []*batch.Item[T]) ([]*batch.Item[T], error)

Process implements the Processor interface by applying the transformation function to each item's Data field.

type TransformFunc added in v0.2.0

type TransformFunc[T any] func(data T) (T, error)

TransformFunc is a function that transforms an item's Data field. It takes the current Data value and returns the new Data value.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL