Documentation
¶
Overview ¶
Package microbatch simplifies asynchronous microbatching.
Example (Asynchronous) ¶
Example (Asynchronous) demonstrates how to use [Batcher.SubmitJob] with a timeout. Note that you can shut down the batcher without waiting for the jobs to finish.
package main
import (
"context"
"fmt"
"sync"
"fillmore-labs.com/microbatch"
)
type (
JobID int
Job struct {
ID JobID
}
JobResult struct {
ID JobID
Body string
}
Jobs []*Job
JobResults []*JobResult
)
func (j *Job) JobID() JobID { return j.ID }
func (j *JobResult) JobID() JobID { return j.ID }
// unwrap unwraps a JobResult to payload and error.
func unwrap(r *JobResult, err error) (string, error) {
if err != nil {
return "", err
}
return r.Body, nil
}
type RemoteProcessor struct{}
func (p *RemoteProcessor) ProcessJobs(jobs Jobs) (JobResults, error) {
results := make(JobResults, 0, len(jobs))
for _, job := range jobs {
result := &JobResult{
ID: job.ID,
Body: fmt.Sprintf("Processed job %d", job.ID),
}
results = append(results, result)
}
return results, nil
}
func main() {
// Initialize
processor := &RemoteProcessor{}
batcher := microbatch.NewBatcher(
processor.ProcessJobs,
(*Job).JobID,
(*JobResult).JobID,
microbatch.WithSize(3),
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
const iterations = 5
var wg sync.WaitGroup
for i := 1; i <= iterations; i++ {
future := batcher.Submit(&Job{ID: JobID(i)})
wg.Add(1)
go func(i int) {
defer wg.Done()
result, err := unwrap(future.Await(ctx))
if err == nil {
fmt.Println(result)
} else {
fmt.Printf("Error executing job %d: %v\n", i, err)
}
}(i)
}
// Shut down
batcher.Send()
wg.Wait()
}
Output: Processed job 1 Processed job 2 Processed job 3 Processed job 4 Processed job 5
Example (Blocking) ¶
Example (Blocking) demonstrates how to use [Batcher.SubmitJob] in a single line.
package main
import (
"context"
"fmt"
"sync"
"time"
"fillmore-labs.com/microbatch"
)
type (
JobID int
Job struct {
ID JobID
}
JobResult struct {
ID JobID
Body string
}
Jobs []*Job
JobResults []*JobResult
)
func (j *Job) JobID() JobID { return j.ID }
func (j *JobResult) JobID() JobID { return j.ID }
// unwrap unwraps a JobResult to payload and error.
func unwrap(r *JobResult, err error) (string, error) {
if err != nil {
return "", err
}
return r.Body, nil
}
type RemoteProcessor struct{}
func (p *RemoteProcessor) ProcessJobs(jobs Jobs) (JobResults, error) {
results := make(JobResults, 0, len(jobs))
for _, job := range jobs {
result := &JobResult{
ID: job.ID,
Body: fmt.Sprintf("Processed job %d", job.ID),
}
results = append(results, result)
}
return results, nil
}
func main() {
// Initialize
processor := &RemoteProcessor{}
batcher := microbatch.NewBatcher(
processor.ProcessJobs,
(*Job).JobID,
(*JobResult).JobID,
microbatch.WithSize(3),
microbatch.WithTimeout(10*time.Millisecond),
)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
const iterations = 5
var wg sync.WaitGroup
// Submit jobs
for i := 1; i <= iterations; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
if result, err := unwrap(batcher.Execute(ctx, &Job{ID: JobID(i)})); err == nil {
fmt.Println(result)
}
}(i) // https://go.dev/doc/faq#closures_and_goroutines
}
// Shut down
wg.Wait()
batcher.Send()
}
Output: Processed job 1 Processed job 2 Processed job 3 Processed job 4 Processed job 5
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ( // ErrNoResult is returned when the response from processJobs is missing a // matching correlation ID. ErrNoResult = errors.New("no result") // ErrDuplicateID is returned when a job has an already existing correlation ID. ErrDuplicateID = errors.New("duplicate correlation ID") )
Functions ¶
This section is empty.
Types ¶
type Batcher ¶
type Batcher[Q, R any] struct { // contains filtered or unexported fields }
Batcher handles submitting requests in batches and returning results through channels.
func NewBatcher ¶
func NewBatcher[Q, R any, C comparable, QQ ~[]Q, RR ~[]R]( processJobs func(jobs QQ) (RR, error), correlateRequest func(request Q) C, correlateResult func(result R) C, opts ...Option, ) *Batcher[Q, R]
NewBatcher creates a new Batcher.
- batchProcessor is used to process batches of jobs.
- correlateRequest and correlateResult functions are used to get a common key from a job and result for correlating results back to jobs.
- opts are used to configure the batch size and timeout.
The batch collector is run in a goroutine which must be terminated with [Batcher.Shutdown].
type Option ¶ added in v0.1.0
type Option interface {
// contains filtered or unexported methods
}
Option defines configurations for NewBatcher.
func WithTimeout ¶ added in v0.1.0
WithTimeout is an option to configure the batch timeout.
Click to show internal directories.
Click to hide internal directories.