curlyq

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2020 License: MIT Imports: 16 Imported by: 9

README

CurlyQ

GoDoc Build Status GoCover Go Report Card License

CurlyQ provides a simple, easy-to-use interface for performing background processing in Go. It supports scheduled jobs, job deduplication, and configurable concurrent execution out of the box.

Quickstart

package main

import (
	"context"
	"log"

	cq "github.com/mcmathja/curlyq"
)

func main() {
	// Create a new producer
	producer := cq.NewProducer(&cq.ProducerOpts{
		Address: "localhost:6379",
		Queue: "testq",
	})

	// Use the producer to push a job to the queue
	producer.Perform(cq.Job{
		Data: []byte("Some data!"),
	})

	// Create a new consumer
	consumer := cq.NewConsumer(&cq.ConsumerOpts{
		Address: "localhost:6379",
		Queue: "testq",
	})

	// Consume jobs from the queue with the consumer
	consumer.Consume(func(ctx context.Context, job cq.Job) error {
		log.Println(string(job.Data))
		return nil
	})
}

The Basics

CurlyQ exposes three key types: Jobs, Producers, and Consumers.

Jobs

A Job wraps your data. In most cases, that's all you'll ever need to know about it:

job := &cq.Job{
	Data: []byte("Some data."),
}

Every Job also exposes an ID field that uniquely identifies it among all jobs in the queue, and an Attempt field representing how many times it has been attempted so far.

Producers

A Producer pushes jobs on to the queue. Create one by providing it with the address of your Redis instance and a queue name:

producer := cq.NewProducer(&cq.ProducerOpts{
	Address: "my.redis.addr:6379",
	Queue: "queue_name",
})

You can also provide an existing go-redis instance if you would like to configure the queue to run on a more advanced Redis configuration or set up your own retry and timeout logic for network calls:

import "github.com/go-redis/redis/v7"

client := redis.NewClient(&redis.Client{
	Password: "p@55vvoRd",
	DB: 3,
	MaxRetries: 2,
})

producer := cq.NewProducer(&cq.ProducerOpts{
	Client: client,
	Queue: "queue_name",
})

Running producer.Perform(job) will add a job to the queue to be run asynchronously. You can also schedule a job to be enqueued at a particular time by running producer.PerformAt(time, job), or after a certain wait period by running producer.PerformAfter(duration, job). All of the Perform methods return the ID assigned to the job and an error if one occurred.

You can deduplicate jobs by pre-assigning them IDs:

job := &Job{
	ID: "todays_job",
}

// Enqueue the job
producer.PerformAfter(10 * time.Second, job)

// Does nothing, because a job with the same ID is already on the queue
producer.Perform(job)

Once a job has been acknowledged, its ID becomes available for reuse.

See the documentation for ProducerOpts for more details about available configuration options.

Consumers

A Consumer pulls jobs off the queue and executes them using a provided handler function. Create one with the same basic options as a Producer:

consumer := cq.NewConsumer(&cq.ConsumerOpts{
	Queue: "queue_name",

	// With an address:
	Address: "my.redis.addr:6379",
	// With a preconfigured go-redis client:
	Client: redisClient,
})

You start a consumer by running its Consume method with a handler function:

consumer.Consume(func(ctx context.Context, job cq.Job) error {
	log.Println("Job %s has been processed!")
	return nil
})

If the provided handler function returns nil, the job is considered to have been processed successfully and is removed from the queue. If the handler returns an error or panics, the job is considered to have failed and will be retried or killed based on how many times it has been attempted.

Consume will continue to process jobs until your application receives an interrupt signal or the consumer encounters a fatal error. Fatal errors only occur when the consumer is unable to communicate with Redis for an essential operation, such as updating the status of a job in flight.

See the documentation for ConsumerOpts for more details about available configuration options.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

A Consumer executes jobs and manages the state of the queue.

func NewConsumer

func NewConsumer(opts *ConsumerOpts) *Consumer

NewConsumer instantiates a new Consumer.

func (*Consumer) Consume

func (c *Consumer) Consume(handler HandlerFunc, signals ...os.Signal) error

Consume starts the consumer with a default context. The Consumer runs until the process receives one of the specified signals. An error is returned if the Consumer cannot shut down gracefully.

func (*Consumer) ConsumeCtx

func (c *Consumer) ConsumeCtx(ctx context.Context, handler HandlerFunc) (err error)

ConsumeCtx starts the consumer with a user-supplied context. The Consumer runs indefinitely until the provided context is canceled. An error is returned if the Consumer cannot shut down gracefully.

type ConsumerOpts

type ConsumerOpts struct {
	// Address specifies the address of the Redis backing your queue.
	// CurlyQ will generate a go-redis instance based on this address.
	Address string
	// Client is a custom go-redis instance used to communicate with Redis.
	// If provided, this option overrides the value set in Address.
	Client *redis.Client
	// Queue specifies the name of the queue that this consumer will consume from.
	Queue string

	// Logger provides a concrete implementation of the Logger interface.
	// If not provided, it will default to using the stdlib's log package.
	Logger Logger
	// The maximum number of times to retry a job before killing it.
	// Default: 20
	JobMaxAttempts int
	// The maximum delay between retry attempts.
	// Default: 1 week
	JobMaxBackoff time.Duration
	// How long to wait for executors to finish before exiting forcibly.
	// A zero value indicates that we should wait indefinitely.
	// Default: 0
	ShutdownGracePeriod time.Duration

	// How long to wait after a missed heartbeat before a consumer is considered dead.
	// Default: 1 minute
	// Minimum: 5 seconds
	CustodianConsumerTimeout time.Duration
	// The maximum number of failed attempts before aborting.
	// A zero value indiciates the custodian should never abort.
	// Default: 0
	CustodianMaxAttempts int
	// The longest amount of time to wait between failed attempts.
	// Default: 30 seconds
	CustodianMaxBackoff time.Duration
	// Max number of jobs to clean up during a single check.
	// Default: 50
	CustodianMaxJobs int
	// How frequently the custodian should clean up jobs.
	// Default: 1 minute
	CustodianPollInterval time.Duration

	// The maximum number of failed attempts before aborting.
	// A zero value indiciates the hearbeart should never abort.
	// Default: 0
	HeartbeatMaxAttempts int
	// The longest amount of time to wait between failed attempts.
	// Default: 30 seconds
	HeartbeatMaxBackoff time.Duration
	// How frequently we should heartbeat.
	// Default: 1 minute
	// Minimum: 15 seconds
	HeartbeatPollInterval time.Duration

	// How many jobs to buffer locally.
	// Default: 10
	PollerBufferSize int
	// The maximum number of failed attempts before aborting.
	// A zero value indiciates the poller should never abort.
	// Default: 0
	PollerMaxAttempts int
	// The longest amount of time to wait between failed attempts.
	// Default: 30 seconds
	PollerMaxBackoff time.Duration
	// How long we should block on Redis for new jobs on each call.
	// Default: 5 seconds
	// Minimum: 1 second
	PollerPollDuration time.Duration

	// How many jobs to process simultaneously.
	// Default: 5
	ProcessorConcurrency int

	// The maximum number of failed attempts before aborting.
	// A zero value indiciates the scheduler should never abort.
	// Default: 0
	SchedulerMaxAttempts int
	// The longest amount of time to wait between failed attempts.
	// Default: 30 seconds
	SchedulerMaxBackoff time.Duration
	// Max number of jobs to schedule during each check.
	// Default: 50
	SchedulerMaxJobs int
	// How frequently the scheduler should check for scheduled jobs.
	// Default: 5 seconds
	SchedulerPollInterval time.Duration
}

ConsumerOpts exposes options used when creating a new Consumer.

type DefaultLogger added in v0.2.0

type DefaultLogger struct{}

DefaultLogger is a Logger that send all non-debug logs to stdout.

func (*DefaultLogger) Debug added in v0.2.0

func (l *DefaultLogger) Debug(args ...interface{})

Debug does nothing.

func (*DefaultLogger) Error added in v0.2.0

func (l *DefaultLogger) Error(args ...interface{})

Error logs error level information to stdout.

func (*DefaultLogger) Info added in v0.2.0

func (l *DefaultLogger) Info(args ...interface{})

Info logs info level information to stdout.

func (*DefaultLogger) Warn added in v0.2.0

func (l *DefaultLogger) Warn(args ...interface{})

Warn logs warn level information to stdout.

type EmptyLogger added in v0.3.0

type EmptyLogger struct{}

EmptyLogger is a Logger that logs nothing.

func (*EmptyLogger) Debug added in v0.3.0

func (l *EmptyLogger) Debug(args ...interface{})

Debug does nothing.

func (*EmptyLogger) Error added in v0.3.0

func (l *EmptyLogger) Error(args ...interface{})

Error does nothing.

func (*EmptyLogger) Info added in v0.3.0

func (l *EmptyLogger) Info(args ...interface{})

Info does nothing.

func (*EmptyLogger) Warn added in v0.3.0

func (l *EmptyLogger) Warn(args ...interface{})

Warn does nothing.

type ErrExceededMaxBackoff added in v0.4.0

type ErrExceededMaxBackoff struct {
	Attempt int
	Process string
}

ErrExceededMaxBackoff indicates a polling loop exceeded a maximum number of backoffs. It is considered a fatal error that should shut down the consumer.

func (ErrExceededMaxBackoff) Error added in v0.4.0

func (e ErrExceededMaxBackoff) Error() string

type ErrFailedToAckJob added in v0.2.0

type ErrFailedToAckJob struct {
	Job Job
	Err error
}

ErrFailedToAckJob indicates an error when acknowledging a completed job. It is considered a fatal error that should shut down the consumer.

func (ErrFailedToAckJob) Error added in v0.2.0

func (e ErrFailedToAckJob) Error() string

type ErrFailedToKillJob added in v0.2.0

type ErrFailedToKillJob struct {
	Job Job
	Err error
}

ErrFailedToKillJob indicates an error when marking a job as dead. It is considered a fatal error that should shut down the consumer.

func (ErrFailedToKillJob) Error added in v0.2.0

func (e ErrFailedToKillJob) Error() string

type ErrFailedToRetryJob added in v0.2.0

type ErrFailedToRetryJob struct {
	Job Job
	Err error
}

ErrFailedToRetryJob indicates an error when scheduling a retry. It is considered a fatal error that should shut down the consumer.

func (ErrFailedToRetryJob) Error added in v0.2.0

func (e ErrFailedToRetryJob) Error() string

type HandlerFunc

type HandlerFunc func(context.Context, Job) error

HandlerFunc is a convenience alias. It represents a function used to process a job.

type Job

type Job struct {
	ID      string
	Data    []byte
	Attempt int
}

A Job provides a wrapper for your job data.

type Logger added in v0.2.0

type Logger interface {
	// Debug logs fine-grained information,
	// such as when a given process starts and ends.
	Debug(...interface{})

	// Info logs useful information,
	// such as which job is currently being processed.
	Info(...interface{})

	// Warn logs non-critical errors,
	// such as network issues that are treated as transient errors.
	Warn(...interface{})

	// Error logs critical errors,
	// such as redis issues which might affect the consistency of the queue.
	Error(...interface{})
}

Logger exposes an interface for a leveled logger. You can provide a Logger to a Consumer and a Producer to modify CurlyQ's default logging behavior.

type LoudLogger added in v0.4.0

type LoudLogger struct{}

LoudLogger is a Logger that sends all logs to stdout.

func (*LoudLogger) Debug added in v0.4.0

func (l *LoudLogger) Debug(args ...interface{})

Debug logs debug level information to stdout.

func (*LoudLogger) Error added in v0.4.0

func (l *LoudLogger) Error(args ...interface{})

Error logs error level information to stdout.

func (*LoudLogger) Info added in v0.4.0

func (l *LoudLogger) Info(args ...interface{})

Info logs info level information to stdout.

func (*LoudLogger) Warn added in v0.4.0

func (l *LoudLogger) Warn(args ...interface{})

Warn logs warn level information to stdout.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

A Producer pushes jobs onto a queue.

func NewProducer

func NewProducer(opts *ProducerOpts) *Producer

NewProducer instantiates a new Producer.

func (*Producer) Perform

func (p *Producer) Perform(job Job) (string, error)

Perform calls PerformCtx with a default context. It calls to Redis using a default background context. It returns the ID of the enqueued job when successful or an error otherwise.

func (*Producer) PerformAfter

func (p *Producer) PerformAfter(duration time.Duration, job Job) (string, error)

PerformAfter enqueues a job to be performed after a certain amount of time. It calls to Redis using a default background context. It returns the ID of the enqueued job when successful or an error otherwise.

func (*Producer) PerformAfterCtx

func (p *Producer) PerformAfterCtx(ctx context.Context, duration time.Duration, job Job) (string, error)

PerformAfterCtx enqueues a job to be performed after a certain amount of time. It calls to Redis using a user-supplied context. It returns the ID of the enqueued job when successful or an error otherwise.

func (*Producer) PerformAt

func (p *Producer) PerformAt(at time.Time, job Job) (string, error)

PerformAt calls PerformAtCtx with a default context. It calls to Redis using a default background context. It returns the ID of the enqueued job when successful or an error otherwise.

func (*Producer) PerformAtCtx

func (p *Producer) PerformAtCtx(ctx context.Context, at time.Time, job Job) (string, error)

PerformAtCtx schedules a job to be performed at a particular point in time. It calls to Redis using a user-supplied context. It returns the ID of the enqueued job when successful or an error otherwise.

func (*Producer) PerformCtx

func (p *Producer) PerformCtx(ctx context.Context, job Job) (string, error)

PerformCtx enqueues a job to be performed as soon as possible. It calls to Redis using a user-supplied context. It returns the ID of the enqueued job when successful or an error otherwise.

type ProducerOpts

type ProducerOpts struct {
	// Address specifies the address of the Redis backing your queue.
	// CurlyQ will generate a go-redis instance based on this address.
	Address string
	// Client is a custom go-redis instance used to communicate with Redis.
	// If provided, this option overrides the value set in Address.
	Client *redis.Client
	// Logger provides a concrete implementation of the Logger interface.
	// If not provided, it will default to using the stdlib's log package.
	Logger Logger
	// Queue specifies the name of the queue that this producer will push to.
	Queue string
}

ProducerOpts exposes options used when creating a new Producer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL