goncordia

package module
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2026 License: MIT Imports: 9 Imported by: 0

README

goncordia

CI Go Reference GitHub release

Changelog

A job queue engine for Go that works with the database you already have.

One Driver[TTx] interface parameterized by your library's native transaction type covers Postgres, MySQL, SQLite, MongoDB, Redis, Cassandra, ClickHouse, DynamoDB, Firestore, and in-memory — without forcing you to adopt a new dependency.

tx, _ := pool.Begin(ctx)
_, _ = queries.CreateOrder(ctx, tx, order)
_, _ = client.EnqueueTx(ctx, tx, SendConfirmationArgs{OrderID: order.ID}, nil)
tx.Commit(ctx)  // job and order appear atomically

Features

  • Transactional insertsEnqueueTx shares your existing transaction; the job appears if and only if that transaction commits
  • Scheduled jobsInsertOpts.RunAt for future execution
  • Priority queues — higher priority processed first within a queue
  • Unique jobs — deduplicate by kind, args, queue, or time window
  • Retry with backoff — exponential (default), fixed, or custom RetryPolicy
  • Queue pause/resume — drain a queue without stopping workers
  • Push notifications — LISTEN/NOTIFY (Postgres), Change Streams (MongoDB), Pub/Sub (Redis); polling fallback elsewhere
  • SKIP LOCKED — lock-free concurrent fetching on Postgres and MySQL
  • Periodic / cron jobsCronScheduler with Every(d) or custom ScheduleFunc
  • MockClock — deterministic time control for tests; no time.Sleep

Backends

Driver Package Tx type Atomic insert Notes
PostgreSQL (pgx v5) driver/pgxv5 pgx.Tx LISTEN/NOTIFY, advisory locks, SKIP LOCKED
PostgreSQL / MySQL / SQLite driver/stdlib *sql.Tx pgx stdlib, go-sql-driver/mysql, modernc sqlite
gorm driver/gorm *gorm.DB thin adapter over stdlib
bun driver/bun bun.Tx thin adapter over stdlib
MongoDB 4.0+ driver/mongodb mongo.SessionContext replica set required
Redis driver/redis NoTx at-least-once; Pub/Sub notifications
Cassandra 3.11+ driver/cassandra NoTx LWT claiming; ScyllaDB / DSE compatible
ClickHouse 23+ driver/clickhouse NoTx ReplacingMergeTree; at-least-once
Amazon DynamoDB driver/dynamodb NoTx conditional writes; at-least-once
Cloud Firestore driver/firestore *firestore.Transaction RunTransaction; composite index required
In-memory driver/memory memory.NoTx no persistence; for tests

Installation

go get github.com/kirimatt/goncordia

Pick a driver:

# PostgreSQL via pgx v5
go get github.com/kirimatt/goncordia/driver/pgxv5 github.com/jackc/pgx/v5

# PostgreSQL / MySQL / SQLite via database/sql
go get github.com/kirimatt/goncordia/driver/stdlib

# gorm adapter
go get github.com/kirimatt/goncordia/driver/gorm gorm.io/gorm

# bun adapter
go get github.com/kirimatt/goncordia/driver/bun github.com/uptrace/bun

# MongoDB (replica set required)
go get github.com/kirimatt/goncordia/driver/mongodb go.mongodb.org/mongo-driver/mongo

# Redis
go get github.com/kirimatt/goncordia/driver/redis github.com/redis/go-redis/v9

# Cassandra / ScyllaDB
go get github.com/kirimatt/goncordia/driver/cassandra github.com/gocql/gocql

# ClickHouse
go get github.com/kirimatt/goncordia/driver/clickhouse github.com/ClickHouse/clickhouse-go/v2

# Amazon DynamoDB
go get github.com/kirimatt/goncordia/driver/dynamodb github.com/aws/aws-sdk-go-v2/service/dynamodb

# Cloud Firestore
go get github.com/kirimatt/goncordia/driver/firestore cloud.google.com/go/firestore

Quick start

PostgreSQL (pgx v5)
import (
    "github.com/kirimatt/goncordia"
    "github.com/kirimatt/goncordia/core"
    pgxdriver "github.com/kirimatt/goncordia/driver/pgxv5"
    "github.com/jackc/pgx/v5/pgxpool"
)

pool, _ := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
d := pgxdriver.New(pool)
d.Migrate(ctx)

type SendEmailArgs struct {
    To      string `json:"to"`
    Subject string `json:"subject"`
}
func (SendEmailArgs) Kind() string { return "send_email" }

registry := core.NewRegistry()
core.RegisterWorker(registry, core.WorkerFunc[SendEmailArgs](
    func(ctx context.Context, job *core.Job[SendEmailArgs]) error {
        return sendEmail(job.Args.To, job.Args.Subject)
    },
), core.WorkerOpts{MaxRetry: 5})

client := pgxdriver.NewClient(d, goncordia.ClientConfig{})
client.Enqueue(ctx, SendEmailArgs{To: "user@example.com", Subject: "Welcome"}, nil)

wp := pgxdriver.NewWorkerPool(d, registry, goncordia.WorkerConfig{
    Queues:      []string{"default"},
    Concurrency: 10,
})
wp.Start(ctx)  // blocks; call wp.Stop() to drain gracefully
Transactional insert (PostgreSQL)
tx, _ := pool.Begin(ctx)
_, _ = queries.CreateOrder(ctx, tx, orderParams)
_, _ = client.EnqueueTx(ctx, tx, SendConfirmationArgs{OrderID: id}, nil)
tx.Commit(ctx)  // job and order are atomic
MongoDB
import (
    mongodriver "github.com/kirimatt/goncordia/driver/mongodb"
    "go.mongodb.org/mongo-driver/mongo"
    "go.mongodb.org/mongo-driver/mongo/options"
)

client, _ := mongo.Connect(ctx, options.Client().ApplyURI(os.Getenv("MONGO_URI")))
d, err := mongodriver.New(ctx, client, "myapp")  // fails if not a replica set
d.Migrate(ctx)

mqClient := mongodriver.NewClient(d, goncordia.ClientConfig{})

// Transactional insert via mongo.SessionContext
mongoClient.UseSession(ctx, func(sc mongo.SessionContext) error {
    sc.StartTransaction()
    db.Collection("orders").InsertOne(sc, order)
    mqClient.EnqueueTx(sc, sc, SendConfirmationArgs{OrderID: order.ID}, nil)
    return sc.CommitTransaction(sc)
})
gorm
import (
    gormdriver "github.com/kirimatt/goncordia/driver/gorm"
    "gorm.io/gorm"
)

d, _ := gormdriver.New(db)  // db is *gorm.DB
d.Migrate(ctx)

client := gormdriver.NewClient(d, goncordia.ClientConfig{})

db.Transaction(func(tx *gorm.DB) error {
    tx.Create(&order)
    client.EnqueueTx(ctx, tx, SendConfirmationArgs{OrderID: order.ID}, nil)
    return nil  // commit — job appears atomically with the order
})
bun
import (
    bundriver "github.com/kirimatt/goncordia/driver/bun"
    "github.com/uptrace/bun"
)

d := bundriver.New(db)  // db is *bun.DB
d.Migrate(ctx)

client := bundriver.NewClient(d, goncordia.ClientConfig{})

tx, _ := db.BeginTx(ctx, nil)
tx.NewInsert().Model(&order).Exec(ctx)
client.EnqueueTx(ctx, tx, SendConfirmationArgs{OrderID: order.ID}, nil)
tx.Commit()
Redis
import (
    redisdriver "github.com/kirimatt/goncordia/driver/redis"
    "github.com/redis/go-redis/v9"
)

rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
d := redisdriver.New(rdb)
d.Migrate(ctx)  // pings Redis to verify connectivity

client := redisdriver.NewClient(d, goncordia.ClientConfig{})
client.Enqueue(ctx, SendEmailArgs{To: "user@example.com", Subject: "Welcome"}, nil)

// EnqueueTx is not supported on the Redis driver:
// there is no rollback guarantee. Use Enqueue (post-commit pattern) instead.
Cassandra / ScyllaDB
import (
    cassandradriver "github.com/kirimatt/goncordia/driver/cassandra"
    "github.com/gocql/gocql"
)

cluster := gocql.NewCluster("localhost")
cluster.Keyspace = "myapp"  // keyspace must already exist
session, _ := cluster.CreateSession()
defer session.Close()

d := cassandradriver.New(session)
d.Migrate(ctx)  // creates tables (idempotent)

client := cassandradriver.NewClient(d, goncordia.ClientConfig{})
client.Enqueue(ctx, SendEmailArgs{To: "user@example.com", Subject: "Welcome"}, nil)

// EnqueueTx is identical to Enqueue on Cassandra — no rollback guarantee.
// Use idempotent workers and unique job options for deduplication.
ClickHouse
import (
    clickhousedriver "github.com/kirimatt/goncordia/driver/clickhouse"
    "github.com/ClickHouse/clickhouse-go/v2"
)

conn, _ := clickhouse.Open(&clickhouse.Options{
    Addr: []string{"localhost:9000"},
    Auth: clickhouse.Auth{Database: "myapp"},
})

d := clickhousedriver.New(conn)
d.Migrate(ctx)  // creates ReplacingMergeTree tables (idempotent)

client := clickhousedriver.NewClient(d, goncordia.ClientConfig{})
client.Enqueue(ctx, SendEmailArgs{To: "user@example.com", Subject: "Welcome"}, nil)

// ClickHouse has no transactions. Jobs use at-least-once delivery — workers
// should be idempotent. Best suited for high-throughput analytics pipelines.
Amazon DynamoDB
import (
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/dynamodb"
    dynamodbdriver "github.com/kirimatt/goncordia/driver/dynamodb"
)

cfg, _ := config.LoadDefaultConfig(ctx)
svc := dynamodb.NewFromConfig(cfg)

d := dynamodbdriver.New(svc)
d.Migrate(ctx)  // creates goncordia_jobs + goncordia_uniq + goncordia_queues + goncordia_leaders (idempotent)

client := dynamodbdriver.NewClient(d, goncordia.ClientConfig{})
client.Enqueue(ctx, SendEmailArgs{To: "user@example.com", Subject: "Welcome"}, nil)

// DynamoDB has no cross-table transactions. EnqueueTx behaves like Enqueue.
// Unique-key deduplication uses PutItem with attribute_not_exists condition.
// Jobs are claimed with conditional UpdateItem — safe for concurrent workers.
Cloud Firestore
import (
    "cloud.google.com/go/firestore"
    firestoredriver "github.com/kirimatt/goncordia/driver/firestore"
)

fsClient, _ := firestore.NewClient(ctx, "my-gcp-project")
d := firestoredriver.New(fsClient)
// Migrate is a no-op; create composite index in Firebase console:
//   collection: goncordia_jobs, fields: queue (ASC), state (ASC), run_at (ASC)
d.Migrate(ctx)

client := firestoredriver.NewClient(d, goncordia.ClientConfig{})

// Transactional — job is enqueued atomically with business writes:
fsClient.RunTransaction(ctx, func(ctx context.Context, tx *firestore.Transaction) error {
    tx.Create(orders.Doc(id), orderData)
    _, err := client.EnqueueTx(ctx, tx, SendConfirmationArgs{OrderID: id}, nil)
    return err
})
SQLite (no Docker, good for tests)
import (
    _ "modernc.org/sqlite"
    stdlibdriver "github.com/kirimatt/goncordia/driver/stdlib"
)

db, _ := sql.Open("sqlite", "./jobs.db")
db.SetMaxOpenConns(1)  // SQLite: single writer

d := stdlibdriver.New(db, stdlibdriver.SQLite)
d.Migrate(ctx)

Job lifecycle

available ──► running ──► completed
                │
                ├──► retryable ──► available  (scheduled retry)
                └──► discarded               (max retries exhausted)

available ──► cancelled   (via JobCancel)
scheduled ──► available   (when run_at is reached)

InsertOpts

maxRetry := 3
client.Enqueue(ctx, SendEmailArgs{To: "user@example.com", Subject: "Welcome"}, &core.InsertOpts{
    Queue:    "critical",                    // override default queue
    Priority: 10,                            // higher = processed first
    RunAt:    time.Now().Add(time.Hour),     // schedule for later

    UniqueOpts: &core.UniqueOpts{            // deduplicate
        ByArgs:  true,
        ByQueue: true,
    },

    MaxRetry: &maxRetry,
    Tags:     []string{"user:42"},
})

WorkerConfig

goncordia.WorkerConfig{
    Queues:          []string{"default", "critical"},
    Concurrency:     20,
    PollInterval:    500 * time.Millisecond,         // fallback when no push notifications
    RetryPolicy:     core.ExponentialRetry{Base: time.Second, Max: time.Hour},
    ShutdownTimeout: 30 * time.Second,
    Clock:           clock.NewMock(time.Now()),       // omit in production; inject for tests
}

Periodic / cron jobs

CronScheduler enqueues jobs on a schedule. Pair it with a WorkerPool that processes them.

import "github.com/kirimatt/goncordia/core"

cs := goncordia.NewCronScheduler(d, []goncordia.PeriodicJob{
    {
        Schedule: core.Every(time.Hour),
        Args:     CleanupArgs{},
    },
    {
        Schedule: core.Every(24 * time.Hour),
        Args:     ReportArgs{},
        Opts:     &core.InsertOpts{Queue: "low-priority"},
    },
}, goncordia.CronConfig{
    TickInterval: time.Second, // how often to check for due jobs
})

go cs.Start(ctx)   // blocks; cancel ctx to stop
go wp.Start(ctx)   // worker pool processes the enqueued jobs
Custom schedule
// core.ScheduleFunc adapts any function to the Schedule interface.
sched := core.ScheduleFunc(func(last time.Time) time.Time {
    if last.IsZero() {
        return time.Time{} // run immediately on first tick
    }
    // Business-hours only: next run at 09:00 the following day
    next := last.Add(24 * time.Hour)
    next = time.Date(next.Year(), next.Month(), next.Day(), 9, 0, 0, 0, next.Location())
    return next
})
Notes
  • The scheduler fires each job on the first tick after Start, then respects the interval.
  • CronScheduler only enqueues — workers run via WorkerPool.
  • Add UniqueOpts to PeriodicJob.Opts to prevent duplicate jobs if multiple scheduler instances run.

Retry policies

// Exponential backoff (default): 1s, 2s, 4s, … capped at Max
core.ExponentialRetry{Base: time.Second, Max: 24 * time.Hour}

// Fixed delay
core.FixedRetry{Delay: 30 * time.Second}

// No retry — discard immediately
core.NoRetry{}

// Custom
import "github.com/kirimatt/goncordia/internal/clock"

type MyPolicy struct{}
func (MyPolicy) NextRetryAt(attempt int, err error, clk clock.Clock) time.Time {
    return clk.Now().Add(time.Duration(attempt) * time.Minute)
}

Testing

Use the in-memory driver — no database, no Docker, deterministic time:

import (
    "github.com/kirimatt/goncordia/driver/memory"
    "github.com/kirimatt/goncordia/internal/clock"
)

clk := clock.NewMock(time.Now())
d   := memory.New(memory.WithClock(clk))

client := goncordia.NewClient[memory.NoTx](d, goncordia.ClientConfig{})
wp     := goncordia.NewWorkerPool[memory.NoTx](d, registry, goncordia.WorkerConfig{Clock: clk})

go wp.Start(ctx)
clk.Advance(time.Hour)  // trigger scheduled jobs instantly

jobs := d.AllJobs()  // inspect state without a real database

Implementing a custom driver

Implement driver.Driver[TTx] where TTx is your transaction type:

type MyDriver struct{}

func (d *MyDriver) Name() string                        { return "mydb" }
func (d *MyDriver) Capabilities() driver.Capabilities  { return driver.Capabilities{NativeTx: true} }
func (d *MyDriver) Executor() driver.Executor          { return &myExecutor{} }
func (d *MyDriver) UnwrapTx(tx MyTx) driver.ExecutorTx { return &myTxExecutor{tx: tx} }
func (d *MyDriver) Listener() driver.Listener          { return nil } // nil = polling fallback
func (d *MyDriver) Close() error                       { return nil }

driver.Executor has 14 methods covering job CRUD, queue management, and leader election. See driver/driver.go for the full interface and driver/memory/memory.go for a minimal reference implementation.


Benchmarks

go test ./bench/... -bench=. -benchmem -benchtime=5s -timeout=15m

Apple M5, single process. Memory/SQLite are in-process (no network); Postgres/MongoDB/Redis run in Docker on localhost.

Enqueue — single job

Backend ns/op Notes
Memory 0.57 µs in-process mutex, no I/O
SQLite 27 µs WAL mode, single connection
Redis 109 µs ZADD over localhost
Postgres (pgx v5) 129 µs INSERT over localhost
MongoDB 338 µs insertOne over localhost
DynamoDB 632 µs PutItem over localhost
Firestore 1 195 µs RunTransaction + Create over emulator
ClickHouse 1 378 µs INSERT + new data part over localhost
Cassandra 7 216 µs LWT requires Paxos quorum (3 round trips)

EnqueueBatch(100) — 100 jobs per call

Backend ms/batch jobs/s
Memory 0.06 ms ~1 775 000
SQLite 2.8 ms ~35 300
Redis 10.9 ms ~9 200
Postgres (pgx v5) 12.8 ms ~7 800
MongoDB 34.9 ms ~2 900
DynamoDB 62.9 ms ~1 590
Firestore 93.5 ms ~1 070
ClickHouse 150 ms ~665
Cassandra 708 ms ~141

FetchAndComplete — hot worker loop path

Backend µs/op Notes
SQLite 53 µs indexed; faster than memory at scale
Memory 520 µs O(N) linear scan
Redis 729 µs Lua ZPOPMIN + HSET
DynamoDB 1 731 µs Query GSI + conditional UpdateItem
MongoDB 2 475 µs findAndModify + updateOne
Postgres (pgx v5) 12 190 µs SELECT SKIP LOCKED + UPDATE
ClickHouse 14 416 µs SELECT FINAL + INSERT new version
Cassandra 18 813 µs SELECT avail + LWT UPDATE per job
Firestore 140 943 µs Query + per-job RunTransaction on emulator

End-to-end — full WorkerPool

Backend workload concurrency jobs/s Notes
Memory 1 000 c=10 ~2 020
Redis 1 000 c=4 ~1 084 Pub/Sub notifications
SQLite 1 000 c=4 ~800
DynamoDB 1 000 c=4 ~780 polling; GSI query overhead
MongoDB 1 000 c=4 ~452 Change Streams
Postgres (pgx v5) 1 000 c=4 ~179 LISTEN/NOTIFY
Cassandra 1 000 c=4 ~153 polling; LWT overhead
ClickHouse 1 000 c=4 ~148 polling; SELECT FINAL overhead
Firestore 200 c=4 ~14 polling; per-job RunTransaction on emulator

End-to-end throughput is bounded by the 5 ms poll interval used in the benchmark. In production the pgxv5 driver uses LISTEN/NOTIFY and the Redis driver uses Pub/Sub, eliminating poll latency entirely — real throughput matches the FetchAndComplete numbers above.

Cassandra's high per-operation latency comes from Lightweight Transaction consensus (Paxos, ~3 network round trips per claim). ClickHouse's overhead comes from SELECT … FINAL deduplication at query time. DynamoDB's per-operation cost is dominated by HTTP/JSON round trips to the service — measured against DynamoDB Local on localhost, so real AWS numbers include additional network latency. Firestore's per-job RunTransaction for claiming is the primary bottleneck on the emulator; production GCP Firestore will be faster due to lower round-trip latency, but the sequential-per-job claiming model inherently limits throughput. All four backends are best suited for workloads where horizontal scale matters more than raw per-job latency.


Testing

gontest makes it easy to test workers and enqueue assertions without a real database.

go get github.com/kirimatt/goncordia/gontest

Assert that business logic enqueues the right jobs:

func TestPlaceOrder_EnqueuesConfirmationEmail(t *testing.T) {
    ctx := context.Background()
    client, tracker := gontest.NewClient(t)

    _ = PlaceOrder(ctx, client, "order-123") // calls client.Enqueue internally

    jobs := gontest.RequireEnqueued[SendEmailArgs](t, tracker, 1)
    if jobs[0].Args.OrderID != "order-123" {
        t.Errorf("unexpected order ID: %s", jobs[0].Args.OrderID)
    }
}

Unit-test a worker function without a database or pool:

func TestEmailWorker_SendsEmail(t *testing.T) {
    h := gontest.NewWorkerHelper[SendEmailArgs](emailWorker)
    if err := h.Work(ctx, SendEmailArgs{To: "user@example.com"}); err != nil {
        t.Fatal(err)
    }
}

// Or with a one-liner:
gontest.RequireWork(t, ctx, emailWorker, SendEmailArgs{To: "user@example.com"})

Test scheduled jobs with a controllable clock:

clk := gontest.NewMockClock()
client, tracker := gontest.NewClientWithClock(t, clk)

client.Enqueue(ctx, ReminderArgs{UserID: "u1"}, &core.InsertOpts{
    RunAt: clk.Now().Add(24 * time.Hour),
})

// Job exists but is not yet available:
gontest.RequireEnqueued[ReminderArgs](t, tracker, 1)

// Advance past the scheduled time:
clk.Advance(25 * time.Hour)
// now start the pool — the job will be picked up immediately

Run an end-to-end flow in memory:

registry := core.NewRegistry()
core.RegisterWorker(registry, emailWorker, core.WorkerOpts{Queue: "default"})

client, tracker := gontest.NewClient(t)
pool := tracker.NewWorkerPool(registry, goncordia.WorkerConfig{
    Queues: []string{"default"}, Concurrency: 2,
})

runCtx, cancel := context.WithCancel(ctx)
defer cancel()
go pool.Start(runCtx)
// enqueue and wait for processed.Load() >= 1 ...
pool.Stop()

Observability (OpenTelemetry)

go get github.com/kirimatt/goncordia/otel
import otelgoncordia "github.com/kirimatt/goncordia/otel"

wp := pgxdriver.NewWorkerPool(d, registry, goncordia.WorkerConfig{
    Queues:      []string{"default"},
    Concurrency: 10,
    Middleware: []goncordia.JobMiddleware{
        otelgoncordia.NewMiddleware(
            // optional — defaults to otel.GetTracerProvider() / otel.GetMeterProvider()
            otelgoncordia.WithTracerProvider(tp),
            otelgoncordia.WithMeterProvider(mp),
        ),
    },
})

Each job execution produces:

  • Span goncordia.process with attributes goncordia.job.kind, goncordia.job.queue, goncordia.job.id, goncordia.job.attempt
  • Histogram goncordia.job.duration (seconds) — labelled by kind, queue, status
  • Counter goncordia.job.count — labelled by kind, queue, status (ok / error)

Panics are recovered, converted to errors, and recorded on the span before re-triggering the retry policy — the worker pool always stays alive.

You can also add your own middleware for logging or custom metrics:

func loggingMiddleware(ctx context.Context, job *core.RawJob, next func(context.Context, *core.RawJob) error) error {
    slog.InfoContext(ctx, "job started", "kind", job.Kind, "id", job.ID)
    err := next(ctx, job)
    slog.InfoContext(ctx, "job finished", "kind", job.Kind, "err", err)
    return err
}

goncordia.WorkerConfig{
    Middleware: []goncordia.JobMiddleware{
        otelgoncordia.NewMiddleware(),
        loggingMiddleware,
    },
}

Project layout

goncordia/
├── client.go              # Client[TTx] — Enqueue, EnqueueTx, Cancel
├── worker.go              # WorkerPool[TTx] — Start, Stop, JobMiddleware
├── cron.go                # CronScheduler[TTx] — periodic/cron job scheduling
├── core/
│   ├── job.go             # JobArgs, Worker, InsertOpts, WorkerOpts
│   ├── registry.go        # type-erased worker dispatch
│   ├── retry.go           # RetryPolicy, ExponentialRetry, FixedRetry, NoRetry
│   └── schedule.go        # Schedule interface, Every, ScheduleFunc
├── driver/
│   ├── driver.go          # Driver[TTx], Executor, ExecutorTx, Listener interfaces
│   ├── memory/            # in-memory (no persistence; for tests)
│   ├── pgxv5/             # PostgreSQL via pgx v5 (LISTEN/NOTIFY, advisory locks)
│   ├── stdlib/            # PostgreSQL + MySQL + SQLite via database/sql
│   ├── gorm/              # gorm adapter (wraps stdlib)
│   ├── bun/               # bun adapter (wraps stdlib)
│   ├── mongodb/           # MongoDB 4.0+ replica set
│   ├── redis/             # Redis (at-least-once; Pub/Sub notifications)
│   ├── cassandra/         # Cassandra 3.11+ / ScyllaDB (LWT claiming; at-least-once)
│   ├── clickhouse/        # ClickHouse 23+ (ReplacingMergeTree; at-least-once)
│   ├── dynamodb/          # Amazon DynamoDB (conditional writes; at-least-once)
│   └── firestore/         # Cloud Firestore (RunTransaction; ACID inserts)
├── gontest/               # test helpers (Tracker, WorkerHelper, MockClock)
├── otel/                  # OpenTelemetry middleware (spans + metrics)
└── internal/clock/        # Clock interface + MockClock

Transaction guarantees by backend

Backend Guarantee Mechanism
Postgres / MySQL / SQLite Atomic with business tx Same DB connection, same BEGIN/COMMIT
gorm / bun Atomic with business tx Extracts underlying *sql.Tx
MongoDB Atomic with business tx Multi-document transaction on replica set
Redis None — at-least-once Pub/Sub + idempotent workers
Cassandra None — at-least-once LWT for claiming; no cross-statement tx
ClickHouse None — at-least-once ReplacingMergeTree + FINAL; no transactions
DynamoDB None — at-least-once Conditional writes; no cross-table tx
Firestore Atomic with business tx RunTransaction + EnqueueTx
In-memory Atomic (in-process) Single mutex

Documentation

Overview

Package goncordia provides a transactional job queue engine for Go. It supports multiple storage backends (Postgres, MySQL, SQLite, MongoDB, Redis, in-memory) through a driver interface parameterized by the native transaction type of each backend.

Transactional usage (shared transaction with business logic):

tx, _ := pool.Begin(ctx)
_, _ = queries.CreateOrder(ctx, tx, orderParams)
_, _ = client.EnqueueTx(ctx, tx, SendConfirmationEmailArgs{OrderID: id}, nil)
tx.Commit(ctx) // both operations are atomic

Non-transactional usage (at-least-once semantics):

client.Enqueue(ctx, SendConfirmationEmailArgs{OrderID: id}, nil)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client[TTx any] struct {
	// contains filtered or unexported fields
}

Client enqueues jobs into the job queue. TTx is the transaction type of the chosen backend driver (e.g. *pgx.Tx for pgxv5, *sql.Tx for stdlib, mongo.SessionContext for mongodb).

func NewClient

func NewClient[TTx any](d driver.Driver[TTx], cfg ClientConfig) *Client[TTx]

NewClient creates a Client backed by the given driver.

func (*Client[TTx]) Cancel

func (c *Client[TTx]) Cancel(ctx context.Context, id string) error

Cancel marks a job as cancelled. The job must be in available or scheduled state.

func (*Client[TTx]) Enqueue

func (c *Client[TTx]) Enqueue(ctx context.Context, args core.JobArgs, opts *core.InsertOpts) (*driver.JobInsertResult, error)

Enqueue inserts a single job without a transaction (at-least-once semantics). Safe to call for all backends; for SQL/MongoDB backends prefer EnqueueTx for atomicity.

func (*Client[TTx]) EnqueueMany

func (c *Client[TTx]) EnqueueMany(ctx context.Context, args []core.JobArgs, opts *core.InsertOpts) ([]driver.JobInsertResult, error)

EnqueueMany inserts multiple jobs in a single batch (non-transactional).

func (*Client[TTx]) EnqueueManyTx

func (c *Client[TTx]) EnqueueManyTx(ctx context.Context, tx TTx, args []core.JobArgs, opts *core.InsertOpts) ([]driver.JobInsertResult, error)

EnqueueManyTx inserts multiple jobs within an existing transaction.

func (*Client[TTx]) EnqueueTx

func (c *Client[TTx]) EnqueueTx(ctx context.Context, tx TTx, args core.JobArgs, opts *core.InsertOpts) (*driver.JobInsertResult, error)

EnqueueTx inserts a job within an existing transaction. The job becomes visible to workers only when tx is committed. Only available on backends with Capabilities.NativeTx == true.

type ClientConfig

type ClientConfig struct {
	// DefaultQueue is used when InsertOpts.Queue is empty. Default: "default".
	DefaultQueue string
}

ClientConfig controls optional Client behavior.

type CronConfig

type CronConfig struct {
	// TickInterval controls how often the scheduler checks for due jobs.
	// Default: 1 second.
	TickInterval time.Duration
	// Clock overrides the time source. Defaults to clock.Real{}.
	Clock clock.Clock
}

CronConfig configures a CronScheduler.

type CronScheduler

type CronScheduler[TTx any] struct {
	// contains filtered or unexported fields
}

CronScheduler enqueues periodic jobs on a configurable tick. It does not process jobs — pair it with a WorkerPool.

Usage:

cs := goncordia.NewCronScheduler(d, []goncordia.PeriodicJob{
    {Schedule: core.Every(time.Hour), Args: CleanupArgs{}},
}, goncordia.CronConfig{})
go cs.Start(ctx)

func NewCronScheduler

func NewCronScheduler[TTx any](d driver.Driver[TTx], jobs []PeriodicJob, cfg CronConfig) *CronScheduler[TTx]

NewCronScheduler creates a CronScheduler backed by d. jobs is the list of periodic jobs to manage.

func (*CronScheduler[TTx]) Start

func (s *CronScheduler[TTx]) Start(ctx context.Context) error

Start begins the scheduling loop. It blocks until ctx is cancelled.

type JobMiddleware

type JobMiddleware func(ctx context.Context, job *core.RawJob, next func(context.Context, *core.RawJob) error) error

JobMiddleware wraps job execution. Call next to continue the chain. Middleware is applied around the actual job handler, inside panic recovery, so err is never nil when the handler panicked — panics are converted to errors.

type PeriodicJob

type PeriodicJob struct {
	// Schedule determines when the job runs.
	Schedule core.Schedule
	// Args is the job to enqueue.
	Args core.JobArgs
	// Opts are passed through to Enqueue on each tick (optional).
	Opts *core.InsertOpts
}

PeriodicJob pairs a Schedule with the job args to enqueue on each tick.

type WorkerConfig

type WorkerConfig struct {
	// Queues lists the queues this worker pool processes.
	// If empty, only "default" is polled.
	Queues []string
	// Concurrency is the maximum number of jobs running simultaneously.
	// Default: 10.
	Concurrency int
	// PollInterval is how long to wait between polls when the queue is empty.
	// Only used when the backend has no push notification support.
	// Default: 1 second.
	PollInterval time.Duration
	// RetryPolicy controls retry timing. Default: ExponentialRetry.
	RetryPolicy core.RetryPolicy
	// ShutdownTimeout is the max duration to wait for in-flight jobs during shutdown.
	// Default: 30 seconds.
	ShutdownTimeout time.Duration
	// Clock overrides the time source. Defaults to clock.Real{}.
	// Inject clock.NewMock() in tests to control time.
	Clock clock.Clock
	// Middleware is applied around each job execution in order (outermost first).
	// Use it to add tracing, logging, or metrics without modifying job handlers.
	Middleware []JobMiddleware
}

WorkerConfig configures the worker pool.

type WorkerPool

type WorkerPool[TTx any] struct {
	// contains filtered or unexported fields
}

WorkerPool processes jobs from the queue using a pool of goroutines. TTx is the driver's transaction type (needed only for type parameter inference; the pool itself does not open user-visible transactions).

func NewWorkerPool

func NewWorkerPool[TTx any](d driver.Driver[TTx], registry *core.Registry, cfg WorkerConfig) *WorkerPool[TTx]

NewWorkerPool creates a WorkerPool. Register workers using core.RegisterWorker before calling Start.

func (*WorkerPool[TTx]) Start

func (p *WorkerPool[TTx]) Start(ctx context.Context) error

Start launches the fetch-and-process loops. Blocks until ctx is cancelled or Stop is called.

func (*WorkerPool[TTx]) Stop

func (p *WorkerPool[TTx]) Stop()

Stop initiates a graceful shutdown, waiting up to ShutdownTimeout for in-flight jobs.

Directories

Path Synopsis
Package core contains the backend-agnostic engine logic: job/worker registration, retry policies, scheduling, and middleware.
Package core contains the backend-agnostic engine logic: job/worker registration, retry policies, scheduling, and middleware.
Package driver defines the interfaces that all storage backend drivers must implement.
Package driver defines the interfaces that all storage backend drivers must implement.
bun
Package bundriver provides a goncordia driver that accepts *bun.Tx transactions.
Package bundriver provides a goncordia driver that accepts *bun.Tx transactions.
cassandra
Package cassandradriver provides a goncordia driver backed by Apache Cassandra.
Package cassandradriver provides a goncordia driver backed by Apache Cassandra.
clickhouse
Package clickhousedriver provides a goncordia driver backed by ClickHouse.
Package clickhousedriver provides a goncordia driver backed by ClickHouse.
dynamodb
Package dynamodbdriver provides a goncordia driver backed by Amazon DynamoDB.
Package dynamodbdriver provides a goncordia driver backed by Amazon DynamoDB.
firestore
Package firestoredriver provides a goncordia driver backed by Google Cloud Firestore.
Package firestoredriver provides a goncordia driver backed by Google Cloud Firestore.
gorm
Package gormdriver provides a goncordia driver that accepts *gorm.DB transactions.
Package gormdriver provides a goncordia driver that accepts *gorm.DB transactions.
memory
Package memory provides an in-memory driver for testing and development.
Package memory provides an in-memory driver for testing and development.
mongodb
Package mongodriver provides a goncordia driver backed by MongoDB.
Package mongodriver provides a goncordia driver backed by MongoDB.
pgxv5
Package pgxv5 provides a goncordia driver backed by PostgreSQL via pgx/v5.
Package pgxv5 provides a goncordia driver backed by PostgreSQL via pgx/v5.
redis
Package redisdriver provides a goncordia driver backed by Redis.
Package redisdriver provides a goncordia driver backed by Redis.
stdlib
Package stdlib provides a goncordia driver backed by database/sql.
Package stdlib provides a goncordia driver backed by database/sql.
Package gontest provides test helpers for goncordia.
Package gontest provides test helpers for goncordia.
internal
clock
Package clock provides a Clock interface and implementations for real and mock time.
Package clock provides a Clock interface and implementations for real and mock time.
Package otelgoncordia provides OpenTelemetry instrumentation for goncordia.
Package otelgoncordia provides OpenTelemetry instrumentation for goncordia.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL