taskq

package module
v3.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2020 License: BSD-2-Clause Imports: 22 Imported by: 0

README

Golang asynchronous task/job queue with Redis, SQS, IronMQ, and in-memory backends

Build Status GoDoc

Uptrace.dev - distributed traces, logs, and errors in one place

Installation

taskq requires a Go version with Modules support and uses import versioning. So please make sure to initialize a Go module before installing taskq:

go get github.com/vickxxx/taskq/v3

Features

  • Redis, SQS, IronMQ, and in-memory backends.
  • Automatically scaling number of goroutines used to fetch (fetcher) and process messages (worker).
  • Global rate limiting.
  • Global limit of workers.
  • Call once - deduplicating messages with same name.
  • Automatic retries with exponential backoffs.
  • Automatic pausing when all messages in queue fail.
  • Fallback handler for processing failed messages.
  • Message batching. It is used in SQS and IronMQ backends to add/delete messages in batches.
  • Automatic message compression using snappy / s2.

Quickstart

I recommend that you split your app into the two parts:

  • An API that accepts requests from customers and adds tasks to the queues.
  • A Worker that fetches tasks from the queues and processes them.

This way you can:

  • Isolate API and worker from each other.
  • Scale API and worker separately.
  • Have different configs for API and worker (like timeouts).

There is an api_worker example that demonstrates this approach using Redis as a backend:

cd example/api_worker
go run worker/worker.go
go run api/api.go

You start by choosing a backend to use - in our case Redis:

package api_worker

var QueueFactory = redisq.NewFactory()

Using that factory you create a queue that contains tasks:

var MainQueue = QueueFactory.RegisterQueue(&taskq.QueueOptions{
    Name:  "api-worker",
    Redis: Redis, // go-redis client
})

Using the queue you create a task with handler that does some useful work:

var CountTask = taskq.RegisterTask(&taskq.TaskOptions{
    Name: "counter",
    Handler: func() error {
        IncrLocalCounter()
        return nil
    },
})

Then in an API binary you use tasks to add messages/jobs to queues:

ctx := context.Background()
for {
    // call task handler without any args
    err := api_worker.MainQueue.Add(api_worker.CountTask.WithArgs(ctx))
    if err != nil {
        log.Fatal(err)
    }
}

And in a worker binary you start processing queues:

err := api_worker.MainQueue.Start(context.Background())
if err != nil {
    log.Fatal(err)
}

API overview

t := myQueue.RegisterTask(&taskq.TaskOptions{
    Name:    "greeting",
    Handler: func(name string) error {
        fmt.Println("Hello", name)
        return nil
    },
})

// Say "Hello World".
err := myQueue.Add(t.WithArgs(context.Background(), "World"))
if err != nil {
    panic(err)
}

// Say "Hello World" with 1 hour delay.
msg := t.WithArgs(ctx, "World")
msg.Delay = time.Hour
_ = myQueue.Add(msg)

// Say "Hello World" once.
for i := 0; i < 100; i++ {
    msg := t.WithArgs(ctx, "World")
    msg.Name = "hello-world" // unique
    _ = myQueue.Add(msg)
}

// Say "Hello World" once with 1 hour delay.
for i := 0; i < 100; i++ {
    msg := t.WithArgs(ctx, "World")
    msg.Name = "hello-world"
    msg.Delay = time.Hour
    _ = myQueue.Add(msg)
}

// Say "Hello World" once in an hour.
for i := 0; i < 100; i++ {
    msg := t.WithArgs(ctx, "World").OnceInPeriod(time.Hour)
    _ = myQueue.Add(msg)
}

// Say "Hello World" for Europe region once in an hour.
for i := 0; i < 100; i++ {
    msg := t.WithArgs(ctx, "World").OnceInPeriod(time.Hour, "World", "europe")
    _ = myQueue.Add(msg)
}

Message deduplication

If a Message has a Name then this will be used as unique identifier and messages with the same name will be deduplicated (i.e. not processed again) within a 24 hour period (or possibly longer if not evicted from local cache after that period). Where Name is omitted then non deduplication occurs and each message will be processed. Task's WithMessage and WithArgs both produces messages with no Name so will not be deduplicated. OnceWithArgs sets a name based off a consistent hash of the arguments and a quantised period of time (i.e. 'this hour', 'today') passed to OnceWithArgs a period. This guarantees that the same function will not be called with the same arguments during `period'.

Handlers

A Handler and FallbackHandler are supplied to RegisterTask in the TaskOptions.

There are three permitted types of signature:

  1. A zero-argument function
  2. A function whose arguments are assignable in type from those which are passed in the message
  3. A function which takes a single *Message argument

If a task is registered with a handler that takes a Go context.Context as its first argument then when that handler is invoked it will be passed the same Context that was passed to Consumer.Start(ctx). This can be used to transmit a signal to abort to all tasks being processed:

var AbortableTask = MainQueue.RegisterTask(&taskq.TaskOptions{
    Name: "SomethingLongwinded",
    Handler: func(ctx context.Context) error {
        for range time.Tick(time.Second) {
            select {
                case <-ctx.Done():
                    return ctx.Err()
                default:
                    fmt.Println("Wee!")
            }
        }
        return nil
    },
})

Custom message delay

If error returned by handler implements Delay() time.Duration interface then that delay is used to postpone message processing.

type RateLimitError string

func (e RateLimitError) Error() string {
    return string(e)
}

func (RateLimitError) Delay() time.Duration {
    return time.Hour
}

func handler() error {
    return RateLimitError("calm down")
}

Tracing

taskq supports tracing out-of-the-box using OpenTelemetry API. To instrument a queue, use the following code:

import "github.com/vickxxx/taskq/v3/taskqext"

consumer := queue.Consumer()
consumer.AddHook(&taskqext.OpenTelemetryHook{})

or using a taskq.Factory:

factory.Range(func(q taskq.Queue) bool {
    consumer := q.Consumer()
    consumer.AddHook(&taskqext.OpenTelemetryHook{})

    return true
})

We recommend using Uptrace.dev as a tracing backend.

Documentation

Overview

Package taskq implements task/job queue with Redis, SQS, IronMQ, and in-memory backends.

Example (CustomRateLimit)
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/vickxxx/taskq/v3"
	"github.com/vickxxx/taskq/v3/memqueue"
)

type RateLimitError string

func (e RateLimitError) Error() string {
	return string(e)
}

func (RateLimitError) Delay() time.Duration {
	return 3 * time.Second
}

func main() {
	start := time.Now()
	q := memqueue.NewQueue(&taskq.QueueOptions{
		Name: "test",
	})
	task := taskq.RegisterTask(&taskq.TaskOptions{
		Name: "Example_customRateLimit",
		Handler: func() error {
			fmt.Println("retried in", timeSince(start))
			return RateLimitError("calm down")
		},
		RetryLimit: 2,
		MinBackoff: time.Millisecond,
	})

	ctx := context.Background()
	q.Add(task.WithArgs(ctx))

	// Wait for all messages to be processed.
	_ = q.Close()

}
Output:

retried in 0s
retried in 3s
Example (MessageDelay)
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueOptions{
	Name: "test",
})
task := taskq.RegisterTask(&taskq.TaskOptions{
	Name: "Example_messageDelay",
	Handler: func() {
		fmt.Println("processed with delay", timeSince(start))
	},
})

ctx := context.Background()
msg := task.WithArgs(ctx)
msg.Delay = time.Second
_ = q.Add(msg)

// Wait for all messages to be processed.
_ = q.Close()
Output:

processed with delay 1s
Example (Once)
q := memqueue.NewQueue(&taskq.QueueOptions{
	Name:      "test",
	Redis:     redisRing(),
	RateLimit: redis_rate.PerSecond(1),
})
task := taskq.RegisterTask(&taskq.TaskOptions{
	Name: "Example_once",
	Handler: func(name string) {
		fmt.Println("hello", name)
	},
})

ctx := context.Background()
for i := 0; i < 10; i++ {
	msg := task.WithArgs(ctx, "world")
	// Call once in a second.
	msg.OnceInPeriod(time.Second)

	_ = q.Add(msg)
}

// Wait for all messages to be processed.
_ = q.Close()
Output:

hello world
Example (RateLimit)
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueOptions{
	Name:      "test",
	Redis:     redisRing(),
	RateLimit: redis_rate.PerSecond(1),
})
task := taskq.RegisterTask(&taskq.TaskOptions{
	Name:    "Example_rateLimit",
	Handler: func() {},
})

const n = 5

ctx := context.Background()
for i := 0; i < n; i++ {
	_ = q.Add(task.WithArgs(ctx))
}

// Wait for all messages to be processed.
_ = q.Close()

fmt.Printf("%d msg/s", timeSinceCeil(start)/time.Second/n)
Output:

1 msg/s
Example (RetryOnError)
start := time.Now()
q := memqueue.NewQueue(&taskq.QueueOptions{
	Name: "test",
})
task := taskq.RegisterTask(&taskq.TaskOptions{
	Name: "Example_retryOnError",
	Handler: func() error {
		fmt.Println("retried in", timeSince(start))
		return errors.New("fake error")
	},
	RetryLimit: 3,
	MinBackoff: time.Second,
})

ctx := context.Background()
q.Add(task.WithArgs(ctx))

// Wait for all messages to be processed.
_ = q.Close()
Output:

retried in 0s
retried in 1s
retried in 3s

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrAsyncTask = errors.New("taskq: async task")
View Source
var ErrDuplicate = errors.New("taskq: message with such name already exists")

ErrDuplicate is returned when adding duplicate message to the queue.

Functions

func SetLogger

func SetLogger(logger *log.Logger)

func SetUnknownTaskOptions

func SetUnknownTaskOptions(opt *TaskOptions)

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer reserves messages from the queue, processes them, and then either releases or deletes messages from the queue.

func NewConsumer

func NewConsumer(q Queue) *Consumer

New creates new Consumer for the queue using provided processing options.

func StartConsumer

func StartConsumer(ctx context.Context, q Queue) *Consumer

Starts creates new Consumer and starts it.

func (*Consumer) Add

func (c *Consumer) Add(msg *Message) error

func (*Consumer) AddHook

func (c *Consumer) AddHook(hook ConsumerHook)

AddHook adds a hook into message processing.

func (*Consumer) Len

func (c *Consumer) Len() int

func (*Consumer) Options

func (c *Consumer) Options() *QueueOptions

func (*Consumer) Process

func (c *Consumer) Process(msg *Message) error

Process is low-level API to process message bypassing the internal queue.

func (*Consumer) ProcessAll

func (c *Consumer) ProcessAll(ctx context.Context) error

ProcessAll starts workers to process messages in the queue and then stops them when all messages are processed.

func (*Consumer) ProcessOne

func (c *Consumer) ProcessOne(ctx context.Context) error

ProcessOne processes at most one message in the queue.

func (*Consumer) Purge

func (c *Consumer) Purge() error

Purge discards messages from the internal queue.

func (*Consumer) Put

func (c *Consumer) Put(msg *Message)

func (*Consumer) Queue

func (c *Consumer) Queue() Queue

func (*Consumer) Start

func (c *Consumer) Start(ctx context.Context) error

Start starts consuming messages in the queue.

func (*Consumer) Stats

func (c *Consumer) Stats() *ConsumerStats

Stats returns processor stats.

func (*Consumer) Stop

func (c *Consumer) Stop() error

Stop is StopTimeout with 30 seconds timeout.

func (*Consumer) StopTimeout

func (c *Consumer) StopTimeout(timeout time.Duration) error

StopTimeout waits workers for timeout duration to finish processing current messages and stops workers.

func (*Consumer) String

func (c *Consumer) String() string

type ConsumerHook

type ConsumerHook interface {
	BeforeProcessMessage(*ProcessMessageEvent) error
	AfterProcessMessage(*ProcessMessageEvent) error
}

type ConsumerStats

type ConsumerStats struct {
	NumWorker  uint32
	NumFetcher uint32

	BufferSize uint32
	Buffered   uint32

	InFlight  uint32
	Processed uint32
	Retries   uint32
	Fails     uint32
	Timing    time.Duration
}

type Delayer

type Delayer interface {
	Delay() time.Duration
}

type Factory

type Factory interface {
	RegisterQueue(*QueueOptions) Queue
	Range(func(Queue) bool)
	StartConsumers(context.Context) error
	StopConsumers() error
	Close() error
}

Factory is an interface that abstracts creation of new queues. It is implemented in subpackages memqueue, azsqs, and ironmq.

type Handler

type Handler interface {
	HandleMessage(msg *Message) error
}

Handler is an interface for processing messages.

func NewHandler

func NewHandler(fn interface{}) Handler

type HandlerFunc

type HandlerFunc func(*Message) error

func (HandlerFunc) HandleMessage

func (fn HandlerFunc) HandleMessage(msg *Message) error

type Message

type Message struct {
	Ctx context.Context `msgpack:"-"`

	// SQS/IronMQ message id.
	ID string `msgpack:"1,omitempty,alias:ID"`

	// Optional name for the message. Messages with the same name
	// are processed only once.
	Name string `msgpack:"-"`

	// Delay specifies the duration the queue must wait
	// before executing the message.
	Delay time.Duration `msgpack:"-"`

	// Args passed to the handler.
	Args []interface{} `msgpack:"-"`

	// Binary representation of the args.
	ArgsCompression string `msgpack:"2,omitempty,alias:ArgsCompression"`
	ArgsBin         []byte `msgpack:"3,alias:ArgsBin"`

	// SQS/IronMQ reservation id that is used to release/delete the message.
	ReservationID string `msgpack:"-"`

	// The number of times the message has been reserved or released.
	ReservedCount int `msgpack:"4,omitempty,alias:ReservedCount"`

	TaskName string `msgpack:"5,alias:TaskName"`
	Err      error  `msgpack:"-"`
	// contains filtered or unexported fields
}

Message is used to create and retrieve messages from a queue.

func NewMessage

func NewMessage(ctx context.Context, args ...interface{}) *Message

func (*Message) MarshalArgs

func (m *Message) MarshalArgs() ([]byte, error)

func (*Message) MarshalBinary

func (m *Message) MarshalBinary() ([]byte, error)

func (*Message) OnceInPeriod

func (m *Message) OnceInPeriod(period time.Duration, args ...interface{})

OnceInPeriod uses the period and the args to generate such a message name that message with such args is added to the queue once in a given period. If args are not provided then message args are used instead.

func (*Message) OnceWithDelay

func (m *Message) OnceWithDelay(delay time.Duration)

func (*Message) OnceWithSchedule

func (m *Message) OnceWithSchedule(tm time.Time)

func (*Message) SetDelay

func (m *Message) SetDelay(delay time.Duration)

SetDelay sets the message delay.

func (*Message) String

func (m *Message) String() string

func (*Message) UnmarshalBinary

func (m *Message) UnmarshalBinary(b []byte) error

type ProcessMessageEvent

type ProcessMessageEvent struct {
	Message   *Message
	StartTime time.Time

	Stash map[interface{}]interface{}
}

type Queue

type Queue interface {
	fmt.Stringer
	Name() string
	Options() *QueueOptions
	Consumer() *Consumer

	Len() (int, error)
	Add(msg *Message) error
	ReserveN(ctx context.Context, n int, waitTimeout time.Duration) ([]Message, error)
	Release(msg *Message) error
	Delete(msg *Message) error
	Purge() error
	Close() error
	CloseTimeout(timeout time.Duration) error
}

type QueueOptions

type QueueOptions struct {
	// Queue name.
	Name string

	// Minimum number of goroutines processing messages.
	// Default is 1.
	MinNumWorker int32
	// Maximum number of goroutines processing messages.
	// Default is 32 * number of CPUs.
	MaxNumWorker int32
	// Global limit of concurrently running workers across all servers.
	// Overrides MaxNumWorker.
	WorkerLimit int32
	// Maximum number of goroutines fetching messages.
	// Default is 16 * number of CPUs.
	MaxNumFetcher int32

	// Number of messages reserved by a fetcher in the queue in one request.
	// Default is 10 messages.
	ReservationSize int
	// Time after which the reserved message is returned to the queue.
	// Default is 5 minutes.
	ReservationTimeout time.Duration
	// Time that a long polling receive call waits for a message to become
	// available before returning an empty response.
	// Default is 10 seconds.
	WaitTimeout time.Duration
	// Size of the buffer where reserved messages are stored.
	// Default is the same as ReservationSize.
	BufferSize int

	// Number of consecutive failures after which queue processing is paused.
	// Default is 100 failures.
	PauseErrorsThreshold int

	// Processing rate limit.
	RateLimit redis_rate.Limit

	// Optional rate limiter. The default is to use Redis.
	RateLimiter *redis_rate.Limiter

	// Redis client that is used for storing metadata.
	Redis Redis

	// Optional storage interface. The default is to use Redis.
	Storage Storage

	// Optional message handler. The default is the global Tasks registry.
	Handler Handler
	// contains filtered or unexported fields
}

func (*QueueOptions) Init

func (opt *QueueOptions) Init()

type Redis

type Redis interface {
	Del(ctx context.Context, keys ...string) *redis.IntCmd
	SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd
	Pipelined(ctx context.Context, fn func(pipe redis.Pipeliner) error) ([]redis.Cmder, error)

	// Required by redislock
	Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd
	EvalSha(ctx context.Context, sha1 string, keys []string, args ...interface{}) *redis.Cmd
	ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd
	ScriptLoad(ctx context.Context, script string) *redis.StringCmd
}

type Storage

type Storage interface {
	Exists(ctx context.Context, key string) bool
}

type Task

type Task struct {
	// contains filtered or unexported fields
}

func RegisterTask

func RegisterTask(opt *TaskOptions) *Task

func (*Task) HandleMessage

func (t *Task) HandleMessage(msg *Message) error

func (*Task) Name

func (t *Task) Name() string

func (*Task) Options

func (t *Task) Options() *TaskOptions

func (*Task) String

func (t *Task) String() string

func (*Task) WithArgs

func (t *Task) WithArgs(ctx context.Context, args ...interface{}) *Message

type TaskMap

type TaskMap struct {
	// contains filtered or unexported fields
}
var Tasks TaskMap

func (*TaskMap) Get

func (r *TaskMap) Get(name string) *Task

func (*TaskMap) HandleMessage

func (r *TaskMap) HandleMessage(msg *Message) error

func (*TaskMap) Range

func (r *TaskMap) Range(fn func(name string, task *Task) bool)

func (*TaskMap) Register

func (r *TaskMap) Register(opt *TaskOptions) (*Task, error)

func (*TaskMap) Reset

func (r *TaskMap) Reset()

func (*TaskMap) Unregister

func (r *TaskMap) Unregister(task *Task)

type TaskOptions

type TaskOptions struct {
	// Task name.
	Name string

	// Function called to process a message.
	// There are three permitted types of signature:
	// 1. A zero-argument function
	// 2. A function whose arguments are assignable in type from those which are passed in the message
	// 3. A function which takes a single `*Message` argument
	// The handler function may also optionally take a Context as a first argument and may optionally return an error.
	// If the handler takes a Context, when it is invoked it will be passed the same Context as that which was passed to
	// `StartConsumer`. If the handler returns a non-nil error the message processing will fail and will be retried/.
	Handler interface{}
	// Function called to process failed message after the specified number of retries have all failed.
	// The FallbackHandler accepts the same types of function as the Handler.
	FallbackHandler interface{}

	// Optional function used by Consumer with defer statement
	// to recover from panics.
	DeferFunc func()

	// Number of tries/releases after which the message fails permanently
	// and is deleted.
	// Default is 64 retries.
	RetryLimit int
	// Minimum backoff time between retries.
	// Default is 30 seconds.
	MinBackoff time.Duration
	// Maximum backoff time between retries.
	// Default is 30 minutes.
	MaxBackoff time.Duration
	// contains filtered or unexported fields
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL