Documentation ¶
Overview ¶
microbatch simplifies asynchronous microbatching.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ( // ErrBatcherTerminated is returned when the batcher is terminated. ErrBatcherTerminated = errors.New("batcher terminated") // ErrNoResult is returned when the response from [BatchProcessor] is missing a // matching correlation ID. ErrNoResult = errors.New("no result") )
Errors returned from Batcher.ExecuteJob.
Functions ¶
This section is empty.
Types ¶
type BatchProcessor ¶
BatchProcessor is the interface your batch processor needs to implement.
Example ¶
package main import ( "fmt" "fillmore-labs.com/microbatch" ) type RemoteProcessor struct{} func (p *RemoteProcessor) ProcessJobs(jobs []*Job) ([]*JobResult, error) { result := make([]*JobResult, 0, len(jobs)) for _, job := range jobs { body := fmt.Sprintf("Processed job %d", job.ID) result = append(result, &JobResult{ID: job.ID, Body: body}) } return result, nil } func main() { var processor microbatch.BatchProcessor[*Job, *JobResult] = &RemoteProcessor{} result, _ := processor.ProcessJobs([]*Job{}) fmt.Println(result) }
Output: []
type Batcher ¶
Use the Batcher to submit requests.
Example ¶
// Initialize processor := &RemoteProcessor{} batcher := microbatch.NewBatcher(processor, 3, 1*time.Millisecond) // Submit jobs const iterations = 5 var wg sync.WaitGroup wg.Add(iterations) ctx := context.Background() for i := 1; i <= iterations; i++ { go func(i int) { result, _ := batcher.ExecuteJob(ctx, &Job{ID: JobID(i)}) fmt.Println(result.Body) wg.Done() }(i) // https://go.dev/doc/faq#closures_and_goroutines } wg.Wait() // Shut down batcher.Shutdown()
Output: Processed job 1 Processed job 2 Processed job 3 Processed job 4 Processed job 5
func NewBatcher ¶
func NewBatcher[Q, S Correlatable[K], K comparable]( processor BatchProcessor[Q, S], size int, duration time.Duration, ) *Batcher[Q, S]
NewBatcher creates a new Batcher.
func (*Batcher[Q, S]) ExecuteJob ¶
Submit a job and wait for the result.
func (*Batcher[Q, S]) Shutdown ¶
func (b *Batcher[Q, S]) Shutdown()
Shutdown needs to be called to send the last batch and terminate to goroutine. No calls to Batcher.ExecuteJob after this will be accepted.
type Correlatable ¶
type Correlatable[K any] interface { CorrelationID() K }
Correlatable is the interface your workloads needs to implement.
Example ¶
package main import ( "fmt" "fillmore-labs.com/microbatch" ) type ( JobID int Job struct { ID JobID } JobResult struct { ID JobID Body string } ) func (j *Job) CorrelationID() JobID { return j.ID } func (j *JobResult) CorrelationID() JobID { return j.ID } func main() { printID := func(j microbatch.Correlatable[JobID]) { fmt.Println(j.CorrelationID()) } printID(&Job{ID: 1}) printID(&JobResult{ID: 1}) }
Output: 1 1
Click to show internal directories.
Click to hide internal directories.