queue

package
v2.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2024 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrDuplicateAddressedPush = errors.New("Duplicate address")

Functions

func ContextWithExpectedRecursion

func ContextWithExpectedRecursion(ctx context.Context) context.Context

func ContextWithRecursion

func ContextWithRecursion(ctx context.Context, workType uint64, recurseFunc RecurseFunc) context.Context

Types

type AddressableWork

type AddressableWork interface {
	Type() uint64
	Address() string
}

type BaseRunner

type BaseRunner struct{}

Implements the WorkRunner's `Stop` function to avoid boilerplate since most runners don't need special stop logic

func (*BaseRunner) Stop

func (*BaseRunner) Stop(timeout time.Duration) error

type CtxRecurseData

type CtxRecurseData struct {
	Recurse  RecurseFunc
	WorkType uint64
}

type CtxRecurseKey

type CtxRecurseKey string
const (
	CtxRecurse         CtxRecurseKey = "context_recurse_data"
	CtxAllowsRecursion CtxRecurseKey = "context_recurse_allowed"
)

type DefaultQueueSupportedTypes

type DefaultQueueSupportedTypes struct {
	// contains filtered or unexported fields
}

func (*DefaultQueueSupportedTypes) DisableAll

func (d *DefaultQueueSupportedTypes) DisableAll()

func (*DefaultQueueSupportedTypes) Enabled

func (d *DefaultQueueSupportedTypes) Enabled() []uint64

func (*DefaultQueueSupportedTypes) SetEnabled

func (d *DefaultQueueSupportedTypes) SetEnabled(typeId uint64, enabled bool)

func (*DefaultQueueSupportedTypes) SetEnabledConditional

func (d *DefaultQueueSupportedTypes) SetEnabledConditional(typeId uint64, enabled func() bool)

type Enabled

type Enabled struct {
	Always      bool
	Conditional func() bool
}

type OptionalRecurser

type OptionalRecurser struct {
	// contains filtered or unexported fields
}

func NewOptionalRecurser

func NewOptionalRecurser(cfg OptionalRecurserConfig) *OptionalRecurser

func (*OptionalRecurser) OptionallyRecurse

func (a *OptionalRecurser) OptionallyRecurse(ctx context.Context, run func())

type OptionalRecurserConfig

type OptionalRecurserConfig struct {
	FatalRecurseCheck bool
}

type Queue

type Queue interface {
	// WithDbTx returns a queue that operates in the context of a database
	// transaction.
	WithDbTx(tx interface{}) Queue

	// Peek at work in the queue
	// * types the types of work to peek at
	Peek(filter func(work *QueueWork) (bool, error), types ...uint64) ([]QueueWork, error)

	// Push new work into the queue
	//  * priority the priority for this work. Lower number are higher priority
	//  * groupId the group id for this work. Use zero (0) if not grouped
	//  * work the work to push into the queue. Must be JSON-serializable
	Push(priority uint64, groupId int64, work Work) error

	// AddressedPush pushes uniquely addressed new work into the queue
	//  * priority the priority for this work. Lower number are higher priority
	//  * groupId the group id for this work. Use zero (0) if not grouped
	//  * address the address for this work. Must be unique. Address can be reused,
	//      but only one occurrence of any address may be in the queue at any time.
	//  * work the work to push into the queue. Must be JSON-serializable
	AddressedPush(priority uint64, groupId int64, address string, work Work) error

	// RecordFailure records a failure of addressed work in the queue
	//  * address the address of the work the failed
	//  * failure the error that occurred. Overwrites any previous error information
	//    from earlier runs of the same work. If `failure==nil`, the error is cleared.
	RecordFailure(address string, failure error) error

	// PollAddress polls addressed work in the queue, and an `errs` channels
	// to report when the work is done and/or an error has occurred. We pass
	// `nil` over the errs channel when the poll has completed without errors
	PollAddress(address string) (errs <-chan error)

	// IsAddressInQueue checks to see if work with the provided address is in the queue
	IsAddressInQueue(address string) (bool, error)

	// Get attempts to get a job from the queue. Blocks until a job is found and returned
	// Parameters:
	//  * maxPriority uint64 - get only jobs with priority <= this value.
	//  * maxPriorityChan chan uint64 - while blocking (waiting for a job) pass a new
	//     maxPriority value to this channel if the capacity changes. For example, we
	//     may be blocking (waiting for work) with a maximum priority of 2; when other
	//     in-progress work completes, we may suddenly have capacity for work with a
	//     maximum priority of 4. In this case, we'd pass a `4` on this channel to notify
	//     the store that we can change our polling query to allow a new maximum priority.
	//  * types QueueSupportedTypes - used to indicate what types of work we are willing
	//     to grab from the queue. During shutdown, we may be willing to grab certain
	//     types of work necessary to drain the queue while ignoring other types of work.
	//  * stop - notified when terminating
	// Returns:
	//  * *QueueWork - A pointer to a QueueWork struct
	//  * error - An error (if error occurs); nil if successful
	Get(maxPriority uint64, maxPriorityChan chan uint64, types QueueSupportedTypes, stop chan bool) (*QueueWork, error)

	// Extend (heartbeat) a queue permit while work is in progress.
	Extend(permit.Permit) error

	// Delete a queue permit and its associated work. This is typically called when
	// the work is complete.
	Delete(permit.Permit) error

	// Name returns the name of the queue.
	Name() string
}

type QueueError

type QueueError struct {
	// HTTP error code to use if this error is returned by a service
	Code    int    `json:"code,omitempty"`
	Message string `json:"message"`
}

Used for queue work with an address. If your queue work has an address, and if the work's `Run` method returns a *QueueError pointer, then the queue `Agent` will record the message and code in the database. If the work's `Run` method returns a generic error, then the message will be recorded, but no code will be recorded. Used by `PollAddress` to return more detailed error information when polling for an address to complete. For example, see `/services/package/current.go`. This service checks for a non-zero error code, which most likely results in a 404 to the browser vs. a 500.

func (*QueueError) Error

func (q *QueueError) Error() string

type QueueSupportedTypes

type QueueSupportedTypes interface {
	Enabled() []uint64
	SetEnabled(typeId uint64, enabled bool)
	SetEnabledConditional(typeId uint64, enabled func() bool)
	DisableAll()
}

type QueueWork

type QueueWork struct {
	// A permit (uint64) for doing the work and heartbeating
	Permit permit.Permit

	// The work's address, if addressed. Blank if not addressed.
	Address string

	// The work type
	WorkType uint64

	// A byte array representing the work that can be unmarshaled to JSON
	Work []byte

	// A context for tracking recursive work
	Context context.Context

	// Byte array for persisting tracing data across the work lifecycle.
	Carrier []byte
}

func (*QueueWork) Type

func (w *QueueWork) Type() uint64

type RecordingProducer

type RecordingProducer struct {
	AddParams     []addParams
	PushError     error
	Extended      int
	PollErrs      chan error
	RecordErr     error
	HasAddress    bool
	HasAddressErr error
	// Notified when the first Push or AddressedPush is received
	Received chan bool
	Lock     sync.Mutex
	Peeked   []uint64
	PeekRes  []QueueWork
	PeekErr  error
}

RecordingProducer is an implementation of queue.Queue which records arguments passed to its methods. RecordingProducer is meant to be used in tests.

func (*RecordingProducer) AddressedPush

func (q *RecordingProducer) AddressedPush(priority uint64, groupId int64, address string, work Work) error

func (*RecordingProducer) Delete

func (q *RecordingProducer) Delete(permit.Permit) error

func (*RecordingProducer) Extend

func (q *RecordingProducer) Extend(permit.Permit) error

func (*RecordingProducer) Get

func (q *RecordingProducer) Get(maxPriority uint64, maxPriorityChan chan uint64, types QueueSupportedTypes, stop chan bool) (*QueueWork, error)

func (*RecordingProducer) IsAddressInQueue

func (q *RecordingProducer) IsAddressInQueue(address string) (bool, error)

func (*RecordingProducer) Name

func (q *RecordingProducer) Name() string

func (*RecordingProducer) Peek

func (q *RecordingProducer) Peek(filter func(work *QueueWork) (bool, error), types ...uint64) ([]QueueWork, error)

func (*RecordingProducer) PollAddress

func (q *RecordingProducer) PollAddress(address string) (errs <-chan error)

func (*RecordingProducer) Push

func (q *RecordingProducer) Push(priority uint64, groupId int64, work Work) error

func (*RecordingProducer) RecordFailure

func (q *RecordingProducer) RecordFailure(address string, failure error) error

func (*RecordingProducer) WithDbTx

func (q *RecordingProducer) WithDbTx(tx interface{}) Queue

type RecursableWork

type RecursableWork struct {
	Work     []byte
	WorkType uint64
	Context  context.Context
}

type RecurseFunc

type RecurseFunc func(run func())

type Work

type Work interface {
	Type() uint64
}

type WorkRunner

type WorkRunner interface {
	// Run the work
	Run(work RecursableWork) error

	// Stop the runner. For most implementations, this method should be
	// empty and return `nil`. If special logic is required to stop
	// a runner (for example, keeping a group runner running until all
	// the active groups are done), then it should go here.
	Stop(timeout time.Duration) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL