Documentation ¶
Index ¶
Constants ¶
const ( FlagAboutToClose = iota + 1 FlagClosing FlagClosed ConsumerInterval = 10 )
Variables ¶
var ( DefaultTaskWaitDuration int64 = 50 DefaultQueueConsumeRate = 100 ErrorTimedOut = fmt.Errorf("task timed out in queue") )
Functions ¶
This section is empty.
Types ¶
type BatchSizeProvider ¶
BatchSizeProvider provides a reasonable batch size for queued tasks for specified partition name
type MemoryBatchQueue ¶
type MemoryBatchQueue struct {
// contains filtered or unexported fields
}
MemoryBatchQueue implements Queue. All tasks are stored in memory
func NewMemoryBatchQueue ¶
func NewMemoryBatchQueue(bsp BatchSizeProvider, hdl QueueTaskHandler, poolSize int) *MemoryBatchQueue
NewMemoryBatchQueue initializes a MemoryBatchQueue. poolSize is the size of goroutine pool
func (*MemoryBatchQueue) Close ¶
func (q *MemoryBatchQueue) Close() error
func (*MemoryBatchQueue) Closed ¶
func (q *MemoryBatchQueue) Closed() bool
func (*MemoryBatchQueue) Push ¶
func (q *MemoryBatchQueue) Push(tasks ...QueueTask) chan int64
Push pushes QueueTasks in queue, returns a int64 channel to listen on, once all tasks are processed, a byte map number equals 1<<n-1 is sent to the channel. This byte map number can be used to find out which tasks are finished. When the queue is closing, or tasks is empty, a buffered channel is returned so that caller can go ahead without blocking.
type Queue ¶
type Queue interface { // Push pushes QueueTasks in queue, returns a int64 channel to listen on. Once all // tasks are processed, a byte map number equals 1<<n-1 is sent to the channel. // This byte map number can be used to find out which tasks are finished. // When the queue is closing, or tasks is empty, a buffered channel is returned // so that listener can go ahead without blocking. Push(...QueueTask) chan int64 Close() error Closed() bool }
Queue consistently accepts and buffers incoming tasks in underlying queue, reorganizes buffered tasks in batches, finally processes task batches in parallel. There are 2 circumstances in which tasks are processed - when the number of buffered tasks is greater than batch size, or the very first queued task has been waiting for a time longer than DefaultTaskWaitDuration. The size of a batch is provided by BatchSizeProvider and queued tasks are partitioned accordingly, e.g. by model id.
type QueueTask ¶
type QueueTask interface { // GetPartition returns QueueTask's partition name GetPartition() string // GetPayload returns QueueTask's payload, can be used for debugging purpose GetPayload() interface{} // IsTimeout determines whether the QueueTask has timed out IsTimeout() bool // SetResult sets task execution result back SetResult(interface{}) // SetError sets task execution error SetError(error) // WithFinishFunc sets a callback for the task, must be called after SetResult or SetError WithFinishFunc(func()) // GetResult returns task result, should return nil when no result can be found GetResult() interface{} // GetError returns task error GetError() error }
QueueTask defines required methods for a request that can be processed by Queue. The QueueTask must be able to know when all tasks that are originally queued together are finished, e.g. keeping a finished task counter. See TestQueueTask.SetResult in tests for sample usage.
type QueueTaskHandler ¶
QueueTaskHandler is used to handle a types.QueueTask batch. This is often where user logic should be placed.
type QueueTasks ¶
type QueueTasks []QueueTask
QueueTasks is an array of QueueTasks, this provides several convenient methods
func (QueueTasks) Payload ¶
func (qt QueueTasks) Payload() []interface{}
Payload returns QueueTasks' payload as an array