neoq

package module
v0.68.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2024 License: MIT Imports: 6 Imported by: 0

README

Neoq

Queue-agnostic background job library for Go, with a pleasant API and powerful features.

Go Reference Matrix Chat

Getting Started

See the Getting Started wiki to get started.

About

Neoq is a queue-agnostic background job library for Go, with a pleasant API and powerful features.

Queue-agnostic means that whether you're using an in-memory queue for developing and testing, or Postgres or Redis queue in production -- your job processing code doesn't change. Job handlers are agnostic to the queue providing jobs. It also means that you can mix queue types within a single application. If you have ephemeral or periodic tasks, you may want to process them in an in-memory queue, and use Postgres or Redis queues for jobs requiring queue durability.

Neoq aims to be simple, reliable, easy to integrate, and demand a minimal infrastructure footprint by providing queue backends that match your existing tech stack.

What it does

  • Multiple Backends: In-memory, Postgres, Redis, or user-supplied custom backends.
  • Retries: Jobs may be retried a configurable number of times with exponential backoff and jitter to prevent thundering herds
  • Job uniqueness: jobs are fingerprinted based on their payload and status to prevent job duplication (multiple jobs with the same payload are not re-queued)
  • Job Timeouts: Queue handlers can be configured with per-job timeouts with millisecond accuracy
  • Periodic Jobs: Jobs can be scheduled periodically using standard cron syntax
  • Future Jobs: Jobs can be scheduled in the future
  • Concurrency: Concurrency is configurable for every queue
  • Job Deadlines: If a job doesn't complete before a specific time.Time, the job expires

Getting Started

Getting started is as simple as declaring queue handlers and adding jobs. You can create multiple neoq instances with different backends to meet your application's needs. E.g. an in-memory backend instance for ephemeral jobs and a Postgres backend instance for queue durability between application restarts.

Additional documentation can be found in the wiki: https://github.com/acaloiaro/neoq/wiki

Error handling in this section is excluded for simplicity.

Add queue handlers

Queue handlers listen for Jobs on queues. Jobs may consist of any payload that is JSON-serializable.

Queue Handlers are simple Go functions that accept a Context parameter.

Example: Add a listener on the greetings queue using the default in-memory backend

ctx := context.Background()
nq, _ := neoq.New(ctx, neoq.WithBackend(memory.Backend))
nq.Start(ctx, handler.New("greetings", func(ctx context.Context) (err error) {
  j, _ := jobs.FromContext(ctx)
  log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
  return
}))

Enqueue jobs

Enqueuing adds jobs to the specified queue to be processed asynchronously.

Example: Add a "Hello World" job to the greetings queue using the default in-memory backend.

ctx := context.Background()
nq, _ := neoq.New(ctx, neoq.WithBackend(memory.Backend))
nq.Enqueue(ctx, &jobs.Job{
  Queue: "greetings",
  Payload: map[string]any{
    "message": "hello world",
  },
})

Redis

Example: Process jobs on the "greetings" queue and add a job to it using the redis backend

ctx := context.Background()
nq, _ := neoq.New(ctx,
  neoq.WithBackend(redis.Backend),
  redis.WithAddr("localhost:6379"),
  redis.WithPassword(""))

nq.Start(ctx, handler.New("greetings", func(ctx context.Context) (err error) {
  j, _ := jobs.FromContext(ctx)
  log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
  return
}))

nq.Enqueue(ctx, &jobs.Job{
  Queue: "greetings",
  Payload: map[string]interface{}{
    "message": "hello world",
  },
})

Postgres

Example: Process jobs on the "greetings" queue and add a job to it using the postgres backend

ctx := context.Background()
nq, _ := neoq.New(ctx,
  neoq.WithBackend(postgres.Backend),
  postgres.WithConnectionString("postgres://postgres:postgres@127.0.0.1:5432/neoq"),
)

nq.Start(ctx, handler.New("greetings", func(ctx context.Context) (err error) {
  j, _ := jobs.FromContext(ctx)
  log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
  return
}))

nq.Enqueue(ctx, &jobs.Job{
  Queue: "greetings",
  Payload: map[string]interface{}{
    "message": "hello world",
  },
})

Example Code

Additional example integration code can be found at https://github.com/acaloiaro/neoq/tree/main/examples

Developing

Neoq development is largely based on Nix and devenv.

After installing nix, this repository contains everything else you will need to develop and run tests.

Automatic setup and teardown

See installing direnv if you want the dev environment setup to be automated.

direnv allow allows direnv to automatically setup all tooling and dependencies in a development shell upon entering the neoq directory.

Manual setup and teardown

Neoq uses devenv to manage development environments and services.

To enter the development shell, run nix develop --impure. If devenv is installed, this step is not necessary; simply enter the neoq directory after having run direnv allow.

Running services for tests

The neoq development shell gives you the devenv executable.

To run postgres and redis for tests and development, run

devenv up

This runs Postgres and Redis in the foreground. In a separate terminal, run make test to run the test suite.

Running tests

Before submitting pull requests, always run tests locally with after having run devenv up.

Run make test to run the test suite.

Status

This project is currently pre-1.0. Future releases may change the API.

Documentation

Overview

Neoq does not aim to be the _fastest_ background job processor. It aims to be _fast_, _reliable_, and demand a _minimal infrastructure footprint_.

Index

Examples

Constants

View Source
const (
	DefaultIdleTxTimeout = 30000
	// the window of time between time.Now() and when a job's RunAfter comes due that neoq will schedule a goroutine to
	// schdule the job for execution.
	// E.g. right now is 16:00 and a job's RunAfter is 16:30 of the same date. This job will get a dedicated goroutine
	// to wait until the job's RunAfter, scheduling the job to be run exactly at RunAfter
	DefaultFutureJobWindow  = 30 * time.Second
	DefaultJobCheckInterval = 1 * time.Second
)

Variables

View Source
var ErrBackendNotSpecified = errors.New("a backend must be specified")

Functions

This section is empty.

Types

type BackendInitializer added in v0.24.0

type BackendInitializer func(ctx context.Context, opts ...ConfigOption) (backend Neoq, err error)

BackendInitializer is a function that initializes a backend

type Config added in v0.24.0

type Config struct {
	BackendInitializer     BackendInitializer
	BackendAuthPassword    string                   // password with which to authenticate to the backend
	BackendConcurrency     int                      // total number of backend processes available to process jobs
	ConnectionString       string                   // a string containing connection details for the backend
	JobCheckInterval       time.Duration            // the interval of time between checking for new future/retry jobs
	FutureJobWindow        time.Duration            // time duration between current time and job.RunAfter that future jobs get scheduled
	IdleTransactionTimeout int                      // number of milliseconds PgBackend transaction may idle before the connection is killed
	ShutdownTimeout        time.Duration            // duration to wait for jobs to finish during shutdown
	SynchronousCommit      bool                     // Postgres: Enable synchronous commits (increases durability, decreases performance)
	LogLevel               logging.LogLevel         // the log level of the default logger
	PGConnectionTimeout    time.Duration            // the amount of time to wait for a connection to become available before timing out
	RecoveryCallback       handler.RecoveryCallback // the recovery handler applied to all Handlers excuted by the associated Neoq instance
}

Config configures neoq and its backends

This configuration struct includes options for all backends. As such, some of its options are not applicable to all backends. [BackendConcurrency], for example, is only used by the redis backend. Other backends manage concurrency on a per-handler basis.

func NewConfig added in v0.24.0

func NewConfig() *Config

NewConfig initiailizes a new Config with defaults

type ConfigOption added in v0.24.0

type ConfigOption func(c *Config)

ConfigOption is a function that sets optional backend configuration

func WithBackend

func WithBackend(initializer BackendInitializer) ConfigOption

WithBackend configures neoq to initialize a specific backend for job processing.

Neoq provides two [config.BackendInitializer] that may be used with WithBackend

Example
package main

import (
	"context"
	"fmt"

	"github.com/acaloiaro/neoq"
	"github.com/acaloiaro/neoq/backends/memory"
)

func main() {
	ctx := context.Background()
	nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend))
	if err != nil {
		fmt.Println("initializing a new Neoq with no params should not return an error:", err)
		return
	}
	defer nq.Shutdown(ctx)

	fmt.Println("neoq initialized with memory backend")
}
Output:

neoq initialized with memory backend
Example (Postgres)
package main

import (
	"context"
	"fmt"
	"os"

	"github.com/acaloiaro/neoq"
	"github.com/acaloiaro/neoq/backends/postgres"
)

func main() {
	ctx := context.Background()
	var pgURL string
	var ok bool
	if pgURL, ok = os.LookupEnv("TEST_DATABASE_URL"); !ok {
		fmt.Println("Please set TEST_DATABASE_URL environment variable")
		return
	}

	nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(pgURL))
	if err != nil {
		fmt.Println("initializing a new Neoq with no params should not return an error:", err)
		return
	}
	defer nq.Shutdown(ctx)

	fmt.Println("neoq initialized with postgres backend")
}
Output:

neoq initialized with postgres backend

func WithJobCheckInterval

func WithJobCheckInterval(interval time.Duration) ConfigOption

WithJobCheckInterval configures the duration of time between checking for future jobs

func WithLogLevel added in v0.13.0

func WithLogLevel(level logging.LogLevel) ConfigOption

WithLogLevel configures the log level for neoq's default logger. By default, log level is "INFO". if SetLogger is used, WithLogLevel has no effect on the set logger

func WithRecoveryCallback added in v0.65.0

func WithRecoveryCallback(cb handler.RecoveryCallback) ConfigOption

WithRecoveryCallback configures neoq with a function to be called when fatal errors occur in job Handlers.

Recovery callbacks are useful for reporting errors to error loggers and collecting error metrics

type Neoq added in v0.24.0

type Neoq interface {
	// Enqueue queues jobs to be executed asynchronously
	Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error)

	// Start starts processing jobs on the queue specified in the Handler
	Start(ctx context.Context, h handler.Handler) (err error)

	// StartCron starts processing jobs with the specified cron schedule and handler
	//
	// See: https://pkg.go.dev/github.com/robfig/cron?#hdr-CRON_Expression_Format for details on the cron spec format
	StartCron(ctx context.Context, cron string, h handler.Handler) (err error)

	// SetLogger sets the backend logger
	SetLogger(logger logging.Logger)

	// Shutdown halts job processing and releases resources
	Shutdown(ctx context.Context)
}

Neoq interface is Neoq's primary API

Neoq is implemented by:

func New

func New(ctx context.Context, opts ...ConfigOption) (b Neoq, err error)

New creates a new backend instance for job processing.

By default, neoq initializes [memory.Backend] if New() is called without a backend configuration option.

Use neoq.WithBackend to initialize different backends.

For available configuration options see neoq.ConfigOption.

Example
package main

import (
	"context"
	"fmt"

	"github.com/acaloiaro/neoq"
	"github.com/acaloiaro/neoq/backends/memory"
)

func main() {
	ctx := context.Background()
	nq, err := neoq.New(ctx, neoq.WithBackend(memory.Backend))
	if err != nil {
		fmt.Println("initializing a new Neoq with no params should not return an error:", err)
		return
	}
	defer nq.Shutdown(ctx)

	fmt.Println("neoq initialized with default memory backend")
}
Output:

neoq initialized with default memory backend
Example (Postgres)
package main

import (
	"context"
	"fmt"
	"os"

	"github.com/acaloiaro/neoq"
	"github.com/acaloiaro/neoq/backends/postgres"
)

func main() {
	ctx := context.Background()
	var pgURL string
	var ok bool
	if pgURL, ok = os.LookupEnv("TEST_DATABASE_URL"); !ok {
		fmt.Println("Please set TEST_DATABASE_URL environment variable")
		return
	}

	nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(pgURL))
	if err != nil {
		fmt.Println("neoq's postgres backend failed to initialize:", err)
		return
	}
	defer nq.Shutdown(ctx)

	fmt.Println("neoq initialized with postgres backend")
}
Output:

neoq initialized with postgres backend

Directories

Path Synopsis
Package backends provides concrete implementations of pkg/github.com/acaloiaro/neoq/neoq.Neoq
Package backends provides concrete implementations of pkg/github.com/acaloiaro/neoq/neoq.Neoq
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL