Version: v0.0.0-...-711b27a Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2018 License: MIT Imports: 3 Imported by: 0



Package batch contains the core batch processing functionality. The main class is Batch, which can be created using New. It reads from an implementation of the Source interface, and items are processed in batches by an implementation of the Processor interface. Some Source and Processor implementations are provided in the source and processor packages, respectively, or you can create your own based on your needs.

Batch uses the MinTime, MinItems, MaxTime, and MaxItems configuration parameters in Config to determine when and how many items are processed at once.

These parameters may conflict, however; for example, during a slow time, MaxTime may be reached before MinItems are read. Thus it is necessary to prioritize the parameters in some way. They are prioritized as follows (with EOF signifying the end of the input data):

MaxTime = MaxItems > EOF > MinTime > MinItems

A few examples:

MinTime = 2s. After 1s the input channel is closed. The items are processed right away.

MinItems = 10, MinTime = 2s. After 1s, 10 items have been read. They are not processed until 2s has passed (along with all other items that have been read up to the 2s mark).

MaxItems = 10, MinTime = 2s. After 1s, 10 items have been read. They aren't processed until 2s has passed.

Note that the timers and item counters are relative to the time when the previous batch started processing. Just before the timers and counters are started the config is read from the Config interface. This is so that the configuration can be changed at any time during processing.

package main

import (


// printProcessor is a Processor that prints items in batches.
// To demonstrate how errors can be handled, it fails to process the number 5.
type printProcessor struct{}

// Process prints a batch of items.
func (p printProcessor) Process(ctx context.Context, ps *batch.PipelineStage) {
	// Process needs to close ps after it's done
	defer ps.Close()

	toPrint := make([]interface{}, 0, 5)
	for item := range ps.Input {
		// Get returns the item itself
		if item.Get() == 5 {
			ps.Errors <- errors.New("cannot process 5")

		toPrint = append(toPrint, item.Get())


func main() {
	// Create a batch processor that processes items 5 at a time
	config := batch.NewConstantConfig(&batch.ConfigValues{
		MinItems: 5,
	b := batch.New(config)
	p := &printProcessor{}

	// Channel is a Source that reads from a channel until it's closed
	ch := make(chan interface{})
	s := source.Channel{
		Input: ch,

	// Go runs in the background while the main goroutine processes errors
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	errs := b.Go(ctx, &s, p)

	// Spawn a goroutine that simulates loading data from somewhere
	go func() {
		for i := 0; i < 20; i++ {
			time.Sleep(time.Millisecond * 10)
			ch <- i

	// Wait for errors. When the error channel is closed the pipeline has been
	// completely drained. Alternatively, we could wait for Done.
	var lastErr error
	for err := range errs {
		lastErr = err

	fmt.Println("Finished processing.")
	if lastErr != nil {
		fmt.Println("Found error:", lastErr.Error())

[0 1 2 3 4]
[6 7 8 9]
[10 11 12 13 14]
[15 16 17 18 19]
Finished processing.
Found error: cannot process 5




This section is empty.


This section is empty.


func IgnoreErrors

func IgnoreErrors(errs <-chan error)

IgnoreErrors starts a goroutine that reads errors from errs but ignores them. It can be used with Batch.Go if errors aren't needed. Since the error channel is unbuffered, one cannot just throw away the error channel like this:

// NOTE: bad - this can cause a deadlock!
_ = batch.Go(ctx, p, s)

Instead, IgnoreErrors can be used to safely throw away all errors:

batch.IgnoreErrors(myBatch.Go(ctx, p, s))


type Batch

type Batch struct {
	// contains filtered or unexported fields

Batch provides batch processing given an Source and a Processor. Data is read from the Source and processed in batches by the Processor. Any errors are wrapped in either a SourceError or a ProcessorError, so the caller can determine where the errors came from.

To create a new Batch, call the New function. Creating one using &Batch{} will return the default Batch.

// The following are equivalent
defaultBatch1 := &batch.Batch{}
defaultBatch2 := batch.New(nil)
defaultBatch3 := batch.New(batch.NewConstantConfig(&batch.ConfigValues{}))

The defaults (with nil Config) provide a usable, but likely suboptimal, Batch where items are processed as soon as they are retrieved from the source. Processing is done in the background using as many goroutines as necessary.

Both Source and Processor are given a PipelineSource, which contains channels for input and output, as well as an error channel. Items in the channel are wrapped in an Item struct that contains extra metadata used by Batch. For easier usage, the helper function NextItem can be used to read from the input channel, set the data, and return the modified Item:

ps.Output() <- batch.NextItem(ps, item)

Batch runs asynchronously until the source closes its PipelineSource, signaling that there is nothing else to read. Once that happens, and the pipeline has been drained (all items have been processed), there are two ways for the caller to know: the error channel returned from Go is closed, or the channel returned from Done is closed.

The first way can be used if errors need to be processed elsewhere. A simple loop could look like this:

errs := myBatch.Go(ctx, s, p)
for err := range errs {
  // Log the error here...
// Now batch processing is done

If the errors don't need to be processed, the IgnoreErrors function can be used to drain the error channel. Then the Done channel can be used to determine whether or not batch processing is complete:

batch.IgnoreErrors(myBatch.Go(ctx, s, p))
// Now batch processing is done

Note that the errors returned on the error channel may be wrapped in a batch.Error so the caller knows whether they come from the source or the processor (or neither). Errors from the source will be of type SourceError, and errors from the processor will be of type ProcessorError. Errors from Batch itself will be neither.

func New

func New(config Config) *Batch

New creates a new Batch based on specified config. If config is nil, the default config is used as described in Batch.

To avoid race conditions, the config cannot be changed after the Batch is created. Instead, implement the Config interface to support changing values.

func (*Batch) Done

func (b *Batch) Done() <-chan struct{}

Done provides an alternative way to determine when processing is complete. When it is, the channel is closed, signaling that everything is done.

func (*Batch) Go

func (b *Batch) Go(ctx context.Context, s Source, p Processor) <-chan error

Go starts batch processing asynchronously and returns a channel on which errors are written. When processing is done and the pipeline is drained, the error channel is closed.

Even though Go has several goroutines running concurrently, concurrent calls to Go are not allowed. If Go is called before a previous call completes, the second one will panic.

// NOTE: bad - this will panic!
errs := batch.Go(ctx, s, p)
errs2 := batch.Go(ctx, s, p) // this call panics

Note that Go does not stop if ctx is done. Otherwise loss of data could occur. Suppose the source reads item A and then ctx is canceled. If Go were to return right away, item A would not be processed and it would be lost.

To avoid situations like that, a proper way to handle context completion is for the source to check for ctx done and then close its channels. The batch processor realizes the source is finished reading items and it sends all remaining items to the processor for processing. Once the processor is done, it closes its error channel to signal to the batch processor. Finally, the batch processor signals to its caller that processing is complete and the entire pipeline is drained.

type Config

type Config interface {
	// Get returns the values for configuration.
	// If MinItems > MaxItems or MinTime > MaxTime, the min value will be
	// set to the maximum value.
	// If the config values may be modified during batch processing, Get
	// must properly handle concurrency issues.
	Get() ConfigValues

Config retrieves the config values used by Batch. If these values are constant, NewConstantConfig can be used to create an implementation of the interface.

type ConfigValues

type ConfigValues struct {
	// MinTime specifies that a minimum amount of time that should pass
	// before processing items. The exception to this is if a max number
	// of items was specified and that number is reached before MinTime;
	// in that case those items will be processed right away.
	MinTime time.Duration `json:"minTime"`

	// MinItems specifies that a minimum number of items should be
	// processed at a time. Items will not be processed until MinItems
	// items are ready for processing. The exceptions to that are if MaxTime
	// is specified and that time is reached before the minimum number of
	// items is available, or if all items have been read and are ready
	// to process.
	MinItems uint64 `json:"minItems"`

	// MaxTime specifies that a maximum amount of time should pass before
	// processing. Once that time has been reached, items will be processed
	// whether or not MinItems items are available.
	MaxTime time.Duration `json:"maxTime"`

	// MaxItems specifies that a maximum number of items should be available
	// before processing. Once that number of items is available, they will
	// be processed whether or not MinTime has been reached.
	MaxItems uint64 `json:"maxItems"`

ConfigValues is a struct that contains the Batch config values.

type ConstantConfig

type ConstantConfig struct {
	// contains filtered or unexported fields

ConstantConfig is a Config with constant values. Create one with NewConstantConfig.

func NewConstantConfig

func NewConstantConfig(values *ConfigValues) *ConstantConfig

NewConstantConfig returns a Config with constant values. If values is nil, the default values are used as described in Batch.

func (*ConstantConfig) Get

func (b *ConstantConfig) Get() ConfigValues

Get implements the Config interface.

type Error

type Error interface {
	// Original returns the original (unwrapped) error.
	Original() error

Error is a wrapped error message returned on the error channel.

type Item

type Item struct {
	// contains filtered or unexported fields

Item holds a single item in the batch processing pipeline.

func NextItem

func NextItem(ps *PipelineStage, data interface{}) *Item

NextItem retrieves the next source item from the input channel of ps, sets its data, and returns it. If the input channel is closed, it returns nil. NextItem can be used in the source Read function:

func (s *source) Read(ctx context.Context, ps batch.PipelineStage) {
  // Read data into myData...
  items <- batch.NextItem(ps, myData)
  // ...

func (*Item) Get

func (i *Item) Get() interface{}

Get returns the item data.

func (*Item) GetID

func (i *Item) GetID() uint64

GetID returns a unique ID of the current item in the pipeline.

func (*Item) Set

func (i *Item) Set(item interface{})

Set sets the item data.

type MockItemGenerator

type MockItemGenerator struct {
	// contains filtered or unexported fields

MockItemGenerator generates mock Items with unique IDs. Items are generated in a separate goroutine and added to a channel, which can be retrieved by calling GetCh.

func NewMockItemGenerator

func NewMockItemGenerator() *MockItemGenerator

NewMockItemGenerator returns a new MockItemGenerator.

After using it, call Close to prevent a goroutine leak.

func (*MockItemGenerator) Close

func (m *MockItemGenerator) Close()

Close stops a MockItemGenerator's goroutine

func (*MockItemGenerator) GetCh

func (m *MockItemGenerator) GetCh() <-chan *Item

GetCh returns a channel of Items with unique IDs.

type PipelineStage

type PipelineStage struct {
	// Input contains the input items for a pipeline stage.
	Input <-chan *Item

	// Output is for the output of the pipeline stage.
	Output chan<- *Item

	// Error is for any errors encountered during the pipeline stage.
	Errors chan<- error

PipelineStage contains the input and output channels for a single stage of the batch pipeline.

func (*PipelineStage) Close

func (p *PipelineStage) Close()

Close closes the pipeline stage.

Note that it will also close the write channels. Do not close them separately or it will panic.

type Processor

type Processor interface {
	// Process processes items from ps's Input channel and returns any errors
	// encountered on the Errors channel. When it is done, it must close ps
	// to signify that it's finished processing. Simply returning isn't enough.
	//    func (p *processor) Process(ctx context.Context, ps *batch.PipelineStage) {
	//      defer ps.Close()
	//      // Do processing here...
	//    }
	// Batch does not wait for Process to finish, so it can spawn a
	// goroutine and then return, as long as ps is closed at the end.
	//    // This is ok
	//    func (p *processor) Process(ctx context.Context, ps *batch.PipelineStage) {
	//      go func() {
	//        defer ps.Close()
	//        time.Sleep(time.Second)
	//        fmt.Println(items)
	//      }()
	//    }
	// To allow Processors to be chained together, processed items should
	// be returned on the Output channel:
	//    // Process squares values in batches.
	//    func (p *processor) Process(ctx context.Context, ps *batch.PipelineStage) {
	//      defer ps.Close()
	//      for item := range ps.Input() {
	//        value, _ := item.Get().(int64)
	//        item.Set(value*value)
	//        ps.Output() <- item
	//      }
	//    }
	// Process may be run in any number of concurrent goroutines. If
	// concurrency needs to be limited it must be done in Process; for
	// example, by using a semaphore channel.
	Process(ctx context.Context, ps *PipelineStage)

Processor processes items in batches.

type ProcessorError

type ProcessorError struct {
	// contains filtered or unexported fields

ProcessorError is an error returned from the processor.

func (ProcessorError) Error

func (e ProcessorError) Error() string

Error implements error. It returns the error string of the original error.

func (ProcessorError) Original

func (e ProcessorError) Original() error

Original implements Error. It returns the original error.

type Source

type Source interface {
	// Read reads items from somewhere and writes them to the Output
	// channel of ps. Any errors it encounters while reading are written to the
	// Errors channel. The Input channel provides a steady stream of Items that
	// have pre-set metadata so the batch processor can identify them. A helper
	// function, NextItem, can be used to retrieve an item from the channel,
	// set it, and return it:
	//    items <- batch.NextItem(ps, myData)
	// Read is only run in a single goroutine. Any currency must be provided
	// by the implementation.
	// Once reading is finished (or when the program ends), the batch
	// processor needs to be notified. This is done by calling the Close
	// method on ps, which signals to Batch that it should drain the pipeline
	// and finish. It is not enough for Read to return.
	//    func (s source) Read(ctx context.Context, ps *batch.PipelineStage) {
	//      defer ps.Close()
	//      // Read items until done...
	//    }
	// Read should not modify an item after adding it to items.
	Read(ctx context.Context, ps *PipelineStage)

Source reads items that are to be batch processed.

type SourceError

type SourceError struct {
	// contains filtered or unexported fields

SourceError is an error returned from the source.

func (SourceError) Error

func (e SourceError) Error() string

Error implements error. It returns the error string of the original error.

func (SourceError) Original

func (e SourceError) Original() error

Original implements Error. It returns the original error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL