Documentation
¶
Index ¶
Constants ¶
View Source
const ( QueueGroupFlagStart = "START" // Job that starts a group of work. QueueGroupFlagEnd = "END" // Job that finalizes a completed group of work. QueueGroupFlagCancel = "CANCEL" // Job that cancels an in-progress group of work. QueueGroupFlagAbort = "ABORT" // Job that finalizes a cancelled group of work. )
Flags are submitted as part of the GroupQueueJob. When we handle work of the GroupQueueJob type, we use these flags to determine how to handle the job.
Variables ¶
View Source
var ErrQueueGroupStopTimeout = errors.New("timeout stopping queue group runner")
View Source
var MissingTypeError = errors.New("MissingType error")
Functions ¶
This section is empty.
Types ¶
type DefaultGroupQueueEndRunnerFactory ¶
type DefaultGroupQueueEndRunnerFactory struct {
// contains filtered or unexported fields
}
func NewGroupQueueEndRunnerFactory ¶
func NewGroupQueueEndRunnerFactory() *DefaultGroupQueueEndRunnerFactory
func (*DefaultGroupQueueEndRunnerFactory) AddRunner ¶
func (r *DefaultGroupQueueEndRunnerFactory) AddRunner(t uint8, runner GroupQueueEndRunner)
func (*DefaultGroupQueueEndRunnerFactory) GetRunner ¶
func (r *DefaultGroupQueueEndRunnerFactory) GetRunner(t uint8) (GroupQueueEndRunner, error)
type GenericMatcher ¶
type GenericMatcher struct {
// contains filtered or unexported fields
}
func NewMatcher ¶
func NewMatcher(field string) *GenericMatcher
func (*GenericMatcher) Field ¶
func (m *GenericMatcher) Field() string
func (*GenericMatcher) Register ¶
func (m *GenericMatcher) Register(workType uint64, dataType interface{})
func (*GenericMatcher) Type ¶
func (m *GenericMatcher) Type(workType uint64) (interface{}, error)
type GroupQueue ¶
type GroupQueue interface {
// Push pushes work into the base queue
Push(ctx context.Context, priority uint64, work queue.Work) error
// SetEndWork sets the work to run when the group ends
SetEndWork(ctx context.Context, work interface{}, endWorkType uint8) error
// Start starts monitoring the base queue for completion
// of all queued work.
Start(ctx context.Context) error
// Group returns the group info
Group() GroupQueueJob
// BaseQueueName returns the base queue name
BaseQueueName() string
}
GroupQueue is the outer wrapper of the collection of other queues. It behaves
more or less like a DatabaseQueue. If you're looking for the raw queue underneath, see the QueueGroup struct.
type GroupQueueEndRunner ¶
type GroupQueueEndRunnerFactory ¶
type GroupQueueEndRunnerFactory interface {
AddRunner(t uint8, runner GroupQueueEndRunner)
GetRunner(t uint8) (GroupQueueEndRunner, error)
}
type GroupQueueJob ¶
type GroupQueueJob interface {
Type() uint64
GroupId() int64
Name() string
Flag() string
EndWorkType() uint8
EndWork() GroupQueueJob
EndWorkJob() []byte
AbortWork() GroupQueueJob
CancelWork() GroupQueueJob
SetEndWork(endWorkType uint8, work []byte)
}
GroupQueueJob defines the lifecycle for a group of work in the queue.
type GroupQueueProvider ¶
type GroupQueueProvider interface {
// IsReady is called to determine when it is time to start a group of work.
// It returns nil when it is time to start the work in a group.
IsReady(job GroupQueueJob) error
// IsComplete is called to determine all the work in a group has been
// successfully completed.
IsComplete(job GroupQueueJob) (cancelled bool, err error)
// Begin is called when the group begins handling its work.
Begin(job GroupQueueJob) error
// Cancel is called when a group of work is cancelled.
Cancel(job GroupQueueJob) error
// Abort is called for finalization after handling a group cancellation.
Abort(job GroupQueueJob) error
// Clear is called to clear the work from a group. This is called upon
// failure or cancellation.
Clear(job GroupQueueJob) error
// Fail is called when a group of work fails.
Fail(job GroupQueueJob, err error) error
}
GroupQueueProvider links the group queue runner to a queue implementation
type QueueGroupRunner ¶
type QueueGroupRunner struct {
// contains filtered or unexported fields
}
func NewQueueGroupRunner ¶
func NewQueueGroupRunner(cfg QueueGroupRunnerConfig) *QueueGroupRunner
func (*QueueGroupRunner) Run ¶
func (r *QueueGroupRunner) Run(ctx context.Context, work queue.RecursableWork) error
type QueueGroupRunnerConfig ¶
type QueueGroupRunnerConfig struct {
Queue queue.Queue
Provider GroupQueueProvider
TypeMatcher TypeMatcher
EndRunnerFactory GroupQueueEndRunnerFactory
Recurser *queue.OptionalRecurser
}
Click to show internal directories.
Click to hide internal directories.