aggregator

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2024 License: MIT Imports: 2 Imported by: 0

README

Aggregator

Go Reference Go Report Card

This is a generic batch processing library for golang. You can group up and process batch of tasks into group or groups, which can help us to reduce loading of io.

Beta

This project may contain bugs and have not being tested at all. Please feel free to test, improve it.

Golang

golang v1.18+ required

Examples

there are many practical use cases in real world
  • sync a bulk of documents into elasticsearch
  • write a group of data by single rpc call
  • write a group of quota data into redis by hmset
  • write metrics into clickhouse by batch, big batch like 10k rows

How

There is 3 types need to define on your own before init aggregator, which is even more important than aggregator functions.

  • T: task type - the original task waiting to group up. you need to define a wrapper struct with ctx if needed.
  • U: the grouped type - each work will gather tasks into this type before flush (really handler them in batch).
  • V: the result type - the result of each flush action.

how.png

Inspiration

LICENSE

MIT lo

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Aggregator

type Aggregator[T any, U any, V any] struct {
	DebugMode bool

	WorkerCount int

	TaskCh        chan T
	BatchSize     int
	BatchInterval time.Duration

	NewSum    func() U
	Reduce    func(U, T) U
	BeforeAct func(U) error
	Action    func(U) (V, error)
	AfterAct  func(U, V, error)
	// contains filtered or unexported fields
}

Aggregator - the entry point as a aggregator group.

  • T - tasks type that are going to be processed by this worker.
  • U - type of the intermediate result from the aggregation of tasks (the sum).
  • V - final result type after performing some action on the sum.
  • DebugMode - debug or not. # TODO - implement debug mode.
  • WorkerCount - workers count to create, each worker can be regard as a group to sum up all collected tasks.
  • BatchSize - number of tasks to be collected before performing the action.
  • BatchInterval - time interval to perform the action, if the number of tasks collected is less than BatchSize.
  • NewSum - function to create a new sum object (U) when worker is reset after performing the action.
  • Reduce - function to reduce the sum object (U) after getting a new task (T) with existing sum object (U).
  • BeforeAct - function to be called before performing the action, like validation, optional.
  • Action - function to perform the action on the sum object (U) and return the result (V).
  • AfterAct - function to be called after performing the action, like error tracking, optional.

func (*Aggregator[T, U, V]) Start

func (a *Aggregator[T, U, V]) Start() error

Start - initialize the aggregator and start its workers.

func (*Aggregator[T, U, V]) Stop

func (a *Aggregator[T, U, V]) Stop() error

Stop - stop the aggregator and its workers.

Directories

Path Synopsis
example
mset command
simple command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL