taskq

package module
v2.0.0-beta.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 15, 2019 License: BSD-2-Clause Imports: 20 Imported by: 4

README

Golang asynchronous task/job queue with Redis, SQS, IronMQ, and in-memory backends

Build Status GoDoc

Installation

go get github.com/vmihailenco/taskq/v2

Features

  • Redis, SQS, IronMQ, and in-memory backends.
  • Automatically scaling number of goroutines used to fetch (fetcher) and process messages (worker).
  • Global rate limiting.
  • Global limit of workers.
  • Call once - deduplicating messages with same name.
  • Automatic retries with exponential backoffs.
  • Automatic pausing when all messages in queue fail.
  • Fallback handler for processing failed messages.
  • Message batching. It is used in SQS and IronMQ backends to add/delete messages in batches.
  • Automatic message compression using zstd.

Quickstart

I recommend to split your app into 2 parts:

  • API that accepts requests from customers and adds tasks to the queues.
  • Worker that fetches tasks from the queues and processes them.

This way you can:

  • isolate API and worker from each other;
  • scale API and worker separately;
  • have different configs for API and worker (like timeouts).

There is an api_worker example that demonstrates this approach using Redis as backend:

cd examples/api_worker
go run worker/main.go
go run api/main.go

You start by choosing backend to use - in our case Redis:

var QueueFactory = redisq.NewFactory()

Using that factory you create queue that contains task(s):

var MainQueue = QueueFactory.NewQueue(&taskq.QueueOptions{
	Name:  "api-worker",
	Redis: Redis, // go-redis client
})

Using the queue you create task with handler that does some useful work:

var CountTask = MainQueue.NewTask(&taskq.TaskOptions{
	Name: "counter",
	Handler: func() error {
		IncrLocalCounter()
		return nil
	},
})

Then in API you use the task to add messages/jobs to the queues:

for {
	err := api_worker.CountTask.Call() // call task handler without any args
	if err != nil {
		log.Fatal(err)
	}
}

And in worker you start processing the queue:

err := api_worker.MainQueue.Consumer().Start()
if err != nil {
	log.Fatal(err)
}

API overview

t := myQueue.NewTask(&taskq.TaskOptions{
	Name:    "greeting",
	Handler: func(name string) error {
		fmt.Println("Hello", name)
		return nil
	},
})

// Say "Hello World".
t.Call("World")

// Same using Message API.
t.AddMessage(taskq.NewMessage("World"))

// Say "Hello World" with 1 hour delay.
msg := taskq.NewMessage("World")
msg.Delay = time.Hour
t.AddMessage(msg)

// Say "Hello World" once.
for i := 0; i < 100; i++ {
    msg := taskq.NewMessage("hello")
    msg.Name = "hello-world" // unique
    t.Add(msg)
}

// Say "Hello World" once with 1 hour delay.
for i := 0; i < 100; i++ {
    msg := taskq.NewMessage("hello")
    msg.Name = "hello-world"
    msg.Delay = time.Hour
    t.Add(msg)
}

// Say "Hello World" once in an hour.
for i := 0; i < 100; i++ {
    t.CallOnce(time.Hour, "hello")
}

// Say "Hello World" for Europe region once in an hour.
for i := 0; i < 100; i++ {
    msg := taskq.NewMessage("hello")
    msg.OnceWithArgs(time.Hour, "europe") // set delay and autogenerate message name
    t.Add(msg)
}

Custom message delay

If error returned by handler implements Delay() time.Duration interface then that delay is used to postpone message processing.

type RateLimitError string

func (e RateLimitError) Error() string {
    return string(e)
}

func (RateLimitError) Delay() time.Duration {
    return time.Hour
}

func handler() error {
    return RateLimitError("calm down")
}

Documentation

Overview

Package taskq implements task/job queue with Redis, SQS, IronMQ, and in-memory backends.

Example (CustomRateLimit)
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/vmihailenco/taskq/v2"
	"github.com/vmihailenco/taskq/v2/memqueue"
)

type RateLimitError string

func (e RateLimitError) Error() string {
	return string(e)
}

func (RateLimitError) Delay() time.Duration {
	return 3 * time.Second
}

func main() {
	start := time.Now()
	q := memqueue.NewQueue(&taskq.QueueOptions{
		Name: "test",
	})
	task := taskq.RegisterTask(&taskq.TaskOptions{
		Name: "Example_customRateLimit",
		Handler: func() error {
			fmt.Println("retried in", timeSince(start))
			return RateLimitError("calm down")
		},
		RetryLimit: 2,
		MinBackoff: time.Millisecond,
	})

	ctx := context.Background()
	q.Add(task.WithArgs(ctx))

	// Wait for all messages to be processed.
	_ = q.Close()

}
Output:

retried in 0s
retried in 3s
Example (MessageDelay)
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueOptions{
	Name: "test",
})
task := taskq.RegisterTask(&taskq.TaskOptions{
	Name: "Example_messageDelay",
	Handler: func() {
		fmt.Println("processed with delay", timeSince(start))
	},
})

ctx := context.Background()
msg := task.WithArgs(ctx)
msg.Delay = time.Second
_ = q.Add(msg)

// Wait for all messages to be processed.
_ = q.Close()
Output:

processed with delay 1s
Example (Once)
q := memqueue.NewQueue(&taskq.QueueOptions{
	Name:      "test",
	Redis:     redisRing(),
	RateLimit: rate.Every(time.Second),
})
task := taskq.RegisterTask(&taskq.TaskOptions{
	Name: "Example_once",
	Handler: func(name string) {
		fmt.Println("hello", name)
	},
})

ctx := context.Background()
for i := 0; i < 10; i++ {
	// Call once in a second.
	_ = q.Add(task.OnceWithArgs(ctx, time.Second, "world"))
}

// Wait for all messages to be processed.
_ = q.Close()
Output:

hello world
Example (RateLimit)
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueOptions{
	Name:      "test",
	Redis:     redisRing(),
	RateLimit: rate.Every(time.Second),
})
task := taskq.RegisterTask(&taskq.TaskOptions{
	Name:    "Example_rateLimit",
	Handler: func() {},
})

const n = 5

ctx := context.Background()
for i := 0; i < n; i++ {
	_ = q.Add(task.WithArgs(ctx))
}

// Wait for all messages to be processed.
_ = q.Close()

fmt.Printf("%d msg/s", timeSinceCeil(start)/time.Second/n)
Output:

1 msg/s
Example (RetryOnError)
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueOptions{
	Name: "test",
})
task := taskq.RegisterTask(&taskq.TaskOptions{
	Name: "Example_retryOnError",
	Handler: func() error {
		fmt.Println("retried in", timeSince(start))
		return errors.New("fake error")
	},
	RetryLimit: 3,
	MinBackoff: time.Second,
})

ctx := context.Background()
q.Add(task.WithArgs(ctx))

// Wait for all messages to be processed.
_ = q.Close()
Output:

retried in 0s
retried in 1s
retried in 3s

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrAsyncTask = errors.New("taskq: async task")
View Source
var ErrDuplicate = errors.New("taskq: message with such name already exists")

ErrDuplicate is returned when adding duplicate message to the queue.

View Source
var Tasks taskRegistry

Functions

func SetLogger

func SetLogger(logger *log.Logger)

func SetUnknownTaskOptions

func SetUnknownTaskOptions(opt *TaskOptions)

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer reserves messages from the queue, processes them, and then either releases or deletes messages from the queue.

func NewConsumer

func NewConsumer(q Queue) *Consumer

New creates new Consumer for the queue using provided processing options.

func StartConsumer

func StartConsumer(ctx context.Context, q Queue) *Consumer

Starts creates new Consumer and starts it.

func (*Consumer) Add

func (c *Consumer) Add(msg *Message) error

func (*Consumer) AddHook

func (c *Consumer) AddHook(hook ConsumerHook)

AddHook adds a hook into message processing.

func (*Consumer) Len

func (c *Consumer) Len() int

func (*Consumer) Options

func (c *Consumer) Options() *QueueOptions

func (*Consumer) Process

func (c *Consumer) Process(msg *Message) error

Process is low-level API to process message bypassing the internal queue.

func (*Consumer) ProcessAll

func (c *Consumer) ProcessAll(ctx context.Context) error

ProcessAll starts workers to process messages in the queue and then stops them when all messages are processed.

func (*Consumer) ProcessOne

func (c *Consumer) ProcessOne(ctx context.Context) error

ProcessOne processes at most one message in the queue.

func (*Consumer) Purge

func (c *Consumer) Purge() error

Purge discards messages from the internal queue.

func (*Consumer) Put

func (c *Consumer) Put(msg *Message)

func (*Consumer) Queue

func (c *Consumer) Queue() Queue

func (*Consumer) Start

func (c *Consumer) Start(ctx context.Context) error

Start starts consuming messages in the queue.

func (*Consumer) Stats

func (c *Consumer) Stats() *ConsumerStats

Stats returns processor stats.

func (*Consumer) Stop

func (c *Consumer) Stop() error

Stop is StopTimeout with 30 seconds timeout.

func (*Consumer) StopTimeout

func (c *Consumer) StopTimeout(timeout time.Duration) error

StopTimeout waits workers for timeout duration to finish processing current messages and stops workers.

func (*Consumer) String

func (c *Consumer) String() string

type ConsumerHook

type ConsumerHook interface {
	BeforeProcessMessage(*ProcessMessageEvent) error
	AfterProcessMessage(*ProcessMessageEvent) error
}

type ConsumerStats

type ConsumerStats struct {
	WorkerNumber  uint32
	FetcherNumber uint32
	BufferSize    uint32
	Buffered      uint32
	InFlight      uint32
	Processed     uint32
	Retries       uint32
	Fails         uint32
}

type Delayer

type Delayer interface {
	Delay() time.Duration
}

type Factory

type Factory interface {
	RegisterQueue(*QueueOptions) Queue
	Range(func(Queue) bool)
	StartConsumers(context.Context) error
	StopConsumers() error
	Close() error
}

Factory is an interface that abstracts creation of new queues. It is implemented in subpackages memqueue, azsqs, and ironmq.

type Handler

type Handler interface {
	HandleMessage(msg *Message) error
}

Handler is an interface for processing messages.

func NewHandler

func NewHandler(fn interface{}) Handler

type HandlerFunc

type HandlerFunc func(*Message) error

func (HandlerFunc) HandleMessage

func (fn HandlerFunc) HandleMessage(msg *Message) error

type Message

type Message struct {
	Ctx context.Context `msgpack:"-"`

	// SQS/IronMQ message id.
	ID string `msgpack:",omitempty"`

	// Optional name for the message. Messages with the same name
	// are processed only once.
	Name string `msgpack:"-"`

	// Delay specifies the duration the queue must wait
	// before executing the message.
	Delay time.Duration `msgpack:"-"`

	// Function args passed to the handler.
	Args []interface{} `msgpack:"-"`

	// Binary representation of the args.
	ArgsCompression string `msgpack:",omitempty"`
	ArgsBin         []byte

	// SQS/IronMQ reservation id that is used to release/delete the message.
	ReservationID string `msgpack:"-"`

	// The number of times the message has been reserved or released.
	ReservedCount int `msgpack:",omitempty"`

	TaskName  string
	StickyErr error `msgpack:"-"`
	// contains filtered or unexported fields
}

Message is used to create and retrieve messages from a queue.

func NewMessage

func NewMessage(ctx context.Context, args ...interface{}) *Message

func (*Message) MarshalArgs

func (m *Message) MarshalArgs() ([]byte, error)

func (*Message) MarshalBinary

func (m *Message) MarshalBinary() ([]byte, error)

func (*Message) OnceWithArgs

func (m *Message) OnceWithArgs(period time.Duration, args ...interface{})

func (*Message) String

func (m *Message) String() string

func (*Message) UnmarshalBinary

func (m *Message) UnmarshalBinary(b []byte) error

type ProcessMessageEvent

type ProcessMessageEvent struct {
	Message   *Message
	StartTime time.Time
	Error     error

	Stash map[interface{}]interface{}
}

type Queue

type Queue interface {
	fmt.Stringer
	Name() string
	Options() *QueueOptions
	Consumer() *Consumer

	Len() (int, error)
	Add(msg *Message) error
	ReserveN(n int, waitTimeout time.Duration) ([]Message, error)
	Release(msg *Message) error
	Delete(msg *Message) error
	Purge() error
	Close() error
	CloseTimeout(timeout time.Duration) error
}

type QueueOptions

type QueueOptions struct {
	// Queue name.
	Name string

	// Minimum number of goroutines processing messages.
	// Default is 1.
	MinWorkers int
	// Maximum number of goroutines processing messages.
	// Default is 32 * number of CPUs.
	MaxWorkers int
	// Global limit of concurrently running workers across all servers.
	// Overrides MaxWorkers.
	WorkerLimit int
	// Maximum number of goroutines fetching messages.
	// Default is 16 * number of CPUs.
	MaxFetchers int

	// Number of messages reserved by a fetcher in the queue in one request.
	// Default is 10 messages.
	ReservationSize int
	// Time after which the reserved message is returned to the queue.
	// Default is 5 minutes.
	ReservationTimeout time.Duration
	// Time that a long polling receive call waits for a message to become
	// available before returning an empty response.
	// Default is 10 seconds.
	WaitTimeout time.Duration
	// Size of the buffer where reserved messages are stored.
	// Default is the same as ReservationSize.
	BufferSize int

	// Number of consecutive failures after which queue processing is paused.
	// Default is 100 failures.
	PauseErrorsThreshold int

	// Processing rate limit.
	RateLimit rate.Limit

	// Optional rate limiter interface. The default is to use Redis.
	RateLimiter RateLimiter

	// Redis client that is used for storing metadata.
	Redis Redis

	// Optional storage interface. The default is to use Redis.
	Storage Storage
	// contains filtered or unexported fields
}

func (*QueueOptions) Init

func (opt *QueueOptions) Init()

type RateLimiter

type RateLimiter interface {
	AllowRate(name string, limit rate.Limit) (delay time.Duration, allow bool)
}

type Redis

type Redis interface {
	Del(keys ...string) *redis.IntCmd
	SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
	Pipelined(func(pipe redis.Pipeliner) error) ([]redis.Cmder, error)

	// Required by redislock
	Eval(script string, keys []string, args ...interface{}) *redis.Cmd
	EvalSha(sha1 string, keys []string, args ...interface{}) *redis.Cmd
	ScriptExists(scripts ...string) *redis.BoolSliceCmd
	ScriptLoad(script string) *redis.StringCmd
}

type Storage

type Storage interface {
	Exists(key string) bool
}

type Task

type Task struct {
	// contains filtered or unexported fields
}

func RegisterTask

func RegisterTask(opt *TaskOptions) *Task

func (*Task) HandleMessage

func (t *Task) HandleMessage(msg *Message) error

func (*Task) Name

func (t *Task) Name() string

func (*Task) OnceWithArgs

func (t *Task) OnceWithArgs(ctx context.Context, period time.Duration, args ...interface{}) *Message

func (*Task) Options

func (t *Task) Options() *TaskOptions

func (*Task) String

func (t *Task) String() string

func (*Task) WithArgs

func (t *Task) WithArgs(ctx context.Context, args ...interface{}) *Message

func (*Task) WithMessage

func (t *Task) WithMessage(msg *Message) *Message

type TaskOptions

type TaskOptions struct {
	// Task name.
	Name string

	// Function called to process a message.
	Handler interface{}
	// Function called to process failed message.
	FallbackHandler interface{}

	// Optional function used by Consumer with defer statement
	// to recover from panics.
	DeferFunc func()

	// Number of tries/releases after which the message fails permanently
	// and is deleted.
	// Default is 64 retries.
	RetryLimit int
	// Minimum backoff time between retries.
	// Default is 30 seconds.
	MinBackoff time.Duration
	// Maximum backoff time between retries.
	// Default is 30 minutes.
	MaxBackoff time.Duration
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL