batch

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 30, 2023 License: MIT Imports: 5 Imported by: 3

README

Build Go Reference Go Report Card codecov

What it can be used for?

To increase database-driven web application throughput without sacrificing data consistency and data durability or making source code and architecture complex.

The batch package simplifies writing Go applications that process incoming requests (HTTP, GRPC etc.) in a batch manner: instead of processing each request separately, they group incoming requests to a batch and run whole group at once. This method of processing can significantly speed up the application and reduce the consumption of disk, network or CPU.

The batch package can be used to write any type of servers that handle thousands of requests per second. Thanks to this small library, you can create relatively simple code without the need to use low-level data structures.

Why batch processing improves performance?

Normally a web application is using following pattern to modify data in the database:

  1. Load resource from database. Resource is some portion of data such as set of records from relational database, document from Document-oriented database or value from KV store (in Domain-Driven Design terms it is called an aggregate). Lock the entire resource optimistically by reading version number.
  2. Apply change to data in plain Go
  3. Save resource to database. Release the lock by running atomic update with version check.

But such architecture does not scale well if the number of requests for a single resource is very high (meaning hundreds or thousands of requests per second). The lock contention in such case is very high and database is significantly overloaded. Also, round-trips between application server and database add latency. Practically, the number of concurrent requests is severely limited.

One solution to this problem is to reduce the number of costly operations. Because a single resource is loaded and saved thousands of times per second we can instead:

  1. Load the resource once (let's say once per second)
  2. Execute all the requests from this period of time on an already loaded resource. Run them all sequentially to keep things simple and data consistent.
  3. Save the resource and send responses to all clients if data was stored successfully.

Such solution could improve the performance by a factor of 1000. And resource is still stored in a consistent state.

The batch package does exactly that. You configure the duration of window, provide functions to load and save resource and once the request comes in - you run a function:

// Set up the batch processor:
processor := batch.StartProcessor(
    batch.Options[*YourResource]{ // YourResource is your own Go struct
        MinDuration:  100 * time.Millisecond,
        LoadResource: func(ctx context.Context, resourceKey string) (*YourResource, error){
            // resourceKey uniquely identifies the resource
            ...
        },
        SaveResource: ...,
    },
)

// And use the processor inside http/grpc handler or technology-agnostic service.
// ctx is a standard context.Context and resourceKey can be taken from request parameter
err := processor.Run(ctx, resourceKey, func(r *YourResource) {
    // Here you put the code which will executed sequentially inside batch  
})

For real-life example see example web application.

Installation

# Add batch to your Go module:
go get github.com/elgopher/batch

Please note that at least Go 1.18 is required. The package is using generics, which was added in 1.18.

Scaling out

Single Go http server is able to handle up to tens of thousands of requests per second on a commodity hardware. This is a lot, but very often you also need:

  • high availability (if one server goes down you want other to handle the traffic)
  • you want to handle hundred-thousands or millions of requests per second

For both cases you need to deploy multiple servers and put a load balancer in front of them. Please note though, that you have to carefully configure the load balancing algorithm. Round-robin is not an option here, because sooner or later you will have problems with locking (multiple server instances will run batches on the same resource). Ideal solution is to route requests based on URL path or query string parameters. For example some http query string parameter could have a resource key. You can instruct load balancer to calculate hash on this parameter and always route requests with the same key to the same backend. If backend will be no longer available the load balancer should route request to a different server.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var OperationCancelled = errors.New("run failed: operation was canceled before it was run")
View Source
var ProcessorStopped = errors.New("run failed: processor is stopped")

Functions

This section is empty.

Types

type Metric added in v0.5.0

type Metric struct {
	BatchStart           time.Time
	ResourceKey          string
	OperationCount       int
	LoadResourceDuration time.Duration
	SaveResourceDuration time.Duration
	TotalDuration        time.Duration
	Error                error
}

Metric contains measurements for one finished batch.

type Options

type Options[Resource any] struct {
	// All batches will be run for at least MinDuration.
	//
	// By default, 100ms.
	MinDuration time.Duration

	// Batch will have timeout with MaxDuration. Context with this timeout will be passed to
	// LoadResource and SaveResource functions, which can abort the batch by returning an error.
	//
	// By default, 2*MinDuration.
	MaxDuration time.Duration

	// LoadResource loads resource with given key from a database. Returning an error aborts the batch.
	// This function is called in the beginning of each new batch.
	//
	// Context passed as a first parameter has a timeout calculated using batch MaxDuration.
	// You can watch context cancellation in order to abort loading resource if it takes too long.
	// Context is also cancelled after batch was ended.
	//
	// By default, returns zero-value Resource.
	LoadResource func(_ context.Context, key string) (Resource, error)

	// SaveResource saves resource with given key to a database. Returning an error aborts the batch.
	// This function is called at the end of each batch.
	//
	// Context passed as a first parameter has a timeout calculated using batch MaxDuration.
	// You can watch context cancellation in order to abort saving resource if it takes too long
	// (thus aborting the entire batch). Context is also cancelled after batch was ended.
	//
	// By default, does nothing.
	SaveResource func(_ context.Context, key string, _ Resource) error
}

Options represent parameters for batch.Processor. They should be passed to StartProcessor function. All options (as the name suggest) are optional and have default values.

type Processor

type Processor[Resource any] struct {
	// contains filtered or unexported fields
}

Processor represents instance of batch processor which can be used to issue operations which run in a batch manner.

func StartProcessor

func StartProcessor[Resource any](options Options[Resource]) *Processor[Resource]

StartProcessor starts batch processor which will run operations in batches.

Please note that Processor is a go-routine pool internally and should be stopped when no longer needed. Please use Processor.Stop method to stop it.

func (*Processor[Resource]) Run

func (p *Processor[Resource]) Run(ctx context.Context, key string, _operation func(Resource)) error

Run lets you run an operation on a resource with given key. Operation will run along other operations in batches. If there is no pending batch then the new batch will be started and will run for at least MinDuration. After the MinDuration no new operations will be accepted and SaveResource function will be called.

Operations are run sequentially. No manual synchronization is required inside operation. Operation should be fast, which basically means that any I/O should be avoided at all cost. Operations (together with LoadResource and SaveResource) are run on a batch dedicated go-routine.

Operation must leave Resource in a consistent state, so the next operation in batch can be executed on the same resource. When operation cannot be executed because some conditions are not met then operation should not change the state of resource at all. This could be achieved easily by dividing operation into two sections:

  • first section validates if operation is possible and returns error if not
  • second section change the Resource state

Run ends when the entire batch has ended.

Error is returned when batch is aborted or processor is stopped. Only LoadResource and SaveResource functions can abort the batch by returning an error. If error was reported for a batch, all Run calls assigned to this batch will get this error.

Please always check the returned error. Operations which query the resource get uncommitted data. If there is a problem with saving changes to the database, then you could have a serious inconsistency between your db and what you've just sent to the users.

Operation which is still waiting to be run can be canceled by cancelling ctx. If operation was executed but batch is pending then Run waits until batch ends. When ctx is cancelled then OperationCancelled error is returned.

func (*Processor[Resource]) Stop

func (p *Processor[Resource]) Stop()

Stop ends all running batches. No new operations will be accepted. Stop blocks until all pending batches are ended and resources saved.

func (*Processor[Resource]) SubscribeBatchMetrics added in v0.5.0

func (p *Processor[Resource]) SubscribeBatchMetrics() <-chan Metric

SubscribeBatchMetrics subscribes to all batch metrics. Returned channel is closed after Processor was stopped. It is safe to execute method multiple times. Each call will create a new separate subscription.

As soon as subscription is created all Metric messages **must be** consumed from the channel. Otherwise, Processor will block. Please note that slow consumer could potentially slow down entire Processor, limiting the amount of operations which can be run. The amount of batches per second can reach 100k, so be ready to handle such traffic. This basically means that Metric consumer should not directly do any blocking IO. Instead, it should aggregate data and publish it asynchronously.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL