curlyq

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 27, 2020 License: MIT Imports: 16 Imported by: 9

README

CurlyQ

Build Status GoDoc GoCover

CurlyQ provides a simple, easy-to-use interface for performing background processing in Go. It supports scheduled jobs, job deduplication, and configurable concurrent execution out of the box.

Quickstart

package main

import (
	"context"
	"log"

	cq "github.com/mcmathja/curlyq"
)

func main() {
	// Create a new producer
	producer := cq.NewProducer(&cq.ProducerOpts{
		Address: "localhost:6379",
		Queue: "testq",
	})

	// Use the producer to push a job to the queue
	producer.Perform(cq.Job{
		Data: []byte("Some data!"),
	})

	// Create a new consumer
	consumer := cq.NewConsumer(&cq.ConsumerOpts{
		Address: "localhost:6379",
		Queue: "testq",
	})

	// Consume jobs from the queue with the consumer
	consumer.Consume(func(ctx context.Context, job cq.Job) error {
		log.Println(string(job.Data))
		return nil
	})
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumers execute jobs and manage the state of the queue.

func NewConsumer

func NewConsumer(opts *ConsumerOpts) *Consumer

NewConsumer instantiates a new Consumer.

func (*Consumer) Consume

func (c *Consumer) Consume(handler HandlerFunc, signals ...os.Signal) error

Consumer starts the consumer with a default context. The Consumer runs until the process receives one of the specified signals. An error is returned if the Consumer cannot shut down gracefully.

func (*Consumer) ConsumeCtx

func (c *Consumer) ConsumeCtx(ctx context.Context, handler HandlerFunc) (err error)

ConsumeCtx starts the consumer with a user-supplied context. The Consumer runs indefinitely until the provided context is canceled. An error is returned if the Consumer cannot shut down gracefully.

type ConsumerOpts

type ConsumerOpts struct {
	// Address specifies the address of the Redis backing your queue.
	// CurlyQ will generate a go-redis instance based on this address.
	Address string
	// Client is a custom go-redis instance used to communicate with Redis.
	// If provided, this option overrides the value set in Address.
	Client *redis.Client
	// Log provides a concrete implementation of the Logger interface.
	// If not provided, it will default to using the stdlib's log package.
	Logger Logger
	// Queue specifies the name of the queue that this consumer will consume from.
	Queue string

	// How long to wait for executors to finish before exiting forcibly.
	// A zero value indicates that we should wait indefinitely.
	// Default: 0
	ShutdownGracePeriod time.Duration

	// How frequently the custodian should clean up jobs.
	// Default: 1 minute
	CustodianPollInterval time.Duration
	// Max number of jobs to clean up during a single check.
	// Default: 50
	CustodianMaxJobs uint
	// How long to wait after a missed heartbeat before a consumer is considered dead.
	// Default: 1 minute
	// Minimum: 5 seconds
	CustodianConsumerTimeout time.Duration

	// How many job executors to run simultaneously.
	// Default: 10
	ExecutorsConcurrency uint
	// How frequently we should poll for jobs.
	// Default: 3 seconds
	ExecutorsPollInterval time.Duration
	// How many jobs to buffer locally.
	// Default: Same as ExecutorsConcurrency
	ExecutorsBufferSize uint
	// The number of times to attempt a job before killing it.
	// Default: 5
	ExecutorsMaxAttempts uint

	// How frequently we should heartbeat.
	// Default: 1 minute
	// Minimum: 15 seconds
	HeartbeatInterval time.Duration

	// How frequently the scheduler should check for scheduled jobs.
	// Default: 15 seconds
	SchedulerPollInterval time.Duration
	// Max number of jobs to schedule during each check.
	// Default: 50
	SchedulerMaxJobs uint
}

ConsumerOpts exposes options used when creating a new Consumer.

type DefaultLogger added in v0.2.0

type DefaultLogger struct{}

func (*DefaultLogger) Debug added in v0.2.0

func (l *DefaultLogger) Debug(args ...interface{})

func (*DefaultLogger) Error added in v0.2.0

func (l *DefaultLogger) Error(args ...interface{})

func (*DefaultLogger) Info added in v0.2.0

func (l *DefaultLogger) Info(args ...interface{})

func (*DefaultLogger) Warn added in v0.2.0

func (l *DefaultLogger) Warn(args ...interface{})

type ErrFailedToAckJob added in v0.2.0

type ErrFailedToAckJob struct {
	Job Job
	Err error
}

func (ErrFailedToAckJob) Error added in v0.2.0

func (e ErrFailedToAckJob) Error() string

type ErrFailedToKillJob added in v0.2.0

type ErrFailedToKillJob struct {
	Job Job
	Err error
}

func (ErrFailedToKillJob) Error added in v0.2.0

func (e ErrFailedToKillJob) Error() string

type ErrFailedToRetryJob added in v0.2.0

type ErrFailedToRetryJob struct {
	Job Job
	Err error
}

func (ErrFailedToRetryJob) Error added in v0.2.0

func (e ErrFailedToRetryJob) Error() string

type HandlerFunc

type HandlerFunc func(context.Context, Job) error

HandlerFunc is a convenience alias. It represents a function used to process a job.

type Job

type Job struct {
	ID      string
	Data    []byte
	Attempt uint
}

type Logger added in v0.2.0

type Logger interface {
	Debug(...interface{})
	Info(...interface{})
	Warn(...interface{})
	Error(...interface{})
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producers provide logic for pushing jobs onto a queue.

func NewProducer

func NewProducer(opts *ProducerOpts) *Producer

NewProducer instantiates a new Producer.

func (*Producer) Perform

func (p *Producer) Perform(job Job) (string, error)

Perform calls PerformCtx with a default context. It calls to Redis using a default background context. It returns the ID of the enqueued job when successful or an error otherwise.

func (*Producer) PerformAfter

func (p *Producer) PerformAfter(duration time.Duration, job Job) (string, error)

PerformAfter enqueues a job to be performed after a certain amount of time. It calls to Redis using a default background context. It returns the ID of the enqueued job when successful or an error otherwise.

func (*Producer) PerformAfterCtx

func (p *Producer) PerformAfterCtx(ctx context.Context, duration time.Duration, job Job) (string, error)

PerformAfterCtx enqueues a job to be performed after a certain amount of time. It calls to Redis using a user-supplied context. It returns the ID of the enqueued job when successful or an error otherwise.

func (*Producer) PerformAt

func (p *Producer) PerformAt(at time.Time, job Job) (string, error)

PerformAt calls PerformAtCtx with a default context. It calls to Redis using a default background context. It returns the ID of the enqueued job when successful or an error otherwise.

func (*Producer) PerformAtCtx

func (p *Producer) PerformAtCtx(ctx context.Context, at time.Time, job Job) (string, error)

PerformAtCtx schedules a job to be performed at a particular point in time. It calls to Redis using a user-supplied context. It returns the ID of the enqueued job when successful or an error otherwise.

func (*Producer) PerformCtx

func (p *Producer) PerformCtx(ctx context.Context, job Job) (string, error)

PerformCtx enqueues a job to be performed as soon as possible. It calls to Redis using a user-supplied context. It returns the ID of the enqueued job when successful or an error otherwise.

type ProducerOpts

type ProducerOpts struct {
	// Address specifies the address of the Redis backing your queue.
	// CurlyQ will generate a go-redis instance based on this address.
	Address string
	// Client is a custom go-redis instance used to communicate with Redis.
	// If provided, this option overrides the value set in Address.
	Client *redis.Client
	// Log provides a concrete implementation of the Logger interface.
	// If not provided, it will default to using the stdlib's log package.
	Logger Logger
	// Queue specifies the name of the queue that this producer will push to.
	Queue string
}

ProducerOpts exposes options used when creating a new Producer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL