Documentation
¶
Index ¶
- Constants
- func AddToSlice[T any](b []T, item T) []T
- func InitChan[T any](i int64) chan T
- func InitMap[K comparable, V any](i int64) map[K]V
- func InitSlice[T any](i int64) []T
- func InitType[T any](_ int64) T
- func LoggingErrorHandler[B any](_ B, count int64, err error) error
- func NewErrorWithRemaining[B any](err error, remainBatch B, count int64) error
- type Combine
- type CombineFn
- type Error
- type ExtractFn
- type Extractor
- type InitBatchFn
- type MapRunner
- type MergeToBatchFn
- func AddSelfToMapUsing[T any, K comparable](keyExtractor ExtractFn[T, K]) MergeToBatchFn[map[K]T, T]
- func AddToMapUsing[T any, K comparable, V any](keyExtractor ExtractFn[T, K], valueExtractor ExtractFn[T, V]) MergeToBatchFn[map[K]V, T]
- func MergeSelfToMapUsing[T any, K comparable](keyExtractor ExtractFn[T, K], combiner CombineFn[T]) MergeToBatchFn[map[K]T, T]
- func MergeToMapUsing[T any, K comparable, V any](keyExtractor ExtractFn[T, K], valueExtractor ExtractFn[T, V], ...) MergeToBatchFn[map[K]V, T]
- type Option
- func WithAggressiveMode() Option
- func WithBlockWhileProcessing() Option
- func WithDisabledDefaultProcessErrorLog() Option
- func WithHardMaxWait(wait time.Duration) Option
- func WithMaxCloseWait(wait time.Duration) Option
- func WithMaxConcurrency[I Size](concurrency I) Option
- func WithMaxItem[I Size](maxItem I) Option
- func WithMaxWait(wait time.Duration) Option
- type ProcessBatchFn
- type ProcessBatchIgnoreErrorFn
- type ProcessorConfig
- type ProcessorSetup
- func NewIdentityMapProcessor[T any, K comparable](keyExtractor ExtractFn[T, K], combiner CombineFn[T]) ProcessorSetup[T, map[K]T]
- func NewMapProcessor[T any, K comparable, V any](keyExtractor ExtractFn[T, K], valueExtractor ExtractFn[T, V], ...) ProcessorSetup[T, map[K]V]
- func NewProcessor[T any, B any](init InitBatchFn[B], merge MergeToBatchFn[B, T]) ProcessorSetup[T, B]
- func NewReplaceIdentityMapProcessor[T any, K comparable](keyExtractor ExtractFn[T, K]) ProcessorSetup[T, map[K]T]
- func NewReplaceMapProcessor[T any, K comparable, V any](keyExtractor ExtractFn[T, K], valueExtractor ExtractFn[T, V]) ProcessorSetup[T, map[K]V]
- func NewSliceProcessor[T any]() ProcessorSetup[T, []T]
- func (p ProcessorSetup[T, B]) Configure(options ...Option) ProcessorSetup[T, B]
- func (p ProcessorSetup[T, B]) Run(process ProcessBatchFn[B], errorHandlers ...RecoverBatchFn[B]) *RunningProcessor[T, B]
- func (p ProcessorSetup[T, B]) RunIgnoreError(process ProcessBatchIgnoreErrorFn[B]) *RunningProcessor[T, B]
- func (p ProcessorSetup[T, B]) WithSplitter(split SplitBatchFn[B]) ProcessorSetup[T, B]
- type RecoverBatchFn
- type Runner
- type RunningProcessor
- func (p *RunningProcessor[T, B]) ApproxItemCount() int64
- func (p *RunningProcessor[T, B]) Close() error
- func (p *RunningProcessor[T, B]) CloseContext(ctx context.Context) error
- func (p *RunningProcessor[T, B]) DrainContext(ctx context.Context) error
- func (p *RunningProcessor[T, B]) Flush()
- func (p *RunningProcessor[T, B]) FlushContext(ctx context.Context) error
- func (p *RunningProcessor[T, B]) IsDisabled() bool
- func (p *RunningProcessor[T, B]) ItemCount() int64
- func (p *RunningProcessor[T, B]) ItemCountContext(ctx context.Context) (int64, bool)
- func (p *RunningProcessor[T, B]) MustClose()
- func (p *RunningProcessor[T, B]) Put(item T)
- func (p *RunningProcessor[T, B]) PutAll(items []T)
- func (p *RunningProcessor[T, B]) PutAllContext(ctx context.Context, items []T) int
- func (p *RunningProcessor[T, B]) PutContext(ctx context.Context, item T) bool
- func (p *RunningProcessor[T, B]) StopContext(ctx context.Context) error
- type Size
- type SliceRunner
- type SplitBatchFn
Constants ¶
const Disabled = -1
Disabled are special values for WithMaxWait. Deprecated: use Unset.
const Unlimited = -1
Unlimited are special values for WithMaxItem. Deprecated: use Unset.
const Unset = -1
Unset is a special value for various Option functions, usually meaning unrestricted, unlimited, or disable. You need to read the doc of the corresponding function to know what this value does.
Variables ¶
This section is empty.
Functions ¶
func AddToSlice ¶
func AddToSlice[T any](b []T, item T) []T
AddToSlice is MergeToBatchFn that add item to a slice.
func InitChan ¶
InitChan is an InitBatchFn that allocate a channel. this should not be used with unbounded processor (maxItem < 0).
func InitMap ¶
func InitMap[K comparable, V any](i int64) map[K]V
InitMap is InitBatchFn that allocate a map.
func LoggingErrorHandler ¶
LoggingErrorHandler default error handler, always included in RecoverBatchFn chain unless disable.
Types ¶
type Combine ¶
Combine is an alias for CombineFn for backward compatibility. Deprecated: use CombineFn.
type CombineFn ¶ added in v1.4.2
type CombineFn[T any] func(T, T) T
CombineFn is a function to combine two values in to one.
type Error ¶
type Error[B any] struct { // Cause the error cause. If not specified, then nil will be passed to the next error handler. Cause error // RemainingBatch the batch to pass to the next handler. The RemainingCount must be specified. RemainingBatch B // RemainingCount number of items to pass to the next handler. // If RemainingCount = 0 and Cause != nil then pass the original batch and count to the next handler. RemainingCount int64 }
Error is an error wrapper that supports passing remaining items to the RecoverBatchFn.
type Extractor ¶
Extractor is an alias for ExtractFn for backward compatibility. Deprecated: use ExtractFn.
type InitBatchFn ¶
InitBatchFn function to create empty batch.
type MapRunner ¶ added in v1.2.0
type MapRunner[K comparable, T any] interface { Runner[T, map[K]T] }
MapRunner shorthand for Runner that merge item into maps.
type MergeToBatchFn ¶
MergeToBatchFn function to add an item to batch.
func AddSelfToMapUsing ¶
func AddSelfToMapUsing[T any, K comparable](keyExtractor ExtractFn[T, K]) MergeToBatchFn[map[K]T, T]
AddSelfToMapUsing create MergeToBatchFn that add self as item to map using key ExtractFn.
func AddToMapUsing ¶
func AddToMapUsing[T any, K comparable, V any](keyExtractor ExtractFn[T, K], valueExtractor ExtractFn[T, V]) MergeToBatchFn[map[K]V, T]
AddToMapUsing create MergeToBatchFn that add item to map using key and value ExtractFn.
func MergeSelfToMapUsing ¶
func MergeSelfToMapUsing[T any, K comparable](keyExtractor ExtractFn[T, K], combiner CombineFn[T]) MergeToBatchFn[map[K]T, T]
MergeSelfToMapUsing create a MergeToBatchFn that add self as item to map using key ExtractFn and apply CombineFn if key duplicated. The original value will be passed as 1st parameter to the CombineFn.
func MergeToMapUsing ¶
func MergeToMapUsing[T any, K comparable, V any](keyExtractor ExtractFn[T, K], valueExtractor ExtractFn[T, V], combiner CombineFn[V]) MergeToBatchFn[map[K]V, T]
MergeToMapUsing create MergeToBatchFn that add item to map using key and value ExtractFn and apply CombineFn if key duplicated. The original value will be passed as 1st parameter to the CombineFn.
type Option ¶
type Option func(*ProcessorConfig)
Option applies an option to ProcessorConfig.
func WithAggressiveMode ¶
func WithAggressiveMode() Option
WithAggressiveMode enable the aggressive mode. In this mode, the processor does not wait for the maxWait or maxItems reached, will continue processing item and only merge into batch if needed (for example, reached concurrentLimit, or dispatcher thread is busy). The maxItems configured by WithMaxItem still control the maximum number of items the processor can hold before block. The WithBlockWhileProcessing will be ignored in this mode.
func WithBlockWhileProcessing ¶
func WithBlockWhileProcessing() Option
WithBlockWhileProcessing enable the processor block when processing item. If concurrency enabled, the processor only blocks when reached max concurrency. This method has no effect if the processor is in aggressive mode.
func WithDisabledDefaultProcessErrorLog ¶
func WithDisabledDefaultProcessErrorLog() Option
WithDisabledDefaultProcessErrorLog disable default error logging when batch processing error occurs.
func WithHardMaxWait ¶
WithHardMaxWait set the max waiting time before the processor will handle the batch anyway. Unlike WithMaxWait, the batch will be processed even if it is empty, which is preferable if the processor must perform some periodic tasks. You should ONLY configure WithMaxWait OR WithHardMaxWait, NOT BOTH.
func WithMaxCloseWait ¶
WithMaxCloseWait set the max waiting time when closing the processor.
func WithMaxConcurrency ¶
WithMaxConcurrency set the max number of go routine this processor can create when processing item. Support 0 (run on dispatcher goroutine) and fixed number. Passing -1 Unset (unlimited) to this function has the same effect of passing math.MaxInt64.
func WithMaxItem ¶
WithMaxItem set the max number of items this processor can hold before block. Support fixed number and -1 Unset (unlimited) When set to unlimited, it will never block, and the batch handling behavior depends on WithMaxWait. When set to 0, the processor will be DISABLED and item will be processed directly on caller thread without batching.
func WithMaxWait ¶
WithMaxWait set the max waiting time before the processor will handle the batch anyway. If the batch is empty, then it is skipped. The max wait start counting from the last processed time, not a fixed period. Accept 0 (no wait), -1 Unset (wait util maxItems reached), or time.Duration. If set to -1 Unset and the maxItems is unlimited, then the processor will keep processing whenever possible without waiting for anything.
type ProcessBatchFn ¶
ProcessBatchFn function to process a batch.
type ProcessorConfig ¶
type ProcessorConfig struct {
// contains filtered or unexported fields
}
ProcessorConfig configurable options of processor.
type ProcessorSetup ¶
type ProcessorSetup[T any, B any] struct { ProcessorConfig // contains filtered or unexported fields }
ProcessorSetup batch processor that is in setup phase (not running) You cannot put item into this processor, use Run to create a RunningProcessor that can accept item. See ProcessorConfig for available options.
func NewIdentityMapProcessor ¶
func NewIdentityMapProcessor[T any, K comparable](keyExtractor ExtractFn[T, K], combiner CombineFn[T]) ProcessorSetup[T, map[K]T]
NewIdentityMapProcessor prepare a processor that backed by a map, using item as value without extracting.
func NewMapProcessor ¶
func NewMapProcessor[T any, K comparable, V any](keyExtractor ExtractFn[T, K], valueExtractor ExtractFn[T, V], combiner CombineFn[V]) ProcessorSetup[T, map[K]V]
NewMapProcessor prepare a processor that backed by a map.
func NewProcessor ¶
func NewProcessor[T any, B any](init InitBatchFn[B], merge MergeToBatchFn[B, T]) ProcessorSetup[T, B]
NewProcessor create a ProcessorSetup using specified functions. See ProcessorSetup.Configure and Option for available configuration. The result ProcessorSetup is in setup state. Call ProcessorSetup.Run with a handler to create a RunningProcessor that can accept item. It is recommended to set at least maxWait by WithMaxWait or maxItem by WithMaxItem. By default, the processor operates similarly to aggressive mode, use Configure to change its behavior.
func NewReplaceIdentityMapProcessor ¶
func NewReplaceIdentityMapProcessor[T any, K comparable](keyExtractor ExtractFn[T, K]) ProcessorSetup[T, map[K]T]
NewReplaceIdentityMapProcessor prepare a processor that backed by a map, using item as value without extracting. ProcessorSetup created by this construct handles duplicated key by keeping only the last value.
func NewReplaceMapProcessor ¶
func NewReplaceMapProcessor[T any, K comparable, V any](keyExtractor ExtractFn[T, K], valueExtractor ExtractFn[T, V]) ProcessorSetup[T, map[K]V]
NewReplaceMapProcessor prepare a processor that backed by a map. ProcessorSetup created by this construct handles duplicated key by keeping only the last value.
func NewSliceProcessor ¶
func NewSliceProcessor[T any]() ProcessorSetup[T, []T]
NewSliceProcessor prepare a processor that backed by a slice.
func (ProcessorSetup[T, B]) Configure ¶
func (p ProcessorSetup[T, B]) Configure(options ...Option) ProcessorSetup[T, B]
Configure apply Option to this processor. Each Configure call creates a new processor.
func (ProcessorSetup[T, B]) Run ¶
func (p ProcessorSetup[T, B]) Run(process ProcessBatchFn[B], errorHandlers ...RecoverBatchFn[B]) *RunningProcessor[T, B]
Run create a RunningProcessor that can accept item. Accept a ProcessBatchFn and a RecoverBatchFn chain to process on error.
func (ProcessorSetup[T, B]) RunIgnoreError ¶
func (p ProcessorSetup[T, B]) RunIgnoreError(process ProcessBatchIgnoreErrorFn[B]) *RunningProcessor[T, B]
func (ProcessorSetup[T, B]) WithSplitter ¶
func (p ProcessorSetup[T, B]) WithSplitter(split SplitBatchFn[B]) ProcessorSetup[T, B]
WithSplitter split the batch into multiple smaller batch. When concurrency > 0 and SplitBatchFn are set, the processor will split the batch and process across multiple threads, otherwise the batch will be process on a single thread, and block when concurrency is reached. This configuration may be beneficial if you have a very large batch that can be split into smaller batch and processed in parallel.
type RecoverBatchFn ¶
RecoverBatchFn function to handle an error batch. Each RecoverBatchFn can further return error to enable the next RecoverBatchFn in the chain. The RecoverBatchFn must never panic.
type Runner ¶
type Runner[T any, B any] interface { // Put add item to the processor. // This method can block until the processor is available for processing new batch, // and may block indefinitely. Put(item T) PutAll(items []T) // PutContext add item to the processor. // If the context is canceled and the item is not added, then this method will return false. // The context passed in only control the put step, after item added to the processor, // the processing will not be canceled by this context. PutContext(ctx context.Context, item T) bool // PutAllContext add all items to the processor. // If the context is canceled, then this method will return the number of items added to the processor. PutAllContext(ctx context.Context, items []T) int // ApproxItemCount return number of current item in processor, approximately. ApproxItemCount() int64 // ItemCount return number of current item in processor. ItemCount() int64 // ItemCountContext return number of current item in processor. // If the context is canceled, then this method will return approximate item count and false. ItemCountContext(ctx context.Context) (int64, bool) // Close stop the processor. // The implementation of this method may vary, but it must never wait indefinitely. Close() error // CloseContext stop the processor. // This method may process the left-over batch on caller thread. // Context can be used to provide deadline for this method. CloseContext(ctx context.Context) error // StopContext stop the processor. // This method does not process leftover batch. StopContext(ctx context.Context) error // DrainContext force process batch util the batch is empty. // This method may process the batch on caller thread. // Context can be used to provide deadline for this method. DrainContext(ctx context.Context) error // FlushContext force process the current batch. // This method may process the batch on caller thread. // Context can be used to provide deadline for this method. FlushContext(ctx context.Context) error // Flush force process the current batch. // This method may process the batch on caller thread. Flush() // MustClose stop the processor and panic if there is any error. // This method should only be used in tests. MustClose() }
Runner provides common methods of a RunningProcessor.
type RunningProcessor ¶
type RunningProcessor[T any, B any] struct { ProcessorSetup[T, B] // contains filtered or unexported fields }
RunningProcessor processor that is running and can process item.
func (*RunningProcessor[T, B]) ApproxItemCount ¶
func (p *RunningProcessor[T, B]) ApproxItemCount() int64
ApproxItemCount return number of current item in processor. This method does not block, so the counter may not be accurate.
func (*RunningProcessor[T, B]) Close ¶
func (p *RunningProcessor[T, B]) Close() error
Close stop the processor. This method will process the leftover branch on caller thread. Return error if maxCloseWait passed. Timeout can be configured by WithMaxCloseWait. See getCloseMaxWait for detail.
func (*RunningProcessor[T, B]) CloseContext ¶
func (p *RunningProcessor[T, B]) CloseContext(ctx context.Context) error
CloseContext stop the processor. This method will process the leftover branch on caller thread. Context can be used to provide deadline for this method.
func (*RunningProcessor[T, B]) DrainContext ¶
func (p *RunningProcessor[T, B]) DrainContext(ctx context.Context) error
DrainContext force process batch util the batch is empty. This method always processes the batch on caller thread. ctx can be used to provide deadline for this method.
func (*RunningProcessor[T, B]) Flush ¶
func (p *RunningProcessor[T, B]) Flush()
Flush force process the current batch. This method may process the batch on caller thread, depend on concurrent and block settings. It is recommended to use [FlushContext] instead.
func (*RunningProcessor[T, B]) FlushContext ¶
func (p *RunningProcessor[T, B]) FlushContext(ctx context.Context) error
FlushContext force process the current batch. This method may process the batch on caller thread, depend on concurrent and block settings. Context can be used to provide deadline for this method.
func (*RunningProcessor[T, B]) IsDisabled ¶
func (p *RunningProcessor[T, B]) IsDisabled() bool
IsDisabled whether the processor is disabled. Disabled processor won't do batching, instead the process will be executed on caller. All other settings are ignored when the processor is disabled.
func (*RunningProcessor[T, B]) ItemCount ¶
func (p *RunningProcessor[T, B]) ItemCount() int64
ItemCount return number of current item in processor. This method will block the processor for accurate counting. It is recommended to use [ItemCountContext] instead.
func (*RunningProcessor[T, B]) ItemCountContext ¶ added in v1.3.0
func (p *RunningProcessor[T, B]) ItemCountContext(ctx context.Context) (int64, bool)
ItemCountContext return number of current item in processor. If the context is canceled, then this method will return approximate item count and false.
func (*RunningProcessor[T, B]) MustClose ¶
func (p *RunningProcessor[T, B]) MustClose()
MustClose stop the processor without deadline.
func (*RunningProcessor[T, B]) Put ¶
func (p *RunningProcessor[T, B]) Put(item T)
Put add item to the processor. This method can block until the processor is available for processing new batch. It is recommended to use [PutContext] instead.
func (*RunningProcessor[T, B]) PutAll ¶ added in v1.4.0
func (p *RunningProcessor[T, B]) PutAll(items []T)
PutAll add all item to the processor. This method will block until all items were put into the processor. It is recommended to use [PutAllContext] instead.
func (*RunningProcessor[T, B]) PutAllContext ¶ added in v1.4.0
func (p *RunningProcessor[T, B]) PutAllContext(ctx context.Context, items []T) int
PutAllContext add all items to the processor. If the context is canceled, then this method will return the number of items added to the processor. The processing order is the same as the input list, so the output can also be used to determine the next item to process if you want to retry or continue processing.
func (*RunningProcessor[T, B]) PutContext ¶ added in v1.3.0
func (p *RunningProcessor[T, B]) PutContext(ctx context.Context, item T) bool
PutContext add item to the processor.
func (*RunningProcessor[T, B]) StopContext ¶ added in v1.1.0
func (p *RunningProcessor[T, B]) StopContext(ctx context.Context) error
StopContext stop the processor. This method does not process leftover batch.
type SliceRunner ¶ added in v1.2.0
SliceRunner shorthand for Runner that merge item into slices.
type SplitBatchFn ¶
SplitBatchFn function to split a batch into multiple smaller batches. The SplitBatchFn must never panic.
func SplitSliceEqually ¶
func SplitSliceEqually[T any, I Size](numberOfChunk I) SplitBatchFn[[]T]
SplitSliceEqually create a SplitBatchFn that split a slice into multiple equal chuck.
func SplitSliceSizeLimit ¶
func SplitSliceSizeLimit[T any, I Size](maxSizeOfChunk I) SplitBatchFn[[]T]
SplitSliceSizeLimit create a SplitBatchFn that split a slice into multiple chuck of limited size.