Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Batcher ¶
type Batcher[T, R any] struct { // contains filtered or unexported fields }
Batcher collects items concurrently passed to Add into batches and calls a processing function on them a batch at a time. The processing function will be called on batches of items in a single-threaded manner, and Add will block while fn is running.
func NewBatcher ¶
func NewBatcher[T, R any](fn func([]T) R, opts BatcherOptions, timeSource clock.TimeSource) *Batcher[T, R]
NewBatcher creates a Batcher. `fn` is the processing function, `opts` are the timing options. `clock` is usually clock.NewRealTimeSource but can be a fake time source for testing.
func (*Batcher[T, R]) Add ¶
Add adds an item to the stream and returns when it has been processed, or if the context is canceled or times out. It returns two values: the value that the batch processor returned for the whole batch that the item ended up in, and a context error. Even if Add returns a context error, the item may still be processed in the future!
type BatcherOptions ¶
type BatcherOptions struct { // MaxItems is the maximum number of items in a batch. MaxItems int // MinDelay is how long to wait for no more items to come in after any item before // finishing the batch. MinDelay time.Duration // MaxDelay is the maximum time to wait after the first item in a batch before finishing // the batch. MaxDelay time.Duration // IdleTime is the time after which the internal goroutine will exit, to avoid wasting // resources on idle streams. IdleTime time.Duration }