microbatch

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2024 License: MIT Imports: 4 Imported by: 0

README

go-microbatch

Package microbatch groups tasks into small batches, e.g. to reduce the number of round trips.

See the docs for example usage.

See also github.com/joeycumines/go-longpoll, for a similar, lower-level implementation, e.g. if you require more control over the batching or concurrency behavior.

Documentation

Overview

Package microbatch groups tasks into small batches, e.g. to reduce the number of round trips.

See also github.com/joeycumines/go-longpoll, for a similar, lower-level implementation, e.g. if you require more control over the batching or concurrency behavior.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchProcessor

type BatchProcessor[Job any] func(ctx context.Context, jobs []Job) error

BatchProcessor runs jobs, using arbitrary behavior. Individual job results (etc) should be assigned to the jobs themselves. Any returned error will be propagated via JobResult.Wait.

type Batcher

type Batcher[Job any] struct {
	// contains filtered or unexported fields
}

Batcher accepts jobs, batching them into small groups. Instances must be initialized using the NewBatcher factory.

Example (BulkInsert)

Demonstrates the basic pattern, in a scenario where individual job results are unnecessary.

const batchSize = 3
batcher := microbatch.NewBatcher(&microbatch.BatcherConfig{
	MaxSize:       batchSize, // small batch size, for demonstrative purposes
	FlushInterval: -1,        // (DON'T DO THIS FOR ACTUAL USE) disable flush interval, to make the output stable
}, func(ctx context.Context, rows [][]any) error {
	if ctx.Err() != nil {
		panic(`expected ctx not to be canceled`)
	}
	// in practice, this would be interacting with your chosen database
	fmt.Printf("Inserted %d rows:\n", len(rows))
	for _, row := range rows {
		b, _ := json.Marshal(row)
		fmt.Printf("%s\n", b)
		// you wouldn't normally do this - this is for the benefit of the test suite being useful for race detection
		row[0] = nil
	}
	return nil
})
defer batcher.Close()

// the following is very contrived - in practice you would simply call
// batcher.Submit to insert the row, then wait for the result

rows := make([][]any, batchSize*5+2)

var wg sync.WaitGroup
wg.Add(len(rows))

for i := 0; i < len(rows); i += batchSize {
	for j := 0; j < batchSize && i+j < len(rows); j++ {
		row := [1]any{fmt.Sprintf("row %d", i+j)}
		rows[i+j] = row[:]

		result, err := batcher.Submit(context.Background(), rows[i+j])
		if err != nil {
			panic(err)
		}
		if (*[1]any)(result.Job) != &row {
			panic(result.Job)
		}

		go func() {
			if err := result.Wait(context.Background()); err != nil {
				panic(err)
			}
			if row[0] != nil {
				panic(`expected the value to be cleared`)
			}
			wg.Done()
		}()
	}
}

// will send the last, partial batch
if err := batcher.Shutdown(context.Background()); err != nil {
	panic(err)
}

wg.Wait()
Output:

Inserted 3 rows:
["row 0"]
["row 1"]
["row 2"]
Inserted 3 rows:
["row 3"]
["row 4"]
["row 5"]
Inserted 3 rows:
["row 6"]
["row 7"]
["row 8"]
Inserted 3 rows:
["row 9"]
["row 10"]
["row 11"]
Inserted 3 rows:
["row 12"]
["row 13"]
["row 14"]
Inserted 2 rows:
["row 15"]
["row 16"]
Example (IndependentOperations)

Demonstrates how Batcher can be used to model logically-independent operations, that are batched together for efficiency.

// this can be any type you wish, though references are necessary to pass on results
type Job struct {
	Callback func(ctx context.Context) (int, error)
	Result   int
	Err      error
}

// note: see the example output for the final value (implementation irrelevant to the example)
mu, maxRunningBatchProcessorCount, incDecBatchProcessorCount := newMaxRunningTracker()

// the default batching behavior has "sensible" defaults, but may not be suitable for all use cases
// (e.g. you may want to allow more than one concurrent batch, as in this example, which allows 10)
batcher := microbatch.NewBatcher(&microbatch.BatcherConfig{MaxConcurrency: 10}, func(ctx context.Context, jobs []*Job) error {
	defer incDecBatchProcessorCount()() // not relevant - you can ignore this

	// this guard is optional, but a good idea to ensure Batcher.Close exits promptly
	if err := ctx.Err(); err != nil {
		return err
	}

	// in practice, this might involve calling out to a remote service
	for _, job := range jobs {
		// any potentially long-running operation should accept and respect the context
		job.Result, job.Err = job.Callback(ctx)
	}

	time.Sleep(time.Millisecond * 20) // simulating load

	return nil
})
defer batcher.Close() // always remember to close the Batcher

// for the sake of this example, we just run a bunch of operations, concurrently

const (
	numOpsPerWorker = 1000
	numWorkers      = 5
	numOps          = numOpsPerWorker * numWorkers
)

var submitWg sync.WaitGroup
submitWg.Add(numOps)
var resultWg sync.WaitGroup
resultWg.Add(numOps)

var callbackCount int64
submit := func() {
	defer submitWg.Done() // note: (potentially) prior to the result being available
	succeed := rand.Intn(2) == 0
	expectedResult := rand.Int()
	var expectedErr error
	if !succeed {
		expectedErr = errors.New("expected error")
	}
	result, err := batcher.Submit(context.Background(), &Job{
		Callback: func(ctx context.Context) (int, error) {
			atomic.AddInt64(&callbackCount, 1)
			return expectedResult, expectedErr
		},
	})
	if err != nil {
		panic(err)
	}
	go func() {
		defer resultWg.Done()
		if err := result.Wait(context.Background()); err != nil {
			panic(err)
		}
		if result.Job.Result != expectedResult {
			panic(fmt.Sprintf("expected %d, got %d", expectedResult, result.Job.Result))
		}
		if result.Job.Err != expectedErr {
			panic(fmt.Sprintf("expected %v, got %v", expectedErr, result.Job.Err))
		}
	}()
}

for i := 0; i < numWorkers; i++ {
	go func() {
		for j := 0; j < numOpsPerWorker; j++ {
			submit()
		}
	}()
}

submitWg.Wait() // waits for all jobs to be submitted

// note shutdown guarantees close (by the time it exits) but doesn't cancel
// the batcher's context until/unless its context is canceled
if err := batcher.Shutdown(context.Background()); err != nil {
	panic(err)
}

resultWg.Wait() // wait for all results to be available (and checked)

mu.Lock()
defer mu.Unlock()

fmt.Println(`total number of callback calls:`, atomic.LoadInt64(&callbackCount))
fmt.Println(`max number of concurrent batch processors:`, *maxRunningBatchProcessorCount)
Output:

total number of callback calls: 5000
max number of concurrent batch processors: 10

func NewBatcher

func NewBatcher[Job any](config *BatcherConfig, processor BatchProcessor[Job]) *Batcher[Job]

NewBatcher initializes a new Batcher, using the provided BatcherConfig and BatchProcessor. The provided config may be nil. A panic will occur if processor is nil, or invalid config is provided.

The Batcher.Close method and/or Batcher.Shutdown method should be called when the Batcher is no longer needed.

func (*Batcher[Job]) Close

func (x *Batcher[Job]) Close() error

Close immediately cancels all jobs, and prevents further jobs via Submit, blocking until the Batcher has finished closing.

This method is unsafe to call from within a job or BatchProcessor.

func (*Batcher[Job]) Shutdown

func (x *Batcher[Job]) Shutdown(ctx context.Context) (err error)

Shutdown will immediately prevent further jobs via Submit, then wait for all already running or scheduled jobs to complete. An error will be returned if ctx is canceled prior to this, causing a forced Close.

This method is unsafe to call from within a job or BatchProcessor.

func (*Batcher[Job]) Submit

func (x *Batcher[Job]) Submit(ctx context.Context, job Job) (*JobResult[Job], error)

Submit schedules a job for processing, returning an error if ctx is canceled, or the Batcher is stopped.

The JobResult.Wait method should be used to wait for the job's completion, after which any individual job result(s) may be accessed, on the job itself. The job is available via JobResult.Job, for convenience.

type BatcherConfig

type BatcherConfig struct {
	// MaxSize restricts the maximum number of jobs per batch, if positive.
	// **Defaults to 16, if 0, or BatcherConfig is nil.**
	//
	// WARNING: NewBatcher will panic if both MaxSize and FlushInterval are
	// disabled.
	MaxSize int

	// FlushInterval specifies the maximum duration before an "incomplete"
	// batch is passed to the BatchProcessor, if positive.
	// **Defaults to 50ms, if 0, or BatcherConfig is nil.**
	// If MaxSize is specified, time-based flushing can be disabled, by
	// setting this <= 0.
	//
	// WARNING: NewBatcher will panic if both MaxSize and FlushInterval are
	// disabled.
	FlushInterval time.Duration

	// MaxConcurrency specifies the maximum number of concurrent
	// BatchProcessor calls, able to be made by the Batcher, if positive.
	// **Defaults to 1, if 0, or BatcherConfig is nil.**
	MaxConcurrency int
}

BatcherConfig models optional configuration, for NewBatcher.

type JobResult

type JobResult[Job any] struct {
	// Job is the pending job.
	//
	// WARNING: Consider that it may be accessed by the batch processor -
	// consider the implications, e.g. race conditions, if interacting with
	// internal state.
	Job Job
	// contains filtered or unexported fields
}

JobResult models a scheduled job, providing a Wait method that should be called prior to accessing any output/result, which the BatchProcessor may set on the Job.

WARNING: The actual value of the Job field will not be modified, meaning any return values from BatchProcessor must be by references available via the Job value.

func (*JobResult[Job]) Wait

func (x *JobResult[Job]) Wait(ctx context.Context) error

Wait for the Job to be processed. If the BatchProcessor failed with an error, that error will be returned. Handling of any implementation-specific behavior is via the JobResult.Job field.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL