groups

package
v3.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	QueueGroupFlagStart  = "START"  // Job that starts a group of work.
	QueueGroupFlagEnd    = "END"    // Job that finalizes a completed group of work.
	QueueGroupFlagCancel = "CANCEL" // Job that cancels an in-progress group of work.
	QueueGroupFlagAbort  = "ABORT"  // Job that finalizes a cancelled group of work.
)

Flags are submitted as part of the GroupQueueJob. When we handle work of the GroupQueueJob type, we use these flags to determine how to handle the job.

Variables

View Source
var ErrQueueGroupStopTimeout = errors.New("timeout stopping queue group runner")
View Source
var MissingTypeError = errors.New("MissingType error")

Functions

This section is empty.

Types

type DefaultGroupQueueEndRunnerFactory

type DefaultGroupQueueEndRunnerFactory struct {
	// contains filtered or unexported fields
}

func NewGroupQueueEndRunnerFactory

func NewGroupQueueEndRunnerFactory() *DefaultGroupQueueEndRunnerFactory

func (*DefaultGroupQueueEndRunnerFactory) AddRunner

func (*DefaultGroupQueueEndRunnerFactory) GetRunner

type GenericMatcher

type GenericMatcher struct {
	// contains filtered or unexported fields
}

func NewMatcher

func NewMatcher(field string) *GenericMatcher

func (*GenericMatcher) Field

func (m *GenericMatcher) Field() string

func (*GenericMatcher) Register

func (m *GenericMatcher) Register(workType uint64, dataType interface{})

func (*GenericMatcher) Type

func (m *GenericMatcher) Type(workType uint64) (interface{}, error)

type GroupQueue

type GroupQueue interface {
	// Push pushes work into the base queue
	Push(ctx context.Context, priority uint64, work queue.Work) error

	// SetEndWork sets the work to run when the group ends
	SetEndWork(ctx context.Context, work interface{}, endWorkType uint8) error

	// Start starts monitoring the base queue for completion
	// of all queued work.
	Start(ctx context.Context) error

	// Group returns the group info
	Group() GroupQueueJob

	// BaseQueueName returns the base queue name
	BaseQueueName() string
}

GroupQueue is the outer wrapper of the collection of other queues. It behaves

more or less like a DatabaseQueue. If you're looking for the raw queue underneath,
see the QueueGroup struct.

type GroupQueueEndRunner

type GroupQueueEndRunner interface {
	Run(work []byte) error
}

type GroupQueueEndRunnerFactory

type GroupQueueEndRunnerFactory interface {
	AddRunner(t uint8, runner GroupQueueEndRunner)
	GetRunner(t uint8) (GroupQueueEndRunner, error)
}

type GroupQueueJob

type GroupQueueJob interface {
	Type() uint64
	GroupId() int64
	Name() string
	Flag() string
	EndWorkType() uint8
	EndWork() GroupQueueJob
	EndWorkJob() []byte
	AbortWork() GroupQueueJob
	CancelWork() GroupQueueJob
	SetEndWork(endWorkType uint8, work []byte)
}

GroupQueueJob defines the lifecycle for a group of work in the queue.

type GroupQueueProvider

type GroupQueueProvider interface {
	// IsReady is called to determine when it is time to start a group of work.
	// It returns nil when it is time to start the work in a group.
	IsReady(job GroupQueueJob) error

	// IsComplete is called to determine all the work in a group has been
	// successfully completed.
	IsComplete(job GroupQueueJob) (cancelled bool, err error)

	// Begin is called when the group begins handling its work.
	Begin(job GroupQueueJob) error

	// Cancel is called when a group of work is cancelled.
	Cancel(job GroupQueueJob) error

	// Abort is called for finalization after handling a group cancellation.
	Abort(job GroupQueueJob) error

	// Clear is called to clear the work from a group. This is called upon
	// failure or cancellation.
	Clear(job GroupQueueJob) error

	// Fail is called when a group of work fails.
	Fail(job GroupQueueJob, err error) error
}

GroupQueueProvider links the group queue runner to a queue implementation

type QueueGroupRunner

type QueueGroupRunner struct {
	// contains filtered or unexported fields
}

func NewQueueGroupRunner

func NewQueueGroupRunner(cfg QueueGroupRunnerConfig) *QueueGroupRunner

func (*QueueGroupRunner) Run

func (*QueueGroupRunner) Stop

func (r *QueueGroupRunner) Stop(timeout time.Duration) error

type QueueGroupRunnerConfig

type QueueGroupRunnerConfig struct {
	Queue            queue.Queue
	Provider         GroupQueueProvider
	TypeMatcher      TypeMatcher
	EndRunnerFactory GroupQueueEndRunnerFactory
	Recurser         *queue.OptionalRecurser
}

type TypeMatcher

type TypeMatcher interface {
	Field() string
	Register(workType uint64, dataType interface{})
	Type(workType uint64) (interface{}, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL