queue

package
v0.0.0-...-4e15770 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 30, 2020 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Overview

Package queue provides several implementations of the amboy.Queue interface capable of processing amboy.Job implementations.

Local Shuffled Queue

The shuffled queue is functionally similar to the LocalUnordered Queue (which is, in fact, a FIFO queue as a result of its implementation); however, the shuffled queue dispatches tasks randomized, using the properties of Go's map type, which is not dependent on insertion order.

Additionally this implementation does not using locking, which may improve performance for some workloads. Intentionally, the implementation retains pointers to all completed tasks, and does not cap the number of pending tasks.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewAdaptiveOrderedLocalQueue

func NewAdaptiveOrderedLocalQueue(workers, capacity int) amboy.Queue

NewAdaptiveOrderedLocalQueue provides a queue implementation that stores jobs in memory, and dispatches tasks based on the dependency information.

Use this implementation rather than LocalOrderedQueue when you need to add jobs *after* starting the queue, and when you want to avoid the higher potential overhead of the remote-backed queues.

Like other ordered in memory queues, this implementation does not support scoped locks.

func NewLocalLimitedSize

func NewLocalLimitedSize(workers, capacity int) amboy.Queue

NewLocalLimitedSize constructs a LocalLimitedSize queue instance with the specified number of workers and capacity.

func NewLocalOrdered

func NewLocalOrdered(workers int) amboy.Queue

NewLocalOrdered constructs an LocalOrdered object. The "workers" argument is passed to a default pool.SimplePool object.

The ordered queue requires that users add all tasks to the queue before starting it, and does not accept tasks after starting.

Like other ordered in memory queues, this implementation does not support scoped locks.

func NewLocalPriorityQueue

func NewLocalPriorityQueue(workers, capacity int) amboy.Queue

NewLocalPriorityQueue constructs a new priority queue instance and initializes a local worker queue with the specified number of worker processes.

func NewLocalQueueGroup

func NewLocalQueueGroup(ctx context.Context, opts LocalQueueGroupOptions) (amboy.QueueGroup, error)

NewLocalQueueGroup constructs a new local queue group. If ttl is 0, the queues will not be TTLed except when the client explicitly calls Prune.

func NewMongoDBQueue

func NewMongoDBQueue(ctx context.Context, opts MongoDBQueueCreationOptions) (amboy.Queue, error)

NewMongoDBQueue builds a new queue that persists jobs to a MongoDB instance. These queues allow workers running in multiple processes to service shared workloads in multiple processes.

func NewMongoDBQueueGroup

func NewMongoDBQueueGroup(ctx context.Context, opts MongoDBQueueGroupOptions, client *mongo.Client, mdbopts MongoDBOptions) (amboy.QueueGroup, error)

NewMongoDBQueueGroup constructs a new remote queue group. If ttl is 0, the queues will not be TTLed except when the client explicitly calls Prune.

The MongoDBRemoteQueue group creats a new collection for every queue, unlike the other remote queue group implementations. This is probably most viable for lower volume workloads; however, the caching mechanism may be more responsive in some situations.

func NewMongoDBSingleQueueGroup

func NewMongoDBSingleQueueGroup(ctx context.Context, opts MongoDBQueueGroupOptions, client *mongo.Client, mdbopts MongoDBOptions) (amboy.QueueGroup, error)

NewMongoDBSingleQueueGroup constructs a new remote queue group. If ttl is 0, the queues will not be TTLed except when the client explicitly calls Prune.

func NewSQSFifoQueue

func NewSQSFifoQueue(queueName string, workers int) (amboy.Queue, error)

NewSQSFifoQueue constructs a AWS SQS backed Queue implementation. This queue, generally is ephemeral: tasks are removed from the queue, and therefore may not handle jobs across restarts.

func NewShuffledLocal

func NewShuffledLocal(workers, capacity int) amboy.Queue

NewShuffledLocal provides a queue implementation that shuffles the order of jobs, relative the insertion order.

Types

type Dispatcher

type Dispatcher interface {
	Dispatch(context.Context, amboy.Job) error
	Release(context.Context, amboy.Job)
	Complete(context.Context, amboy.Job)
}

Dispatcher provides a common mechanism shared between queue implementations to handle job locking to prevent multiple workers from running the same job.

func NewDispatcher

func NewDispatcher(q amboy.Queue) Dispatcher

NewDispatcher constructs a default dispatching implementation.

type GroupCache

type GroupCache interface {
	Set(string, amboy.Queue, time.Duration) error
	Get(string) amboy.Queue
	Remove(context.Context, string) error
	Prune(context.Context) error
	Close(context.Context) error
	Names() []string
	Len() int
}

GroupCache provides a common mechanism for managing collections of queues, for use in specific group cache situations

func NewCacheWithCleanupHook

func NewCacheWithCleanupHook(ttl time.Duration, hook func(ctx context.Context, id string) error) GroupCache

NewCacheWithCleanupHook defines a cache but allows implementations to add additional cleanup logic to the prune and Close operations.

func NewGroupCache

func NewGroupCache(ttl time.Duration) GroupCache

NewGroupCache produces a GroupCache implementation that supports a default TTL setting, and supports cloning and closing operations.

type LocalQueueGroupOptions

type LocalQueueGroupOptions struct {
	Constructor func(ctx context.Context) (amboy.Queue, error)
	TTL         time.Duration
}

LocalQueueGroupOptions describe options passed to NewLocalQueueGroup.

type MongoDBOptions

type MongoDBOptions struct {
	URI                      string
	DB                       string
	GroupName                string
	UseGroups                bool
	Priority                 bool
	CheckWaitUntil           bool
	CheckDispatchBy          bool
	SkipQueueIndexBuilds     bool
	SkipReportingIndexBuilds bool
	Format                   amboy.Format
	WaitInterval             time.Duration
	// TTL sets the number of seconds for a TTL index on the "info.created"
	// field. If set to zero, the TTL index will not be created and
	// and documents may live forever in the database.
	TTL time.Duration
	// LockTimeout overrides the default job lock timeout if set.
	LockTimeout time.Duration
}

MongoDBOptions is a struct passed to the NewMongo constructor to communicate mgoDriver specific settings about the driver's behavior and operation.

func DefaultMongoDBOptions

func DefaultMongoDBOptions() MongoDBOptions

DefaultMongoDBOptions constructs a new options object with default values: connecting to a MongoDB instance on localhost, using the "amboy" database, and *not* using priority ordering of jobs.

func (*MongoDBOptions) Validate

func (opts *MongoDBOptions) Validate() error

Validate validates that the required options are given and sets fields that are unspecified and have a default value.

type MongoDBQueueCreationOptions

type MongoDBQueueCreationOptions struct {
	Size    int
	Name    string
	Ordered bool
	MDB     MongoDBOptions
	Client  *mongo.Client
}

MongoDBQueueCreationOptions describes the options passed to the remote queue, that store jobs in a remote persistence layer to support distributed systems of workers.

func (*MongoDBQueueCreationOptions) Validate

func (opts *MongoDBQueueCreationOptions) Validate() error

Validate ensure that the arguments defined are valid.

type MongoDBQueueGroupOptions

type MongoDBQueueGroupOptions struct {
	// Prefix is a string prepended to the queue collections.
	Prefix string

	// Abortable controls if the queue will use an abortable pool
	// imlementation. The Ordered option controls if an
	// order-respecting queue will be created, while default
	// workers sets the default number of workers new queues will
	// have if the WorkerPoolSize function is not set.
	Abortable      bool
	Ordered        bool
	DefaultWorkers int

	// WorkerPoolSize determines how many works will be allocated
	// to each queue, based on the queue ID passed to it.
	WorkerPoolSize func(string) int

	// PruneFrequency is how often Prune runs by default.
	PruneFrequency time.Duration

	// BackgroundCreateFrequency is how often the background queue
	// creation runs, in the case that queues may be created in
	// the background without
	BackgroundCreateFrequency time.Duration

	// TTL is how old the oldest task in the queue must be for the collection to be pruned.
	TTL time.Duration
}

MongoDBQueueGroupOptions describe options passed to NewRemoteQueueGroup.

type ScopeManager

type ScopeManager interface {
	Acquire(string, []string) error
	Release(string, []string) error
}

ScopeManager provides a service to queue implementation to support additional locking semantics for queues that cannot push that into their backing storage.

func NewLocalScopeManager

func NewLocalScopeManager() ScopeManager

NewLocalScopeManager constructs a ScopeManager implementation suitable for use in most local (in memory) queue implementations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL