queue

package
v3.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2026 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrDuplicateAddressedPush = errors.New("Duplicate address")

Functions

func ContextWithExpectedRecursion

func ContextWithExpectedRecursion(ctx context.Context) context.Context

func ContextWithRecursion

func ContextWithRecursion(ctx context.Context, workType uint64, recurseFunc RecurseFunc) context.Context

Types

type AddressableWork

type AddressableWork interface {
	Type() uint64
	Address() string
}

type BaseRunner

type BaseRunner struct{}

BaseRunner implements the WorkRunner's `Stop` function to avoid boilerplate since most runners don't need special stop logic

func (*BaseRunner) Stop

func (*BaseRunner) Stop(timeout time.Duration) error

type CtxRecurseData

type CtxRecurseData struct {
	Recurse  RecurseFunc
	WorkType uint64
}

type CtxRecurseKey

type CtxRecurseKey string
const (
	CtxRecurse         CtxRecurseKey = "context_recurse_data"
	CtxAllowsRecursion CtxRecurseKey = "context_recurse_allowed"
)

type DatabaseQueueChunkMatcher

type DatabaseQueueChunkMatcher interface {
	Match(n listener.Notification, address string) bool
}

type DefaultQueueSupportedTypes

type DefaultQueueSupportedTypes struct {
	// contains filtered or unexported fields
}

func (*DefaultQueueSupportedTypes) DisableAll

func (d *DefaultQueueSupportedTypes) DisableAll()

func (*DefaultQueueSupportedTypes) Enabled

func (d *DefaultQueueSupportedTypes) Enabled() []uint64

func (*DefaultQueueSupportedTypes) SetEnabled

func (d *DefaultQueueSupportedTypes) SetEnabled(typeId uint64, enabled bool)

func (*DefaultQueueSupportedTypes) SetEnabledConditional

func (d *DefaultQueueSupportedTypes) SetEnabledConditional(typeId uint64, enabled func() bool)

type Enabled

type Enabled struct {
	Always      bool
	Conditional func() bool
}

type OptionalRecurser

type OptionalRecurser struct {
	// contains filtered or unexported fields
}

func NewOptionalRecurser

func NewOptionalRecurser(cfg OptionalRecurserConfig) *OptionalRecurser

func (*OptionalRecurser) OptionallyRecurse

func (a *OptionalRecurser) OptionallyRecurse(ctx context.Context, run func())

type OptionalRecurserConfig

type OptionalRecurserConfig struct {
	FatalRecurseCheck bool
}

type Queue

type Queue interface {
	// WithDbTx returns a queue that operates in the context of a database
	// transaction.
	WithDbTx(ctx context.Context, tx QueueStore) Queue

	// Peek at work in the queue
	// * types the types of work to peek at
	Peek(ctx context.Context, filter func(work *QueueWork) (bool, error), types ...uint64) ([]QueueWork, error)

	// Push new work into the queue
	//  * priority the priority for this work. Lower number are higher priority
	//  * groupId the group id for this work. Use zero (0) if not grouped
	//  * work the work to push into the queue. Must be JSON-serializable
	Push(ctx context.Context, priority uint64, groupId int64, work Work) error

	// AddressedPush pushes uniquely addressed new work into the queue
	//  * priority the priority for this work. Lower number are higher priority
	//  * groupId the group id for this work. Use zero (0) if not grouped
	//  * address the address for this work. Must be unique. Address can be reused,
	//      but only one occurrence of any address may be in the queue at any time.
	//  * work the work to push into the queue. Must be JSON-serializable
	AddressedPush(ctx context.Context, priority uint64, groupId int64, address string, work Work) error

	// RecordFailure records a failure of addressed work in the queue
	//  * address the address of the work the failed
	//  * failure the error that occurred. Overwrites any previous error information
	//    from earlier runs of the same work. If `failure==nil`, the error is cleared.
	RecordFailure(ctx context.Context, address string, failure error) error

	// PollAddress polls addressed work in the queue, and an `errs` channels
	// to report when the work is done and/or an error has occurred. We pass
	// `nil` over the errs channel when the poll has completed without errors
	PollAddress(ctx context.Context, address string) (errs <-chan error)

	// IsAddressInQueue checks to see if work with the provided address is in the queue
	IsAddressInQueue(ctx context.Context, address string) (bool, error)

	// Get attempts to get a job from the queue. Blocks until a job is found and returned
	// Parameters:
	//  * maxPriority uint64 - get only jobs with priority <= this value.
	//  * maxPriorityChan chan uint64 - while blocking (waiting for a job) pass a new
	//     maxPriority value to this channel if the capacity changes. For example, we
	//     may be blocking (waiting for work) with a maximum priority of 2; when other
	//     in-progress work completes, we may suddenly have capacity for work with a
	//     maximum priority of 4. In this case, we'd pass a `4` on this channel to notify
	//     the store that we can change our polling query to allow a new maximum priority.
	//  * types QueueSupportedTypes - used to indicate what types of work we are willing
	//     to grab from the queue. During shutdown, we may be willing to grab certain
	//     types of work necessary to drain the queue while ignoring other types of work.
	//  * stop - notified when terminating
	// Returns:
	//  * *QueueWork - A pointer to a QueueWork struct
	//  * error - An error (if error occurs); nil if successful
	Get(ctx context.Context, maxPriority uint64, maxPriorityChan chan uint64, types QueueSupportedTypes, stop chan bool) (*QueueWork, error)

	// Extend (heartbeat) a queue permit while work is in progress.
	Extend(ctx context.Context, permit permit.Permit) error

	// Delete a queue permit and its associated work. This is typically called when
	// the work is complete.
	Delete(ctx context.Context, permit permit.Permit) error

	// Name returns the name of the queue.
	Name() string
}

type QueueError

type QueueError struct {
	// HTTP error code to use if this error is returned by a service
	Code    int    `json:"code,omitempty"`
	Message string `json:"message"`
}

QueueError is used for queue work with an address. If your queue work has an address, and if the work's `Run` method returns a *QueueError pointer, then the queue `Agent` will record the message and code in the database. If the work's `Run` method returns a generic error, then the message will be recorded, but no code will be recorded. Used by `PollAddress` to return more detailed error information when polling for an address to complete. For example, see `/services/package/current.go`. This service checks for a non-zero error code, which most likely results in a 404 to the browser vs. a 500.

func (*QueueError) Error

func (q *QueueError) Error() string

type QueueGroupRecord

type QueueGroupRecord interface {
	GroupId() int64
}

type QueueGroupStore

type QueueGroupStore interface {
	TransactionCompleter

	// QueueGroupStart marks a queue group as started. Work for this queue group
	// will not be retrieved by `QueuePop` until the group has
	// been marked as started
	QueueGroupStart(ctx context.Context, id int64) error

	// QueueGroupComplete checks to see if a queue group is complete/empty
	// Expects:
	//  * id - the queue group id
	// Returns:
	//  * bool - is the group complete/empty?
	//  * bool - was the group cancelled?
	//  * error - errors
	QueueGroupComplete(ctx context.Context, id int64) (bool, bool, error)

	// QueueGroupCancel cancels a queue group
	QueueGroupCancel(ctx context.Context, id int64) error

	// QueueGroupClear cancels/deletes the work in a queue group
	QueueGroupClear(ctx context.Context, id int64) error
}

type QueuePermit

type QueuePermit interface {
	PermitId() permit.Permit
	PermitCreated() time.Time
}

type QueuePermitExtendNotification

type QueuePermitExtendNotification struct {
	PermitID    uint64
	GuidVal     string
	MessageType uint8
}

QueuePermitExtendNotification A notification that indicates a queue work permit extension

func NewQueuePermitExtendNotification

func NewQueuePermitExtendNotification(permitID uint64, notifyType uint8) *QueuePermitExtendNotification

func (*QueuePermitExtendNotification) Guid

func (*QueuePermitExtendNotification) Type

type QueueStore

type QueueStore interface {
	TransactionCompleter
	BeginTransactionQueue(ctx context.Context, description string) (QueueStore, error)

	NotifyExtend(ctx context.Context, permit uint64) error

	// QueuePush pushes work into a queue. Pass a queue name, a priority (0 is the highest),
	// and some work.
	QueuePush(ctx context.Context, name string, groupId sql.NullInt64, priority, workType uint64, work interface{}, carrier []byte) error

	// QueuePushAddressed pushes work into a queue. Pass a queue name, a priority (0 is the highest),
	// a type, a unique address, and some work.
	QueuePushAddressed(ctx context.Context, name string, groupId sql.NullInt64, priority, workType uint64, address string, work interface{}, carrier []byte) error

	// QueuePop pops work from the queue. Pass a queue name and a maximum priority, along with
	// a []uint64 slice to indicate what types of work you're willing to grab.
	//
	// Returns a QueueWork pointer
	QueuePop(ctx context.Context, name string, maxPriority uint64, types []uint64) (*QueueWork, error)

	// QueueDelete deletes claimed work from the queue. Called after the work is completed.
	QueueDelete(ctx context.Context, permitId permit.Permit) error

	// QueuePermits returns permits for a given queue
	QueuePermits(ctx context.Context, name string) ([]QueuePermit, error)

	// QueuePermitDelete clears a permit
	QueuePermitDelete(ctx context.Context, permitId permit.Permit) error

	// QueuePeek checks to see whether a queue has any outstanding work.
	// for a particular list of types.
	// Expects:
	//  * types - the work types
	// Returns:
	//  * results - the work
	//  * error - errors
	QueuePeek(ctx context.Context, types ...uint64) (results []QueueWork, err error)

	// IsQueueAddressComplete checks to see if an address is done/gone
	// Expects:
	//  * id - the queue item address
	// Returns:
	//  * bool - is the item done/gone?
	//  * error - errors
	IsQueueAddressComplete(ctx context.Context, address string) (bool, error)

	// IsQueueAddressInProgress checks to see if an address is still in progress
	// Expects:
	//  * id - the queue item address
	// Returns:
	//  * bool - is the item still in progress?
	//  * error - errors
	IsQueueAddressInProgress(ctx context.Context, address string) (bool, error)

	// QueueAddressedComplete saves or clears failure information for an addressed item
	QueueAddressedComplete(ctx context.Context, address string, failure error) error
}

type QueueSupportedTypes

type QueueSupportedTypes interface {
	Enabled() []uint64
	SetEnabled(typeId uint64, enabled bool)
	SetEnabledConditional(typeId uint64, enabled func() bool)
	DisableAll()
}

type QueueWork

type QueueWork struct {
	// A permit (uint64) for doing the work and heartbeating
	Permit permit.Permit

	// The work's address, if addressed. Blank if not addressed.
	Address string

	// The work type
	WorkType uint64

	// A byte array representing the work that can be unmarshaled to JSON
	Work []byte

	// Byte array for persisting tracing data across the work lifecycle.
	Carrier []byte
}

func (*QueueWork) Type

func (w *QueueWork) Type() uint64

type RecordingProducer

type RecordingProducer struct {
	AddParams     []addParams
	PushError     error
	Extended      int
	PollErrs      chan error
	RecordErr     error
	HasAddress    bool
	HasAddressErr error
	// Notified when the first Push or AddressedPush is received
	Received chan bool
	Lock     sync.Mutex
	Peeked   []uint64
	PeekRes  []QueueWork
	PeekErr  error
}

RecordingProducer is an implementation of queue.Queue which records arguments passed to its methods. RecordingProducer is meant to be used in tests.

func (*RecordingProducer) AddressedPush

func (q *RecordingProducer) AddressedPush(ctx context.Context, priority uint64, groupId int64, address string, work Work) error

func (*RecordingProducer) Delete

func (q *RecordingProducer) Delete(ctx context.Context, permit permit.Permit) error

func (*RecordingProducer) Extend

func (q *RecordingProducer) Extend(ctx context.Context, permit permit.Permit) error

func (*RecordingProducer) Get

func (q *RecordingProducer) Get(ctx context.Context, maxPriority uint64, maxPriorityChan chan uint64, types QueueSupportedTypes, stop chan bool) (*QueueWork, error)

func (*RecordingProducer) IsAddressInQueue

func (q *RecordingProducer) IsAddressInQueue(ctx context.Context, address string) (bool, error)

func (*RecordingProducer) Name

func (q *RecordingProducer) Name() string

func (*RecordingProducer) Peek

func (q *RecordingProducer) Peek(ctx context.Context, filter func(work *QueueWork) (bool, error), types ...uint64) ([]QueueWork, error)

func (*RecordingProducer) PollAddress

func (q *RecordingProducer) PollAddress(ctx context.Context, address string) (errs <-chan error)

func (*RecordingProducer) Push

func (q *RecordingProducer) Push(ctx context.Context, priority uint64, groupId int64, work Work) error

func (*RecordingProducer) RecordFailure

func (q *RecordingProducer) RecordFailure(ctx context.Context, address string, failure error) error

func (*RecordingProducer) WithDbTx

func (q *RecordingProducer) WithDbTx(ctx context.Context, tx QueueStore) Queue

type RecursableWork

type RecursableWork struct {
	Work     []byte
	WorkType uint64
}

type RecurseFunc

type RecurseFunc func(run func())

type TransactionCompleter

type TransactionCompleter interface {
	CompleteTransaction(err *error)
}

type Work

type Work interface {
	Type() uint64
}

type WorkRunner

type WorkRunner interface {
	// Run the work
	Run(ctx context.Context, work RecursableWork) error

	// Stop the runner. For most implementations, this method should be
	// empty and return `nil`. If special logic is required to stop
	// a runner (for example, keeping a group runner running until all
	// the active groups are done), then it should go here.
	Stop(timeout time.Duration) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL