microbatch

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2023 License: Apache-2.0 Imports: 5 Imported by: 1

README

Micro Batcher

Go Reference

Micro-batching is a technique often used in stream processing to achieve near real-time computation while reducing the overhead compared to single record processing. It balances latency versus throughput and enables simplified parallelization while optimizing resource utilization.

Popular examples are Spark Structured Streaming, Apache Kafka and others.

Usage

Implement Job and JobResult
type (
	Job struct {
		ID      string
		Request string
	}

	JobResult struct {
		ID       string
		Response string
	}
)

func correlateRequest(j *Job) string      { return j.ID }
func correlateResult(r *JobResult) string { return r.ID }
Implement the Batch Processor
type RemoteProcessor struct{}

func (*RemoteProcessor) ProcessJobs(jobs []*Job) ([]*JobResult, error) {
    ... // Send the jobs downstream for processing and return the results
}
Use the Batcher
// Initialize
processor := &RemoteProcessor{}
const batchSize = 5
const batchDuration = 1 * time.Millisecond
batcher := microbatch.NewBatcher(processor, correlateRequest, correlateResult, batchSize, batchDuration)

var wg sync.WaitGroup

// Submit jobs
wg.Add(1)
go func() {
	result, _ := batcher.ExecuteJob(ctx, &Job{ID: 1})
	wg.Done()
}()

// Shut down
wg.Wait()
batcher.Shutdown()

Documentation

Overview

Package microbatch simplifies asynchronous microbatching.

Example
// Initialize
processor := &RemoteProcessor{}
batcher := microbatch.NewBatcher(
	processor,
	func(j *Job) JobID { return j.ID },
	func(r *JobResult) JobID { return r.ID },
	3,
	10*time.Millisecond,
)

ctx := context.Background()
const iterations = 5
var wg sync.WaitGroup

// Submit jobs
for i := 1; i <= iterations; i++ {
	wg.Add(1)
	go func(i int) {
		result, _ := batcher.ExecuteJob(ctx, &Job{ID: JobID(i)})
		fmt.Println(result.Body)
		wg.Done()
	}(i) // https://go.dev/doc/faq#closures_and_goroutines
}

// Shut down
wg.Wait()
batcher.Shutdown()
Output:

Processed job 1
Processed job 2
Processed job 3
Processed job 4
Processed job 5

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrBatcherTerminated is returned when the batcher is terminated.
	ErrBatcherTerminated = errors.New("batcher terminated")
	// ErrNoResult is returned when the response from [BatchProcessor] is missing a
	// matching correlation ID.
	ErrNoResult = errors.New("no result")
)

Errors returned from Batcher.ExecuteJob.

Functions

This section is empty.

Types

type BatchProcessor

type BatchProcessor[QQ, SS any] interface {
	ProcessJobs(jobs QQ) (SS, error)
}

BatchProcessor is the interface your batch processor needs to implement.

Example
package main

import (
	"fmt"
)

type (
	JobID int

	Job struct {
		ID JobID
	}

	JobResult struct {
		ID   JobID
		Body string
	}

	Jobs       []*Job
	JobResults []*JobResult
)

type RemoteProcessor struct{}

func (p *RemoteProcessor) ProcessJobs(jobs Jobs) (JobResults, error) {
	results := make(JobResults, 0, len(jobs))
	for _, job := range jobs {
		result := &JobResult{
			ID:   job.ID,
			Body: fmt.Sprintf("Processed job %d", job.ID),
		}
		results = append(results, result)
	}

	return results, nil
}

func main() {
	processor := &RemoteProcessor{}
	results, _ := processor.ProcessJobs(Jobs{&Job{ID: 1}, &Job{ID: 2}})
	for _, result := range results {
		fmt.Println(result.Body)
	}
}
Output:

Processed job 1
Processed job 2

type Batcher

type Batcher[Q, S any] struct {
	// contains filtered or unexported fields
}

Batcher is used to submit requests.

func NewBatcher

func NewBatcher[Q, S any, K comparable, QQ ~[]Q, SS ~[]S](
	batchProcessor BatchProcessor[QQ, SS],
	correlateRequest func(Q) K,
	correlateResult func(S) K,
	size int,
	duration time.Duration,
) *Batcher[Q, S]

NewBatcher creates a new Batcher.

func (*Batcher[Q, S]) ExecuteJob

func (b *Batcher[Q, S]) ExecuteJob(ctx context.Context, request Q) (S, error)

ExecuteJob submits a job and waits for the result.

func (*Batcher[Q, S]) Shutdown

func (b *Batcher[Q, S]) Shutdown()

Shutdown needs to be called to send the last batch and terminate the goroutine. No calls to Batcher.ExecuteJob after this will be accepted.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL