Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( EmptyBatchResult = BatchResult{} EmptyBatchResultSlice = make([]BatchResult, 0) )
var ( NUM_OF_WORKER = 256 NUM_OF_ARGS_TO_WAIT = 1024 )
var ( PARTITION_4 = 4 PARTITION_8 = 8 PARTITION_16 = 16 SLEEP_DURATION = time.Duration(5 * time.Millisecond) )
var ErrArgsBiggerThanHardLimit = errors.New(
"number of args passed is bigger than the given hard limit")
ErrArgsBiggerThanHardLimit is returned when the number of given args is bigger than hard limit
var ErrEngineBrokenSizes = errors.New(
"Please check EngineConfig docs, and ensure your config match the requirements")
ErrEngineBrokenSizes is returned when the soft/hard limit is relatively broken
Please check EngineConfig docs for more
var ErrNilWorkerFn = errors.New("workerFn can't be nil")
ErrNilWorkerFn is returned when`workerFn` is nil
var ErrResultNotFound = errors.New(
"A result is not found on the resulting map. " +
"Please check your code to ensure all ids are returned with their corresponding results.")
ErrResultNotFound is returned when an ID is not found in the results map
Functions ¶
Types ¶
type Batch ¶
type Batch struct {
ID uint64
// contains filtered or unexported fields
}
Batch wraps multiple result, and expose API to get the result from it
This object is *NOT* goroutine-safe, but internally, always used only inside a mutex or by one background worker only
While there are `GetResult()` and `GetResultWithContext` Batch object itself has no context variant as we can't know which context it should be based on
func NewBatch ¶
func NewBatch(id uint64, wp *WP.WorkerPool) *Batch
NewBatch creates a new batch
Once taken to work on, nothing should be put anymore
func (*Batch) Put ¶
func (b *Batch) Put( id uint64, arg interface{}) BatchResult
Put into Batch.args
type BatchResult ¶
type BatchResult struct {
// contains filtered or unexported fields
}
func (*BatchResult) GetResult ¶
func (br *BatchResult) GetResult() (interface{}, error)
GetResult waits until the batch is done, then match the result for each caller.
This is not automatically called inside Engine's `Submit()` call. This is by design, to allow user to specify when to wait, and also makes it easier to be reused inside another utility functions.
func (*BatchResult) GetResultWithContext ¶
func (br *BatchResult) GetResultWithContext( ctx context.Context) (interface{}, error)
GetResultWithContext waits until either the batch or ctx is done, then match the result for each caller. You need to have WorkerPool object for this function to work.
Note that unless you need to use the context idiom, it is recommended to use `GetResult()` call instead, as it has much, much less allocation (only interface{} typecasting). This API need to create another goroutine (if Workerpool is nil), and 2 channels to manage its functionality. (And of course, using context's `WithCancel` or `WithTimeout` also creates goroutines)
Beware that because how go handles local variable, if you are using multiple `GetResultWithContext` in single function, be sure to assign it to different local variable. See https://stackoverflow.com/questions/25919213/why-does-go-handle-closures-differently-in-goroutines
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine is our batch-controller, loosely adapted from https://github.com/grab/async/blob/master/batch.go, designed specifically for business-logic use-case.
User just need to specify the config on `NewEngine` call, and then use `Submit()` call on logic code.
Each worker and each submit (or `many` variant) call will signal at the end of the code that a slot is available. We do this blindly, to make the code clearer that another call should try. This is not degrading performance, as only 1 G is waken up, so no contention
Internally, this implementation uses a slice, to hold inflight data. In the future, if the need arises, will add map-based batch. Useful typically for case where multiple keys could be the same.
This implementation is goroutine-safe
func GetEngineForBenchmarks ¶
func GetEngineForBenchmarks(n int, wpb *WP.WorkerPool) *Engine
func GetEnginesForBenchmarks ¶
func GetEnginesForBenchmarks(n int, wpb *WP.WorkerPool) []*Engine
func NewEngine ¶
func NewEngine( ec EngineConfig, fn WorkerFn, wp *WP.WorkerPool) (*Engine, error)
NewEngine creates the engine based on the given EngineConfig
The given `fn` should be goroutine-safe
func (*Engine) Submit ¶
func (e *Engine) Submit(arg interface{}) BatchResult
Submit puts arg to current batch to be worked on by background goroutine.
func (*Engine) SubmitMany ¶
func (e *Engine) SubmitMany(args []interface{}) []BatchResult
SubmitMany puts args into batches, which may be one or more
Atomicity should not be assumed, as this implementation will try to pack as much as possible to ensure we get the best possible savings + efficiency.
It would allocate a slice to accomodate the result, and internally call `SubmitManyInto`
func (*Engine) SubmitManyInto ¶
func (e *Engine) SubmitManyInto(args []interface{}, result *[]BatchResult)
SubmitManyInto puts args into batches, which may be one or more, and store them into result
Atomicity should not be assumed, as this implementation will try to pack as much as possible to ensure we get the best possible savings + efficiency.
Mainly used to control result's slice allocation ¶
Note that the result will be appended to, so can (and will) be re-allocated for more spaces
type EngineConfig ¶
EngineConfig is the config object for our engine
SoftLimit is the limit where the batch can already be taken by worker, but not forcefully sent to. Worker gonna take this by itself after a timeout (triggered by timeoutWatchdog)
While HardLimit is which can't be passed. Useful for example like AWS SQS, which a single call can only have at most 100 messages
If SoftLimit is empty, it is set to HardLimit / 2.
If HardLimit is empty, it is set to SoftLimit * 2.
After those conversion, both should still be > 1, and SoftLimit <= HardLimit
func GetEngineConfigForBenchmarks ¶
func GetEngineConfigForBenchmarks(n int) EngineConfig