package module
v0.0.0-...-fd3d795 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jul 8, 2015 License: BSD-3-Clause Imports: 5 Imported by: 21




Package muster provides a framework for writing libraries that internally batch operations.

It will be useful to you if you're building an API that benefits from performing work in batches for whatever reason. Batching is triggered based on a maximum number of items in a batch, and/or based on a timeout for how long a batch waits before it is dispatched. For example if you're willing to wait for a maximum of a 1m duration, you can just set BatchTimeout and keep adding things. Or if you want batches of 50 just set MaxBatchSize and it will only fire when the batch is filled. For best results set both.

It would be in your best interest to use this library in a hidden fashion in order to avoid unnecessary coupling. You will typically achieve this by ensuring your implementation of muster.Batch and the use of muster.Client are private.

package main

import (


// The ShoppingClient manages the shopping list and dispatches shoppers.
type ShoppingClient struct {
	MaxBatchSize        uint          // How much a shopper can carry at a time.
	BatchTimeout        time.Duration // How long we wait once we need to get something.
	PendingWorkCapacity uint          // How long our shopping list can be.
	muster              muster.Client

// The ShoppingClient has to be started in order to initialize the underlying
// work channel as well as the background goroutine that handles the work.
func (s *ShoppingClient) Start() error {
	s.muster.MaxBatchSize = s.MaxBatchSize
	s.muster.BatchTimeout = s.BatchTimeout
	s.muster.PendingWorkCapacity = s.PendingWorkCapacity
	s.muster.BatchMaker = func() muster.Batch { return &batch{Client: s} }
	return s.muster.Start()

// Similarly the ShoppingClient has to be stopped in order to ensure we flush
// pending items and wait for in progress batches.
func (s *ShoppingClient) Stop() error {
	return s.muster.Stop()

// The ShoppingClient provides a typed Add method which enqueues the work.
func (s *ShoppingClient) Add(item string) {
	s.muster.Work <- item

// The batch is the collection of items that will be dispatched together.
type batch struct {
	Client *ShoppingClient
	Items  []string

// The batch provides an untyped Add to satisfy the muster.Batch interface. As
// is the case here, the Batch implementation is internal to the user of muster
// and not exposed to the users of ShoppingClient.
func (b *batch) Add(item interface{}) {
	b.Items = append(b.Items, item.(string))

// Once a Batch is ready, it will be Fired. It must call notifier.Done once the
// batch has been processed.
func (b *batch) Fire(notifier muster.Notifier) {
	defer notifier.Done()
	fmt.Println("Delivery", b.Items)

func main() {
	sm := &ShoppingClient{
		MaxBatchSize:        3,
		BatchTimeout:        20 * time.Millisecond,
		PendingWorkCapacity: 100,

	// We need to start the muster.
	if err := sm.Start(); err != nil {

	// Since our capacity is 3, these 3 will end up in a batch as soon as the
	// third item has been added.

	// Since our timeout is 20ms, these 2 will end up in a batch once we Sleep.
	time.Sleep(500 * time.Millisecond)

	// Finally this 1 will also get batched as soon as we Stop which flushes.

	// Stopping the muster ensures we wait for all batches to finish.
	if err := sm.Stop(); err != nil {




This section is empty.


This section is empty.


This section is empty.


type Batch

type Batch interface {
	// This should add the given single item to the Batch. This is the "other
	// end" of the Client.Work channel where your application will send items.
	Add(item interface{})

	// Fire off the Batch. It should call Notifier.Done() when it has finished
	// processing the Batch.
	Fire(notifier Notifier)

Batch collects added items. Fire will be called exactly once. The Batch does not need to be safe for concurrent access; synchronization will be handled by the Client.

type Client

type Client struct {
	// Maximum number of items in a batch. If this is zero batches will only be
	// dispatched upon hitting the BatchTimeout. It is an error for both this and
	// the BatchTimeout to be zero.
	MaxBatchSize uint

	// Duration after which to send a pending batch. If this is zero batches will
	// only be dispatched upon hitting the MaxBatchSize. It is an error for both
	// this and the MaxBatchSize to be zero.
	BatchTimeout time.Duration

	// MaxConcurrentBatches determines how many parallel batches we'll allow to
	// be "in flight" concurrently. Once these many batches are in flight, the
	// PendingWorkCapacity determines when sending to the Work channel will start
	// blocking. In other words, once MaxConcurrentBatches hits, the system
	// starts blocking. This allows for tighter control over memory utilization.
	// If not set, the number of parallel batches in-flight will not be limited.
	MaxConcurrentBatches uint

	// Capacity of work channel. If this is zero, the Work channel will be
	// blocking.
	PendingWorkCapacity uint

	// This function should create a new empty Batch on each invocation.
	BatchMaker func() Batch

	// Once this Client has been started, send work items here to add to batch.
	Work chan interface{}
	// contains filtered or unexported fields

The Client manages the background process that makes, populates & fires Batches.

func (*Client) Start

func (c *Client) Start() error

Start the background worker goroutines and get ready for accepting requests.

func (*Client) Stop

func (c *Client) Stop() error

Stop gracefully and return once all processing has finished.

type Notifier

type Notifier interface {
	// Calling Done will indicate the batch has finished processing.

Notifier is used to indicate to the Client when a batch has finished processing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL