aggregator

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 16, 2022 License: MIT Imports: 6 Imported by: 0

README

Aggregator

Go Reference

Aggregator is a batch processing library for Go supports returning values. You can group up and process batch of tasks with keys in a single callback. Using it for grouping up database query or cache can help you to reduce loading of database and network.

THIS PROJECT IS IN BETA

This project may contain bugs and have not being tested at all. Use under your own risk, but feel free to test, make pull request and improve this project.

Features

  • Support multi Aggregators (using AggregatorList) for fallback.
  • Support multi workers to flush tasks.
  • Support Go generics for query keys and result values.
  • Support timeout-only or tasks limit-only.
  • Support singleflight (using singleflight-any).

Install

Currently Go 1.18+ is required (for go generics), backward compatible is planned.

go get github.com/serkodev/aggregator@latest

Example

callback := func(keys []string) (map[string]Book, error) {
    results := db.Query(`SELECT * FROM books WHERE name IN ?`, keys)
    return results, nil
}
agg, _ := aggregator.New(callback, 100*time.Millisecond, 5).Run()

for _, name := range []string{"foo", "foo", "bar", "baz", "baz"} {
    go func(n string) {
        book, err := agg.Query(n).Get()
        if err == nil {
            print(book.Name + ":" + book.Price, " ")
        }
    }(name)
}

// foo:10 foo:10 bar:25 baz:30 baz:30

How it works

flowchart LR;
    subgraph A [Aggregator]
        direction TB
        subgraph cb ["Customize Process (example)"]
        direction TB
            input("Input
            []string{#quot;foo#quot;, #quot;bar#quot;, #quot;baz#quot;}")
            db[("Database

            SELECT price FROM books<br />WHERE name
            IN ('foo', 'bar', 'baz')")]
            output("return map[string]int{
                &nbsp;&nbsp;&nbsp;&nbsp;#quot;foo#quot;: 10,
                &nbsp;&nbsp;&nbsp;&nbsp;#quot;bar#quot;: 25,
                &nbsp;&nbsp;&nbsp;&nbsp;#quot;baz#quot;: 30,
            }")
            input --> db --> output
            style output text-align:left
        end

        Wait -- Reach tasks limit / Timeout -->
        cb --> rt("Return value to each Request")
    end

    req1[Request 1] --> q_foo_("Query(#quot;foo#quot;)"):::bgFoo --> A
    req2[Request 2] --> q_foo2("Query(#quot;foo#quot;)"):::bgFoo --> A
    req3[Request 3] --> q_bar_("Query(#quot;bar#quot;)"):::bgBar --> A
    req4[Request 4] --> q_baz_("Query(#quot;baz#quot;)"):::bgBaz --> A
    req5[Request 5] --> q_baz2("Query(#quot;baz#quot;)"):::bgBaz --> A

    A --- rtn1("return 10"):::bgFoo --> req1_[Request 1]
    A --- rtn2("return 10"):::bgFoo --> req2_[Request 2]
    A --- rtn3("return 25"):::bgBar --> req3_[Request 3]
    A --- rtn4("return 30"):::bgBaz --> req4_[Request 4]
    A --- rtn5("return 30"):::bgBaz --> req5_[Request 5]

    classDef bgFoo fill:green;
    classDef bgBar fill:blue;
    classDef bgBaz fill:purple;

Advance

AggregatorList

AggregatorList contains a slice of Aggregator, you can create it by aggregator.NewList(...). If the prior order aggregator cannot return data for any keys. Then AggregatorList will query data from the next aggregator for fallback.

For example, you create an AggregatorList with cache and database aggregator, when the data has not been cached, it will auto query from database.

cacheAgg := aggregator.New(func(k []string) (map[string]string, error) {
    fmt.Println("fetch from cache...", k)
    return map[string]string{
        "key1": "val1",
        "key2": "val2",
    }, nil
}, 50*time.Millisecond, 10)

databaseAgg := aggregator.New(func(k []string) (map[string]string, error) {
    fmt.Println("fetch from database...", k)
    return map[string]string{
        "key1": "val1",
        "key2": "val2",
        "key3": "val3",
        "key4": "val4",
    }, nil
}, 50*time.Millisecond, 10)

list := aggregator.NewList(cacheAgg, databaseAgg).Run()
results := list.QueryMulti([]string{"key1", "key2", "key3", "key4"})

// fetch from cache... ["key1", "key2", "key3", "key4"]
// fetch from database... ["key3", "key4"]
// results: ["val1", "val2", "val3", "val4"]
singleflight

In some use case you may need to prevent cache breakdown. Aggregator works with singleflight by using singleflight-any (supports Go generics).

Inspiration

LICENSE

MIT License

Documentation

Index

Constants

This section is empty.

Variables

View Source
var NeverFlushTimeout time.Duration = -1

If NeverFlushTimeout is set for the flushMaxWait, the aggregator will never flush with timeout.

View Source
var NoResult = errors.New("No result")

Functions

This section is empty.

Types

type Aggregator

type Aggregator[K comparable, T any] struct {
	// Max waiting time of worker to flush query tasks.
	MaxWait time.Duration

	// Max amount of query tasks, the worker will flush after reaching it.
	MaxSize int
	// contains filtered or unexported fields
}

Aggregator is the instance, it contains the flush and worker settings. Create an instance of Aggregator, by using New()

func New

func New[K comparable, T any](fn func([]K) (map[K]T, error), flushMaxWait time.Duration, flushMaxSize int) *Aggregator[K, T]

New creates a new Aggregator. The flushMaxWait variable sets the maximum timeout of flushing. If flushMaxWait equals to NeverFlushTimeout then the aggregator will never flush with timeout. The flushMaxSize variable sets the maximum size of task. If the flushMaxSize <= 0, the aggregator will never flush with amount of tasks.

func (*Aggregator[K, T]) Debug

func (a *Aggregator[K, T]) Debug() *Aggregator[K, T]

If Debug() is called, the Aggregator will prints debug messages.

func (*Aggregator[K, T]) Query

func (a *Aggregator[K, T]) Query(key K) Result[T]

Query with a key and return with a Result[T] synchronously. It is a shortcut for <-QueryChan(key)

func (*Aggregator[K, T]) QueryChan

func (a *Aggregator[K, T]) QueryChan(key K) <-chan Result[T]

Query with a key and return with a Result[T] channel.

func (*Aggregator[K, T]) QueryMulti

func (a *Aggregator[K, T]) QueryMulti(keys []K) []Result[T]

Query with multiple keys and return a slice of Result[T] synchronously.

func (*Aggregator[K, T]) QueryResult

func (a *Aggregator[K, T]) QueryResult(key K) (T, error)

Query with a key and return with Value and Error of Result[T] synchronously. It is a shortcut for Query(key).Get()

func (*Aggregator[K, T]) QueryValue

func (a *Aggregator[K, T]) QueryValue(key K) T

Query with a key and return with Value Result[T] synchronously. It is a shortcut for Query(key).Value

func (*Aggregator[K, T]) Run

func (a *Aggregator[K, T]) Run() (*Aggregator[K, T], error)

Start run Aggregator with single worker.

func (*Aggregator[K, T]) RunWithWorkers

func (a *Aggregator[K, T]) RunWithWorkers(workers int) (*Aggregator[K, T], error)

Start run Aggregator with n workers.

type AggregatorList

type AggregatorList[K comparable, T any] []*Aggregator[K, T]

AggregatorList is a type defintion of []*Aggregator[K, T].

func NewList

func NewList[K comparable, T any](aggregators ...*Aggregator[K, T]) AggregatorList[K, T]

NewList creates a new slice of Aggregators. When an aggregator returns a NoResult error it will call the next aggregator of the AggregatorList by order.

func (AggregatorList[K, T]) Query

func (aggregators AggregatorList[K, T]) Query(key K) Result[T]

Query result in aggregators of the AggregatorList by order.

func (AggregatorList[K, T]) QueryMulti

func (aggregators AggregatorList[K, T]) QueryMulti(keys []K) []Result[T]

Query result in aggregator of the AggregatorList by order with multiple keys and return a slice of Result[T] synchronously.

func (AggregatorList[K, T]) QueryResult

func (aggregators AggregatorList[K, T]) QueryResult(key K) (T, error)

It is a shortcut for Query(key).Get()

func (AggregatorList[K, T]) QueryValue

func (aggregators AggregatorList[K, T]) QueryValue(key K) T

It is a shortcut for Query(key).Value

func (AggregatorList[K, T]) Run

func (aggregators AggregatorList[K, T]) Run() AggregatorList[K, T]

Run all aggregators of the AggregatorList by order.

type Result

type Result[T any] struct {
	Value T
	Error error
}

Result of Aggregator query. Including Value and Error.

func (Result[T]) Get

func (r Result[T]) Get() (T, error)

Value and Error in multiple return value

func (Result[T]) IsNoResult

func (r Result[T]) IsNoResult() bool

Use to check Result[T].Error equals NoResult. If fn of aggregator does not set the key of result map or returns an NoResult error, the Result[T].Error will be NoResult.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL