microbatching

package module
v0.0.0-...-3f99b97 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 25, 2024 License: MIT Imports: 5 Imported by: 0

README

micro-batching

Micro batching library in Go

Installation

go get github.com/antklim/micro-batching

Usage

package main

func main() {
    // create and run a micro-batching service
    srv := mb.NewService(mb.WithFrequency(10 * time.Millisecond))
    // provide a batch processor
    srv.Run(&BatchProcessor{})

    jobsSize := 7

    for i := 0; i < jobsSize; i++ {
        // add jobs to be processed
        srv.AddJob(Job{i})
    }

    // wait for the jobs to be processed
    time.Sleep(20 * time.Millisecond)

    // get the jobs results
    for i := 0; i < jobsSize; i++ {
        r, err := srv.JobResult(i)

        if err != nil {
            fmt.Println(err)
        } else {
            fmt.Printf("Job ID: %s, State: %s\n", r.JobID, r.State)
        }
    }

    srv.Shutdown()
}

Service Options

The service can be configured with the following options:

  • WithBatchSize - sets the batch size (default is 3)
  • WithFrequency - sets the frequency of the calling the batch processor (default is 1 second)
  • WithLogger - sets the logger for the service
  • WithQueueSize - sets the queue size for the jobs (default is 100). In case the queue is full, the AddJob call will be blocked until the queue has space.
  • WithShutdownTimeout - sets the timeout for the service shutdown (default is 5 seconds)

API

The NewService function creates a new service with the provided options.

The service provides the following API:

  • AddJob - adds a job to the queue
  • JobResult - returns the result of the job by its ID
  • Run - runs the service
  • Shutdown - shuts down the service

Architecture

+---------+  jobs channel  +-------+  batches channel  +-----------------+
| Service | -------------> | Batch | ----------------> | Batch processor |
+---------+                +-------+                   +-----------------+
    ^                                                           |
    |                  notifications channel                    |
    +-----------------------------------------------------------+
Running service

Service needs batch processor to be provided. The batch processor is responsible for processing the batches of jobs. The batch processor should implement the BatchProcessor interface. To run the service, call the Run method with the batch processor instance.

Processing jobs

When AddJob is called, the job is added to the jobs channel. The batching go-routine listens to the jobs channel and creates batches of jobs. When the batch is ready or when the frequency time ticks, the batches are sent to the batch runner go-routine.

The batch runner listens to the batch channel and accumulates the batches. When the frequency timer ticks the batches are sent to the batch processor. The results of the processed batches are sent to the notifications channel.

The service listens to the notifications channel and updates the job results.

Job results

Job results are available via the JobResult method.

Service shutdown

When the Shutdown method is called, the service closes the jobs channel and waits for the submitted jobs to be processed.

Closing jobs channel causes batch go-routine to send all the remaining batches to the batch runner. The batch go-routine closes the batches channel.

Closing the batches channel causes the batch runner to send all the remaining batches to the batch processor. The batch runner closes the notifications channel.

Closing the notifications channel causes the service to stop listening to the notifications channel and send the final event to the done channel.

The Shutdown method is waiting for the event in done channel or for the timeout to occur.

Testing

To test the library, run the following command:

go test -v

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrJobNotFound = errors.New("microbatching: Job not found")

ErrJobNotFound is returned by the Service.JobResult method when the job is not found.

View Source
var ErrServiceClosed = errors.New("microbatching: Service closed")

ErrServiceClosed is returned by the Service.AddJob methods after a call to Service.Shutdown.

Functions

func Batch

func Batch[V any](
	batchSize int,
	in <-chan V,
	out chan<- []V,
	freq time.Duration,
)

Batch takes values from an 'in' channel and sends them in batches to an 'out' channel.

Types

type BatchProcessor

type BatchProcessor interface {
	Process(jobs []Job) []ProcessingResult
}

BatchProcessor describes a batch processor interface.

type Job

type Job interface {
	ID() string
	Do() JobResult
}

Job describes a job interface. Job is a unit of work executed by the batch processor.

type JobExtendedResult

type JobExtendedResult struct {
	JobID string
	State JobState
	JobResult
}

JobExtendedResult describes job result with state.

type JobResult

type JobResult struct {
	Err    error
	Result interface{}
}

JobResult describes job result.

type JobState

type JobState int
const (
	Submitted JobState = iota
	Processing
	Completed
)

func (JobState) String

func (s JobState) String() string

type Logger

type Logger interface {
	Print(...interface{})
	Printf(format string, v ...any)
	Println(...interface{})
}

type ProcessingResult

type ProcessingResult struct {
	JobID string
	JobResult
}

ProcessingResult describes job processing result by the batch processor.

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner is a micro-batching runner. It reads batches from the channel and stores them into a queue. It processes the queue in a batch when the ticker ticks. It notifies the results to the notification channel.

func NewRunner

func NewRunner(
	bp BatchProcessor,
	bc <-chan []Job,
	nc chan<- JobExtendedResult,
	freq time.Duration,
) *Runner

func (*Runner) Run

func (r *Runner) Run()

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service is a micro-batching service that processes jobs in batches.

Example
srv := mb.NewService(mb.WithFrequency(10 * time.Millisecond))
srv.Run(&mockBatchProcessor{})

jobsSize := 7
testJobs := makeMockJobs(jobsSize)

for _, j := range testJobs {
	srv.AddJob(j)
}

time.Sleep(50 * time.Millisecond)

for _, j := range testJobs {
	r, err := srv.JobResult(j.ID())

	if err != nil {
		fmt.Println(err)
	} else {
		fmt.Printf("Job ID: %s, State: %s\n", r.JobID, r.State)
	}

}

srv.Shutdown()
Output:

Job ID: 0, State: Completed
Job ID: 1, State: Completed
Job ID: 2, State: Completed
Job ID: 3, State: Completed
Job ID: 4, State: Completed
Job ID: 5, State: Completed
Job ID: 6, State: Completed

func NewService

func NewService(opt ...ServiceOption) *Service

func (*Service) AddJob

func (s *Service) AddJob(j Job) error

AddJob adds a job to the queue. It returns an error if the service is closed.

func (*Service) JobResult

func (s *Service) JobResult(jobID string) (JobExtendedResult, error)

JobResult returns the result of a job. It returns an error if the job is not found.

func (*Service) Run

func (s *Service) Run(bp BatchProcessor)

func (*Service) Shutdown

func (s *Service) Shutdown()

Shutdown stops the service.

type ServiceOption

type ServiceOption interface {
	// contains filtered or unexported methods
}

ServiceOption sets service options such as batch size and frequency.

func WithBatchSize

func WithBatchSize(v int) ServiceOption

WithBatchSize returns a ServiceOption that sets batch size.

func WithFrequency

func WithFrequency(v time.Duration) ServiceOption

WithFrequency returns a ServiceOption that sets frequency.

func WithLogger

func WithLogger(v Logger) ServiceOption

WithLogger returns a ServiceOption that sets service logger.

func WithQueueSize

func WithQueueSize(v int) ServiceOption

WithQueueSize returns a ServiceOption that sets jobs queue size.

func WithShutdownTimeout

func WithShutdownTimeout(v time.Duration) ServiceOption

WithShutdownTimeout returns a ServiceOption that sets service shutdown timeout.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL