microbatch

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2023 License: Apache-2.0 Imports: 5 Imported by: 1

README

Micro Batcher

Go Reference

Micro-batching is a technique often used in stream processing to achieve near real-time computation while reducing the overhead compared to single record processing. It balances latency versus throughput and enables simplified parallelization while optimizing resource utilization.

Popular examples are Spark Structured Streaming, Kafka and others.

Usage

An example project calling AWS Labda demonstrates an example use case.

Implement Job and JobResult
type (
	JobID string

	Job struct {
		ID      JobID
		Payload string
	}

	JobResult struct {
		ID      JobID
		Payload string
	}
)

func (j *Job) CorrelationID() JobID {
	return j.ID
}

func (j *JobResult) CorrelationID() JobID {
	return j.ID
}

var (
	_ microbatch.Correlatable[JobID] = (*Job)(nil)
	_ microbatch.Correlatable[JobID] = (*JobResult)(nil)
)
Implement the Batch Processor
type RemoteProcessor struct{}

func (p *RemoteProcessor) ProcessJobs(jobs []*Job) ([]*JobResult, error) {
    ... // Send the jobs downstream for processing and return the results
}

var _ microbatch.BatchProcessor[*Job, *JobResult] = (*RemoteProcessor)(nil)
Use the Batcher
// Initialize
processor := &RemoteProcessor{}
const batchSize = 5
const batchDuration = 1 * time.Millisecond
batcher := microbatch.NewBatcher(processor, batchSize, batchDuration)

var wg sync.WaitGroup

// Submit jobs
wg.Add(1)
go func() {
	result, _ := batcher.ExecuteJob(ctx, &Job{ID: 1})
	wg.Done()
}()

// Shut down
wg.Wait()
batcher.Shutdown()

Documentation

Overview

microbatch simplifies asynchronous microbatching.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrBatcherTerminated is returned when the batcher is terminated.
	ErrBatcherTerminated = errors.New("batcher terminated")
	// ErrNoResult is returned when the response from [BatchProcessor] is missing a
	// matching correlation ID.
	ErrNoResult = errors.New("no result")
)

Errors returned from Batcher.ExecuteJob.

Functions

This section is empty.

Types

type BatchProcessor

type BatchProcessor[Q any, S any] interface {
	ProcessJobs(jobs []Q) ([]S, error)
}

BatchProcessor is the interface your batch processor needs to implement.

Example
package main

import (
	"fmt"

	"fillmore-labs.com/microbatch"
)

type RemoteProcessor struct{}

func (p *RemoteProcessor) ProcessJobs(jobs []*Job) ([]*JobResult, error) {
	result := make([]*JobResult, 0, len(jobs))
	for _, job := range jobs {
		body := fmt.Sprintf("Processed job %d", job.ID)
		result = append(result, &JobResult{ID: job.ID, Body: body})
	}

	return result, nil
}

func main() {
	var processor microbatch.BatchProcessor[*Job, *JobResult] = &RemoteProcessor{}

	result, _ := processor.ProcessJobs([]*Job{})
	fmt.Println(result)
}
Output:

[]

type Batcher

type Batcher[Q any, S any] struct {
	// contains filtered or unexported fields
}

Use the Batcher to submit requests.

Example
// Initialize
processor := &RemoteProcessor{}
batcher := microbatch.NewBatcher(processor, 3, 1*time.Millisecond)

// Submit jobs
const iterations = 5
var wg sync.WaitGroup
wg.Add(iterations)
ctx := context.Background()
for i := 1; i <= iterations; i++ {
	go func(i int) {
		result, _ := batcher.ExecuteJob(ctx, &Job{ID: JobID(i)})
		fmt.Println(result.Body)
		wg.Done()
	}(i) // https://go.dev/doc/faq#closures_and_goroutines
}
wg.Wait()

// Shut down
batcher.Shutdown()
Output:

Processed job 1
Processed job 2
Processed job 3
Processed job 4
Processed job 5

func NewBatcher

func NewBatcher[Q, S Correlatable[K], K comparable](
	processor BatchProcessor[Q, S],
	size int,
	duration time.Duration,
) *Batcher[Q, S]

NewBatcher creates a new Batcher.

func (*Batcher[Q, S]) ExecuteJob

func (b *Batcher[Q, S]) ExecuteJob(ctx context.Context, request Q) (S, error)

Submit a job and wait for the result.

func (*Batcher[Q, S]) Shutdown

func (b *Batcher[Q, S]) Shutdown()

Shutdown needs to be called to send the last batch and terminate to goroutine. No calls to Batcher.ExecuteJob after this will be accepted.

type Correlatable

type Correlatable[K any] interface {
	CorrelationID() K
}

Correlatable is the interface your workloads needs to implement.

Example
package main

import (
	"fmt"

	"fillmore-labs.com/microbatch"
)

type (
	JobID int

	Job struct {
		ID JobID
	}

	JobResult struct {
		ID   JobID
		Body string
	}
)

func (j *Job) CorrelationID() JobID {
	return j.ID
}

func (j *JobResult) CorrelationID() JobID {
	return j.ID
}

func main() {
	printID := func(j microbatch.Correlatable[JobID]) {
		fmt.Println(j.CorrelationID())
	}

	printID(&Job{ID: 1})
	printID(&JobResult{ID: 1})
}
Output:

1
1

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL