conditiaonalbatchexecutor

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2023 License: MIT Imports: 3 Imported by: 0

README

Conditional Batch Executor

Go Reference Go Report Card Go codecov

A batch worker that collects tasks and executes them when conditions are met.

Caller can get the results asynchronously.

Install

go get github.com/plzzzzg/conditional-batch-executor

Examples

// init
worker := conditiaonalbatchexecutor.New(func(tasks []interface{}) map[string]interface{} {
		m := map[string]interface{}{}
		// do something ...
		return m // key is the id from Submit
	}, conditiaonalbatchexecutor.Size(3), conditiaonalbatchexecutor.Interval(time.Second*2)) // execute if size of tasks >= 3 OR after 2 seconds since last execution  

// submit
resultReciever, err := worker.Submit(idStr, i)

// receive
result := <-resultReciever

Supported Conditions

Interval
worker := conditiaonalbatchexecutor.New(func(tasks []interface{}) map[string]interface{} {
		m := map[string]interface{}{}
		return m 
	}, conditiaonalbatchexecutor.Interval(time.Second*2)) // execute every 2 seconds 
Size
worker := conditiaonalbatchexecutor.New(func(tasks []interface{}) map[string]interface{} {
		m := map[string]interface{}{}
		return m 
	}, conditiaonalbatchexecutor.Size(2)) // execute then buffer size reaches 2
And
worker := conditiaonalbatchexecutor.New(func(tasks []interface{}) map[string]interface{} {
		m := map[string]interface{}{}
		return m 
	}, conditiaonalbatchexecutor.And(Size(2), conditiaonalbatchexecutor.Interval(time.Second*2))) // execute then buffer size reaches 2 AND last execution happened more than 2 min ago  

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Condition

type Condition func(executor *Executor) bool

func And

func And(first Condition, others ...Condition) Condition

func Interval

func Interval(duration time.Duration) Condition

func Size

func Size(i int) Condition

type Executor

type Executor struct {
	// contains filtered or unexported fields
}

func New

func New(doFnc func([]*Item) (map[string]interface{}, error), conditions ...Condition) *Executor

func (*Executor) Close

func (w *Executor) Close()

func (*Executor) Size

func (w *Executor) Size() int

func (*Executor) Submit

func (w *Executor) Submit(taskID string, item interface{}) (<-chan interface{}, error)

type Item

type Item struct {
	Key     string
	Content interface{}
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL