batch

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2025 License: MIT Imports: 5 Imported by: 0

README

Documentation Go workflow CircleCI codecov Go Report Card GitHub tag (latest SemVer)

batch

batch is a library to make concurrent work batcheable and reliable. Each worker either has its work committed or gets an error.

Hope is not a strategy. (from Google SRE book)

No more batch operations that add its data to a batch and go away hoping it would be committed.

This is all without timeouts, additional goroutines, allocations, and channels.

How it works

  • Each worker adds its work to a shared batch.
  • If there are no more workers in the queue the last one executes commit, the others wait.
  • Every worker in the batch gets the same result and error.

Usage

// General pattern is
// Queue.In -> Enter -> defer Exit -> add work to the batch -> Commit/Cancel/panic/return

var sum int

bc := batch.Coordinator[int]{
	// Required
	Commit: func(ctx context.Context) (int, error) {
		// commit sum
		return sum, err
	},
}

for j := 0; j < N; j++ {
	go func(j int) {
		ctx := context.WithValue(ctx, workerID{}, j) // can be obtained in Coordinator.Commit

		bc.Queue().In() // let others know we are going to join

		data := 1 // prepare data

		idx := bc.Enter(true)
		defer bc.Exit()

		if idx == 0 { // we are first in the batch, reset it
			sum = 0
		}

		sum += data // add data to the batch

		res, err := bc.Commit(ctx)
		if err != nil { // works the same as we had independent commit in each goroutine
			_ = err
		}

		// batching is transparent for worker
		_ = res
	}(j)
}

See the all available options in the doc.

Batch is error and panic proof which means the user code can return error or panic in any place, but as soon as all the workers left the batch its state is reset. But not the external state, it's callers responsibility to keep it consistent.

Documentation

Overview

batch made easy.

batch is a safe and efficient way to combine results of multiple parallel tasks into one heavy "commit" (save) operation. batch ensures each task is either receive successful result if its results have been committed (possibly as a parth of a batch) or an error in other case.

Logic for Coordinator

                    -------------------------
-> worker0 ------> | common cumulative state |            /--> worker0
-> worker1 ------> | collected from multiple | -> result  ---> worker1
-> worker2 ------> | workers and committed   |            \--> worker2
-> worker3 ------> | all at once             |             \-> worker3
                    -------------------------

Logic for Multi

Workers are distributed among multiple coaches and then each coach works the same as in Coordinator case.

                    -------------------------
              | -> | workers are ditributed  |            /--> worker0
-> worker0 -> | -> | among free coaches      | -> result  ---> worker1
-> worker1 -> |     -------------------------
-> worker2 -> |
-> worker3 -> |     -------------------------
              | -> | state is combined in a  |            /--> worker2
              | -> | few independent coaches | -> result  ---> worker3
                    -------------------------

Full Guide

bc.Queue().In() // get in the queue

// Get data ready for the commit.
// All the independent from other workers operations should be done here.
data := 1

if leave() {
	bc.Queue().Out() // Get out of the queue.
	bc.Notify()      // Notify waiting goroutines we won't come.
	return errNoNeed
}

idx := bc.Enter(true) // true to block and wait, false for non-blocking return if batch is in the process of commiting.
                     // Just like with the Mutex there are no guaranties who will enter first.
if idx < 0 {        // In non-blocking mode we didn't Enter the batch, it's always >= 0 in blocking mode.
	return errBusy // No need to do anything here.
}

defer bc.Exit() // if we entered we must exit
_ = 0          // calling it with defer ensures state consistency in case of panic

// We are inside locked Mutex between Enter and Exit.
// So the shared state can be modified safely.
// That also means that all the long and heavy operations
// should be done before Enter.

if idx == 0 { // we are first in the batch, reset the state
	sum = 0
}

if full() {            // if we didn't change the state we can just leave.
	bc.Trigger()      // Optionally we can trigger the batch.
	                 // Common use case is to flush the data if we won't fit.
	return errRetry // Then we may return and retry in the next batch.
}

sum += data // adding our work to the batch

if spoilt() {						   // If we spoilt the satate and want to abort commit
	_, err = bc.Cancel(ctx, causeErr) // cancel the batch. Commit isn't done, all workers get causeErr.
	                                 // If causeErr is nil it's set to Canceled.
}

if failed() {                          // Suppose some library panicked here.
	panic("we can safely panic here") // Panic will be propogated to the caller,
	                                 // other goroutines in the batch will get PanicError.
}

if urgent() {     // If we want to commit immideately
	bc.Trigger() // trigger it.
	            // Workers already added their job will be committed too.
			   // Workers haven't entered the batch will go to the next one.
}

res, err := bc.Commit(ctx) // Call Commit.
                          // The last goroutine entered the batch calls the actual Coordinator.Commit.
                         // All the others wait and get the same res and error.

Batch is a safe wrapper around Coordinator. It will call missed methods if that makes sense or panic otherwise.

Multi is a coordinator with N available parallel batches. Suppose you have 3 db replicas so you can distribute load across them. Multi.Enter will enter the first free coach (replica in our example) and return its index. The rest is similar to Coordinator. Custom logic for choosing coach can be used by setting Multi.Balancer.

MultiBatch is a safe wrapper around Multi just like Batch wraps Coordinator.

Index

Examples

Constants

This section is empty.

Variables

View Source
var Canceled = errors.New("batch canceled")

Canceled is a default error returned to workers if Cancel was called with nil err.

Functions

This section is empty.

Types

type Batch

type Batch[Res any] struct {
	// contains filtered or unexported fields
}
Example
package main

import (
	"context"
	"errors"
	"log"
	"sync"

	"nikand.dev/go/batch"
)

type (
	SafeService struct {
		sum int // state we collect to commit together

		bc *batch.Coordinator[int] // [int] is the result value type, set to struct{} if don't need it
	}

	contextKeySafe struct{}
)

func NewSafeService() *SafeService {
	s := &SafeService{}

	s.bc = batch.New(s.commit)

	return s
}

func (s *SafeService) commit(ctx context.Context) (int, error) {
	// suppose some heavy operation here
	// update a file or write to db

	log.Printf("* * *  commit %2d  * * *", s.sum)

	return s.sum, nil
}

func (s *SafeService) DoWork(ctx context.Context, data int) (int, error) {
	b := batch.By(s.bc)
	defer b.Exit() // it's like Mutex.Unlock, but safely works even if we didn't enter
	_ = 0          // Must be called with defer to outlive panics

	b.QueueIn() // let others know we are going to join

	_ = data // prepare data

	idx := b.Enter(true) // true for blocking, false if we want to leave instead of waiting
	if idx < 0 {         // we haven't entered the batch in non blocking mode
		return 0, errors.New("not this time") // we have to leave in that case
	}

	if idx == 0 { // we are first in the batch, reset the state
		s.sum = 0
		log.Printf("* * * reset batch * * *")
	}

	log.Printf("worker %2d got in with index %2d", ctx.Value(contextKeySafe{}), idx)

	s.sum += data // add our work to the batch

	// only one of return/Cancel/Commit must be called and only once
	res, err := b.Commit(ctx)
	if err != nil { // batch failed, each worker in it will get the same error
		return 0, err
	}

	log.Printf("worker %2d got result %v %v", ctx.Value(contextKeySafe{}), res, err)

	// if we are here, all of the workers have their work committed

	return res, nil
}

func main() {
	const jobs = 5

	s := NewSafeService()

	// let's spin up some workers
	var wg sync.WaitGroup

	for j := 0; j < jobs; j++ {
		j := j
		wg.Add(1)

		go func() {
			defer wg.Done()

			ctx := context.Background() // passed to commit function
			ctx = context.WithValue(ctx, contextKeySafe{}, j)

			res, err := s.DoWork(ctx, 1)
			_, _ = res, err
		}()
	}

	wg.Wait()
}

func By added in v0.5.0

func By[Res any](c *Coordinator[Res]) Batch[Res]

func (*Batch[Res]) Cancel added in v0.5.0

func (b *Batch[Res]) Cancel(ctx context.Context, err error) (Res, error)

func (*Batch[Res]) Commit

func (b *Batch[Res]) Commit(ctx context.Context) (Res, error)

func (*Batch[Res]) Enter added in v0.3.1

func (b *Batch[Res]) Enter(blocking bool) int

func (*Batch[Res]) Exit added in v0.3.0

func (b *Batch[Res]) Exit() int

func (*Batch[Res]) QueueIn added in v0.5.0

func (b *Batch[Res]) QueueIn() int

func (*Batch[Res]) Trigger added in v0.5.0

func (b *Batch[Res]) Trigger()

type Coordinator added in v0.5.0

type Coordinator[Res any] struct {
	// CommitFunc is called to commit shared shate.
	//
	// It's already called owning critical section. Enter-Exit cycle must not be called from it.
	//
	// Required.
	CommitFunc func(ctx context.Context) (Res, error)
	// contains filtered or unexported fields
}

Coordinator coordinates workers to update shared state, commit it, and deliver result to all participated workers.

Example
package main

import (
	"context"
	"errors"
	"log"
	"sync"

	"nikand.dev/go/batch"
)

type (
	Service struct {
		sum int // state we collect to commit together

		bc *batch.Coordinator[int] // [int] is the result value type, set to struct{} if don't need it
	}

	contextKey struct{}
)

func NewService() *Service {
	s := &Service{}

	s.bc = batch.New(s.commit)

	return s
}

func (s *Service) commit(ctx context.Context) (int, error) {
	// suppose some heavy operation here
	// update a file or write to db

	log.Printf("* * *  commit %2d  * * *", s.sum)

	return s.sum, nil
}

func (s *Service) DoWork(ctx context.Context, data int) (int, error) {
	s.bc.Queue().In() // let others know we are going to join

	_ = data // prepare data

	idx := s.bc.Enter(true) // true for blocking, false if we want to leave instead of waiting
	if idx < 0 {            // we haven't entered the batch in non blocking mode
		return 0, errors.New("not this time") // we have to leave in that case
	}

	defer s.bc.Exit() // it's like Mutex.Unlock. It's a pair to successful Enter.
	_ = 0             // Must be called with defer to outlive panics

	if idx == 0 { // we are first in the batch, reset the state
		s.sum = 0
		log.Printf("* * * reset batch * * *")
	}

	log.Printf("worker %2d got in with index %2d", ctx.Value(contextKey{}), idx)

	s.sum += data // add our work to the batch

	// only one of return/Cancel/Commit must be called and only once
	res, err := s.bc.Commit(ctx)
	if err != nil { // batch failed, each worker in it will get the same error
		return 0, err
	}

	log.Printf("worker %2d got result %v %v", ctx.Value(contextKey{}), res, err)

	// if we are here, all of the workers have their work committed

	return res, nil
}

func main() {
	const jobs = 5

	s := NewService()

	// let's spin up some workers
	var wg sync.WaitGroup

	for j := 0; j < jobs; j++ {
		j := j
		wg.Add(1)

		go func() {
			defer wg.Done()

			ctx := context.Background() // passed to commit function
			ctx = context.WithValue(ctx, contextKey{}, j)

			res, err := s.DoWork(ctx, 1)
			_, _ = res, err
		}()
	}

	wg.Wait()
}

func New

func New[Res any](f func(ctx context.Context) (Res, error)) *Coordinator[Res]

New creates new Coordinator.

func (*Coordinator[Res]) Cancel added in v0.5.0

func (c *Coordinator[Res]) Cancel(ctx context.Context, err error) (Res, error)

Cancel aborts current batch and returns the same error to all workers already added their data to the batch. Coordinator.Commit is not called. Waiting workers but not Entered the critical section are not affected.

func (*Coordinator[Res]) Commit added in v0.5.0

func (c *Coordinator[Res]) Commit(ctx context.Context) (Res, error)

Commit waits for the waiting workes to add their data to the batch, calls Coordinator.Commit only once for the batch, and returns the same shared result to all workers.

func (*Coordinator[Res]) Enter added in v0.5.0

func (c *Coordinator[Res]) Enter(blocking bool) int

Enter enters the batch. When the call returns we are in the critical section. Shared resources can be used safely. It's similar to Mutex.Lock. Pair method Exit must be called if Enter was successful (returned value >= 0). It returns index of entered worker. 0 means we are the first in the batch and we should reset shared state. If blocking == false and batch is not available negative value returned. Enter also removes the worker from the queue.

func (*Coordinator[Res]) Exit added in v0.5.0

func (c *Coordinator[Res]) Exit() int

Exit exits the critical section. It should be called with defer just after we successfuly Entered the batch. It's similar to Mutex.Unlock. Returns number of workers have not Exited yet. 0 means we are the last exiting the batch, state can be reset here. But remember the case when worker have panicked.

func (*Coordinator[Res]) Init added in v0.5.0

func (c *Coordinator[Res]) Init(f func(ctx context.Context) (Res, error))

Init initiates zero Coordinator. It can also be used as Reset but not in parallel with its usage.

func (*Coordinator[Res]) Notify added in v0.5.0

func (c *Coordinator[Res]) Notify()

Notify wakes up waiting workers.

Must be called if the worker left the Queue before Enter.

func (*Coordinator[Res]) Queue added in v0.5.0

func (c *Coordinator[Res]) Queue() *Queue

Gets the queue of waitng workers.

Worker can leave the Queue before Enter, but we must call Notify to wake up waiting workers.

func (*Coordinator[Res]) Trigger added in v0.5.0

func (c *Coordinator[Res]) Trigger()

Trigger batch to Commit. We can call both Commit or Exit after that. If we added our data to the batch or if we didn't respectively. So we will be part of the batch or not.

type Multi added in v0.4.0

type Multi[Res any] struct {
	// CommitFunc is called to commit coach `coach`.
	//
	// It's already called owning critical section. Enter-Exit cycle must not be called from it.
	CommitFunc func(ctx context.Context, coach int) (Res, error)

	// Balancer chooses replica among available or it can choose to wait for more.
	// bitset is a set of available coaches.
	// Coach n is available <=> bitset & (1<<n) != 0.
	// If returned value >= 0 and that coach is available it proceeds with it.
	// If returned value < 0 or that coach is not available
	// worker acts as there were no available coaches.
	Balancer func(bitset uint64) int

	// BigBalancer is if 64 coaches is not enough.
	// Coach n is available <=> bitset[n/64] & (1<<(n%64)) != 0.
	BigBalancer func(bitset []uint64) int
	// contains filtered or unexported fields
}

Multi is a coordinator for multiple parallel batches. Multi can't be created as a literal, it must be initialized either by NewMulti or Init.

Example
package main

import (
	"context"
	"errors"
	"log"
	"sync"
	"time"

	"nikand.dev/go/batch"
)

type (
	ServiceMulti struct {
		sum []int // state we collect to commit together

		bc *batch.Multi[int] // [int] is the result value type, set to struct{} if don't need it
	}
)

func NewServiceMulti(coaches int) *ServiceMulti {
	s := &ServiceMulti{
		sum: make([]int, coaches),
	}

	s.bc = batch.NewMulti(coaches, s.commit)

	return s
}

func (s *ServiceMulti) commit(ctx context.Context, coach int) (int, error) {
	// suppose some heavy operation here
	// update a file or write to db

	log.Printf("* * *  coach: %2d, commit %2d  * * *", coach, s.sum[coach])

	time.Sleep(time.Millisecond) // let other workers to use another coach

	return s.sum[coach], nil
}

func (s *ServiceMulti) DoWork(ctx context.Context, data int) (int, error) {
	s.bc.Queue().In() // let others know we are going to join

	_ = data // prepare data

	coach, idx := s.bc.Enter(true) // true for blocking, false if we want to leave instead of waiting
	if idx < 0 {                   // we haven't entered the batch in non blocking mode
		return 0, errors.New("not this time") // we have to leave in that case
	}

	defer s.bc.Exit(coach) // it's like Mutex.Unlock. It's a pair to successful Enter.
	_ = 0                  // Must be called with defer to outlive panics

	if idx == 0 { // we are first in the batch, reset the state
		s.sum[coach] = 0
		log.Printf("* * * coach %2d, reset batch * * *", coach)
	}

	log.Printf("worker %2d got into coach %2d with index %2d", ctx.Value(contextKey{}), coach, idx)

	if data == 0 { // if we didn't spoil the batch state
		return 0, nil // we can leave freely
	}

	s.sum[coach] += data // add our work to the batch

	if s.sum[coach] >= 3 { // if batch is already big
		s.bc.Trigger(coach) // trigger it
	}

	// only one of return/Cancel/Commit must be called and only once
	res, err := s.bc.Commit(ctx, coach)
	if err != nil { // batch failed, each worker in it will get the same error
		return 0, err
	}

	log.Printf("worker %2d got result for coach %2d: %v %v", ctx.Value(contextKey{}), coach, res, err)

	// if we are here, all of the workers have their work committed

	return res, nil
}

func main() {
	const jobs = 5

	s := NewServiceMulti(2)

	// let's spin up some workers
	var wg sync.WaitGroup

	for j := 0; j < jobs; j++ {
		j := j
		wg.Add(1)

		go func() {
			defer wg.Done()

			ctx := context.Background() // passed to commit function
			ctx = context.WithValue(ctx, contextKey{}, j)

			res, err := s.DoWork(ctx, 1)
			_, _ = res, err
		}()
	}

	wg.Wait()
}

func NewMulti added in v0.4.0

func NewMulti[Res any](n int, f func(ctx context.Context, coach int) (Res, error)) *Multi[Res]

NewMulti create new Multi coordinator with n parallel batches.

func (*Multi[Res]) Cancel added in v0.5.0

func (c *Multi[Res]) Cancel(ctx context.Context, coach int, err error) (Res, error)

Cancel indicates the current worker is done with shared data but it mustn't be committed. All the workers from the same batch receive zero Res and the same error.

It can be used if batch shared state have been spoilt as a result of error or something.

func (*Multi[Res]) Commit added in v0.5.0

func (c *Multi[Res]) Commit(ctx context.Context, coach int) (Res, error)

Commit indicates the current worker is done with the shared state and ready to Commit it. Commit blocks until batch is committed. The same Res and error is returned to all the workers in the batch. (Res, error) is what the Multi.Commit returns.

func (*Multi[Res]) Enter added in v0.4.0

func (c *Multi[Res]) Enter(blocking bool) (coach, idx int)

Enter available batch. It will return -1, -1 if blocking == false and no batches available. Enter also removes worker from the queue.

See also documentation for Coordinator.Enter.

coach choice can be configured by setting custom Multi.Balancer.

func (*Multi[Res]) Exit added in v0.5.0

func (c *Multi[Res]) Exit(coach int) int

Exit the batch. Should be called with defer. It works similar to Mutex.Unlock in the sense it unlocks shared resources.

func (*Multi[Res]) Init added in v0.5.0

func (c *Multi[Res]) Init(n int, f func(ctx context.Context, coach int) (Res, error))

Init initiates zero Multi. It can also be used as Reset but not in parallel with its usage.

func (*Multi[Res]) Notify added in v0.5.0

func (c *Multi[Res]) Notify()

Notify wakes up waiting workers.

Must be called if the worker left the Queue before Enter.

func (*Multi[Res]) Queue added in v0.5.0

func (c *Multi[Res]) Queue() *Queue

Queue returns common queue for all coaches. Getting into it means already started batches will wait for it not committing yet.

Worker can leave the Queue before Enter, but we must call Notify to wake up waiting workers.

func (*Multi[Res]) Trigger added in v0.5.0

func (c *Multi[Res]) Trigger(coach int)

Trigger the batch to commit. Must be called inside Enter-Exit section.

Can be used to flush current batch. With our data or without and then we can retry. Commit happens when current worker Exits from critical section. So you need to Exit and then get into the Queue and Enter again to retry.

type MultiBatch added in v0.5.0

type MultiBatch[Res any] struct {
	// contains filtered or unexported fields
}
Example
package main

import (
	"context"
	"errors"
	"log"
	"sync"
	"time"

	"nikand.dev/go/batch"
)

type (
	SafeServiceMulti struct {
		sum []int // state we collect to commit together

		bc *batch.Multi[int] // [int] is the result value type, set to struct{} if don't need it
	}
)

func NewSafeServiceMulti(coaches int) *SafeServiceMulti {
	s := &SafeServiceMulti{
		sum: make([]int, coaches),
	}

	s.bc = batch.NewMulti(coaches, s.commit)

	return s
}

func (s *SafeServiceMulti) commit(ctx context.Context, coach int) (int, error) {
	// suppose some heavy operation here
	// update a file or write to db

	log.Printf("* * *  coach %2d, commit %2d  * * *", coach, s.sum[coach])

	time.Sleep(time.Millisecond) // let other workers to use another coach

	return s.sum[coach], nil
}

func (s *SafeServiceMulti) DoWork(ctx context.Context, data int) (int, error) {
	b := batch.ByMulti(s.bc)
	defer b.Exit() // it's like Mutex.Unlock, but safely works even if we didn't enter
	_ = 0          // Must be called with defer to outlive panics

	b.QueueIn() // let others know we are going to join

	_ = data // prepare data

	coach, idx := b.Enter(true) // true for blocking, false if we want to leave instead of waiting
	if idx < 0 {                // we haven't entered the batch in non blocking mode
		return 0, errors.New("not this time") // we have to leave in that case
	}

	if idx == 0 { // we are first in the batch, reset the state
		s.sum[coach] = 0
		log.Printf("* * * coach %2d, reset batch * * *", coach)
	}

	log.Printf("worker %2d got into coach %2d with index %2d", ctx.Value(contextKeySafe{}), coach, idx)

	if data == 0 { // if we didn't spoil the batch state
		return 0, nil // we can leave freely
	}

	s.sum[coach] += data // add our work to the batch

	if s.sum[coach] >= 3 { // if batch is already big
		b.Trigger() // trigger it
	}

	// only one of return/Cancel/Commit must be called and only once
	res, err := b.Commit(ctx)
	if err != nil { // batch failed, each worker in it will get the same error
		return 0, err
	}

	log.Printf("worker %2d got result for coach %2d: %v %v", ctx.Value(contextKeySafe{}), coach, res, err)

	// if we are here, all of the workers have their work committed

	return res, nil
}

func main() {
	const jobs = 5

	s := NewSafeServiceMulti(2)

	// let's spin up some workers
	var wg sync.WaitGroup

	for j := 0; j < jobs; j++ {
		j := j
		wg.Add(1)

		go func() {
			defer wg.Done()

			ctx := context.Background() // passed to commit function
			ctx = context.WithValue(ctx, contextKeySafe{}, j)

			res, err := s.DoWork(ctx, 1)
			_, _ = res, err
		}()
	}

	wg.Wait()
}

func ByMulti added in v0.5.0

func ByMulti[Res any](c *Multi[Res]) MultiBatch[Res]

func (*MultiBatch[Res]) Cancel added in v0.5.0

func (b *MultiBatch[Res]) Cancel(ctx context.Context, err error) (Res, error)

func (*MultiBatch[Res]) Commit added in v0.5.0

func (b *MultiBatch[Res]) Commit(ctx context.Context) (Res, error)

func (*MultiBatch[Res]) Enter added in v0.5.0

func (b *MultiBatch[Res]) Enter(blocking bool) (coach, idx int)

func (*MultiBatch[Res]) Exit added in v0.5.0

func (b *MultiBatch[Res]) Exit() int

func (*MultiBatch[Res]) QueueIn added in v0.5.0

func (b *MultiBatch[Res]) QueueIn() int

func (*MultiBatch[Res]) Trigger added in v0.5.0

func (b *MultiBatch[Res]) Trigger()

type PanicError

type PanicError struct {
	Panic interface{}
}

PanicError is returned to all the workers in the batch if one panicked. The panicked worker gets panic, not an error.

func AsPanicError added in v0.5.0

func AsPanicError(err error) (PanicError, bool)

AsPanicError unwrapes PanicError.

func (PanicError) Error

func (e PanicError) Error() string

type Queue added in v0.5.0

type Queue int32

Queue of workers waiting to Enter the batch.

func (*Queue) In added in v0.5.0

func (q *Queue) In() int

In gets into the queue.

func (*Queue) Len added in v0.5.0

func (q *Queue) Len() int

Len is the number of workers in the queue.

func (*Queue) Out added in v0.5.0

func (q *Queue) Out() int

Out gets out of the queue.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL