batch

package module
v1.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 5, 2024 License: MIT Imports: 7 Imported by: 0

README

Go Batch

Batch processing utilities for go projects.

Usage

go get github.com/mawngo/go-batch
Example
package main

import (
	"github.com/mawngo/go-batch"
	"sync/atomic"
)

func main() {
	sum := int32(0)
	processor := batch.NewProcessor(batch.InitSlice[int], batch.AddToSlice[int]).
		Configure(batch.WithMaxConcurrency(5), batch.WithMaxItem(10)).
		Run(summing(&sum))
	for i := 0; i < 1_000_000; i++ {
		processor.Put(1)
	}
	processor.MustClose()
	if sum != 1_000_000 {
		panic("sum is not 1_000_000")
	}
}

func summing(p *int32) batch.ProcessBatchFn[[]int] {
	return func(ints []int, _ int64) error {
		for _, num := range ints {
			atomic.AddInt32(p, int32(num))
		}
		return nil
	}
}

More usage can be found in test and examples

Documentation

Index

Constants

View Source
const Disabled = -1
View Source
const Unlimited = -1

Variables

This section is empty.

Functions

func AddToSlice

func AddToSlice[T any](b []T, item T) []T

AddToSlice is MergeToBatchFn that add item to a slice.

func InitChan

func InitChan[T any](i int64) chan T

InitChan is InitBatchFn that allocate a channel. this should not be used with unbounded processor (maxItem < 0).

func InitMap

func InitMap[K comparable, V any](i int64) map[K]V

InitMap is InitBatchFn that allocate a map.

func InitSlice

func InitSlice[T any](i int64) []T

InitSlice is InitBatchFn that allocate a slice.

func InitType

func InitType[T any](_ int64) T

InitType is InitBatchFn that allocate a type T.

func LoggingErrorHandler

func LoggingErrorHandler[B any](_ B, count int64, err error) error

LoggingErrorHandler default error handler, always included in RecoverBatchFn chain unless disable.

func NewErrorWithRemaining

func NewErrorWithRemaining[B any](err error, remainBatch B, count int64) error

NewErrorWithRemaining create a *Error with remaining items.

Types

type Combine

type Combine[T any] func(T, T) T

type Error

type Error[B any] struct {
	// Cause the error cause. If not specified, then nil will be passed to the next error handler.
	Cause error
	// RemainingBatch the batch to pass to the next handler. The RemainingCount must be specified.
	RemainingBatch B
	// RemainingCount number of items to pass to the next handler.
	// If RemainingCount = 0 and Cause != nil then pass the original batch and count to the next handler.
	RemainingCount int64
}

Error is an error wrapper that supports passing remaining items to the RecoverBatchFn.

func (*Error[B]) Error

func (e *Error[B]) Error() string

func (*Error[B]) String

func (e *Error[B]) String() string

type Extractor

type Extractor[T any, V any] func(T) V

type InitBatchFn

type InitBatchFn[B any] func(int64) B

InitBatchFn function to create empty batch.

type MapRunner added in v1.2.0

type MapRunner[K comparable, T any] interface {
	Runner[T, map[K]T]
}

MapRunner shorthand for runner that merge item into maps.

type MergeToBatchFn

type MergeToBatchFn[B any, T any] func(B, T) B

MergeToBatchFn function to add an item to batch.

func AddSelfToMapUsing

func AddSelfToMapUsing[T any, K comparable](keyExtractor Extractor[T, K]) MergeToBatchFn[map[K]T, T]

AddSelfToMapUsing create MergeToBatchFn that add self as item to map using key Extractor.

func AddToMapUsing

func AddToMapUsing[T any, K comparable, V any](keyExtractor Extractor[T, K], valueExtractor Extractor[T, V]) MergeToBatchFn[map[K]V, T]

AddToMapUsing create MergeToBatchFn that add item to map using key and value Extractor.

func MergeSelfToMapUsing

func MergeSelfToMapUsing[T any, K comparable](keyExtractor Extractor[T, K], combiner Combine[T]) MergeToBatchFn[map[K]T, T]

MergeSelfToMapUsing create MergeToBatchFn that add self as item to map using key Extractor and apply Combine if key duplicated. The original value will be passed as 1st parameter to Combine function.

func MergeToMapUsing

func MergeToMapUsing[T any, K comparable, V any](keyExtractor Extractor[T, K], valueExtractor Extractor[T, V], combiner Combine[V]) MergeToBatchFn[map[K]V, T]

MergeToMapUsing create MergeToBatchFn that add item to map using key and value Extractor and apply Combine if key duplicated. The original value will be passed as 1st parameter to Combine function.

type Option

type Option func(*ProcessorConfig)

Option applies an option to ProcessorConfig.

func WithAggressiveMode

func WithAggressiveMode() Option

WithAggressiveMode enable the aggressive mode. In this mode, the processor does not wait for the maxWait or maxItems reached, will continue processing item and only merge into batch if needed (for example, reached concurrentLimit, or dispatcher thread is busy). The maxItems still control the maximum number of items the processor can hold before block. The WithBlockWhileProcessing will be ignored in this mode.

func WithBlockWhileProcessing

func WithBlockWhileProcessing() Option

WithBlockWhileProcessing enable the processor block when processing item. If concurrency enabled, the processor only blocks when reached max concurrency. This method has no effect if the processor is in aggressive mode.

func WithDisabledDefaultProcessErrorLog

func WithDisabledDefaultProcessErrorLog() Option

WithDisabledDefaultProcessErrorLog disable default error logging when batch processing error occurs.

func WithHardMaxWait

func WithHardMaxWait(wait time.Duration) Option

WithHardMaxWait set the max waiting time before the processor will handle the batch anyway. Unlike WithMaxWait, the batch will be processed even if it is empty, which is preferable if the processor must perform some periodic tasks. You should only configure WithMaxWait or WithHardMaxWait, not both.

func WithMaxCloseWait

func WithMaxCloseWait(wait time.Duration) Option

WithMaxCloseWait set the max waiting time when closing the processor.

func WithMaxConcurrency

func WithMaxConcurrency[I Size](concurrency I) Option

WithMaxConcurrency set the max number of go routine this processor can create when processing item. Support 0 (run on dispatcher goroutine) and fixed number. Passing -1 (unlimited) to this function has the same effect of passing math.MaxInt64.

func WithMaxItem

func WithMaxItem[I Size](max I) Option

WithMaxItem set the max number of items this processor can hold before block. Support fixed number and -1 (unlimited) When set to unlimited, it will never block, and the batch handling behavior depends on WithMaxWait. When set to 0, the processor will be DISABLED and item will be processed directly on caller thread without batching.

func WithMaxWait

func WithMaxWait(wait time.Duration) Option

WithMaxWait set the max waiting time before the processor will handle the batch anyway. If the batch is empty, then it is skipped. The max wait start counting from the last processed time, not a fixed period. Accept 0 (no wait), -1 (wait util maxItems reached), or time.Duration. If set to -1 and the maxItems is unlimited, then the processor will keep processing whenever possible without wait for anything.

type ProcessBatchFn

type ProcessBatchFn[B any] func(B, int64) error

ProcessBatchFn function to process a batch.

type ProcessBatchIgnoreErrorFn

type ProcessBatchIgnoreErrorFn[B any] func(B, int64)

type ProcessorConfig

type ProcessorConfig struct {
	// contains filtered or unexported fields
}

ProcessorConfig configurable options of processor.

type ProcessorSetup

type ProcessorSetup[T any, B any] struct {
	ProcessorConfig
	// contains filtered or unexported fields
}

ProcessorSetup batch processor that is in setup phase (not running) You cannot put item into this processor, use Run to create a RunningProcessor that can accept item. See ProcessorConfig for available options.

func NewIdentityMapProcessor

func NewIdentityMapProcessor[T any, K comparable](keyExtractor Extractor[T, K], combiner Combine[T]) ProcessorSetup[T, map[K]T]

NewIdentityMapProcessor create processor that backed by a map, using item as value without extracting.

func NewMapProcessor

func NewMapProcessor[T any, K comparable, V any](keyExtractor Extractor[T, K], valueExtractor Extractor[T, V], combiner Combine[V]) ProcessorSetup[T, map[K]V]

NewMapProcessor create processor that backed by a map.

func NewProcessor

func NewProcessor[T any, B any](init InitBatchFn[B], merge MergeToBatchFn[B, T]) ProcessorSetup[T, B]

NewProcessor create a ProcessorSetup using specified functions. See Configure and Option for available configuration. The result ProcessorSetup is in setup state. Call ProcessorSetup.Run with a handler to create a RunningProcessor that can accept item. It is recommended to set at least maxWait or maxItem. Default processor operates similarly to aggressive mode, use Configure to change its behavior.

func NewReplaceIdentityMapProcessor

func NewReplaceIdentityMapProcessor[T any, K comparable](keyExtractor Extractor[T, K]) ProcessorSetup[T, map[K]T]

NewReplaceIdentityMapProcessor create processor that backed by a map, using item as value without extracting. ProcessorSetup created by this construct handles duplicated key by keeping only the last value.

func NewReplaceMapProcessor

func NewReplaceMapProcessor[T any, K comparable, V any](keyExtractor Extractor[T, K], valueExtractor Extractor[T, V]) ProcessorSetup[T, map[K]V]

NewReplaceMapProcessor create processor that backed by a map. ProcessorSetup created by this construct handles duplicated key by keeping only the last value.

func NewSliceProcessor

func NewSliceProcessor[T any]() ProcessorSetup[T, []T]

NewSliceProcessor create processor that backed by a slice.

func (ProcessorSetup[T, B]) Configure

func (p ProcessorSetup[T, B]) Configure(options ...Option) ProcessorSetup[T, B]

Configure apply Option to this processor. Each Configure call creates a new processor.

func (ProcessorSetup[T, B]) Run

func (p ProcessorSetup[T, B]) Run(process ProcessBatchFn[B], errorHandlers ...RecoverBatchFn[B]) *RunningProcessor[T, B]

Run create a RunningProcessor that can accept item. Accept a ProcessBatchFn and a RecoverBatchFn chain to process on error.

func (ProcessorSetup[T, B]) RunIgnoreError

func (p ProcessorSetup[T, B]) RunIgnoreError(process ProcessBatchIgnoreErrorFn[B]) *RunningProcessor[T, B]

func (ProcessorSetup[T, B]) WithSplitter

func (p ProcessorSetup[T, B]) WithSplitter(split SplitBatchFn[B]) ProcessorSetup[T, B]

WithSplitter split the batch into multiple smaller batch. When concurrency > 0 and SplitBatchFn are set, the processor will split the batch and process across multiple thread, otherwise the batch will be process on a single thread, and block when concurrency is reached. This configuration may be beneficial if you have a very large batch that can be split into smaller batch and processed in parallel.

type RecoverBatchFn

type RecoverBatchFn[B any] func(B, int64, error) error

RecoverBatchFn function to handle an error batch. Each RecoverBatchFn can further return error to enable the next RecoverBatchFn in the chain. The RecoverBatchFn must never panic.

type Runner

type Runner[T any, B any] interface {
	// Put add item to the processor.
	Put(item T)
	// ApproxItemCount return number of current item in processor, approximately.
	ApproxItemCount() int64
	// ItemCount return number of current item in processor.
	ItemCount() int64
	// Close stop the processor.
	// The implementation of this method may vary, but it must never wait indefinitely.
	Close() error
	// CloseContext stop the processor.
	// This method may process the left-over batch on caller thread.
	// ctx can be used to provide deadline for this method.
	CloseContext(ctx context.Context) error
	// StopContext stop the processor.
	// This method does not process leftover batch.
	StopContext(ctx context.Context) error
	// DrainContext force process batch util the batch is empty.
	// This method may process the batch on caller thread.
	// ctx can be used to provide deadline for this method.
	DrainContext(ctx context.Context) error
	// FlushContext force process the current batch.
	// This method may process the batch on caller thread.
	// ctx can be used to provide deadline for this method.
	FlushContext(ctx context.Context) error
	// Flush force process the current batch.
	// This method may process the batch on caller thread.
	Flush()
}

Runner provides common methods of a RunningProcessor.

type RunningProcessor

type RunningProcessor[T any, B any] struct {
	ProcessorSetup[T, B]
	// contains filtered or unexported fields
}

RunningProcessor processor that is running and can process item.

func (*RunningProcessor[T, B]) ApproxItemCount

func (p *RunningProcessor[T, B]) ApproxItemCount() int64

ApproxItemCount return number of current item in processor. This method does not block, so the counter may not accurate.

func (*RunningProcessor[T, B]) Close

func (p *RunningProcessor[T, B]) Close() error

Close stop the processor. This method will process the leftover branch on caller thread. Return error if maxCloseWait passed. See getCloseMaxWait for detail.

func (*RunningProcessor[T, B]) CloseContext

func (p *RunningProcessor[T, B]) CloseContext(ctx context.Context) error

CloseContext stop the processor. This method will process the leftover branch on caller thread. ctx can be used to provide deadline for this method.

func (*RunningProcessor[T, B]) DrainContext

func (p *RunningProcessor[T, B]) DrainContext(ctx context.Context) error

DrainContext force process batch util the batch is empty. This method always processes the batch on caller thread. ctx can be used to provide deadline for this method.

func (*RunningProcessor[T, B]) Flush

func (p *RunningProcessor[T, B]) Flush()

Flush force process the current batch. This method may process the batch on caller thread, depend on concurrent and block settings.

func (*RunningProcessor[T, B]) FlushContext

func (p *RunningProcessor[T, B]) FlushContext(ctx context.Context) error

FlushContext force process the current batch. This method may process the batch on caller thread, depend on concurrent and block settings. ctx can be used to provide deadline for this method.

func (*RunningProcessor[T, B]) IsDisabled

func (p *RunningProcessor[T, B]) IsDisabled() bool

IsDisabled whether the processor is disabled. Disabled processor won't do batching, instead the process will be executed on caller. All other settings are ignored when the processor is disabled.

func (*RunningProcessor[T, B]) ItemCount

func (p *RunningProcessor[T, B]) ItemCount() int64

ItemCount return number of current item in processor.

func (*RunningProcessor[T, B]) MustClose

func (p *RunningProcessor[T, B]) MustClose()

MustClose stop the processor without deadline.

func (*RunningProcessor[T, B]) Put

func (p *RunningProcessor[T, B]) Put(item T)

Put add item to the processor.

func (*RunningProcessor[T, B]) StopContext added in v1.1.0

func (p *RunningProcessor[T, B]) StopContext(ctx context.Context) error

StopContext stop the processor. This method does not process leftover batch.

type Size

type Size interface {
	~int | ~int32 | ~int64
}

type SliceRunner added in v1.2.0

type SliceRunner[T any] interface {
	Runner[T, []T]
}

SliceRunner shorthand for runner that merge item into slices.

type SplitBatchFn

type SplitBatchFn[B any] func(B, int64) []B

SplitBatchFn function to split a batch into multiple smaller batches. The SplitBatchFn must never panic.

func SplitSliceEqually

func SplitSliceEqually[T any, I Size](numberOfChunk I) SplitBatchFn[[]T]

SplitSliceEqually create a SplitBatchFn that split a slice into multiple equal chuck.

func SplitSliceSizeLimit

func SplitSliceSizeLimit[T any, I Size](maxSizeOfChunk I) SplitBatchFn[[]T]

SplitSliceSizeLimit create a SplitBatchFn that split a slice into multiple chuck of limited size.

Directories

Path Synopsis
maps command
slices command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL