batch

package module
v2.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 22, 2025 License: MIT Imports: 9 Imported by: 0

README

Go Batch

batch processing utilities for go projects.

This library provides a general batch processor that can apply to various use cases like bulk insert to the database, bulk enqueue, precompute reports, ...

Installation

Require go 1.24+

go get -u github.com/mawngo/go-batch/v2

Usage

Example
package main

import (
	"github.com/mawngo/go-batch/v2"
	"sync/atomic"
	"time"
)

func main() {
	sum := int32(0)
	// First create a batch.ProcessorSetup by specifying 
	// the batch initializer and merger.
	//
	// Initializer will be called to create a new batch, 
	// here the batch.InitSlice[int] will create a slice.
	// Merger will be called to add item to a batch, 
	// here the batch.AddToSlice[int] will add item to the slice.
	//
	// A batch can be anything: slice, map, struct, channel, ...
	// The library already defined some built-in 
	// initializers and mergers for common data types,
	// but you can always define your own initializer and merger.
	setup := batch.NewProcessor(batch.InitSlice[int], batch.AddToSlice[int]).
		// Configure the processor.
		// The batch will be processed when the max item is reached 
		// or the max wait is reached.
		Configure(batch.WithMaxConcurrency(5),
			batch.WithMaxItem(10), batch.WithMaxWait(30*time.Second))

	// Start the processor by specifying a handler to process the batch,
	// and optionally additional run configuration.
	// This will create a *batch.Processor that can accept item.
	processor := setup.Run(summing(&sum))

	for i := 0; i < 1_000_000; i++ {
		// Add item to the processor.
		processor.Put(1)
	}
	// Remember to close the processor before your application stopped.
	// Closing will force the processor to process the left-over item, 
	// any item added after closing is not guarantee to be processed.
	if err := processor.Close(); err != nil {
		panic(err)
	}
	if sum != 1_000_000 {
		panic("sum is not 1_000_000")
	}
}

func summing(p *int32) batch.ProcessBatchFn[[]int] {
	return func(ints []int, _ int64) error {
		for _, num := range ints {
			atomic.AddInt32(p, int32(num))
		}
		return nil
	}
}

More usage can be found in test and examples

Context and Cancellation

This library provides both non-context XXX and context XXXContext variants. However, it is recommended to use context variants, as non-context variants can block indefinitely (except for Close)

Cancelling the context only affects the item that is waiting to be added to the batch (for example, when the waiting batch is full and all batch processing threads are busy), there is no way to cancel the item that is already added to the batch.

You can implement your own logic to cancel the batch using the item context by creating custom batch and item struct as demonstrated in custom context control example.

Waiting for an item to be processed

The processor does not provide a method to wait for or get the result of processing an item, however, you can use the batch.IFuture[T] with custom batch to implement your own waiting logic.

See future example or loader implementation.

Loader

Provide batch loading like Facebook's DataLoader.

package main

import (
	"github.com/mawngo/go-batch/v2"
	"strconv"
	"sync/atomic"
	"time"
)

func main() {
	loadedCount := int32(0)
	// First create a batch.LoaderSetup
	loader := batch.NewLoader[int, string]().
		// Configure the loader.
		// All pending load requests will be processed when one of the 
		// following limits is reached.
		Configure(batch.WithMaxItem(100), batch.WithMaxWait(1*time.Second)).
		// Like when using the processor,
		// start the loader by providing a LoadBatchFn,
		// and optionally additional run configuration.
		// This will create a *batch.Loader that can accept item.
		Run(load(&loadedCount))

	for i := 0; i < 100_000; i++ {
		k := i % 10
		go func() {
			// Use the loader.
			// Alternately, you can use the Load method
			// future := loader.Load(k)
			// ...
			// v, err := future.Get()
			v, err := loader.Get(k)

			if err != nil {
				panic(err)
			}
			if v != strconv.Itoa(k) {
				panic("key mismatch")
			}
		}()
	}
	// Remember to close the running load before your application stopped.
	// Closing will force the loader to load the left-over request,
	// any load request after the loader is closed is not guarantee 
	// to be processed, and may block forever.
	if err := loader.Close(); err != nil {
		panic(err)
	}
	// If you do not want to load left over request, then use StopContext instead.
	// if err := loader.StopContext(context.Background()); err != nil {
	//     panic(err)
	// }
	if loadedCount > 1 {
		panic("loaded too many time")
	}
}

func load(p *int32) batch.LoadBatchFn[int, string] {
	return func(batch batch.LoadKeys[int], count int64) (map[int]string, error) {
		atomic.AddInt32(p, 1)
		if len(batch.Keys) == 0 {
			// This could happen if you provide an alternate counting method.
			return nil, nil
		}

		res := make(map[int]string, len(batch.Keys))
		for _, k := range batch.Keys {
			res[k] = strconv.Itoa(k)
		}
		return res, nil
	}
}

The batch.Loader use batch.Processor for handling batching, so they share the same configuration and options.

However, the default configuration of the Loader is different:

  • It counts the number of pending keys instead of load request, which can be changed by WithBatchLoaderCountInput option.
  • Default max item is 1000
  • Default wait time is 16ms
  • Default concurrency is unlimited.
Caching

This library does not provide caching. You can implement caching by simply checking the cache before Load and add item to the cache in the LoadBatchFn

All Load request before and during load of the same key will share the same Future. Multiple LoadBatchFn can be run concurrently, but they will never share the same keys sets.

See loader cache example.


There is a java version of this library.

Documentation

Index

Constants

View Source
const Unset = -1

Unset is a special value for various Option functions, usually meaning unrestricted, unlimited, or disable. You need to read the doc of the corresponding function to know what this value does.

Variables

View Source
var ErrLoadMissingResult = errors.New("empty missing result for key")

ErrLoadMissingResult is the default error for keys that are missing in the result. Can be configured by LoaderSetup.WithMissingResultError.

Functions

func AddToSlice

func AddToSlice[T any](b []T, item T) []T

AddToSlice is MergeToBatchFn that add item to a slice.

func CountMapKeys added in v2.1.0

func CountMapKeys[V any, K comparable]() func(map[K]V, int64) int64

CountMapKeys create a counter that count keys in map.

func InitMap

func InitMap[K comparable, V any](i int64) map[K]V

InitMap is InitBatchFn that allocate a map. It uses the default size for map, as the size of item may be much larger than the size of map after merged. However, if you properly configured WithBatchCounter to count the size of map and WithMaxItem to a reasonable value, you may benefit from specifying the size of map using your own InitBatchFn.

func InitSlice

func InitSlice[T any](i int64) []T

InitSlice is InitBatchFn that allocate a slice.

func InitType

func InitType[T any](_ int64) T

InitType is an InitBatchFn that allocate a type T.

func LoggingErrorHandler

func LoggingErrorHandler[B any](_ B, count int64, err error) error

LoggingErrorHandler default error handler, always included in RecoverBatchFn chain unless disable.

func NewErrorWithRemaining

func NewErrorWithRemaining[B any](err error, remainBatch B, count int64) error

NewErrorWithRemaining create a *Error with remaining items.

Types

type CombineFn

type CombineFn[T any] func(T, T) T

CombineFn is a function to combine two values in to one.

type Error

type Error[B any] struct {
	// Cause the error cause. If not specified, then nil will be passed to the next error handler.
	Cause error
	// RemainingBatch the batch to pass to the next handler. The RemainingCount must be specified.
	RemainingBatch B
	// RemainingCount number of items to pass to the next handler.
	// If RemainingCount = 0 and Cause != nil then pass the original batch and count to the next handler.
	RemainingCount int64
}

Error is an error wrapper that supports passing remaining items to the RecoverBatchFn.

func (*Error[B]) Error

func (e *Error[B]) Error() string

func (*Error[B]) String

func (e *Error[B]) String() string

func (*Error[B]) Unwrap

func (e *Error[B]) Unwrap() error

Unwrap implements the error Unwrap interface.

type ExtractFn

type ExtractFn[T any, V any] func(T) V

ExtractFn is a function to extract value from item.

type Future

type Future[T any] struct {
	// contains filtered or unexported fields
}

Future implements IFuture. Can be used to get the result of a load request.

func (*Future[T]) Get

func (r *Future[T]) Get() (T, error)

Get wait until the result is available and return the result. This method can block indefinitely. It is recommended to use Future.GetContext instead.

func (*Future[T]) GetContext

func (r *Future[T]) GetContext(ctx context.Context) (T, error)

GetContext wait until the result is available and return the result. The context can be used to cancel the wait (not the load request).

func (*Future[T]) IsDone added in v2.3.0

func (r *Future[T]) IsDone() bool

type IFuture added in v2.3.0

type IFuture[T any] interface {
	// Get wait until the result is available.
	Get() (T, error)
	// GetContext wait until the result is available.
	// The context can be used to cancel the wait (not the task).
	GetContext(ctx context.Context) (T, error)
}

IFuture is a future that can be used to get the result of a task.

type ILoader added in v2.3.0

type ILoader[K comparable, T any] interface {
	// Get registers a key to be loaded and wait for it to be loaded.
	// This method can block until the loader is available for loading new batch.
	// It is recommended to use [ILoader.GetContext] instead.
	Get(key K) (T, error)
	// GetContext registers a key to be loaded and wait for it to be loaded.
	//
	// Context can be used to provide deadline for this method.
	// Cancel the context may stop the loader from loading the key.
	GetContext(ctx context.Context, key K) (T, error)

	// GetAll registers keys to be loaded and wait for all of them to be loaded.
	// This method can block until the loader is available for loading new batch,
	// and may block indefinitely.
	// It is recommended to use [ILoader.GetAllContext] instead.
	GetAll(keys []K) (map[K]T, error)
	// GetAllContext registers keys to be loaded and wait for all of them to be loaded.
	// Context can be used to provide deadline for this method.
	//
	// In the case of context timed out, keys that are loaded will be returned along with error.
	// Cancel the context may stop the loader from loading the key.
	GetAllContext(ctx context.Context, keys []K) (map[K]T, error)

	// Load registers a key to be loaded and return a [Future] for waiting for the result.
	// This method can block until the loader is available for loading new batch.
	// It is recommended to use [ILoader.LoadContext] instead.
	Load(key K) *Future[T]
	// LoadContext registers a key to be loaded and return a [Future] for waiting for the result.
	//
	// Context can be used to provide deadline for this method.
	// Cancel the context may stop the loader from loading the key.
	LoadContext(ctx context.Context, key K) *Future[T]

	// LoadAll registers keys to be loaded and return a [IFuture] for waiting for the combined result.
	// This method can block until the processor is available for processing new batch,
	// and may block indefinitely.
	// It is recommended to use [ILoader.LoadAllContext] instead.
	LoadAll(keys []K) map[K]*Future[T]
	// LoadAllContext registers keys to be loaded and return a [Future] for waiting for the combined result.
	//
	// Context can be used to provide deadline for this method.
	// Cancel the context may stop the loader from loading the key.
	LoadAllContext(ctx context.Context, keys []K) map[K]*Future[T]

	// Close stop the loader.
	// This method may process the leftover branch on caller thread.
	// The implementation of this method may vary, but it must never wait indefinitely.
	Close() error
	// CloseContext stop the loader.
	// This method may load the left-over batch on caller thread.
	// Context can be used to provide deadline for this method.
	CloseContext(ctx context.Context) error
	// StopContext stop the loader.
	// This method does not load leftover batch.
	StopContext(ctx context.Context) error
	// FlushContext force load the current batch.
	// This method may load the batch on caller thread.
	// Context can be used to provide deadline for this method.
	FlushContext(ctx context.Context) error
	// Flush force load the current batch.
	// This method may load the batch on caller thread.
	// It is recommended to use [ILoader.FlushContext] instead.
	Flush()
}

type IProcessor added in v2.2.0

type IProcessor[T any, B any] interface {
	// Put add item to the processor.
	// This method can block until the processor is available for processing new batch,
	// and may block indefinitely.
	// It is recommended to use [IProcessor.PutContext] instead.
	Put(item T)
	// PutAll add all specified item to the processor.
	// This method can block until the processor is available for processing new batch,
	// and may block indefinitely.
	// It is recommended to use [IProcessor.PutAllContext] instead.
	PutAll(items []T)
	// PutContext add item to the processor.
	// If the context is canceled and the item is not added, then this method will return false.
	// The context passed in only control the put step, after item added to the processor,
	// the processing will not be canceled by this context.
	PutContext(ctx context.Context, item T) bool
	// PutAllContext add all items to the processor.
	// If the context is canceled, then this method will return the number of items added to the processor.
	PutAllContext(ctx context.Context, items []T) int

	// Merge add item to the processor using merge function.
	// This method can block until the processor is available for processing new batch,
	// and may block indefinitely.
	// It is recommended to use [IProcessor.MergeContext] instead.
	Merge(item T, merge MergeToBatchFn[B, T])
	// MergeAll add all items to the processor using merge function.
	// This method can block until the processor is available for processing new batch,
	// and may block indefinitely.
	// It is recommended to use [IProcessor.MergeAllContext] instead.
	MergeAll(items []T, merge MergeToBatchFn[B, T])
	// MergeContext add item to the processor using merge function.
	// If the context is canceled and the item is not added, then this method will return false.
	// The context passed in only control the put step, after item added to the processor,
	// the processing will not be canceled by this context.
	MergeContext(ctx context.Context, item T, merge MergeToBatchFn[B, T]) bool
	// MergeAllContext add all items to the processor using merge function.
	// If the context is canceled, then this method will return the number of items added to the processor.
	MergeAllContext(ctx context.Context, items []T, merge MergeToBatchFn[B, T]) int

	// Peek access the current batch using provided function.
	// This method can block until the processor is available.
	// It is recommended to use [IProcessor.PeekContext] instead.
	// This method does not count as processing the batch, the batch will still be processed.
	Peek(reader ProcessBatchFn[B]) error
	// PeekContext access the current batch using provided function.
	// This method does not count as processing the batch, the batch will still be processed.
	PeekContext(ctx context.Context, reader ProcessBatchFn[B]) error

	// ApproxItemCount return number of current item in processor, approximately.
	ApproxItemCount() int64
	// ItemCount return number of current item in processor.
	ItemCount() int64
	// ItemCountContext return number of current item in processor.
	// If the context is canceled, then this method will return approximate item count and false.
	ItemCountContext(ctx context.Context) (int64, bool)
	// Close stop the processor.
	// This method may process the leftover branch on caller thread.
	// The implementation of this method may vary, but it must never wait indefinitely.
	Close() error
	// CloseContext stop the processor.
	// This method may process the left-over batch on caller thread.
	// Context can be used to provide deadline for this method.
	CloseContext(ctx context.Context) error
	// StopContext stop the processor.
	// This method does not process leftover batch.
	StopContext(ctx context.Context) error
	// DrainContext force process batch util the batch is empty.
	// This method may process the batch on caller thread.
	// Context can be used to provide deadline for this method.
	DrainContext(ctx context.Context) error
	// FlushContext force process the current batch.
	// This method may process the batch on caller thread.
	// Context can be used to provide deadline for this method.
	FlushContext(ctx context.Context) error
	// Flush force process the current batch.
	// This method may process the batch on caller thread.
	// It is recommended to use [IProcessor.FlushContext] instead.
	Flush()
}

IProcessor provides common methods of a Processor.

type InitBatchFn

type InitBatchFn[B any] func(int64) B

InitBatchFn function to create empty batch. Accept max item limit, can be 1 (disabled), -1 (unlimited) or any positive number.

type KeyVal

type KeyVal[K any, V any] struct {
	// contains filtered or unexported fields
}

KeyVal is a tuple of key and value.

type LoadBatchFn added in v2.3.0

type LoadBatchFn[K comparable, T any] func(batch LoadKeys[K], count int64) (map[K]T, error)

LoadBatchFn function to load a batch of keys. Return a map of keys to values and an error, if both are non-nil, the error will be set for all missing keys. If the result does not contain all keys and the error is nil, the error of missing keys will be set to ErrLoadMissingResult or any configured error.

The LoadBatchFn should not modify the key content.

type LoadKeys added in v2.3.0

type LoadKeys[K any] struct {
	Ctx  context.Context
	Keys []K
}

LoadKeys is a batch of keys to be loaded.

type Loader added in v2.3.0

type Loader[K comparable, T any] struct {
	I int32
	// contains filtered or unexported fields
}

Loader ILoader that is running and can load item.

func (*Loader[K, T]) Close added in v2.3.0

func (l *Loader[K, T]) Close() error

Close stop the loader. This method will process the leftover branch on caller thread.

func (*Loader[K, T]) CloseContext added in v2.3.0

func (l *Loader[K, T]) CloseContext(ctx context.Context) error

CloseContext stop the loader. This method may load the left-over batch on caller thread. Context can be used to provide deadline for this method.

func (*Loader[K, T]) Flush added in v2.3.0

func (l *Loader[K, T]) Flush()

Flush force load the current batch. This method may load the batch on caller thread. It is recommended to use Loader.FlushContext instead.

func (*Loader[K, T]) FlushContext added in v2.3.0

func (l *Loader[K, T]) FlushContext(ctx context.Context) error

FlushContext force load the current batch. This method may load the batch on caller thread. Context can be used to provide deadline for this method.

func (*Loader[K, T]) Get added in v2.3.0

func (l *Loader[K, T]) Get(key K) (T, error)

Get registers a key to be loaded and wait for it to be loaded. This method can block until the loader is available for loading new batch. It is recommended to use Loader.GetContext instead.

func (*Loader[K, T]) GetAll added in v2.3.0

func (l *Loader[K, T]) GetAll(keys []K) (map[K]T, error)

GetAll registers keys to be loaded and wait for all of them to be loaded. This method can block until the loader is available for loading new batch, and may block indefinitely. It is recommended to use Loader.GetAllContext instead.

func (*Loader[K, T]) GetAllContext added in v2.3.0

func (l *Loader[K, T]) GetAllContext(ctx context.Context, keys []K) (map[K]T, error)

GetAllContext registers keys to be loaded and wait for all of them to be loaded. Context can be used to provide deadline for this method.

In the case of context timed out, keys that are loaded will be returned along with error. Cancel the context may stop the loader from loading the key.

func (*Loader[K, T]) GetContext added in v2.3.0

func (l *Loader[K, T]) GetContext(ctx context.Context, key K) (T, error)

GetContext registers a key to be loaded and wait for it to be loaded.

Context can be used to provide deadline for this method. Cancel the context may stop the loader from loading the key.

func (*Loader[K, T]) Load added in v2.3.0

func (l *Loader[K, T]) Load(key K) *Future[T]

Load registers a key to be loaded and return a IFuture for waiting for the result. This method can block until the loader is available for loading new batch. It is recommended to use Loader.LoadContext instead.

func (*Loader[K, T]) LoadAll added in v2.3.0

func (l *Loader[K, T]) LoadAll(keys []K) map[K]*Future[T]

LoadAll registers keys to be loaded and return a IFuture for waiting for the combined result. This method can block until the processor is available for processing new batch, and may block indefinitely. It is recommended to use Loader.LoadAllContext instead.

func (*Loader[K, T]) LoadAllContext added in v2.3.0

func (l *Loader[K, T]) LoadAllContext(ctx context.Context, keys []K) map[K]*Future[T]

LoadAllContext registers keys to be loaded and return a IFuture for waiting for the combined result.

Context can be used to provide deadline for this method. Cancel the context may stop the loader from loading the key.

func (*Loader[K, T]) LoadContext added in v2.3.0

func (l *Loader[K, T]) LoadContext(ctx context.Context, key K) *Future[T]

LoadContext registers a key to be loaded and return a Future for waiting for the result.

Context can be used to provide deadline for this method. Cancel the context may stop the loader from loading the key.

func (*Loader[K, T]) StopContext added in v2.3.0

func (l *Loader[K, T]) StopContext(ctx context.Context) error

StopContext stop the loader. This method does not load leftover batch.

type LoaderSetup added in v2.3.0

type LoaderSetup[K comparable, T any] struct {
	// contains filtered or unexported fields
}

LoaderSetup batch loader that is in setup phase (not running) You cannot load item using this loader yet, use LoaderSetup.Run to create a Loader that can load item. See Option for available options.

func NewLoader added in v2.3.0

func NewLoader[K comparable, T any]() LoaderSetup[K, T]

NewLoader create a LoaderSetup using specified functions. See LoaderSetup.Configure and Option for available configuration. The result LoaderSetup is in setup state.

Call LoaderSetup.Run with a handler to create a Loader that can load item. By default, the processor operates with the following configuration: - WithMaxConcurrency: Unset (unlimited) - WithMaxItem: 1000 (count keys) - WithMaxWait: 16ms.

func (LoaderSetup[K, T]) Configure added in v2.3.0

func (p LoaderSetup[K, T]) Configure(options ...Option) LoaderSetup[K, T]

Configure apply Option to this loader setup.

func (LoaderSetup[K, T]) Run added in v2.3.0

func (p LoaderSetup[K, T]) Run(loadFn LoadBatchFn[K, T], options ...RunOption[LoadKeys[K]]) *Loader[K, T]

Run create a Loader that can accept item. Accept a LoadBatchFn and a list of RunOption of type LoadKeys.

func (LoaderSetup[K, T]) WithMissingResultError added in v2.3.0

func (p LoaderSetup[K, T]) WithMissingResultError(err error) LoaderSetup[K, T]

WithMissingResultError set the default error for keys that are missing in the result.

type MergeToBatchFn

type MergeToBatchFn[B any, T any] func(B, T) B

MergeToBatchFn function to add an item to batch.

func AddSelfToMapUsing added in v2.1.0

func AddSelfToMapUsing[T any, K comparable](keyExtractor ExtractFn[T, K], combiner CombineFn[T]) MergeToBatchFn[map[K]T, T]

AddSelfToMapUsing create a MergeToBatchFn that add self as item to map using key ExtractFn and apply CombineFn if key duplicated. The original value will be passed as 1st parameter to the CombineFn. If CombineFn is nil, duplicated key will be replaced.

func AddToMapUsing added in v2.1.0

func AddToMapUsing[T any, K comparable, V any](extractor ExtractFn[T, KeyVal[K, V]], combiner CombineFn[V]) MergeToBatchFn[map[K]V, T]

AddToMapUsing create MergeToBatchFn that add item to map using KeyVal ExtractFn and apply CombineFn if key duplicated. The original value will be passed as 1st parameter to the CombineFn. If CombineFn is nil, duplicated key will be replaced.

type Option

type Option func(*processorConfig)

Option general options for batch processor.

func WithAggressiveMode

func WithAggressiveMode() Option

WithAggressiveMode enable the aggressive mode. In this mode, the processor does not wait for the maxWait or maxItems reached, will continue processing item and only merge into batch if needed (for example, reached concurrentLimit, or dispatcher thread is busy). The maxItems configured by WithMaxItem still control the maximum number of items the processor can hold before block. The WithBlockWhileProcessing will be ignored in this mode.

func WithBlockWhileProcessing

func WithBlockWhileProcessing() Option

WithBlockWhileProcessing enable the processor block when processing item. If concurrency enabled, the processor only blocks when reached max concurrency. This method has no effect if the processor is in aggressive mode.

func WithDisabledDefaultProcessErrorLog

func WithDisabledDefaultProcessErrorLog() Option

WithDisabledDefaultProcessErrorLog disable default error logging when batch processing error occurs.

func WithHardMaxWait

func WithHardMaxWait(wait time.Duration) Option

WithHardMaxWait set the max waiting time before the processor will handle the batch anyway. Unlike WithMaxWait, the batch will be processed even if it is empty, which is preferable if the processor must perform some periodic tasks. You should ONLY configure WithMaxWait OR WithHardMaxWait, NOT BOTH.

func WithMaxCloseWait

func WithMaxCloseWait(wait time.Duration) Option

WithMaxCloseWait set the max waiting time when closing the processor.

func WithMaxConcurrency

func WithMaxConcurrency[I size](concurrency I) Option

WithMaxConcurrency set the max number of go routine this processor can create when processing item. Support 0 (run on dispatcher goroutine) and fixed number. Passing -1 Unset (unlimited) to this function has the same effect of passing math.MaxInt64.

func WithMaxItem

func WithMaxItem[I size](maxItem I) Option

WithMaxItem set the max number of items this processor can hold before block. Support fixed number and -1 Unset (unlimited) When set to unlimited, it will never block, and the batch handling behavior depends on WithMaxWait. When set to 0, the processor will be DISABLED and item will be processed directly on caller thread without batching.

func WithMaxWait

func WithMaxWait(wait time.Duration) Option

WithMaxWait set the max waiting time before the processor will handle the batch anyway. If the batch is empty, then it is skipped. The max wait start counting from the last processed time, not a fixed period. Accept 0 (no wait), -1 Unset (wait util maxItems reached), or time.Duration. If set to -1 Unset and the maxItems is unlimited, then the processor will keep processing whenever possible without waiting for anything.

type ProcessBatchFn

type ProcessBatchFn[B any] func(B, int64) error

ProcessBatchFn function to process a batch. Accept the current batch and the input count.

type Processor

type Processor[T any, B any] struct {
	ProcessorSetup[T, B]
	// contains filtered or unexported fields
}

Processor a processor that is running and can process item.

func (*Processor[T, B]) ApproxItemCount

func (p *Processor[T, B]) ApproxItemCount() int64

ApproxItemCount return number of current item in processor. This method does not block, so the counter may not be accurate.

func (*Processor[T, B]) Close

func (p *Processor[T, B]) Close() error

Close stop the processor. This method will process the leftover branch on caller thread. Return error if maxCloseWait passed. Timeout can be configured by WithMaxCloseWait. See getCloseMaxWait for detail.

func (*Processor[T, B]) CloseContext

func (p *Processor[T, B]) CloseContext(ctx context.Context) error

CloseContext stop the processor. This method will process the leftover branch on caller thread. Context can be used to provide deadline for this method.

func (*Processor[T, B]) DrainContext

func (p *Processor[T, B]) DrainContext(ctx context.Context) error

DrainContext force process batch util the batch is empty. This method always processes the batch on caller thread. ctx can be used to provide deadline for this method.

func (*Processor[T, B]) Flush

func (p *Processor[T, B]) Flush()

Flush force process the current batch. This method may process the batch on caller thread, depend on concurrent and block settings. It is recommended to use Processor.FlushContext instead.

func (*Processor[T, B]) FlushContext

func (p *Processor[T, B]) FlushContext(ctx context.Context) error

FlushContext force process the current batch. This method may process the batch on caller thread, depend on concurrent and block settings. Context can be used to provide deadline for this method.

func (*Processor[T, B]) IsDisabled added in v2.2.0

func (p *Processor[T, B]) IsDisabled() bool

IsDisabled whether the processor is disabled. Disabled processor won't do batching, instead the process will be executed on caller. All other settings are ignored when the processor is disabled.

func (*Processor[T, B]) ItemCount

func (p *Processor[T, B]) ItemCount() int64

ItemCount return number of current item in processor. This method will block the processor for accurate counting. It is recommended to use Processor.ItemCountContext instead.

func (*Processor[T, B]) ItemCountContext

func (p *Processor[T, B]) ItemCountContext(ctx context.Context) (int64, bool)

ItemCountContext return number of current item in processor. If the context is canceled, then this method will return approximate item count and false.

func (*Processor[T, B]) Merge

func (p *Processor[T, B]) Merge(item T, merge MergeToBatchFn[B, T])

Merge add item to the processor. This method can block until the processor is available for processing new batch. It is recommended to use Processor.MergeContext instead.

func (*Processor[T, B]) MergeAll

func (p *Processor[T, B]) MergeAll(items []T, merge MergeToBatchFn[B, T])

MergeAll add all item to the processor using merge function. This method will block until all items were put into the processor. It is recommended to use Processor.MergeAllContext instead.

func (*Processor[T, B]) MergeAllContext

func (p *Processor[T, B]) MergeAllContext(ctx context.Context, items []T, merge MergeToBatchFn[B, T]) int

MergeAllContext add all items to the processor using merge function. If the context is canceled, then this method will return the number of items added to the processor. The processing order is the same as the input list, so the output can also be used to determine the next item to process if you want to retry or continue processing.

func (*Processor[T, B]) MergeContext

func (p *Processor[T, B]) MergeContext(ctx context.Context, item T, merge MergeToBatchFn[B, T]) bool

MergeContext add item to the processor using merge function.

func (*Processor[T, B]) Peek

func (p *Processor[T, B]) Peek(reader ProcessBatchFn[B]) error

Peek access the current batch using provided function. This method can block until the processor is available. It is recommended to use Processor.PeekContext instead. This method does not count as processing the batch, the batch will still be processed.

func (*Processor[T, B]) PeekContext

func (p *Processor[T, B]) PeekContext(ctx context.Context, reader ProcessBatchFn[B]) error

PeekContext access the current batch using provided function. This method does not count as processing the batch, the batch will still be processed.

func (*Processor[T, B]) Put

func (p *Processor[T, B]) Put(item T)

Put add item to the processor. This method can block until the processor is available for processing new batch. It is recommended to use Processor.PutContext instead.

func (*Processor[T, B]) PutAll

func (p *Processor[T, B]) PutAll(items []T)

PutAll add all item to the processor. This method will block until all items were put into the processor. It is recommended to use Processor.PutAllContext instead.

func (*Processor[T, B]) PutAllContext

func (p *Processor[T, B]) PutAllContext(ctx context.Context, items []T) int

PutAllContext add all items to the processor. If the context is canceled, then this method will return the number of items added to the processor. The processing order is the same as the input list, so the output can also be used to determine the next item to process if you want to retry or continue processing.

func (*Processor[T, B]) PutContext

func (p *Processor[T, B]) PutContext(ctx context.Context, item T) bool

PutContext add item to the processor.

func (*Processor[T, B]) StopContext

func (p *Processor[T, B]) StopContext(ctx context.Context) error

StopContext stop the processor. This method does not process leftover batch.

type ProcessorSetup

type ProcessorSetup[T any, B any] struct {
	// contains filtered or unexported fields
}

ProcessorSetup batch processor that is in setup phase (not running). You cannot put item into this processor, use ProcessorSetup.Run to create a Processor that can accept item. See Option for available options.

func NewMapProcessor

func NewMapProcessor[T any, K comparable, V any](extractor ExtractFn[T, KeyVal[K, V]], combiner CombineFn[V]) ProcessorSetup[T, map[K]V]

NewMapProcessor prepare a processor that backed by a map. If CombineFn is nil, duplicated key will be replaced.

func NewProcessor

func NewProcessor[T any, B any](init InitBatchFn[B], merge MergeToBatchFn[B, T]) ProcessorSetup[T, B]

NewProcessor create a ProcessorSetup using specified functions. See ProcessorSetup.Configure and Option for available configuration. The result ProcessorSetup is in setup state. Call ProcessorSetup.Run with a handler to create a Processor that can accept item. It is recommended to set at least maxWait by WithMaxWait or maxItem by WithMaxItem. By default, the processor operates similarly to aggressive mode, use Configure to change its behavior.

func NewSelfMapProcessor added in v2.1.0

func NewSelfMapProcessor[T any, K comparable](keyExtractor ExtractFn[T, K], combiner CombineFn[T]) ProcessorSetup[T, map[K]T]

NewSelfMapProcessor prepare a processor that backed by a map, using item as value without extracting. If CombineFn is nil, duplicated key will be replaced.

func NewSliceProcessor

func NewSliceProcessor[T any]() ProcessorSetup[T, []T]

NewSliceProcessor prepare a processor that backed by a slice.

func (ProcessorSetup[T, B]) Configure

func (p ProcessorSetup[T, B]) Configure(options ...Option) ProcessorSetup[T, B]

Configure apply Option to this processor setup.

func (ProcessorSetup[T, B]) Run

func (p ProcessorSetup[T, B]) Run(process ProcessBatchFn[B], options ...RunOption[B]) *Processor[T, B]

Run create a Processor that can accept item. Accept a ProcessBatchFn and a list of RunOption.

type RecoverBatchFn

type RecoverBatchFn[B any] func(B, int64, error) error

RecoverBatchFn function to handle an error batch. Each RecoverBatchFn can further return error to enable the next RecoverBatchFn in the chain. The RecoverBatchFn must never panic.

Accept the current batch and a counter with the previous error. The counter and batch can be controlled by returning an Error, otherwise it will receive the same arguments of ProcessBatchFn.

type RunOption

type RunOption[B any] func(*runConfig[B])

RunOption options for batch processing.

func WithBatchCounter

func WithBatchCounter[B any](count func(B, int64) int64) RunOption[B]

WithBatchCounter provide alternate function to count the number of items in batch. The function receives the current batch and the total input items count of the current batch.

func WithBatchErrorHandlers

func WithBatchErrorHandlers[B any](handlers ...RecoverBatchFn[B]) RunOption[B]

WithBatchErrorHandlers provide a RecoverBatchFn chain to process on error. Each RecoverBatchFn can further return error to enable the next RecoverBatchFn in the chain. The RecoverBatchFn must never panic.

func WithBatchLoaderCountInput added in v2.3.0

func WithBatchLoaderCountInput[K comparable]() RunOption[LoadKeys[K]]

WithBatchLoaderCountInput unset the current count function. Typically used for Loader to specify that it should use the number of pending load requests for limit instead of the number of pending keys.

func WithBatchSplitter

func WithBatchSplitter[B any](split SplitBatchFn[B]) RunOption[B]

WithBatchSplitter split the batch into multiple smaller batch. When concurrency > 0 and SplitBatchFn are set, the processor will split the batch and process across multiple threads, otherwise the batch will be process on a single thread, and block when concurrency is reached. This configuration may be beneficial if you have a very large batch that can be split into smaller batch and processed in parallel.

type SplitBatchFn

type SplitBatchFn[B any] func(B, int64) []B

SplitBatchFn function to split a batch into multiple smaller batches. Accept the current batch and the input count. The SplitBatchFn must never panic.

func SplitSliceEqually

func SplitSliceEqually[T any, I size](numberOfChunk I) SplitBatchFn[[]T]

SplitSliceEqually create a SplitBatchFn that split a slice into multiple equal chuck.

func SplitSliceSizeLimit

func SplitSliceSizeLimit[T any, I size](maxSizeOfChunk I) SplitBatchFn[[]T]

SplitSliceSizeLimit create a SplitBatchFn that split a slice into multiple chuck of limited size.

Directories

Path Synopsis
ctxctrl command
future command
loader command
loadercache command
maps command
slices command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL