bokchoy

package module
Version: v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 21, 2020 License: MIT Imports: 17 Imported by: 27

README

bokchoy

Build Status GoDoc Go report

Introduction

Bokchoy is a simple Go library for queueing tasks and processing them in the background with workers. It should be integrated in your web stack easily and it's designed to have a low barrier entry for newcomers.

It currently only supports Redis (client, sentinel and cluster) with some Lua magic, but internally it relies on a generic broker implementation to extends it.

screen

Motivation

It's relatively easy to make a producer/receiver system in Go since the language contains builtins features to build it from scratch but we keep adding the same system everywhere instead of thinking reusable.

Bokchoy is a plug and play component, it does its job and it does it well for you that you can focus on your business logic.

Features

  • Lightweight
  • A Simple API close to net/http - if you already use net/http then you can learn it pretty quickly
  • Designed with a modular/composable APIs - middlewares, queue middlewares
  • Context control - built on context package, providing value chaining, cancelations and timeouts
  • Highly configurable - tons of options to swap internal parts (broker, logger, timeouts, etc), if you cannot customize something then an option is missing
  • Extensions - RPC server powered by gRPC, Sentry, etc.

Getting started

First, run a Redis server, of course:

redis-server

Define your producer which will send tasks:

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/thoas/bokchoy"
)

func main() {
	ctx := context.Background()

	// define the main engine which will manage queues
	engine, err := bokchoy.New(ctx, bokchoy.Config{
		Broker: bokchoy.BrokerConfig{
			Type: "redis",
			Redis: bokchoy.RedisConfig{
				Type: "client",
				Client: bokchoy.RedisClientConfig{
					Addr: "localhost:6379",
				},
			},
		},
	})
	if err != nil {
		log.Fatal(err)
	}

	payload := map[string]string{
		"data": "hello world",
	}

	task, err := engine.Queue("tasks.message").Publish(ctx, payload)
	if err != nil {
		log.Fatal(err)
	}

	fmt.Println(task, "has been published")
}

See producer directory for more information and to run it.

Now we have a producer which can send tasks to our engine, we need a worker to process them in the background:

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"

	"github.com/thoas/bokchoy"
)

func main() {
	ctx := context.Background()

	engine, err := bokchoy.New(ctx, bokchoy.Config{
		Broker: bokchoy.BrokerConfig{
			Type: "redis",
			Redis: bokchoy.RedisConfig{
				Type: "client",
				Client: bokchoy.RedisClientConfig{
					Addr: "localhost:6379",
				},
			},
		},
	})
	if err != nil {
		log.Fatal(err)
	}

	engine.Queue("tasks.message").HandleFunc(func(r *bokchoy.Request) error {
		fmt.Println("Receive request", r)
		fmt.Println("Payload:", r.Task.Payload)

		return nil
	})

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt)

	go func() {
		for range c {
			log.Print("Received signal, gracefully stopping")
			engine.Stop(ctx)
		}
	}()

	engine.Run(ctx)
}

A worker is defined by handlers, to define a Handler you have to follow this interface:

type Handler interface {
	Handle(*Request) error
}

You can create your own struct which implements this interface or use the HandlerFunc to generate a Handler from your function.

See worker directory for more information and to run it.

If you want a complete application example, you can read A Tour of Bokchoy which explain how to use the main features of it.

Installation

Using Go Modules

go get github.com/thoas/bokchoy

Advanced topics

Delayed tasks

When publishing a task, it will be immediately processed by the worker if it's not already occupied, you may want to delay the task on some occasions by using bokchoy.WithCountdown option:

payload := map[string]string{
    "data": "hello world",
}

queue.Publish(ctx, payload, bokchoy.WithCountdown(5*time.Second))

This task will be executed in 5 seconds.

Priority tasks

A task can be published at front of others by providing a negative countdown.

payload := map[string]string{
    "data": "hello world",
}

queue.Publish(ctx, payload, bokchoy.WithCountdown(-1))

This task will be published and processed immediately.

Custom serializer

By default the task serializer is JSON, you can customize it when initializing the Bokchoy engine, it must respect the Serializer interface.

bokchoy.New(ctx, bokchoy.Config{
    Broker: bokchoy.BrokerConfig{
        Type: "redis",
        Redis: bokchoy.RedisConfig{
            Type: "client",
            Client: bokchoy.RedisClientConfig{
                Addr: "localhost:6379",
            },
        },
    },
}, bokchoy.WithSerializer(MySerializer{}))

You will be capable to define a msgpack, yaml serializers if you want.

Custom logger

By default the internal logger is disabled, you can provide a more verbose logger with options:

import (
	"context"
	"fmt"
	"log"

	"github.com/thoas/bokchoy/logging"
)

func main() {
	logger, err := logging.NewDevelopmentLogger()
	if err != nil {
		log.Fatal(err)
	}

	defer logger.Sync()

    bokchoy.New(ctx, bokchoy.Config{
        Broker: bokchoy.BrokerConfig{
            Type: "redis",
            Redis: bokchoy.RedisConfig{
                Type: "client",
                Client: bokchoy.RedisClientConfig{
                    Addr: "localhost:6379",
                },
            },
        },
    }, bokchoy.WithLogger(logger))
}

The builtin logger is based on zap but you can provide your own implementation easily if you have a central component.

If you don't need that much information, you can enable the Logger middleware.

Worker Concurrency

By default the worker concurrency is set to 1, you can override it based on your server capability, Bokchoy will spawn multiple goroutines to handle your tasks.

engine.Queue("tasks.message").HandleFunc(func(r *bokchoy.Request) error {
    fmt.Println("Receive request", r)
    fmt.Println("Payload:", r.Task.Payload)

    return nil
}, bokchoy.WithConcurrency(5))

You can still set it globally with bokchoy.WithConcurrency option when initializing the engine.

Retries

If your task handler is returning an error, the task will be marked as failed and retried 3 times, based on intervals: 60 seconds, 120 seconds, 180 seconds.

You can customize this globally on the engine or when publishing a new task by using bokchoy.WithMaxRetries and bokchoy.WithRetryIntervals options.

bokchoy.WithMaxRetries(1)
bokchoy.WithRetryIntervals([]time.Duration{
	180 * time.Second,
})
Timeout

By default a task will be forced to timeout and marked as canceled if its running time exceed 180 seconds.

You can customize this globally or when publishing a new task by using bokchoy.WithTimeout option:

bokchoy.WithTimeout(5*time.Second)

The worker will regain control and process the next task but be careful, each task is running in a goroutine so you have to cancel your task at some point or it will be leaking.

Catch events

You can catch events by registering handlers on your queue when your tasks are starting, succeeding, completing or failing.

queue := engine.Queue("tasks.message")
queue.OnStartFunc(func(r *bokchoy.Request) error {
    // we update the context by adding a value
    *r = *r.WithContext(context.WithValue(r.Context(), "foo", "bar"))

    return nil
})

queue.OnCompleteFunc(func(r *bokchoy.Request) error {
    fmt.Println(r.Context().Value("foo"))

    return nil
})

queue.OnSuccessFunc(func(r *bokchoy.Request) error {
    fmt.Println(r.Context().Value("foo"))

    return nil
})

queue.OnFailureFunc(func(r *bokchoy.Request) error {
    fmt.Println(r.Context().Value("foo"))

    return nil
})
Store results

By default, if you don't mutate the task in the handler its result will be always nil.

You can store a result in your task to keep it for later, for example: you might need statistics from a twitter profile to save them later.

queue.HandleFunc(func(r *bokchoy.Request) error {
	r.Task.Result = map[string]string{"result": "wow!"}

	return nil
})

You can store anything as long as your serializer can serializes it.

Keep in mind the default task TTL is 180 seconds, you can override it with bokchoy.WithTTL option.

Helpers

Let's define our previous queue:

queue := engine.Queue("tasks.message")
Empty the queue
queue.Empty()

It will remove all waiting tasks from your queue.

Cancel a waiting task

We produce a task without running the worker:

payload := map[string]string{
    "data": "hello world",
}

task, err := queue.Publish(ctx, payload)
if err != nil {
    log.Fatal(err)
}

Then we can cancel it by using its ID:

queue.Cancel(ctx, task.ID)
Retrieve a published task from the queue
queue.Get(ctx, task.ID)
Retrieve statistics from a queue
stats, err := queue.Count(ctx)
if err != nil {
    log.Fatal(err)
}

fmt.Println("Number of waiting tasks:", stats.Direct)
fmt.Println("Number of delayed tasks:", stats.Delayed)
fmt.Println("Number of total tasks:", stats.Total)

Middleware handlers

Bokchoy comes equipped with an optional middleware package, providing a suite of standard middlewares. Middlewares have the same API as handlers. It's easy to implement them and think of them like net/http middlewares, they share the same purpose to follow the lifecycle of a Bokchoy request.

Core middlewares

bokchoy/middleware description
Logger Logs the start and end of each request with the elapsed processing time
Recoverer Gracefully absorb panics and prints the stack trace
RequestID Injects a request ID into the context of each request
Timeout Signals to the request context when the timeout deadline is reached

See middleware directory for more information.

FAQs

Are Task IDs unique?

Yes! There are based on ulid.

Is exactly-once execution of tasks guaranteed?

It's guaranteed by the underlying broker, it uses BRPOP/BLPOP from Redis.

If multiple clients are blocked for the same key, the first client to be served is the one that was waiting for more time (the first that blocked for the key).

Contributing

Don't hesitate ;)

Project history

Bokchoy is highly influenced by the great rq and celery.

Both are great projects well maintained but only used in a Python ecosystem.

Some parts (middlewares mostly) of Bokchoy are heavily inspired or taken from go-chi.

Documentation

Index

Constants

View Source
const (
	Version = "v0.2.0"
)

Variables

View Source
var (
	// ErrorCtxKey is the context.Context key to store
	// the recovered error from the middleware
	ErrorCtxKey = &contextKey{"Error"}

	// TaskCtxKey is the context.Context key to store
	// the task from the middleware
	TaskCtxKey = &contextKey{"Task"}

	// AfterRequestCtxKey is the context.Context to store
	// functions to execute after the request
	AfterRequestCtxKey = &contextKey{"AfterRequest"}
)
View Source
var (
	// ErrAttributeError is returned when an attribute is not found.
	ErrAttributeError = fmt.Errorf("Attribute error")

	// ErrTaskCanceled is returned when a task is canceled.
	ErrTaskCanceled = fmt.Errorf("Task canceled")

	// ErrTaskNotFound is returned when a task is not found.
	ErrTaskNotFound = fmt.Errorf("Task not found")

	// ErrNoQueueToRun is returned when no queue has been found to run.
	ErrNoQueueToRun = fmt.Errorf("No queue to run")
)
View Source
var (
	// Normal colors
	ColorBlack   = Color{'\033', '[', '3', '0', 'm'}
	ColorRed     = Color{'\033', '[', '3', '1', 'm'}
	ColorGreen   = Color{'\033', '[', '3', '2', 'm'}
	ColorYellow  = Color{'\033', '[', '3', '3', 'm'}
	ColorBlue    = Color{'\033', '[', '3', '4', 'm'}
	ColorMagenta = Color{'\033', '[', '3', '5', 'm'}
	ColorCyan    = Color{'\033', '[', '3', '6', 'm'}
	ColorWhite   = Color{'\033', '[', '3', '7', 'm'}

	// Bright colors
	ColorBrightBlack   = Color{'\033', '[', '3', '0', ';', '1', 'm'}
	ColorBrightRed     = Color{'\033', '[', '3', '1', ';', '1', 'm'}
	ColorBrightGreen   = Color{'\033', '[', '3', '2', ';', '1', 'm'}
	ColorBrightYellow  = Color{'\033', '[', '3', '3', ';', '1', 'm'}
	ColorBrightBlue    = Color{'\033', '[', '3', '4', ';', '1', 'm'}
	ColorBrightMagenta = Color{'\033', '[', '3', '5', ';', '1', 'm'}
	ColorBrightCyan    = Color{'\033', '[', '3', '6', ';', '1', 'm'}
	ColorBrightWhite   = Color{'\033', '[', '3', '7', ';', '1', 'm'}

	ColorReset = Color{'\033', '[', '0', 'm'}
)

DefaultTracer is the default tracer.

Functions

func GetContextError

func GetContextError(ctx context.Context) error

GetContextError returns the in-context recovered error for a request.

func ID added in v0.2.0

func ID() string

func WithContextAfterRequestFunc

func WithContextAfterRequestFunc(ctx context.Context, f AfterRequestFunc) context.Context

WithContextAfterRequestFunc registers a new function to be executed after the request

func WithContextError

func WithContextError(ctx context.Context, err error) context.Context

WithContextError sets the in-context error for a request.

func WithContextTask

func WithContextTask(ctx context.Context, task *Task) context.Context

WithContextTask sets the in-context task for a request.

Types

type AfterRequestFunc

type AfterRequestFunc func()

AfterRequestFunc is a function which will execute after the request

func GetContextAfterRequestFuncs

func GetContextAfterRequestFuncs(ctx context.Context) []AfterRequestFunc

GetContextAfterRequestFuncs returns the registered functions which will execute after the request

type Bokchoy

type Bokchoy struct {
	Serializer Serializer
	Logger     logging.Logger
	Tracer     Tracer
	// contains filtered or unexported fields
}

Bokchoy is the main object which stores all configuration, queues and broker.

func New

func New(ctx context.Context, cfg Config, options ...Option) (*Bokchoy, error)

New initializes a new Bokchoy instance.

func (*Bokchoy) Empty

func (b *Bokchoy) Empty(ctx context.Context) error

Empty empties initialized queues.

func (*Bokchoy) Flush

func (b *Bokchoy) Flush() error

Flush flushes data of the entire system.

func (*Bokchoy) Handle

func (b *Bokchoy) Handle(queueName string, sub Handler, options ...Option)

Handle registers a new handler to consume tasks for a queue.

func (*Bokchoy) HandleFunc

func (b *Bokchoy) HandleFunc(queueName string, f HandlerFunc, options ...Option)

HandleFunc registers a new handler function to consume tasks for a queue.

func (*Bokchoy) Publish

func (b *Bokchoy) Publish(ctx context.Context, queueName string, payload interface{}, options ...Option) (*Task, error)

Publish publishes a new payload to a queue.

func (*Bokchoy) Queue

func (b *Bokchoy) Queue(name string) *Queue

Queue gets or creates a new queue.

func (*Bokchoy) QueueNames

func (b *Bokchoy) QueueNames() []string

QueueNames returns the managed queue names.

func (*Bokchoy) Run

func (b *Bokchoy) Run(ctx context.Context, options ...Option) error

Run runs the system and block the current goroutine.

func (*Bokchoy) ServerNames added in v0.2.0

func (b *Bokchoy) ServerNames() []string

ServerNames returns the managed server names.

func (*Bokchoy) Stop

func (b *Bokchoy) Stop(ctx context.Context)

Stop stops all queues and consumers.

func (*Bokchoy) Use

func (b *Bokchoy) Use(sub ...func(Handler) Handler) *Bokchoy

Use append a new middleware to the system.

type Broker

type Broker interface {
	// Initialize initializes the broker.
	Initialize(context.Context) error

	// Ping pings the broker to ensure it's well connected.
	Ping() error

	// Get returns raw data stored in broker.
	Get(string) (map[string]interface{}, error)

	// Delete deletes raw data in broker based on key.
	Delete(string, string) error

	// List returns raw data stored in broker.
	List(string) ([]map[string]interface{}, error)

	// Empty empties a queue.
	Empty(string) error

	// Flush flushes the entire broker.
	Flush() error

	// Count returns number of items from a queue name.
	Count(string) (BrokerStats, error)

	// Save synchronizes the stored item.
	Set(string, map[string]interface{}, time.Duration) error

	// Publish publishes raw data.
	Publish(string, string, map[string]interface{}, time.Time) error

	// Consume returns an array of raw data.
	Consume(context.Context, string, time.Time) ([]map[string]interface{}, error)
}

Broker is the common interface to define a Broker.

type BrokerConfig

type BrokerConfig struct {
	Type  string
	Redis RedisConfig
}

BrokerConfig contains the broker configuration.

type BrokerStats added in v0.2.0

type BrokerStats struct {
	Total   int
	Direct  int
	Delayed int
}

BrokerStats is the statistics returned by a Queue.

type Color added in v0.2.0

type Color []byte

Color is a terminal color representation.

type ColorWriter added in v0.2.0

type ColorWriter struct {
	*bytes.Buffer
	// contains filtered or unexported fields
}

ColorWriter is a bytes buffer with color.

func NewColorWriter added in v0.2.0

func NewColorWriter(color Color) *ColorWriter

NewColorWriter initializes a new ColorWriter.

func (ColorWriter) WithColor added in v0.2.0

func (c ColorWriter) WithColor(color Color) *ColorWriter

WithColor returns a new ColorWriter with a new color.

func (*ColorWriter) Write added in v0.2.0

func (c *ColorWriter) Write(s string, args ...interface{})

Write writes an output to stdout. nolint: errcheck

type Config

type Config struct {
	Queues     []QueueConfig
	Broker     BrokerConfig
	Serializer SerializerConfig
}

Config contains the main configuration to initialize Bokchoy.

type Handler

type Handler interface {
	Handle(*Request) error
}

Handler is an interface to implement a task handler.

type HandlerFunc

type HandlerFunc func(*Request) error

HandlerFunc is a handler to handle incoming tasks.

func (HandlerFunc) Handle

func (s HandlerFunc) Handle(r *Request) error

Handle consumes the request.

type JSONSerializer

type JSONSerializer struct {
}

func (JSONSerializer) Dumps

func (s JSONSerializer) Dumps(v interface{}) ([]byte, error)

func (JSONSerializer) Loads

func (s JSONSerializer) Loads(data []byte, v interface{}) error

func (JSONSerializer) String

func (s JSONSerializer) String() string

type Option

type Option func(opts *Options)

Option is an option unit.

func WithBroker added in v0.2.0

func WithBroker(broker Broker) Option

WithBroker registers new broker.

func WithConcurrency

func WithConcurrency(concurrency int) Option

WithConcurrency defines the number of concurrent consumers.

func WithCountdown

func WithCountdown(countdown time.Duration) Option

WithCountdown defines the countdown to launch a delayed task.

func WithDisableOutput

func WithDisableOutput(disableOutput bool) Option

WithDisableOutput defines if the output (logo, queues information) should be disabled.

func WithInitialize

func WithInitialize(initialize bool) Option

WithInitialize defines if the broker needs to be initialized.

func WithLogger

func WithLogger(logger logging.Logger) Option

WithLogger defines the Logger.

func WithMaxRetries

func WithMaxRetries(maxRetries int) Option

WithMaxRetries defines the number of maximum retries for a failed task.

func WithQueues

func WithQueues(queues []string) Option

WithQueues allows to override queues to run.

func WithRetryIntervals

func WithRetryIntervals(retryIntervals []time.Duration) Option

WithRetryIntervals defines the retry intervals for a failed task.

func WithSerializer

func WithSerializer(serializer Serializer) Option

WithSerializer defines the Serializer.

func WithServers added in v0.2.0

func WithServers(servers []Server) Option

WithServers registers new servers to be run.

func WithTTL

func WithTTL(ttl time.Duration) Option

WithTTL defines the duration to keep the task in the broker.

func WithTimeout

func WithTimeout(timeout time.Duration) Option

WithTimeout defines the timeout used to execute a task.

func WithTracer

func WithTracer(tracer Tracer) Option

WithTracer defines the Tracer.

type Options

type Options struct {
	Tracer         Tracer
	Logger         logging.Logger
	Concurrency    int
	MaxRetries     int
	TTL            time.Duration
	Countdown      *time.Duration
	Timeout        time.Duration
	RetryIntervals []time.Duration
	Serializer     Serializer
	Initialize     bool
	Queues         []string
	DisableOutput  bool
	Servers        []Server
	Broker         Broker
}

Options is the bokchoy options.

func (Options) RetryIntervalsDisplay

func (o Options) RetryIntervalsDisplay() string

RetryIntervalsDisplay returns a string representation of the retry intervals.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue contains consumers to enqueue.

func (*Queue) Cancel

func (q *Queue) Cancel(ctx context.Context, taskID string) (*Task, error)

Cancel cancels a task using its ID.

func (*Queue) Consume

func (q *Queue) Consume(ctx context.Context) ([]*Task, error)

Consume returns an array of tasks.

func (*Queue) Consumer

func (q *Queue) Consumer() *consumer

Consumer returns a random consumer.

func (*Queue) Count

func (q *Queue) Count(ctx context.Context) (BrokerStats, error)

Count returns statistics from queue: * direct: number of waiting tasks * delayed: number of waiting delayed tasks * total: number of total tasks

func (*Queue) Empty

func (q *Queue) Empty(ctx context.Context) error

Empty empties queue.

func (*Queue) Get

func (q *Queue) Get(ctx context.Context, taskID string) (*Task, error)

Get returns a task instance from the broker with its id.

func (*Queue) Handle

func (q *Queue) Handle(sub Handler, options ...Option) *Queue

Handle registers a new handler to consume tasks.

func (*Queue) HandleFunc

func (q *Queue) HandleFunc(f HandlerFunc, options ...Option) *Queue

HandleFunc registers a new handler function to consume tasks.

func (*Queue) HandleRequest

func (q *Queue) HandleRequest(ctx context.Context, r *Request) error

HandleRequest handles a request synchronously with a consumer.

func (*Queue) List added in v0.2.0

func (q *Queue) List(ctx context.Context) ([]*Task, error)

List returns tasks from the broker.

func (Queue) MarshalLogObject

func (q Queue) MarshalLogObject(enc logging.ObjectEncoder) error

MarshalLogObject returns the log representation for the queue.

func (Queue) Name

func (q Queue) Name() string

Name returns the queue name.

func (*Queue) NewTask

func (q *Queue) NewTask(payload interface{}, options ...Option) *Task

NewTask returns a new task instance from payload and options.

func (*Queue) OnComplete

func (q *Queue) OnComplete(sub Handler) *Queue

OnComplete registers a new handler to be executed when a task is completed.

func (*Queue) OnCompleteFunc

func (q *Queue) OnCompleteFunc(f HandlerFunc) *Queue

OnCompleteFunc registers a new handler function to be executed when a task is completed.

func (*Queue) OnFailure

func (q *Queue) OnFailure(sub Handler) *Queue

OnFailure registers a new handler to be executed when a task is failed.

func (*Queue) OnFailureFunc

func (q *Queue) OnFailureFunc(f HandlerFunc) *Queue

OnFailureFunc registers a new handler function to be executed when a task is failed.

func (*Queue) OnStart

func (q *Queue) OnStart(sub Handler) *Queue

OnStart registers a new handler to be executed when a task is started.

func (*Queue) OnStartFunc

func (q *Queue) OnStartFunc(f HandlerFunc) *Queue

OnStartFunc registers a new handler function to be executed when a task is started.

func (*Queue) OnSuccess

func (q *Queue) OnSuccess(sub Handler) *Queue

OnSuccess registers a new handler to be executed when a task is succeeded.

func (*Queue) OnSuccessFunc

func (q *Queue) OnSuccessFunc(f HandlerFunc) *Queue

OnSuccessFunc registers a new handler function to be executed when a task is succeeded.

func (*Queue) Publish

func (q *Queue) Publish(ctx context.Context, payload interface{}, options ...Option) (*Task, error)

Publish publishes a new payload to the queue.

func (*Queue) PublishTask

func (q *Queue) PublishTask(ctx context.Context, task *Task) error

PublishTask publishes a new task to the queue.

func (*Queue) Save

func (q *Queue) Save(ctx context.Context, task *Task) error

Save saves a task to the queue.

func (*Queue) Use

func (q *Queue) Use(sub ...func(Handler) Handler) *Queue

Use appends a new handler middleware to the queue.

type QueueConfig

type QueueConfig struct {
	Name string
}

QueueConfig contains queue information that should be initialized.

type RedisBroker added in v0.2.0

type RedisBroker struct {
	ClientType string
	Client     redis.UniversalClient
	Prefix     string
	Logger     logging.Logger
	// contains filtered or unexported fields
}

RedisBroker is the redis broker.

func NewRedisBroker added in v0.2.1

func NewRedisBroker(clt redis.UniversalClient, clientType string, prefix string, logger logging.Logger) *RedisBroker

NewRedisBroker initializes a new redis broker instance.

func (*RedisBroker) Consume added in v0.2.0

func (p *RedisBroker) Consume(ctx context.Context, name string, eta time.Time) ([]map[string]interface{}, error)

Consume returns an array of raw data.

func (*RedisBroker) Count added in v0.2.0

func (p *RedisBroker) Count(queueName string) (BrokerStats, error)

Count returns number of items from a queue name.

func (*RedisBroker) Delete added in v0.2.0

func (p *RedisBroker) Delete(name string, taskID string) error

Delete deletes raw data in broker based on key.

func (*RedisBroker) Empty added in v0.2.0

func (p *RedisBroker) Empty(name string) error

Empty removes the redis key for a queue.

func (*RedisBroker) Flush added in v0.2.0

func (p *RedisBroker) Flush() error

Flush flushes the entire redis database.

func (*RedisBroker) Get added in v0.2.0

func (p *RedisBroker) Get(taskKey string) (map[string]interface{}, error)

Get returns stored raw data from task key.

func (*RedisBroker) Initialize added in v0.2.0

func (p *RedisBroker) Initialize(ctx context.Context) error

Initialize initializes the redis broker.

func (*RedisBroker) List added in v0.2.0

func (p *RedisBroker) List(name string) ([]map[string]interface{}, error)

func (RedisBroker) Ping added in v0.2.0

func (p RedisBroker) Ping() error

Ping pings the redis broker to ensure it's well connected.

func (*RedisBroker) Publish added in v0.2.0

func (p *RedisBroker) Publish(queueName string,
	taskID string, data map[string]interface{}, eta time.Time) error

Publish publishes raw data. it uses a hash to store the task itself pushes the task id to the list or a zset if the task is delayed.

func (*RedisBroker) Set added in v0.2.0

func (p *RedisBroker) Set(taskKey string, data map[string]interface{}, expiration time.Duration) error

Save synchronizes the stored item in redis.

func (RedisBroker) String added in v0.2.0

func (p RedisBroker) String() string

type RedisClientConfig

type RedisClientConfig redis.Options

RedisClientConfig contains the redis client configuration.

type RedisClusterConfig

type RedisClusterConfig redis.ClusterOptions

RedisClusterConfig contains the redis cluster configuration.

type RedisConfig

type RedisConfig struct {
	Type     string
	Prefix   string
	Client   RedisClientConfig
	Cluster  RedisClusterConfig
	Sentinel RedisSentinelConfig
}

RedisConfig contains all redis configuration: client, sentinel (failover), cluster.

type RedisSentinelConfig

type RedisSentinelConfig redis.FailoverOptions

RedisSentinelConfig contains the redis sentinel configuration.

type Request

type Request struct {
	Task *Task
	// contains filtered or unexported fields
}

Request is the bokchoy Request which will be handled by a subscriber handler.

func (*Request) Context

func (r *Request) Context() context.Context

Context returns the context attached to the Request.

func (Request) String

func (r Request) String() string

String returns a string representation of a Request

func (*Request) WithContext

func (r *Request) WithContext(ctx context.Context) *Request

WithContext creates a new Request with a context

type Serializer

type Serializer interface {
	Dumps(interface{}) ([]byte, error)
	Loads([]byte, interface{}) error
}

Serializer defines an interface to implement a serializer.

type SerializerConfig

type SerializerConfig struct {
	Type string
}

SerializerConfig contains a serializer configuration to store tasks.

type Server added in v0.2.0

type Server interface {
	Start(context.Context) error
	Stop(context.Context)
}

type Task

type Task struct {
	ID             string
	Name           string
	PublishedAt    time.Time
	StartedAt      time.Time
	ProcessedAt    time.Time
	Status         int
	OldStatus      int
	MaxRetries     int
	Payload        interface{}
	Result         interface{}
	Error          interface{}
	ExecTime       float64
	TTL            time.Duration
	Timeout        time.Duration
	ETA            time.Time
	RetryIntervals []time.Duration
}

Task is the model stored in a Queue.

func GetContextTask

func GetContextTask(ctx context.Context) *Task

GetContextTask returns the in-context task for a request.

func NewTask

func NewTask(name string, payload interface{}, options ...Option) *Task

NewTask initializes a new Task.

func TaskFromPayload

func TaskFromPayload(data map[string]interface{}, serializer Serializer) (*Task, error)

TaskFromPayload returns a Task instance from raw data.

func (Task) ETADisplay

func (t Task) ETADisplay() string

ETADisplay returns the string representation of the ETA.

func (*Task) Finished

func (t *Task) Finished() bool

Finished returns if a task is finished or not.

func (*Task) IsStatusCanceled

func (t *Task) IsStatusCanceled() bool

IsStatusCanceled returns if the task status is canceled.

func (*Task) IsStatusFailed

func (t *Task) IsStatusFailed() bool

IsStatusFailed returns if the task status is failed.

func (*Task) IsStatusProcessing

func (t *Task) IsStatusProcessing() bool

IsStatusProcessing returns if the task status is processing.

func (*Task) IsStatusSucceeded

func (t *Task) IsStatusSucceeded() bool

IsStatusSucceeded returns if the task status is succeeded.

func (*Task) IsStatusWaiting

func (t *Task) IsStatusWaiting() bool

IsStatusWaiting returns if the task status is waiting.

func (Task) Key

func (t Task) Key() string

Key returns the task key.

func (*Task) MarkAsCanceled

func (t *Task) MarkAsCanceled()

MarkAsCanceled marks a task as canceled.

func (*Task) MarkAsFailed

func (t *Task) MarkAsFailed(err error)

MarkAsFailed marks a task as failed.

func (*Task) MarkAsProcessing

func (t *Task) MarkAsProcessing()

MarkAsProcessing marks a task as processing.

func (*Task) MarkAsSucceeded

func (t *Task) MarkAsSucceeded()

MarkAsSucceeded marks a task as succeeded.

func (Task) MarshalLogObject

func (t Task) MarshalLogObject(enc logging.ObjectEncoder) error

MarshalLogObject returns the log representation for the task.

func (Task) RetryETA

func (t Task) RetryETA() time.Time

RetryETA returns the next ETA.

func (Task) RetryIntervalsDisplay

func (t Task) RetryIntervalsDisplay() string

RetryIntervalsDisplay returns the string representation of the retry intervals.

func (Task) Serialize

func (t Task) Serialize(serializer Serializer) (map[string]interface{}, error)

Serialize serializes a Task to raw data.

func (Task) StatusDisplay

func (t Task) StatusDisplay() string

StatusDisplay returns the status in human representation.

func (Task) String

func (t Task) String() string

String returns the string representation of Task.

type Tracer

type Tracer interface {
	Log(context.Context, string, error)
}

Tracer is a component used to trace errors.

func NewLoggerTracer

func NewLoggerTracer(logger logging.Logger) Tracer

NewLoggerTracer initializes a new Tracer instance.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL