batchproc

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2022 License: MIT Imports: 4 Imported by: 0

README ¶

batchproc

Go Reference RELEASE GitHub go.mod Go version Codecov Go Report Card Go Report Card MIT license

Generic batch processor

batchproc is developed to serve the purpose of enabling a uniform way to process large volumes of different types of data & run repititive computations as per requirement

batchproc essentially provides

  • Ability to process any type of indexed, iterative data
  • Customization for batch steps
  • Rungroups to execute batch units concurrently
💾 Installation
go get -u github.com/aryannr97/batchproc
📔 Documentation

Detailed documentation can be found here

🧑💻 Usage
1. New
  • Returns a new instance of batch processing executor.
// Creation stage
executor := batchproc.New(context.Background(), "unique-id", len(data), data, getBatchUnit)
  • GetBatchUnitFunc is a predefined type type GetBatchUnitFunc func() BatchUnit
  • User should define & pass this function while creating executor via New to load batches with user defined type implementing BatchUnit interface
// getBatchUnit returns CustomBatchUnit that implements BatchUnit interface
func getBatchUnit() batchproc.BatchUnit {
	return &CustomBatchUnit{}
}
  • New also accepts an optional last parameter (batchSize) if user prefers to provide any preferred batch size
2. BatchUnit
  • It's an interface defining two methods Compute & GetResult
type BatchUnit interface {
	Compute(int, int, interface{}) error
	GetResult() interface{}
}

  • This provides a way for user to have customize batch steps as per use-case
// CustomBatchUnit : define your own struct type
type CustomBatchUnit struct {
	result int
}

// Compute : define custom implementation for this method as per use case & store result in struct attribute
func (t *CustomBatchUnit) Compute(start, end int, collection interface{}) error {
	if data, ok := collection.([]int); ok {
		data = data[start:end]
		for _, value := range data {
			t.result += value
		}

		return nil
	}

	return fmt.Errorf("type assertion error")
}

// GetResult : return result attribute from defined struct type
func (t *CustomBatchUnit) GetResult() interface{} {
	return t.result
}
3. Run
  • Executes batch processing with loaded batch units
  • Internally each batch will process under a separate rungroup (concurrent routine)
  • Any single batch failure will interrrupt other batches & cancel overall batch processing
// Computation stage
if err := executor.Run(); err != nil {
	log.Println(err)
	return
}
4. Aggregate
  • Aggregate performs final stage of the batch processing, where results from different batches are aggregated to form overall result.
  • User should pass own aggregation function to process batch result as desired
// Aggregation stage
result := executor.Aggregate(aggregation)
  • AggregationFunc is a predefined type, to serve different case-to-case implementations type AggregationFunc func([]interface{}) interface{}
// aggregation : treat results gathered from all batches as required
func aggregation(results []interface{}) interface{} {
	var res int

	for _, result := range results {
		if total, ok := result.(int); ok {
			res += total
		}
	}

	return res
}
🚀 Benchmark results
goos: linux
goarch: amd64
pkg: github.com/aryannr97/batchproc
cpu: Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
Test case b.N iterations Avg processsing time
Addition_of_first_10_million_integers_without_batch_processing-2 100 10665257 ns/op
Addition_of_first_10_million_integers_with_batch_processing-2 251 4620470 ns/op
goos: linux
goarch: amd64
pkg: github.com/aryannr97/batchproc
cpu: Intel(R) Core(TM) i7-10850H CPU @ 2.70GHz
Test case b.N iterations Avg processsing time
Addition_of_first_10_million_integers_without_batch_processing-12 184 6394545 ns/op
Addition_of_first_10_million_integers_with_batch_processing-12 295 4074257 ns/op

As observed, batch processing can nearly run for twice the number of iterations and reduce processing time by half

📂 Example

To execute the demo program, run command make demo

Documentation ¶

Index ¶

Constants ¶

View Source
const TestRun testRunKey = "testRun"

testRun defines constant key for unit test

Variables ¶

This section is empty.

Functions ¶

This section is empty.

Types ¶

type AggregationFunc ¶

type AggregationFunc func([]interface{}) interface{}

AggrerationFunc defines func signature for result aggregation

type BatchUnit ¶

type BatchUnit interface {
	Compute(int, int, interface{}) error
	GetResult() interface{}
}

BatchUnit interface defines functionalities of a single batch

type Executor ¶

type Executor struct {
	ElapsedDuration time.Duration
	// contains filtered or unexported fields
}

Executor represents concurrent batch processing entity

func New ¶

func New(ctx context.Context, id string, totalCount int, data interface{}, getBatchUnit GetBatchUnitFunc, batchSize ...int) *Executor

New creates, initializes & returns a new batch Executor,

id is used to mark each batch with unique identification e.g {id}-batch-0,

totalCount is the total size of the collection,

data is actual collection need to be processed in batched fashion,

getBatchUnit is used to fetch user defined type implementing BatchUnit interface and load batches,

batchSize is optional & can be passed to override dynamic batch sizing.

func (*Executor) Aggregate ¶

func (e *Executor) Aggregate(aggregation AggregationFunc) interface{}

Aggregate is used for result aggregation and mark closure of batch processing, user should define own aggregation func to process results from different batches.

func (*Executor) GetBatchCount ¶ added in v1.0.2

func (e *Executor) GetBatchCount() int

GetBatchCount returns number of batches for given executor

func (*Executor) Run ¶

func (e *Executor) Run() error

Run triggers concurrent routines for computation through batch units

type GetBatchUnitFunc ¶

type GetBatchUnitFunc func() BatchUnit

GetBatchUnitFunc defines func signature for returning user defined type which implements BatchUnit interface

Directories ¶

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL