queue

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 30, 2021 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package queue provides the Queue interface for the queueing and processing of background jobs.

Index

Constants

This section is empty.

Variables

View Source
var (

	// ErrNilProducer should be returned when a Queue implementation is not
	// setup as a producer, and a call to Produce is made.
	ErrNilProducer = errors.New("nil queue producer")

	// ErrNilConsumer should be returned when a Queue implementation is not
	// setup as a consumer, and a call to Consume is made.
	ErrNilConsumer = errors.New("nil queue consumer")

	// ErrQueueNotExist is returned when a Job is dispatched to a non-existent
	// queue in a Set.
	ErrQueueNotExist = errors.New("queue does not exist")
)

Functions

This section is empty.

Types

type InitFunc

type InitFunc func(Job)

InitFunc is a callback for initializing a Job for it to be performed. This would typically be used for setting up dependencies for a Job that could not otherwise be reliably stored on the Queue, such as database connections.

type InitRegistry

type InitRegistry struct {
	// contains filtered or unexported fields
}

InitRegistry provides a thread-safe mechanism of registering an InitFunc against a Job name.

func NewInitRegistry

func NewInitRegistry() *InitRegistry

NewInitRegistry returns a new registry for registered initialization functions against a given name.

func (*InitRegistry) Get

func (r *InitRegistry) Get(name string) (InitFunc, bool)

Get returns the InitFunc for the given name, along with whether a function has been registered against that name.

func (*InitRegistry) Register

func (r *InitRegistry) Register(name string, fn InitFunc)

Register registers the given InitFunc against the given name. If the given name has already been registered, then this panics.

type Job

type Job interface {
	// Name returns the name of the Job being performed. This is used to lookup
	// the resptive InitFunc for the Job, if any.
	Name() string

	// Perform performs the Job on the Queue. This should return any errors
	// if the Job failed in a fatal away that cannot be recovered from.
	Perform() error
}

type Memory

type Memory struct {
	// contains filtered or unexported fields
}

Memory offers an in-memory implementation of the Queue interface. This will queue up jobs in memory, and process them. This is ideal for jobs that aren't that consequential.

func NewMemory

func NewMemory(n int, errh func(Job, error)) *Memory

NewMemory returns a new in-memory Queue for Job processing with the given parallelism as defined by n, and the given error handler. The error handler is invoked whenever an underlying Job being processed on the queue returns an error from a Perform call.

func (*Memory) Consume

func (m *Memory) Consume(ctx context.Context) error

Consume implements the Queue interface.

func (*Memory) InitFunc

func (m *Memory) InitFunc(name string, fn InitFunc)

InitFunc implementas the Queue interface.

func (*Memory) Produce

func (m *Memory) Produce(ctx context.Context, j Job) (string, error)

Produce implements the Queue interface. This does not return anything for the ID of the Job.

type Queue

type Queue interface {
	// InitFunc registers the given init callback for the given Job name. The
	// callback is invoked when the Job is retrieved from the queue to be
	// performed. This would be used to initialize things such as database
	// connections.
	InitFunc(string, InitFunc)

	// Consume begins consuming jobs that have been submitted onto the queue.
	// This should only stop when the given Context is cancelled.
	Consume(context.Context) error

	// Produce places the given Job onto the end of the queue. This should
	// return the ID of the Job, if possible, this will vary depending on the
	// implementation being used.
	Produce(context.Context, Job) (string, error)
}

type Redis

type Redis struct {
	// contains filtered or unexported fields
}

Redis offers an implementation of the Queue interface using curlyq for producing/consuming from/to Redis.

func NewRedisConsumer

func NewRedisConsumer(log *log.Logger, opts *curlyq.ConsumerOpts) *Redis

func NewRedisProducer

func NewRedisProducer(log *log.Logger, opts *curlyq.ProducerOpts) *Redis

func (*Redis) Consume

func (r *Redis) Consume(ctx context.Context) error

Consume implements the Queue interface. If Redis has not been configured as a consumer, then ErrNilConsumer is returned.

func (*Redis) InitFunc

func (c *Redis) InitFunc(name string, fn InitFunc)

InitFunc implements the Queue interface.

func (*Redis) Produce

func (r *Redis) Produce(ctx context.Context, j Job) (string, error)

Produce implements the Queue interface. The given Job is encoded into bytes using gob encoding. This returns the underlying Job ID from curlyq itself. If Redis has not been configured as a producer, then ErrNilProducer is returned.

type Set

type Set struct {
	// contains filtered or unexported fields
}

func NewSet

func NewSet() *Set

func (*Set) Add

func (s *Set) Add(name string, q Queue)

Add adds the given Queue to the Set with the given name. This will panic if the given name has already been set.

func (*Set) InitFunc

func (s *Set) InitFunc(name string, fn InitFunc)

InitFunc will register the given initialization function with the given name against all of the queues in the Set.

func (*Set) Produce

func (s *Set) Produce(ctx context.Context, name string, j Job) (string, error)

Produce will place the given Job onto the Queue with the given name in the Set. If the Queue cannot be found, then ErrQueueNotExist is returned as the error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL