batchman

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2024 License: MIT Imports: 4 Imported by: 0

README ¶

🦇 batchman

batchman provides an in-memory batcher that calls a function with a batch of items when the batch is full or a timeout is reached.

It is useful for batching requests to a service to reduce the total number of calls made.

For example you may only want to send events to your analytics service in batches of 1000 items or after 10 seconds, whichever comes first.

Batchman provides the controls you need to implement graceful shutdown where you don't lose any data.

Features

  • Supports batching up to a maximum number of items or a maximum time duration.
  • Strongly typed with generics.
  • Thread-safe.
  • Context-aware.
  • Buffered to avoid blocking the caller, with a configurable buffer size.
  • Support for graceful shutdown without losing any items.

Usage

You define a function that will be called with a batch of items. It will never be called with an empty slice. The function will be called in a non-overlapping manner, i.e. the next call will only be made after the previous one has returned.

func Flush(ctx context.Context, items []MyItemType) {
	// Handle the items.
}

You then use the builder pattern with batchman.New[MyItemType]() and configure it with MaxSize, MaxDelay and BufferSize.

After that you can start it and push items to it.

init := batchman.New[MyItemType]().MaxSize(2_000).MaxDelay(10 * time.Second)

batcher, err := init.Start(ctx, Flush)
if err != nil {
	panic(err)
}

err = batcher.Push(MyItemType{Some: "value"})
if err != nil {
	panic(err)
}

When you are done (or shutting down your program), cancel the context passed to Start to stop the batcher. The Done channel will be closed when the batcher has finished flushing the remaining data.

cancel()

<-batcher.Done()
Full example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/friendlycaptcha/batchman"
)

// MyItemType is a type that will be batched.
type MyItemType struct {
	ID   int
	Name string
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())

	// Define a function that will be called with a batch of items.
	flush := func(_ context.Context, items []MyItemType) {
		fmt.Println("Flushing a batch of", len(items), "items")
		time.Sleep(1 * time.Second)
	}

	init := batchman.New[MyItemType]().MaxSize(2000).MaxDelay(10 * time.Second)

	// Start the batcher, it will only error immediately or not at all. This is a non-blocking call.
	batcher, err := init.Start(ctx, flush)
	if err != nil {
		panic(err)
	}

	for i := 0; i < 5000; i++ {
		// Add items to the batcher, this is a non-blocking call.
		err = batcher.Push(MyItemType{ID: i, Name: "Some Name"})
		if err != nil {
			// This errors if the batcher has been stopped (by cancelling the context), or if the
			// buffer is full and the batcher is unable to accept more items.
			panic(err)
		}
	}

	cancel()

	// Wait for the batcher to finish flushing.
	<-batcher.Done()
}

For more examples, see the example directory.

Tips

Graceful Shutdown

To gracefully shutdown the batcher, cancel the context passed to Start. This will cause the batcher to flush any remaining items and return. The Done channel will be closed when the batcher has finished flushing.

The context that is passed to the flush function is cancelled when the batcher is stopped, you can use this to stop any long-running flush operations. Note that any remaining data in the buffer will have the flush function called with a cancelled context, so you should decide how to handle this in your implementation.

You may want to use context.WithoutCancel to create a new context that is not cancelled when the batcher is stopped. This can be useful if you want to give the last remaining batches a chance to finish when shutting down your program.

func MyFlushFunc(ctx context.Context, items []MyItemType) {
    ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5 * time.Second)
    defer cancel()
    
    // Handle the items.
}

Alternatively you can also use a select { case <- ctx.Done() ... } to check if the context has since been cancelled.

Timeout on shutdown

While unlikely, it is possible that the buffer is backed up and there are multiple batches that need to be flushed on shutdown.

You may want to wait a maximum time for batcher.Done() to close.

select {
case <-batcher.Done():
    // Done flushing all items.
case <-time.After(20 * time.Second):
    // Timed out waiting for the batcher to finish flushing.
}

License

MIT

Documentation ¶

Overview ¶

Package batchman provides an in-memory batching mechanism for items of a given type.

Index ¶

Constants ¶

This section is empty.

Variables ¶

View Source
var (
	// ErrInvalidMaxSize is returned when the max size is invalid.
	ErrInvalidMaxSize = errors.New("max size must be greater than 0")
	// ErrInvalidBufferSize is returned when the buffer size is invalid.
	ErrInvalidBufferSize = errors.New("buffer size must be greater than 0")
	// ErrInvalidMaxDelay is returned when the max delay is invalid.
	ErrInvalidMaxDelay = errors.New("max delay must be greater than 0")

	// ErrBufferFull is returned when the buffer is full. This means the item was not added.
	ErrBufferFull = errors.New("buffer is full")

	// ErrBatcherStopped is returned when the batcher has been stopped (through a context cancellation).
	// No more items can be added.
	ErrBatcherStopped = errors.New("batcher has been stopped")
)

Functions ¶

This section is empty.

Types ¶

type Batcher ¶

type Batcher[T any] struct {
	// contains filtered or unexported fields
}

Batcher is a controller that batches items of a given type into batches with a maximum size or after a maximum delay.

func (*Batcher[T]) CurrentBufferSize ¶

func (b *Batcher[T]) CurrentBufferSize() int

CurrentBufferSize returns the current amount of items in the buffer.

Note that the buffer is not the amount of items pending to be flushed, it doesn't include items currently being flushed or being grouped into the next batch.

You can use this to monitor the buffer size, when the buffer fills up you won't be able to push additional items.

func (*Batcher[T]) Done ¶

func (b *Batcher[T]) Done() <-chan struct{}

Done returns a channel that is closed when the batcher has stopped completely. Once a batcher has stopped, no more items can be pushed, and it can not be started again.

The batcher stops when the parent context is cancelled, but it will flush the remaining items in the buffer. This means that the stopped channel is closed after the last item has been flushed. Depending on the implementation of the flush function, this might take some time.

func (*Batcher[T]) Push ¶

func (b *Batcher[T]) Push(item T) error

Push an item to the batcher. If the buffer is full, an error is returned.

type Builder ¶

type Builder[T any] struct {
	// contains filtered or unexported fields
}

Builder is a builder for creating a new Batcher.

func New ¶

func New[T any]() *Builder[T]

New creates a new Builder with default values. The default values are a batch size of 1,000, a maximum delay of 10 seconds, and a buffer size of 10,000 items.

func (*Builder[T]) BufferSize ¶

func (b *Builder[T]) BufferSize(bufferSize int) *Builder[T]

BufferSize sets the buffer size for the batcher.

func (*Builder[T]) MaxDelay ¶

func (b *Builder[T]) MaxDelay(maxDelay time.Duration) *Builder[T]

MaxDelay sets the maximum delay before flushing the batch.

func (*Builder[T]) MaxSize ¶

func (b *Builder[T]) MaxSize(maxSize int) *Builder[T]

MaxSize sets the maximum number of items to batch together.

func (*Builder[T]) Start ¶

func (b *Builder[T]) Start(ctx context.Context, flush func(ctx context.Context, values []T)) (*Batcher[T], error)

Start a new Batcher with the configured values. This returns an error immediately if the configuration is invalid. If the context is cancelled, the Batcher will stop and flush any remaining items.

Directories ¶

Path Synopsis
example
graceful_shutdown command
Package main provides example binaries that demonstrates how the batcher behaves.
Package main provides example binaries that demonstrates how the batcher behaves.
readme command
Package readme contains the code snippet from the README.md file.
Package readme contains the code snippet from the README.md file.
stress command
Package main provides a stress test for the batcher package.
Package main provides a stress test for the batcher package.
Package mock provides a mock implementation of a batch handler.
Package mock provides a mock implementation of a batch handler.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL