package module
Version: v0.0.0-...-82de15b Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2020 License: MIT Imports: 2 Imported by: 0



A nice little package with no dependencies for fan-in batching of highly concurrent workloads

GoDoc Go Report Card Code Coverage

The throughput of APIs, web services and background workers can sometimes be improved by orders of magnitude through the introduction of artificial latency in support of concurrent batching. These efficiency improvements can result in increased service stability and total system throughput while lowering infrastructure costs.

This is a general purpose library for concurrent batching of any sort of operation one might desire. It could be used to batch SQL inserts, API calls, disk writes, queue messages, stream records, emails, etc. The batcher hides asynchronous processing behind a syncronous interface.


Here are a few examples illustrating performance improvements for different use cases


architecture diagram

package batchy

type Processor func(items []interface{}) []error

func New(itemLimit int, waitTime time.Duration, processor Processor) *batcher

type Batcher interface {
    Add(interface{}) error


// 100 max batch size
// 100 milliseconds max batch wait time
var batcher = batchy.New(100, 100*time.Millisecond, func(items []interface{}) (errs []error) {
	q := fmt.Sprintf(`INSERT INTO table1 (data) VALUES %s`,
		strings.Trim(strings.Repeat(`(?),`, len(items)), ","))
	_, err := db.Exec(q, items...)
	if err != nil {
		errs = make([]error, len(items))
		for i := range errs {
			errs[i] = err
// Call to Add blocks calling go routine for up to 100ms + processing time.
// If batch is filled before wait time expires, blocking will be reduced.
// Wait time begins when the first item is added to a batch.
err := batcher.Add("data")


This package makes use of Go's empty interface interface{}. For this reason, it is best not to export any Batcher directly from your package. Instead the batcher should be hidden behind an existing synchronous interface.

Suppose you have the following code that writes bytes to a file:

package repo

import (

type DataWriter interface {
	Write(data []byte) error

type dataWriter struct{}

func (r *dataWriter) Write(data []byte) error {
	return ioutil.WriteFile("test1", data, 0644)

func NewDataWriter() *dataWriter {
	return &dataWriter{}

You could create a batched version that satisfies the same interface:

package repo

import (


type dataWriterBatched struct {
	batcher batchy.Batcher

func (r *dataWriterBatched) Write(data []byte) error {
	return r.batcher.Add(data)

func NewDataWriterBatched(maxItems int, maxWait time.Duration) *dataWriterBatched {
	return &dataWriterBatched{batchy.New(maxItems, maxWait, func(items []interface{}) []error {
		errs := make([]error, len(items))
		var data []byte
		for _, d := range items {
			data = append(data, d.([]byte)...)
		err := ioutil.WriteFile("test2", data, 0644)
		if err != nil {
			for i := range errs {
				errs[i] = err
		return errs

Now during dependency injection just replace

dw := repo.NewDataWriter()


dw := repo.NewDataWriterBatched()

and your code shouldn't need to know the difference because you've used interfaces to effectively hide the implementation details (in this case, the use of batching).


I created this repository because:

  1. I frequently see gophers get concurrent batching wrong.
  2. I frequently see gophers avoid batching altogether because concurrency is hard.
  3. I frequently need this sort of batching and I'd rather not solve the same problem differently for every project.


$ go test ./... -bench=. -benchmem

BenchmarkBatcher/itemLimit_10-12          1000000   1042 ns/op   207 B/op   3 allocs/op
BenchmarkBatcher/itemLimit_20-12          2104995    582 ns/op   110 B/op   2 allocs/op
BenchmarkBatcher/itemLimit_100-12         2558310    479 ns/op    80 B/op   1 allocs/op
BenchmarkBatcher/itemLimit_1000-12        2860184    425 ns/op    66 B/op   1 allocs/op
BenchmarkBatcher100ms/itemLimit_10-12     1002462   1188 ns/op   182 B/op   2 allocs/op
BenchmarkBatcher100ms/itemLimit_20-12     1490853    865 ns/op   110 B/op   1 allocs/op
BenchmarkBatcher100ms/itemLimit_100-12    2189893    592 ns/op    65 B/op   1 allocs/op
BenchmarkBatcher100ms/itemLimit_1000-12   2211993    499 ns/op    51 B/op   1 allocs/op
ok    21.737s

Case Study

Benefits of artificial latency




View Source
const ErrBatcherStopped = err("Batcher Stopped")

ErrBatcherStopped indicates that the batcher is not accepting new items because it has been stopped


This section is empty.


func New

func New(itemLimit int, waitTime time.Duration, processor Processor) *batcher

New returns a new batcher - itemLimit indicates the maximum number of items per batch - waitTime indicates the amount of time to wait before processing a non-full batch - processor is the processing function to call for the batch


type Batcher

type Batcher interface {
	// Add adds an item to the current batch
	Add(interface{}) error

	// Stop stops the batcher

Batcher can add an item, returning the corresponding error

type Processor

type Processor func(items []interface{}) []error

Processor is a function that accepts items and returns a corresponding array of errors


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL