batch

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 18, 2025 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package batch provides generic batch processing capabilities for pocket workflows.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Filter

func Filter[T any](
	extract func(context.Context, pocket.Store) ([]T, error),
	predicate func(context.Context, T) (bool, error),
	opts ...Option,
) pocket.Processor

Filter creates a batch processor that filters items.

func ForEach

func ForEach[T any](
	extract func(context.Context, pocket.Store) ([]T, error),
	process func(context.Context, T) error,
	opts ...Option,
) pocket.Processor

ForEach creates a batch processor that doesn't aggregate results.

func MapReduce

func MapReduce[T, R any](
	extract func(context.Context, pocket.Store) ([]T, error),
	mapper func(context.Context, T) (R, error),
	reducer func(context.Context, []R) (any, error),
	opts ...Option,
) pocket.Processor

MapReduce creates a map-reduce batch processor.

Types

type Option

type Option func(*options)

Option configures a batch processor.

func WithConcurrency

func WithConcurrency(n int) Option

WithConcurrency sets the maximum concurrent workers.

func WithOrdered

func WithOrdered() Option

WithOrdered ensures results maintain input order.

type Processor

type Processor[T, R any] struct {
	// Extract retrieves items to process.
	Extract func(ctx context.Context, store pocket.Store) ([]T, error)

	// Transform processes a single item.
	Transform func(ctx context.Context, item T) (R, error)

	// Reduce combines results into a final output.
	Reduce func(ctx context.Context, results []R) (any, error)
	// contains filtered or unexported fields
}

Processor processes a batch of items of type T.

func NewProcessor

func NewProcessor[T, R any](
	extract func(context.Context, pocket.Store) ([]T, error),
	transform func(context.Context, T) (R, error),
	reduce func(context.Context, []R) (any, error),
	opts ...Option,
) *Processor[T, R]

NewProcessor creates a new batch processor.

func (*Processor[T, R]) Process

func (p *Processor[T, R]) Process(ctx context.Context, input any) (any, error)

Process implements pocket.Processor interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL