Documentation
¶
Overview ¶
Package microbatch groups tasks into small batches, e.g. to reduce the number of round trips.
See also github.com/joeycumines/go-longpoll, for a similar, lower-level implementation, e.g. if you require more control over the batching or concurrency behavior.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BatchProcessor ¶
BatchProcessor runs jobs, using arbitrary behavior. Individual job results (etc) should be assigned to the jobs themselves. Any returned error will be propagated via JobResult.Wait.
type Batcher ¶
type Batcher[Job any] struct { // contains filtered or unexported fields }
Batcher accepts jobs, batching them into small groups. Instances must be initialized using the NewBatcher factory.
Example (BulkInsert) ¶
Demonstrates the basic pattern, in a scenario where individual job results are unnecessary.
const batchSize = 3 batcher := microbatch.NewBatcher(µbatch.BatcherConfig{ MaxSize: batchSize, // small batch size, for demonstrative purposes FlushInterval: -1, // (DON'T DO THIS FOR ACTUAL USE) disable flush interval, to make the output stable }, func(ctx context.Context, rows [][]any) error { if ctx.Err() != nil { panic(`expected ctx not to be canceled`) } // in practice, this would be interacting with your chosen database fmt.Printf("Inserted %d rows:\n", len(rows)) for _, row := range rows { b, _ := json.Marshal(row) fmt.Printf("%s\n", b) // you wouldn't normally do this - this is for the benefit of the test suite being useful for race detection row[0] = nil } return nil }) defer batcher.Close() // the following is very contrived - in practice you would simply call // batcher.Submit to insert the row, then wait for the result rows := make([][]any, batchSize*5+2) var wg sync.WaitGroup wg.Add(len(rows)) for i := 0; i < len(rows); i += batchSize { for j := 0; j < batchSize && i+j < len(rows); j++ { row := [1]any{fmt.Sprintf("row %d", i+j)} rows[i+j] = row[:] result, err := batcher.Submit(context.Background(), rows[i+j]) if err != nil { panic(err) } if (*[1]any)(result.Job) != &row { panic(result.Job) } go func() { if err := result.Wait(context.Background()); err != nil { panic(err) } if row[0] != nil { panic(`expected the value to be cleared`) } wg.Done() }() } } // will send the last, partial batch if err := batcher.Shutdown(context.Background()); err != nil { panic(err) } wg.Wait()
Output: Inserted 3 rows: ["row 0"] ["row 1"] ["row 2"] Inserted 3 rows: ["row 3"] ["row 4"] ["row 5"] Inserted 3 rows: ["row 6"] ["row 7"] ["row 8"] Inserted 3 rows: ["row 9"] ["row 10"] ["row 11"] Inserted 3 rows: ["row 12"] ["row 13"] ["row 14"] Inserted 2 rows: ["row 15"] ["row 16"]
Example (IndependentOperations) ¶
Demonstrates how Batcher can be used to model logically-independent operations, that are batched together for efficiency.
// this can be any type you wish, though references are necessary to pass on results type Job struct { Callback func(ctx context.Context) (int, error) Result int Err error } // note: see the example output for the final value (implementation irrelevant to the example) mu, maxRunningBatchProcessorCount, incDecBatchProcessorCount := newMaxRunningTracker() // the default batching behavior has "sensible" defaults, but may not be suitable for all use cases // (e.g. you may want to allow more than one concurrent batch, as in this example, which allows 10) batcher := microbatch.NewBatcher(µbatch.BatcherConfig{MaxConcurrency: 10}, func(ctx context.Context, jobs []*Job) error { defer incDecBatchProcessorCount()() // not relevant - you can ignore this // this guard is optional, but a good idea to ensure Batcher.Close exits promptly if err := ctx.Err(); err != nil { return err } // in practice, this might involve calling out to a remote service for _, job := range jobs { // any potentially long-running operation should accept and respect the context job.Result, job.Err = job.Callback(ctx) } time.Sleep(time.Millisecond * 20) // simulating load return nil }) defer batcher.Close() // always remember to close the Batcher // for the sake of this example, we just run a bunch of operations, concurrently const ( numOpsPerWorker = 1000 numWorkers = 5 numOps = numOpsPerWorker * numWorkers ) var submitWg sync.WaitGroup submitWg.Add(numOps) var resultWg sync.WaitGroup resultWg.Add(numOps) var callbackCount int64 submit := func() { defer submitWg.Done() // note: (potentially) prior to the result being available succeed := rand.Intn(2) == 0 expectedResult := rand.Int() var expectedErr error if !succeed { expectedErr = errors.New("expected error") } result, err := batcher.Submit(context.Background(), &Job{ Callback: func(ctx context.Context) (int, error) { atomic.AddInt64(&callbackCount, 1) return expectedResult, expectedErr }, }) if err != nil { panic(err) } go func() { defer resultWg.Done() if err := result.Wait(context.Background()); err != nil { panic(err) } if result.Job.Result != expectedResult { panic(fmt.Sprintf("expected %d, got %d", expectedResult, result.Job.Result)) } if result.Job.Err != expectedErr { panic(fmt.Sprintf("expected %v, got %v", expectedErr, result.Job.Err)) } }() } for i := 0; i < numWorkers; i++ { go func() { for j := 0; j < numOpsPerWorker; j++ { submit() } }() } submitWg.Wait() // waits for all jobs to be submitted // note shutdown guarantees close (by the time it exits) but doesn't cancel // the batcher's context until/unless its context is canceled if err := batcher.Shutdown(context.Background()); err != nil { panic(err) } resultWg.Wait() // wait for all results to be available (and checked) mu.Lock() defer mu.Unlock() fmt.Println(`total number of callback calls:`, atomic.LoadInt64(&callbackCount)) fmt.Println(`max number of concurrent batch processors:`, *maxRunningBatchProcessorCount)
Output: total number of callback calls: 5000 max number of concurrent batch processors: 10
func NewBatcher ¶
func NewBatcher[Job any](config *BatcherConfig, processor BatchProcessor[Job]) *Batcher[Job]
NewBatcher initializes a new Batcher, using the provided BatcherConfig and BatchProcessor. The provided config may be nil. A panic will occur if processor is nil, or invalid config is provided.
The Batcher.Close method and/or Batcher.Shutdown method should be called when the Batcher is no longer needed.
func (*Batcher[Job]) Close ¶
func (x *Batcher[Job]) Close() error
Close immediately cancels all jobs, and prevents further jobs via Submit, blocking until the Batcher has finished closing.
This method is unsafe to call from within a job or BatchProcessor.
func (*Batcher[Job]) Shutdown ¶
Shutdown will immediately prevent further jobs via Submit, then wait for all already running or scheduled jobs to complete. An error will be returned if ctx is canceled prior to this, causing a forced Close.
This method is unsafe to call from within a job or BatchProcessor.
func (*Batcher[Job]) Submit ¶
Submit schedules a job for processing, returning an error if ctx is canceled, or the Batcher is stopped.
The JobResult.Wait method should be used to wait for the job's completion, after which any individual job result(s) may be accessed, on the job itself. The job is available via JobResult.Job, for convenience.
type BatcherConfig ¶
type BatcherConfig struct { // MaxSize restricts the maximum number of jobs per batch, if positive. // **Defaults to 16, if 0, or BatcherConfig is nil.** // // WARNING: NewBatcher will panic if both MaxSize and FlushInterval are // disabled. MaxSize int // FlushInterval specifies the maximum duration before an "incomplete" // batch is passed to the BatchProcessor, if positive. // **Defaults to 50ms, if 0, or BatcherConfig is nil.** // If MaxSize is specified, time-based flushing can be disabled, by // setting this <= 0. // // WARNING: NewBatcher will panic if both MaxSize and FlushInterval are // disabled. FlushInterval time.Duration // MaxConcurrency specifies the maximum number of concurrent // BatchProcessor calls, able to be made by the Batcher, if positive. // **Defaults to 1, if 0, or BatcherConfig is nil.** MaxConcurrency int }
BatcherConfig models optional configuration, for NewBatcher.
type JobResult ¶
type JobResult[Job any] struct { // Job is the pending job. // // WARNING: Consider that it may be accessed by the batch processor - // consider the implications, e.g. race conditions, if interacting with // internal state. Job Job // contains filtered or unexported fields }
JobResult models a scheduled job, providing a Wait method that should be called prior to accessing any output/result, which the BatchProcessor may set on the Job.
WARNING: The actual value of the Job field will not be modified, meaning any return values from BatchProcessor must be by references available via the Job value.