backlite

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 23, 2025 License: MIT Imports: 11 Imported by: 15

README

Backlite: Type-safe, persistent, embedded task queues and background job runner w/ SQLite

Go Report Card Test Go Reference GoT Mentioned in Awesome Go

Logo

Table of Contents

Introduction

Overview

Backlite provides type-safe, persistent and embedded task queues meant to run within your application as opposed to an external message broker. A task can be of any type and each type declares its own queue along with the configuration for the queue. Tasks are automatically executed in the background via a configurable worker pool.

Origin

This project started shortly after migrating Pagoda to SQLite from Postgres and Redis. Redis was previously used to handle task queues and I wanted to leverage SQLite instead. Originally, goqite was chosen, which is a great library and one that I took inspiration from, but I had a lot of different ideas for the overall approach and it lacked a number of features that I felt were needed.

River, an excellent, similar library built for Postgres, was also a major source of inspiration and ideas.

Screenshots

Failed tasks Task details Task failed

Status

This project is under active development, though all features outlined below are available and complete. No significant API or schema changes are expected at this time, but it is certainly possible.

Installation

Install by simply running: go get github.com/mikestefanello/backlite

Features

Type-safety

No need to deal with serialization and byte slices. By leveraging generics, tasks and queues are completely type-safe which means that you pass in your task type into a queue and your task processor callbacks will only receive that given type.

Persistence with SQLite

When tasks are added to a queue, they are inserted into a SQLite database table to ensure persistence.

Optional retention

Each queue can have completed tasks retained in a separate table for archiving, auditing, monitoring, etc. Options exist to retain all completed tasks or only those that failed all attempts. An option also exists to retain the task data for all tasks or only those that failed.

Retry & Backoff

Each queue can be configured to retry tasks a certain number of times and to backoff a given amount of time between each attempt.

Scheduled execution

When adding a task to a queue, you can specify a duration or specific time to wait until executing the task.

Logging

Optionally log queue operations with a logger of your choice, as long as it implements the simple Logger interface, which log/slog does.

2024/07/21 14:08:13 INFO task processed id=0190d67a-d8da-76d4-8fb8-ded870d69151 queue=example duration=85.101µs attempt=1

Nested tasks

While processing a given task, it's easy to create another task in the same or a different queue, which allows you to nest tasks to create workflows. Use FromContext() with the provided context to get your initialized client from within the task processor, and add one or many tasks.

Graceful shutdown

The task dispatcher, which handles sending tasks to the worker pool for execution, can be shutdown gracefully by calling Stop() on the client. That will wait for all workers to finish for as long as the passed in context is not cancelled. The hard-stop the dispatcher, cancel the context passed in when calling Start(). See usage below.

Transactions

Task creation can be added to a given database transaction. If you are using SQLite as your primary database, this provides a simple, robust way to ensure data integrity. For example, using the eCommerce app example, when inserting a new order into your database, the same transaction to be used to add a task to send an order notification email, and they either both succeed or both fail. Use the chained method Tx() to provide your transaction when adding one or multiple tasks.

No database polling

Since SQLite only supports one writer, no continuous database polling is required. The task dispatcher is able to remain aware of new tasks and keep track of when future tasks are scheduled for, and thus only queries the database when it needs to.

Driver flexibility

Use any SQLite driver that you'd like. This library only includes go-sqlite3 since it is used in tests.

Bulk inserts

Insert one or many tasks across one or many queues in a single operation.

Execution timeout

Each queue can be configured with an execution timeout for processing a given task. The provided context will cancel after the time elapses. If you want to respect that timeout, your processor code will have to listen for the context cancellation.

Background worker pool

When creating a client, you can specify the amount of goroutines to use to build a worker pool. This pool is created and shutdown via the dispatcher by calling Start() and Stop() on the client. The worker pool is the only way to process tasks; they cannot be pulled manually.

Web UI

A simple web UI to monitor running, upcoming, and completed tasks is provided.

To run, pass your *sql.DB to ui.NewHandler() and register that to an HTTP handler, for example:

mux := http.DefaultServeMux
h, err := ui.NewHandler(ui.Config{
    DB: db,
})
h.Register(mux)
err := http.ListenAndServe(":9000", mux)

Then visit the given port and/or domain in your browser (ie, localhost:9000).

The web CSS is provided by tabler.

Panic recovery

If any of your task processors panics, the application with automatically recover, mark the task as failed, and store the panic message as the error message.

Usage

Client initialization

First, open a connection to your SQLite database using the driver of your choice:

db, err := sql.Open("sqlite3", "data.db?_journal=WAL&_timeout=5000")

Second, initialize a client:

client, err := backlite.NewClient(backlite.ClientConfig{
    DB:              db,
    Logger:          slog.Default(),
    ReleaseAfter:    10 * time.Minute,
    NumWorkers:      10,
    CleanupInterval: time.Hour,
})

The configuration options are:

  • DB: The database connection.
  • Logger: A logger that implements the Logger interface. Omit if you do not want to log.
  • ReleaseAfter: The duration after which tasks claimed and passed for execution should be added back to the queue if a response was never received.
  • NumWorkers: The amount of goroutines to open which will process queued tasks.
  • CleanupInterval: How often the completed tasks database table will attempt to remove expired rows.

Schema installation

Until a more robust system is provided, to install the database schema, call client.Install(). This must be done prior to using the client. It is safe to call this if the schema was previously installed. The schema is currently defined in internal/query/schema.sql.

Declaring a Task type

Any type can be a task as long as it implements the Task interface, which requires only the Config() QueueConfig method, used to provide information about the queue that these tasks will be added to. All fields should be exported. As an example, this is a task used to send new order email notifications:

type NewOrderEmailTask struct {
    OrderID string
    EmailAddress string
}

Then implement the Task interface by providing the queue configuration:

func (t NewOrderEmailTask) Config() backlite.QueueConfig {
    return backlite.QueueConfig{
        Name:        "NewOrderEmail",
        MaxAttempts: 5,
        Backoff:     5 * time.Second,
        Timeout:     10 * time.Second,
        Retention: &backlite.Retention{
            Duration:   6 * time.Hour,
            OnlyFailed: false,
            Data: &backlite.RetainData{
                OnlyFailed: true,
            },
        },
    }
}

The configuration options are:

  • Name: The name of the queue. This must be unique otherwise registering the queue will fail.
  • MaxAttempts: The maximum number of times to try executing this task before it's consider failed and marked as complete.
  • Backoff: The amount of time to wait before retrying after a failed attempt at processing.
  • Retention: If provided, completed tasks will be retained in the database in a separate table according to the included options.
    • Duration: How long to retain completed tasks in the database for. Omit to never expire.
    • OnlyFailed: If true, only failed tasks will be retained.
    • Data: If provided, the task data (the serialized task itself) will be retained.
      • OnlyFailed: If true, the task data will only be retained for failed tasks.

Queue processor

The easiest way to implement a queue and define the processor is to use backlite.NewQueue(). This leverages generics in order to provide type-safety with a given task type. Using the example above:

processor := func(ctx context.Context, task NewOrderEmailTask) error {
    return email.Send(ctx, task.EmailAddress, fmt.Sprintf("Order %s received", task.OrderID))
}

queue := backlite.NewQueue[NewOrderEmailTask](processor)

The parameter is the processor callback which is what will be called by the dispatcher worker pool to execute the task. If no error is returned, the task is considered successfully executed. If the task fails all attempts and the queue has retention enabled, the value of the error will be stored in the database.

The provided context will be set to timeout at the duration set in the queue settings, if provided. To get the client from the context, you can call client := backlite.FromContext(ctx).

Registering a queue

You must register all queues with the client by calling client.Register(queue). This will panic if duplicate queue names are registered.

Adding tasks

To add a task to the queue, simply pass one or many into client.Add(). You can provide tasks of different types. This returns a chainable operation which contains many options, that can be used as follows:

ids, err := client.
    Add(task1, task2).
    Ctx(ctx).
    Tx(tx).
    At(time.Date(2024, 1, 5, 12, 30, 00)).
    Wait(15 * time.Minute).
    Save()

Only Add() and Save() are required. Don't use At() and Wait() together as they override each other.

The options are:

  • Ctx: Provide a context to use for the operation.
  • Tx: Provide a database transaction to add the tasks to. You must commit this yourself then call client.Notify() to tell the dispatcher that the new task(s) were added. This may be improved in the future but for now it is required.
  • At: Don't execute this task until at least the given date and time.
  • Wait: Wait at least the given duration before executing the task.

Task status

You can use the client to get the status of a given task. If the task type does not retain completed tasks, the status returned for completed tasks will be TaskStatusNotFound.

status, err := client.Status(taskID)

Starting the dispatcher

To start the dispatcher, which will spin up the worker pool and begin executing tasks in the background, call client.Start(). The context you pass in must persist for as long as you want the dispatcher to continue working. If that is ever cancelled, the dispatcher will shutdown. See the next section for more details.

Shutting down the dispatcher

To gracefully shutdown the dispatcher, which will wait until all tasks currently being executed are finished, call client.Stop(). You can provide a context with a given timeout in order to give the shutdown process a set amount of time to gracefully shutdown. For example:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
client.Stop(ctx)

This will wait up to 5 seconds for all workers to complete the task they are currently working on.

If you want to hard-stop the dispatcher, cancel the context that was provided when calling client.Start().

Example

To see a working example, check out the example provided in Pagoda. When the app starts, a queue is defined and the dispatcher is started. There is a web route that includes a form which creates a task in the queue when it is submitted.

Roadmap

  • Expand testing
  • Hooks
  • Expand processor context to store attempt number, other data
  • Avoid needing to call Notify() when using transaction
  • Queue priority
  • Better handling of database schema, migrations
  • Store queue stats in a separate table?
  • Pause/resume queues
  • Benchmarks

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a client used to register queues and add tasks to them for execution.

func FromContext

func FromContext(ctx context.Context) *Client

FromContext returns a Client from a context which is set for queue processor callbacks, so they can access the client in order to create additional tasks.

func NewClient

func NewClient(cfg ClientConfig) (*Client, error)

NewClient initializes a new Client

func (*Client) Add

func (c *Client) Add(tasks ...Task) *TaskAddOp

Add starts an operation to add one or many tasks.

func (*Client) Install

func (c *Client) Install() error

Install installs the provided schema in the database. TODO provide migrations

func (*Client) Notify

func (c *Client) Notify()

Notify notifies the dispatcher that a new task has been added. This is only needed and required if you supply a database transaction when adding a task. See TaskAddOp.Tx().

func (*Client) Register

func (c *Client) Register(queue Queue)

Register registers a new Queue so tasks can be added to it. This will panic if the name of the queue provided has already been registered.

func (*Client) Start

func (c *Client) Start(ctx context.Context)

Start starts the dispatcher so queued tasks can automatically be executed in the background. To gracefully shut down the dispatcher, call Stop(), or to hard-stop, cancel the provided context.

func (*Client) Status added in v0.6.0

func (c *Client) Status(ctx context.Context, taskID string) (TaskStatus, error)

Status returns the status of a task with a given ID. If the queue does not retain completed tasks, TaskStatusNotFound will be returned for completed tasks rather than TaskStatusSuccess or TaskStatusFailure.

func (*Client) Stop

func (c *Client) Stop(ctx context.Context) bool

Stop attempts to gracefully shut down the dispatcher before the provided context is cancelled. True is returned if all workers were able to complete their tasks prior to shutting down.

type ClientConfig

type ClientConfig struct {
	// DB is the open database connection used for storing tasks.
	DB *sql.DB

	// Logger is the logger used to log task execution.
	Logger Logger

	// NumWorkers is the number of goroutines to open to use for executing queued tasks concurrently.
	NumWorkers int

	// ReleaseAfter is the duration after which a task is released back to a queue if it has not finished executing.
	// This value should be much higher than the timeout setting used for each queue and exists as a fail-safe
	// just in case tasks become stuck.
	ReleaseAfter time.Duration

	// CleanupInterval is how often to run cleanup operations on the database in order to remove expired completed
	// tasks. If omitted, no cleanup operations will be performed and the task retention duration will be ignored.
	CleanupInterval time.Duration
}

ClientConfig contains configuration for the Client.

type Dispatcher

type Dispatcher interface {
	// Start starts the dispatcher.
	Start(context.Context)

	// Stop stops the dispatcher.
	Stop(context.Context) bool

	// Notify notifies the dispatcher that a new task has been added.
	Notify()
}

Dispatcher handles automatically pulling queued tasks and executing them via queue processors.

type Logger

type Logger interface {
	// Info logs info messages.
	Info(message string, params ...any)

	// Error logs error messages.
	Error(message string, params ...any)
}

Logger is used to log operations.

type Queue

type Queue interface {
	// Config returns the configuration for the queue.
	Config() *QueueConfig

	// Process processes the Task.
	Process(ctx context.Context, payload []byte) error
}

Queue represents a queue which contains tasks to be executed.

func NewQueue

func NewQueue[T Task](processor QueueProcessor[T]) Queue

NewQueue creates a new type-safe Queue of a given Task type

type QueueConfig

type QueueConfig struct {
	// Name is the name of the queue and must be unique.
	Name string

	// MaxAttempts are the maximum number of attempts to execute this task before it's marked as completed.
	MaxAttempts int

	// Timeout is the duration set on the context while executing a given task.
	Timeout time.Duration

	// Backoff is the duration a failed task will be held in the queue until being retried.
	Backoff time.Duration

	// Retention dictates if and how completed tasks will be retained in the database.
	// If nil, no completed tasks will be retained.
	Retention *Retention
}

QueueConfig is the configuration options for a queue.

type QueueProcessor

type QueueProcessor[T Task] func(context.Context, T) error

QueueProcessor is a generic processor callback for a given queue to process Tasks

type RetainData

type RetainData struct {
	// OnlyFailed indicates if Task payload data should only be retained for failed tasks.
	OnlyFailed bool
}

RetainData is the policy for how Task payload data will be retained in the database after the task is complete.

type Retention

type Retention struct {
	// Duration is the amount of time to retain a task for after completion.
	// If omitted, the task will be retained forever.
	Duration time.Duration

	// OnlyFailed indicates if only failed tasks should be retained.
	OnlyFailed bool

	// Data provides options for retaining Task payload data.
	// If nil, no task payload data will be retained.
	Data *RetainData
}

Retention is the policy for how completed tasks will be retained in the database.

type Task

type Task interface {
	// Config returns the configuration options for the queue that this Task will be placed in.
	Config() QueueConfig
}

Task represents a task that will be placed in to a queue for execution.

type TaskAddOp

type TaskAddOp struct {
	// contains filtered or unexported fields
}

TaskAddOp facilitates adding Tasks to the queue.

func (*TaskAddOp) At

func (t *TaskAddOp) At(processAt time.Time) *TaskAddOp

At sets the time the task should not be executed until.

func (*TaskAddOp) Ctx

func (t *TaskAddOp) Ctx(ctx context.Context) *TaskAddOp

Ctx sets the request context.

func (*TaskAddOp) Save

func (t *TaskAddOp) Save() ([]string, error)

Save saves the task, so it can be queued for execution, and returns the task IDs.

func (*TaskAddOp) Tx

func (t *TaskAddOp) Tx(tx *sql.Tx) *TaskAddOp

Tx will include the task as part of a given database transaction. When using this, it is critical that after you commit the transaction that you call Notify() on the client so the dispatcher is aware that a new task has been created, otherwise it may not be executed. This is necessary because there is, unfortunately, no way for outsiders to know if or when a transaction is committed and since the dispatcher avoids continuous polling, it needs to know when tasks are added.

func (*TaskAddOp) Wait

func (t *TaskAddOp) Wait(duration time.Duration) *TaskAddOp

Wait instructs the task to wait a given duration before it is executed.

type TaskStatus added in v0.6.0

type TaskStatus int
const (
	// TaskStatusPending indicates the task is awaiting execution.
	TaskStatusPending TaskStatus = iota

	// TaskStatusRunning indicates the task is being executed.
	TaskStatusRunning

	// TaskStatusSuccess indicates the task completed successfully.
	TaskStatusSuccess

	// TaskStatusFailure indicates the task execution failed.
	TaskStatusFailure

	// TaskStatusNotFound indicates the task was not found in the database.
	TaskStatusNotFound
)

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL