engine

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2022 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	EmptyBatchResult      = BatchResult{}
	EmptyBatchResultSlice = make([]BatchResult, 0)
)
View Source
var (
	NUM_OF_WORKER       = 256
	NUM_OF_ARGS_TO_WAIT = 1024
)
View Source
var (
	PARTITION_4    = 4
	PARTITION_8    = 8
	PARTITION_16   = 16
	SLEEP_DURATION = time.Duration(5 * time.Millisecond)
)
View Source
var ErrArgsBiggerThanHardLimit = errors.New(
	"number of args passed is bigger than the given hard limit")

ErrArgsBiggerThanHardLimit is returned when the number of given args is bigger than hard limit

View Source
var ErrEngineBrokenSizes = errors.New(
	"Please check EngineConfig docs, and ensure your config match the requirements")

ErrEngineBrokenSizes is returned when the soft/hard limit is relatively broken

Please check EngineConfig docs for more

View Source
var ErrNilWorkerFn = errors.New("workerFn can't be nil")

ErrNilWorkerFn is returned when`workerFn` is nil

View Source
var ErrResultNotFound = errors.New(
	"A result is not found on the resulting map. " +
		"Please check your code to ensure all ids are returned with their corresponding results.")

ErrResultNotFound is returned when an ID is not found in the results map

Functions

func BatchFunc

func BatchFunc(m map[uint64]interface{}) (
	map[uint64]interface{}, error)

Types

type Batch

type Batch struct {
	ID uint64
	// contains filtered or unexported fields
}

Batch wraps multiple result, and expose API to get the result from it

This object is *NOT* goroutine-safe, but internally, always used only inside a mutex or by one background worker only

While there are `GetResult()` and `GetResultWithContext` Batch object itself has no context variant as we can't know which context it should be based on

func NewBatch

func NewBatch(id uint64, wp *WP.WorkerPool) *Batch

NewBatch creates a new batch

Once taken to work on, nothing should be put anymore

func (*Batch) Put

func (b *Batch) Put(
	id uint64, arg interface{}) BatchResult

Put into Batch.args

type BatchResult

type BatchResult struct {
	// contains filtered or unexported fields
}

func (*BatchResult) GetResult

func (br *BatchResult) GetResult() (interface{}, error)

GetResult waits until the batch is done, then match the result for each caller.

This is not automatically called inside Engine's `Submit()` call. This is by design, to allow user to specify when to wait, and also makes it easier to be reused inside another utility functions.

func (*BatchResult) GetResultWithContext

func (br *BatchResult) GetResultWithContext(
	ctx context.Context) (interface{}, error)

GetResultWithContext waits until either the batch or ctx is done, then match the result for each caller. You need to have WorkerPool object for this function to work.

Note that unless you need to use the context idiom, it is recommended to use `GetResult()` call instead, as it has much, much less allocation (only interface{} typecasting). This API need to create another goroutine (if Workerpool is nil), and 2 channels to manage its functionality. (And of course, using context's `WithCancel` or `WithTimeout` also creates goroutines)

Beware that because how go handles local variable, if you are using multiple `GetResultWithContext` in single function, be sure to assign it to different local variable. See https://stackoverflow.com/questions/25919213/why-does-go-handle-closures-differently-in-goroutines

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is our batch-controller, loosely adapted from https://github.com/grab/async/blob/master/batch.go, designed specifically for business-logic use-case.

User just need to specify the config on `NewEngine` call, and then use `Submit()` call on logic code.

Each worker and each submit (or `many` variant) call will signal at the end of the code that a slot is available. We do this blindly, to make the code clearer that another call should try. This is not degrading performance, as only 1 G is waken up, so no contention

Internally, this implementation uses a slice, to hold inflight data. In the future, if the need arises, will add map-based batch. Useful typically for case where multiple keys could be the same.

This implementation is goroutine-safe

func GetEngineForBenchmarks

func GetEngineForBenchmarks(n int, wpb *WP.WorkerPool) *Engine

func GetEnginesForBenchmarks

func GetEnginesForBenchmarks(n int, wpb *WP.WorkerPool) []*Engine

func NewEngine

func NewEngine(
	ec EngineConfig,
	fn WorkerFn,
	wp *WP.WorkerPool) (*Engine, error)

NewEngine creates the engine based on the given EngineConfig

The given `fn` should be goroutine-safe

func (*Engine) Submit

func (e *Engine) Submit(arg interface{}) BatchResult

Submit puts arg to current batch to be worked on by background goroutine.

func (*Engine) SubmitMany

func (e *Engine) SubmitMany(args []interface{}) []BatchResult

SubmitMany puts args into batches, which may be one or more

Atomicity should not be assumed, as this implementation will try to pack as much as possible to ensure we get the best possible savings + efficiency.

It would allocate a slice to accomodate the result, and internally call `SubmitManyInto`

func (*Engine) SubmitManyInto

func (e *Engine) SubmitManyInto(args []interface{}, result *[]BatchResult)

SubmitManyInto puts args into batches, which may be one or more, and store them into result

Atomicity should not be assumed, as this implementation will try to pack as much as possible to ensure we get the best possible savings + efficiency.

Mainly used to control result's slice allocation

Note that the result will be appended to, so can (and will) be re-allocated for more spaces

type EngineConfig

type EngineConfig struct {
	NumOfWorker  int
	SoftLimit    int
	HardLimit    int
	WaitDuration time.Duration
}

EngineConfig is the config object for our engine

SoftLimit is the limit where the batch can already be taken by worker, but not forcefully sent to. Worker gonna take this by itself after a timeout (triggered by timeoutWatchdog)

While HardLimit is which can't be passed. Useful for example like AWS SQS, which a single call can only have at most 100 messages

If SoftLimit is empty, it is set to HardLimit / 2.

If HardLimit is empty, it is set to SoftLimit * 2.

After those conversion, both should still be > 1, and SoftLimit <= HardLimit

func GetEngineConfigForBenchmarks

func GetEngineConfigForBenchmarks(n int) EngineConfig

type WorkerFn

type WorkerFn func(map[uint64]interface{}) (map[uint64]interface{}, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL