Documentation ¶
Overview ¶
Package microbatch simplifies asynchronous microbatching.
Example ¶
// Initialize processor := &RemoteProcessor{} batcher := microbatch.NewBatcher( processor, func(j *Job) JobID { return j.ID }, func(r *JobResult) JobID { return r.ID }, 3, 10*time.Millisecond, ) ctx := context.Background() const iterations = 5 var wg sync.WaitGroup // Submit jobs for i := 1; i <= iterations; i++ { wg.Add(1) go func(i int) { result, _ := batcher.ExecuteJob(ctx, &Job{ID: JobID(i)}) fmt.Println(result.Body) wg.Done() }(i) // https://go.dev/doc/faq#closures_and_goroutines } // Shut down wg.Wait() batcher.Shutdown()
Output: Processed job 1 Processed job 2 Processed job 3 Processed job 4 Processed job 5
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ( // ErrBatcherTerminated is returned when the batcher is terminated. ErrBatcherTerminated = errors.New("batcher terminated") // ErrNoResult is returned when the response from [BatchProcessor] is missing a // matching correlation ID. ErrNoResult = errors.New("no result") )
Errors returned from Batcher.ExecuteJob.
Functions ¶
This section is empty.
Types ¶
type BatchProcessor ¶
BatchProcessor is the interface your batch processor needs to implement.
Example ¶
package main import ( "fmt" ) type ( JobID int Job struct { ID JobID } JobResult struct { ID JobID Body string } Jobs []*Job JobResults []*JobResult ) type RemoteProcessor struct{} func (p *RemoteProcessor) ProcessJobs(jobs Jobs) (JobResults, error) { results := make(JobResults, 0, len(jobs)) for _, job := range jobs { result := &JobResult{ ID: job.ID, Body: fmt.Sprintf("Processed job %d", job.ID), } results = append(results, result) } return results, nil } func main() { processor := &RemoteProcessor{} results, _ := processor.ProcessJobs(Jobs{&Job{ID: 1}, &Job{ID: 2}}) for _, result := range results { fmt.Println(result.Body) } }
Output: Processed job 1 Processed job 2
type Batcher ¶
type Batcher[Q, S any] struct { // contains filtered or unexported fields }
Batcher is used to submit requests.
func NewBatcher ¶
func NewBatcher[Q, S any, K comparable, QQ ~[]Q, SS ~[]S]( batchProcessor BatchProcessor[QQ, SS], correlateRequest func(Q) K, correlateResult func(S) K, size int, duration time.Duration, ) *Batcher[Q, S]
NewBatcher creates a new Batcher.
func (*Batcher[Q, S]) ExecuteJob ¶
ExecuteJob submits a job and waits for the result.
func (*Batcher[Q, S]) Shutdown ¶
func (b *Batcher[Q, S]) Shutdown()
Shutdown needs to be called to send the last batch and terminate the goroutine. No calls to Batcher.ExecuteJob after this will be accepted.
Click to show internal directories.
Click to hide internal directories.