batch

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 3, 2023 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

View Source
const (
	FlagAboutToClose = iota + 1
	FlagClosing
	FlagClosed
	ConsumerInterval = 10
)

Variables

View Source
var (
	DefaultTaskWaitDuration int64 = 50
	DefaultQueueConsumeRate       = 100
	ErrorTimedOut                 = fmt.Errorf("task timed out in queue")
)

Functions

This section is empty.

Types

type BatchSizeProvider

type BatchSizeProvider interface {
	Get(string) int
	Set(string, int)
}

BatchSizeProvider provides a reasonable batch size for queued tasks for specified partition name

type MemoryBatchQueue

type MemoryBatchQueue struct {
	// contains filtered or unexported fields
}

MemoryBatchQueue implements Queue. All tasks are stored in memory

func NewMemoryBatchQueue

func NewMemoryBatchQueue(bsp BatchSizeProvider, hdl QueueTaskHandler, poolSize int) *MemoryBatchQueue

NewMemoryBatchQueue initializes a MemoryBatchQueue. poolSize is the size of goroutine pool

func (*MemoryBatchQueue) Close

func (q *MemoryBatchQueue) Close() error

func (*MemoryBatchQueue) Closed

func (q *MemoryBatchQueue) Closed() bool

func (*MemoryBatchQueue) Push

func (q *MemoryBatchQueue) Push(tasks ...QueueTask) chan int64

Push pushes QueueTasks in queue, returns a int64 channel to listen on, once all tasks are processed, a byte map number equals 1<<n-1 is sent to the channel. This byte map number can be used to find out which tasks are finished. When the queue is closing, or tasks is empty, a buffered channel is returned so that caller can go ahead without blocking.

type Queue

type Queue interface {
	// Push pushes QueueTasks in queue, returns a int64 channel to listen on. Once all
	// tasks are processed, a byte map number equals 1<<n-1 is sent to the channel.
	// This byte map number can be used to find out which tasks are finished.
	// When the queue is closing, or tasks is empty, a buffered channel is returned
	// so that listener can go ahead without blocking.
	Push(...QueueTask) chan int64
	Close() error
	Closed() bool
}

Queue consistently accepts and buffers incoming tasks in underlying queue, reorganizes buffered tasks in batches, finally processes task batches in parallel. There are 2 circumstances in which tasks are processed - when the number of buffered tasks is greater than batch size, or the very first queued task has been waiting for a time longer than DefaultTaskWaitDuration. The size of a batch is provided by BatchSizeProvider and queued tasks are partitioned accordingly, e.g. by model id.

type QueueTask

type QueueTask interface {
	// GetPartition returns QueueTask's partition name
	GetPartition() string

	// GetPayload returns QueueTask's payload, can be used for debugging purpose
	GetPayload() interface{}

	// IsTimeout determines whether the QueueTask has timed out
	IsTimeout() bool

	// SetResult sets task execution result back
	SetResult(interface{})

	// SetError sets task execution error
	SetError(error)

	// WithFinishFunc sets a callback for the task, must be called after SetResult or SetError
	WithFinishFunc(func())

	// GetResult returns task result, should return nil when no result can be found
	GetResult() interface{}

	// GetError returns task error
	GetError() error
}

QueueTask defines required methods for a request that can be processed by Queue. The QueueTask must be able to know when all tasks that are originally queued together are finished, e.g. keeping a finished task counter. See TestQueueTask.SetResult in tests for sample usage.

type QueueTaskHandler

type QueueTaskHandler func(string, []QueueTask)

QueueTaskHandler is used to handle a types.QueueTask batch. This is often where user logic should be placed.

type QueueTasks

type QueueTasks []QueueTask

QueueTasks is an array of QueueTasks, this provides several convenient methods

func (QueueTasks) Payload

func (qt QueueTasks) Payload() []interface{}

Payload returns QueueTasks' payload as an array

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL