flywheel

package module
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2026 License: MIT Imports: 19 Imported by: 0

README

🎡  go-flywheel

Durable, Postgres - and SQLite-backed job runtime for Go


Release Go Version License


CI / CD    Build Last Commit      Quality    Go Report Coverage
Security    Scorecard Security      Community    Contributors Bitcoin


Project Navigation
🚀 Installation 🧪 Examples & Tests 📚 Documentation
🤝 Contributing 🛠️ Code Standards ⚡ Benchmarks
🤖 AI Usage ⚖️ License 👥 Maintainers

🧩 About

go-flywheel is a durable, database-backed job runtime for Go. It turns an ordinary PostgreSQL or SQLite database into a reliable work queue with typed workers, a periodic scheduler, automatic retries, and a complete per-run audit trail — no Redis, no broker, no external job server to operate. Your jobs live in the same database as your data, so enqueuing work can be transactional with the rest of your application.

Use it two ways: embed it in your app (define Worker[A] types, wire a Node, and let it run the runner + scheduler + health server in ~10 lines), or run it locally as a daemon with the flywheel CLI — a drop-in cron replacement that runs your shell scripts, Python scripts, magex/mage build tasks, and HTTP calls durably, with retries, backfill, and a full audit trail.

The runtime is built from focused, composable pieces:

  • Typed workers — generic Worker[A] interface, registered by Kind() (registry.go)
  • One-call lifecycleNode runs N runners + the scheduler + an optional health/metrics server and drains cleanly on shutdown (node.go)
  • Scheduler — periodic / cron job enqueuing plus stuck-lease recovery; declare schedules in code with UpsertPeriodic (scheduler.go, schedule.go)
  • Retries with backoff — exponential backoff with jitter, overridable per worker (runner.go)
  • Worker timeouts — per-job or per-kind execution deadlines that classify as a retryable timeout (runner.go)
  • Lease-based recovery — orphaned, crashed jobs reclaimed via leased_until sweeps (scheduler.go)
  • Per-run audit — append-only job_runs table records every attempt, outcome, timing, and cost (read.go)
  • Observability built in — a dependency-free Observer seam with ready-made metrics, slog, and Prometheus adapters, queue-health/lag inspection, a /metrics endpoint, and flywheel status (observer.go, observers/, health.go)
  • Postgres + SQLiteFOR UPDATE SKIP LOCKED and BEGIN IMMEDIATE drivers (driver_postgres.go, driver_sqlite.go)
  • Free-form routing — a ExecutorClass label routes jobs to executor pools; empty is the wildcard (types.go)
  • Idempotent enqueuejobs_unique_key partial unique index dedupes work (client.go)
  • Follow-up jobs (DAG) — workers return child jobs that are enqueued atomically (types.go)
  • Outbox pattern — enqueue on the caller's own *gorm.DB transaction for exactly-once side effects (client.go)
  • Generic workers — ready-made ExecWorker, ShellWorker, PythonWorker, MageWorker (magex/mage), and HTTPWorker so local scripts and build tasks need no custom Go (workers/)

Schema setup

go-flywheel owns its three tables (jobs, job_runs, job_periodics) and ships them via a single exported entry point:

import "github.com/mrz1836/go-flywheel"

if err := flywheel.Migrate(db); err != nil { // db is a *gorm.DB
    return err
}

Migrate runs AutoMigrate over the row structs and then reconciles the partial/unique indexes GORM cannot express from struct tags — including the correctness-bearing jobs_unique_key partial unique index that enforces enqueue idempotency. It is idempotent (AutoMigrate no-op + CREATE INDEX IF NOT EXISTS), so repeated calls are safe.

It supports two consumption modes:

  • Standalone — call Migrate(db) against a bare SQLite or PostgreSQL database and the runtime stands up its own schema with no external migration tooling.
  • Embedded — call Migrate(db) as one step of a host project's install/migration process.

Only PostgreSQL and SQLite are supported, because both express the partial indexes the runtime relies on; Migrate returns an error for any other dialect rather than silently dropping idempotency. A host that prefers versioned SQL — e.g. an Atlas / atlas-provider-gorm flow — can point its loader at flywheel.Models() (the runtime's row structs as a single source of truth) and generate migrations from there instead of calling Migrate. The module takes no hard dependency on Atlas or any external migration tool.


Quick start (embedded)

A job runtime earns its keep when work is slow, flaky, costly, or must-not-be-lost — which in 2026 describes almost every LLM and third-party API call your app makes. Blocking a web request on a 30-second model call that might rate-limit is fragile; enqueuing that call and letting flywheel run it in the background is durable. Each job is retried on failure, recovered if the process crashes mid-run, and audited down to its per-attempt cost.

It's three moving parts — ① define the work, ② enqueue it, ③ run a Node that processes it:

package main

import (
	"context"
	"os/signal"
	"syscall"

	"github.com/glebarez/sqlite" // pure-Go SQLite: no cgo, no C compiler
	flywheel "github.com/mrz1836/go-flywheel"
	"gorm.io/gorm"
)

// ① Define the work: typed args + a worker that handles them.
//    The job here is "summarize a document with an LLM" — slow, metered, and
//    occasionally rate-limited, so you never want to run it inline or lose it.

type SummarizeDoc struct {
	DocID string `json:"doc_id"`
}

func (SummarizeDoc) Kind() string { return "summarize_doc" } // args name the kind they want

// Summarizer holds whatever the worker needs: a model client, DB handle, etc.
type Summarizer struct{}

func (Summarizer) Kind() string { return "summarize_doc" } // worker names the kind it handles

func (Summarizer) Work(ctx context.Context, job *flywheel.Job[SummarizeDoc]) (flywheel.Result, error) {
	summary, costMicros, err := callLLM(ctx, job.Args.DocID)
	if err != nil {
		return flywheel.Result{}, err // returning an error → automatic retry with backoff
	}
	return flywheel.Result{
		Output:     summary,    // recorded on this attempt's audit row
		CostMicros: costMicros, // track spend per attempt, no extra plumbing
	}, nil
}

func main() {
	db, _ := gorm.Open(sqlite.Open("flywheel.db"), &gorm.Config{})
	_ = flywheel.Migrate(db) // creates the jobs / job_runs / job_periodics tables

	reg := flywheel.NewRegistry()
	flywheel.Register(reg, Summarizer{})

	// ② Enqueue work. Returns instantly — the caller never waits on the LLM.
	//    (Pass InsertOpts.Tx to enqueue inside your own DB transaction.)
	_, _ = flywheel.Insert(context.Background(), flywheel.NewClient(db),
		SummarizeDoc{DocID: "42"}, flywheel.InsertOpts{})

	// ③ Run a Node: it claims jobs, runs your worker, retries failures, and
	//    drains cleanly on Ctrl+C. Concurrency: 4 → four summaries at once.
	node, _ := flywheel.NewNode(flywheel.NodeConfig{
		Runners: []flywheel.RunnerConfig{{
			DB: db, Driver: flywheel.NewSQLiteDriver(db), Registry: reg,
			Queues: []string{"default"}, Concurrency: 4, ClaimAnyClass: true,
		}},
	})

	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer stop()
	_ = node.Run(ctx) // blocks until Ctrl+C, then drains in-flight jobs
}

// callLLM stands in for your real model call (Anthropic, OpenAI, a local model…).
func callLLM(ctx context.Context, docID string) (summary string, costMicros int64, err error) {
	return "TL;DR of doc " + docID, 1_200, nil
}

That's a durable AI pipeline: enqueue returns instantly, the Node summarizes four documents at a time, a failed model call retries itself with backoff, and every attempt — including what it cost — lands in the job_runs audit table. Need periodic or cron-style runs too? Add a Scheduler to the Node (see examples/ for the full set).


Local daemon & cron replacement

The flywheel CLI runs the runtime as a local daemon over a SQLite file (zero-ops) or Postgres, and replaces cron with durable scheduled jobs — no custom Go required:

go install github.com/mrz1836/go-flywheel/cmd/flywheel@latest

flywheel migrate   # stand up the schema
flywheel serve     # run runner + scheduler until Ctrl+C
flywheel jobs ls   # inspect the queue

Declare your jobs in flywheel.yaml — each run is retried, audited, and overlap-protected, strictly better than a crontab line. Pick the worker that matches what you run locally: shell (a .sh file or inline snippet), python (a script, -m module, or -c snippet), mage (magex/mage build targets), exec (any binary), or http (call a URL):

schedules:
  - slug: nightly-maintenance      # a shell script — file or inline, no +x needed
    every: 24h
    worker: shell
    shell:
      script: /usr/local/bin/maintenance.sh
      args: ["--verbose"]
      timeout_seconds: 600

  - slug: hourly-sync              # a Python script — resolves python3, then python
    cron: "0 * * * *"
    worker: python
    python:
      script: /opt/hermes/sync.py
      args: ["--since=1h"]

  - slug: repo-deps-update         # magex/mage targets — the Go-native task runner
    every: 24h
    worker: mage
    mage:
      targets: ["deps:update"]     # e.g. ["test"], ["lint"], ["version:bump", "push=true"]
      dir: /Users/me/projects/my-repo

  - slug: gateway-healthcheck      # call a URL
    cron: "*/5 * * * *"
    worker: http
    http:
      url: https://gateway.internal/healthz

Every run's stdout, stderr, and exit code are captured to the job_runs audit trail — inspect them with flywheel jobs inspect <id>. Prefer to wire it from Go? The examples/local-tasks program registers the shell, python, and mage workers and schedules one of each. See the CLI README for every command, the config reference, and the macOS launchd setup.


Observability

The runtime is self-diagnosing. The Observer seam (observer.go) reports every attempt's lifecycle — claim, start, finish, retry — with no metrics dependency in the core, and the observers/ package ships ready adapters that plug straight in:

  • observers.NewMetrics(rec) translates events into a MetricsRecorder — a one-method sink you back with Prometheus, OpenTelemetry, statsd, or CloudWatch (the core imports none of them).
  • observers.NewSlog(logger) logs each event at debug level; observers.NewMulti(...) fans events out to several observers at once.

SampleQueueHealth (health.go) reads a point-in-time gauge snapshot — depth by state, ready / in-flight counts, and the oldest-ready age (lag), the canonical "are the runners falling behind?" signal — and RecentFailures lists what was discarded recently and why. Give a Node a metrics handler and its health server also serves Prometheus text at /metrics (recorder counters plus the queue-health gauges, sampled fresh per scrape) alongside /healthz and /readyz:

mem := observers.NewMemRecorder()
node, _ := flywheel.NewNode(flywheel.NodeConfig{
    Runners: []flywheel.RunnerConfig{{
        DB: db, Driver: flywheel.NewSQLiteDriver(db), Registry: reg,
        Queues: []string{"default"}, ClaimAnyClass: true,
        Observer: observers.NewMulti(observers.NewSlog(logger), observers.NewMetrics(mem)),
    }},
    Health: flywheel.HealthConfig{
        Addr: ":9090",
        MetricsHandler: observers.MetricsHandler(mem, func(ctx context.Context) (flywheel.QueueHealth, error) {
            return flywheel.SampleQueueHealth(ctx, db)
        }),
    },
})

The flywheel CLI turns all of this on by default and adds flywheel status for an at-a-glance report of queue health, schedules, and recent failures.


📦 Installation

go-flywheel requires a supported release of Go.

go get -u github.com/mrz1836/go-flywheel

Get the MAGE-X build tool for development:

go install github.com/mrz1836/mage-x/cmd/magex@latest

📚 Documentation


Repository Features

This repository includes 25+ built-in features covering CI/CD, security, code quality, developer experience, and community tooling.

View the full Repository Features list →

Library Deployment

This project uses goreleaser for streamlined binary and library deployment to GitHub. To get started, install it via:

brew install goreleaser

The release process is defined in the .goreleaser.yml configuration file.

Then create and push a new Git tag using:

magex version:bump push=true bump=patch branch=master

This process ensures consistent, repeatable releases with properly versioned artifacts and metadata.

Pre-commit Hooks

Set up the Go-Pre-commit System to run the same formatting, linting, and tests defined in AGENTS.md before every commit:

go install github.com/mrz1836/go-pre-commit/cmd/go-pre-commit@latest
go-pre-commit install

The system is configured via modular env files in .github/env/ and provides 17x faster execution than traditional Python-based pre-commit hooks. See the complete documentation for details.

GitHub Workflows

All workflows are driven by modular configuration in .github/env/ — no YAML editing required.

View all workflows and the control center →

Updating Dependencies

To update all dependencies (Go modules, linters, and related tools), run:

magex deps:update

This command ensures all dependencies are brought up to date in a single step, including Go modules and any tools managed by MAGE-X. It is the recommended way to keep your development environment and CI in sync with the latest versions.

Build Commands

View all build commands

magex help

🧪 Examples & Tests

All unit tests run via GitHub Actions and use Go version 1.25.x. View the configuration file.

Run all tests (fast):

magex test

Run all tests with race detector (slower):

magex test:race

⚡ Benchmarks

Run the Go benchmarks:

magex bench

Benchmarks for the runtime's hot paths (claim, finalize, sweep) are added as those paths are tuned.


🛠️ Code Standards

Read more about this Go project's code standards.


🤖 AI Usage & Assistant Guidelines

Read the AI Usage & Assistant Guidelines for details on how AI is used in this project and how to interact with the AI assistants.


👥 Maintainers

MrZ
MrZ

🤝 Contributing

View the contributing guidelines and please follow the code of conduct.

How can I help?

All kinds of contributions are welcome 🙌! The most basic way to show your support is to star 🌟 the project, or to raise issues 💬. You can also support this project by becoming a sponsor on GitHub 👏 or by making a bitcoin donation to ensure this journey continues indefinitely! 🚀

Stars


📝 License

License

Documentation

Overview

Package flywheel is a durable, Postgres- and SQLite-backed background-work runtime: it enqueues, dispatches, retries, audits, schedules, and recovers typed jobs.

The package is self-contained: it owns its own row and typed structs (RawJob, Job[A]) and reaches the database through a two-implementation Driver seam, so the runtime code never sees the SQL dialect. It depends only on gorm, the cron parser, and the standard library — no host application packages — which is what lets it ship as a standalone module.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrNotFound indicates the requested record was not found.
	ErrNotFound = errors.New("record not found")

	// ErrDuplicateKey indicates a unique constraint violation occurred. The
	// runtime relies on this to make unique_key enqueue idempotent.
	ErrDuplicateKey = errors.New("duplicate key violation")

	// ErrForeignKey indicates a foreign key constraint violation occurred.
	ErrForeignKey = errors.New("foreign key constraint violated")

	// ErrDatabaseError indicates a general database operation failure.
	ErrDatabaseError = errors.New("database operation failed")
)

Sentinel errors for database operations. Use errors.Is() to check for these in callers. These are self-contained: the runtime classifies driver errors through WrapDBError without importing any external foundation package.

View Source
var ErrAlreadyEnqueued = errors.New("jobs: already enqueued")

ErrAlreadyEnqueued is returned by Insert when a job with the same unique_key already exists. Callers compare it with errors.Is and treat the work as already submitted.

View Source
var ErrJobNotFound = errors.New("flywheel: job not found")

ErrJobNotFound is returned by FindJob when no job matches the requested id. A host maps it to a 404 without depending on gorm's record-not-found sentinel.

View Source
var ErrMissingKind = errors.New("jobs: args value must implement Kind() string")

ErrMissingKind is returned by Insert when the args value does not name its job kind. An args type used with Insert must implement Kind() string.

View Source
var ErrPeriodicNotFound = errors.New("flywheel: periodic not found")

ErrPeriodicNotFound is returned by SetPeriodicActive and DeletePeriodic when no periodic definition has the requested slug.

View Source
var ErrSQLiteConcurrency = errors.New("jobs: sqlite driver requires concurrency 1")

ErrSQLiteConcurrency is returned by NewRunner when a SQLite driver is wired with a concurrency greater than 1 — SQLite serializes writers and a second concurrent dequeue would deadlock.

View Source
var ErrUnknownKind = errors.New("jobs: unknown job kind")

ErrUnknownKind is returned by the registry when a job's kind has no registered worker.

View Source
var ErrValidation = errors.New("flywheel: validation failed")

ErrValidation is the sentinel every lifecycle validation failure wraps, so a caller can branch on errors.Is(err, ErrValidation) without depending on a host validation package. The runtime owns its own validation seam — it never imports a foundation/base-model error type.

Functions

func CancelJob added in v0.2.0

func CancelJob(ctx context.Context, db *gorm.DB, id string) error

CancelJob moves a job to the terminal cancelled state. An attempt already in flight is not interrupted, but the job will not be retried or re-claimed. It returns ErrJobNotFound when no live job has the id.

func CountActiveJobs

func CountActiveJobs(ctx context.Context, db *gorm.DB) (int64, error)

CountActiveJobs returns how many jobs are still in a non-terminal state, reading through db (soft-deleted excluded). It is the inspection seam for "pending work remaining" telemetry.

func CountRuns

func CountRuns(ctx context.Context, db *gorm.DB) (int64, error)

CountRuns returns the total number of recorded job attempts (job_runs rows), reading through db. It is the inspection seam for run-throughput telemetry.

func DeleteFinishedJobs added in v0.3.0

func DeleteFinishedJobs(ctx context.Context, db *gorm.DB, olderThan time.Time) (int64, error)

DeleteFinishedJobs hard-deletes terminal jobs (succeeded, cancelled, discarded) finalized before olderThan, together with their job_runs audit rows, and reports how many jobs were removed. It is the retention primitive behind `flywheel prune` and the Scheduler's optional retention sweep: a forever-running daemon needs a way to keep jobs and job_runs from growing unbounded.

The delete is hard, not soft: jobs is soft-deletable, but retention reclaims storage, so it bypasses the soft-delete scope (Unscoped). flywheel has no foreign-key cascade between jobs and job_runs, so the runs are deleted by job_id first and the jobs second, both inside one transaction so a failure leaves neither table half-pruned. Soft-deleted terminal jobs are purged too, so their audit rows do not orphan.

func DeletePeriodic added in v0.3.0

func DeletePeriodic(ctx context.Context, db *gorm.DB, slug string) error

DeletePeriodic removes a periodic definition by slug. Jobs it already enqueued are untouched — only the schedule that would produce new ones is removed. It is the writer behind `flywheel schedule rm`, idiomatic with UpsertPeriodic, and returns ErrPeriodicNotFound when no definition has the slug.

func Enqueue

func Enqueue(ctx context.Context, c *Client, kind string, args []byte, opts InsertOpts) (string, error)

Enqueue writes one available job of an arbitrary kind from a pre-marshaled JSON payload, with no registered worker required. It is the host seed seam: fixtures and inspection hosts create real jobs through the same insert core as Insert — honoring opts (UniqueKey/Queue/Priority/…) and the row's lifecycle defaults — without touching flywheel's unexported row structs. A unique_key collision returns ErrAlreadyEnqueued.

func Insert

func Insert[A Args](ctx context.Context, c *Client, args A, opts InsertOpts) (string, error)

Insert enqueues one job with typed args. The job kind is read from the args value, which must implement Kind() string. When opts.Tx is set the row is written on that transaction (outbox, FR-003). A unique_key collision returns ErrAlreadyEnqueued, never a raw driver error (FR-004).

Example

ExampleInsert enqueues a typed job onto a SQLite-backed queue.

package main

import (
	"context"
	"fmt"

	"github.com/glebarez/sqlite"
	flywheel "github.com/mrz1836/go-flywheel"
	"gorm.io/gorm"
)

// EmailArgs is a job's typed arguments. Its Kind method names the worker that
// handles it, and the struct is JSON-serialized as the job payload.
type EmailArgs struct {
	To      string `json:"to"`
	Subject string `json:"subject"`
}

// Kind names the worker for these args.
func (EmailArgs) Kind() string { return "send_email" }

func main() {
	db, _ := gorm.Open(sqlite.Open("file:example-insert?mode=memory&cache=shared"), &gorm.Config{})
	_ = flywheel.Migrate(db)

	id, err := flywheel.Insert(context.Background(), flywheel.NewClient(db),
		EmailArgs{To: "a@example.com", Subject: "hi"}, flywheel.InsertOpts{})
	if err != nil {
		panic(err)
	}
	fmt.Println(id != "")
}
Output:
true

func Migrate

func Migrate(db *gorm.DB) error

Migrate is the single source of truth for the job schema: it brings up the three job tables (jobs, job_runs, job_periodics) — with their NOT-NULL constraints, column defaults, and the jobs soft-delete column — plus the partial/unique indexes GORM AutoMigrate cannot express. A host installs the schema by calling Migrate(db) and nothing else. It supports both consumption modes:

  • standalone: call it against a bare SQLite or PostgreSQL database and the runtime stands up its own schema with no external migration tooling.
  • embedded: call it as one step of a host project's install/migration process. The module takes no hard Atlas dependency; a host that wants versioned SQL can generate it from Models instead.

Migrate is idempotent: AutoMigrate is a no-op against an up-to-date schema and every reconciled index uses IF NOT EXISTS, so repeated calls are safe.

func Models

func Models() []any

Models returns the runtime's row structs so a consumer can drive schema generation from a single source of truth — the same structs Migrate uses.

This is the seam for the "embedded" install mode: a host that prefers versioned SQL (e.g. an Atlas/atlas-provider-gorm flow) points its loader at these models instead of re-declaring the columns. The runtime keeps the row structs unexported on purpose; Models exposes them as a stable []any without widening the package's typed API surface.

func NewID

func NewID() string

NewID returns a freshly minted UUID v7 string. v7 IDs embed a millisecond timestamp, so they sort lexicographically in creation order — which lets callers paginate by id alone instead of composite cursors.

If the underlying RNG fails (a process-level fault, not a runtime condition on supported platforms), NewID panics.

func Register

func Register[A Args](reg *Registry, w Worker[A])

Register builds the typed dispatch closure for w and stores it keyed by w.Kind(). It panics if the kind is already registered — a duplicate registration is a programming error that must fail at startup (FR-037).

Example

ExampleRegister registers a typed worker so the runtime can dispatch its kind.

package main

import (
	"context"
	"fmt"

	flywheel "github.com/mrz1836/go-flywheel"
)

// EmailArgs is a job's typed arguments. Its Kind method names the worker that
// handles it, and the struct is JSON-serialized as the job payload.
type EmailArgs struct {
	To      string `json:"to"`
	Subject string `json:"subject"`
}

// Kind names the worker for these args.
func (EmailArgs) Kind() string { return "send_email" }

// EmailWorker handles EmailArgs jobs.
type EmailWorker struct{}

// Kind is the stable worker name, matching EmailArgs.Kind.
func (EmailWorker) Kind() string { return "send_email" }

// Work runs one job. It receives the decoded args and a logger pre-tagged with
// the job id, kind, and run id.
func (EmailWorker) Work(ctx context.Context, job *flywheel.Job[EmailArgs]) (flywheel.Result, error) {
	job.Logger.InfoContext(ctx, "sending email", "to", job.Args.To, "subject", job.Args.Subject)
	return flywheel.Result{}, nil
}

func main() {
	reg := flywheel.NewRegistry()
	flywheel.Register(reg, EmailWorker{})
	fmt.Println("registered")
}
Output:
registered

func RequestIDFrom

func RequestIDFrom(ctx context.Context) string

RequestIDFrom returns the request_id stamped on ctx, or "" when none is present. Workers and services use this to read the id without taking a dependency on the HTTP layer.

func RetryJob added in v0.2.0

func RetryJob(ctx context.Context, db *gorm.DB, id string) error

RetryJob forces a job back to available so a runner re-claims it on the next poll, clearing any lease and finalization. It is the operator action behind a "retry now" — it works on a terminal (discarded/cancelled/succeeded) job as well as a stuck one. It returns ErrJobNotFound when no live job has the id.

func SetPeriodicActive added in v0.3.0

func SetPeriodicActive(ctx context.Context, db *gorm.DB, slug string, active bool) error

SetPeriodicActive toggles a periodic definition's active flag by slug without touching its schedule or next_run_at cursor. Deactivating preserves the row — it stays inspectable — but stops it firing; reactivating resumes it on the existing cadence. It is the writer behind a declarative reconcile's orphan-disable and the CLI's enable/disable, and returns ErrPeriodicNotFound when no definition has the slug.

func UpsertPeriodic added in v0.2.0

func UpsertPeriodic(ctx context.Context, db *gorm.DB, spec PeriodicSpec) error

UpsertPeriodic inserts or updates a periodic definition by slug. On insert it seeds next_run_at to the next fire after now (so a fresh schedule does not fire immediately). On update it preserves the existing next_run_at cursor unless the schedule itself changed, so reconciling an unchanged config on restart does not reset the cadence. It is the exported writer for job_periodics, which the CLI and a host's startup reconciliation use to declare schedules in code.

Example

ExampleUpsertPeriodic declares a periodic job in code: run "send_email" every day at 02:00. Reconciling the same slug on startup is idempotent, so it is safe to call on every boot.

package main

import (
	"context"

	"github.com/glebarez/sqlite"
	flywheel "github.com/mrz1836/go-flywheel"
	"gorm.io/gorm"
)

func main() {
	db, _ := gorm.Open(sqlite.Open("flywheel.db"), &gorm.Config{})
	_ = flywheel.Migrate(db)

	err := flywheel.UpsertPeriodic(context.Background(), db, flywheel.PeriodicSpec{
		Slug:   "nightly-report",
		Kind:   "send_email",
		Cron:   "0 2 * * *",
		Active: true,
	})
	if err != nil {
		panic(err)
	}
}

func WithClock

func WithClock(ctx context.Context, clk Clock) context.Context

WithClock returns a child context carrying clk.

func WithRequestID

func WithRequestID(ctx context.Context, id string) context.Context

WithRequestID returns ctx tagged with id. An empty id is a no-op so callers can pass through without branching.

func WrapDBError

func WrapDBError(err error) error

WrapDBError normalizes a raw driver error into one of the package sentinels so callers can branch on errors.Is(err, ErrDuplicateKey) regardless of dialect. It returns nil for a nil error.

Types

type Args

type Args any

Args is any JSON-serializable struct passed to a worker.

type ClaimEvent added in v0.2.0

type ClaimEvent struct {
	ExecutorClass ExecutorClass
	Queues        []string
	Claimed       int
}

ClaimEvent describes one claimed batch.

type Classifier

type Classifier interface {
	Classify(err error) ErrorClass
}

Classifier is an optional worker interface. When implemented, the Runner uses Classify to decide whether an error is retried.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is the producer-side handle for enqueuing jobs. It is built lazily by the service container from the write database connection.

func NewClient

func NewClient(writeDB *gorm.DB) *Client

NewClient returns a Client that enqueues onto writeDB.

func (*Client) WriteDB

func (c *Client) WriteDB() *gorm.DB

WriteDB returns the client's write connection. The Scheduler reuses it.

type Clock

type Clock interface {
	Now(ctx context.Context) time.Time
}

Clock abstracts "now" so callers can be given deterministic time. The runtime reads time exclusively through a Clock pulled from the context, which lets tests drive the scheduler and lease logic with a fixed clock.

func ClockFrom

func ClockFrom(ctx context.Context) Clock

ClockFrom returns the Clock attached to ctx, or a real-time default clock when none is attached.

type Defaults

type Defaults interface {
	Defaults() InsertOpts
}

Defaults is an optional worker interface. When implemented, the returned InsertOpts seed a producer's Insert for this kind.

type Driver

type Driver interface {
	// Dequeue atomically claims up to limit ready jobs from the given queues for
	// the given executor class, leasing each for the lease duration. Unless
	// claimAny is set, it claims only jobs whose executor_class equals class or is
	// the empty wildcard. A claimed job has its state advanced to running and its
	// attempt incremented.
	Dequeue(ctx context.Context, queues []string, class ExecutorClass, claimAny bool, limit int, lease time.Duration) ([]RawJob, error)

	// InsertRunStub commits a job_runs row with outcome started before the
	// worker runs, so a side-effect FK to runID resolves through a crash. class
	// is recorded as the run's executor_class.
	InsertRunStub(ctx context.Context, runID string, raw RawJob, startedAt time.Time, class ExecutorClass, execID string) error

	// Finalize runs, in one transaction, the run-row outcome update, the
	// jobs.state advance, and any follow-up inserts. A follow-up colliding with
	// an existing unique_key is skipped, not fatal.
	Finalize(ctx context.Context, raw RawJob, runID string, result Result, workErr error, finishedAt time.Time) error

	// InsertChild writes one follow-up job on tx, skipping a unique_key
	// collision without error.
	InsertChild(ctx context.Context, tx *gorm.DB, fu FollowUp, parentID string) error

	// Sweep reclaims jobs whose lease has expired (state running, leased_until
	// in the past), returning them to available and marking each stale run stub
	// crashed. It reports how many jobs were reclaimed.
	Sweep(ctx context.Context, now time.Time) (reclaimed int, err error)
}

Driver is the database seam the Runner and Scheduler reach through. It has two implementations — driver_postgres.go (FOR UPDATE SKIP LOCKED) and driver_sqlite.go (BEGIN IMMEDIATE + serialized claim) — so the runtime code above it never sees the SQL dialect.

func NewPostgresDriver

func NewPostgresDriver(db *gorm.DB) Driver

NewPostgresDriver returns a Driver backed by a PostgreSQL connection.

func NewSQLiteDriver

func NewSQLiteDriver(db *gorm.DB) Driver

NewSQLiteDriver returns a Driver backed by a SQLite connection. The connection should be opened with the _txlock=immediate DSN parameter so the write lock is taken up front (research §3).

type ErrorClass

type ErrorClass string

ErrorClass classifies a worker error. Permanent and validation errors stop retrying; transient and timeout errors are retried.

const (
	ErrorTransient  ErrorClass = "transient"
	ErrorPermanent  ErrorClass = "permanent"
	ErrorValidation ErrorClass = "validation"
	ErrorTimeout    ErrorClass = "timeout"
)

Recognized ErrorClass values.

func (ErrorClass) Valid

func (c ErrorClass) Valid() bool

Valid reports whether c is a recognized ErrorClass.

type ExecutorClass added in v0.2.0

type ExecutorClass string

ExecutorClass is a free-form routing label that pairs a job with the executor pool eligible to run it. It is a convention, not a closed enum: a local or SQLite deployment uses "local" (or the empty wildcard), an AWS deployment uses "lambda" or "ecs", and a specialized pool uses any string you like ("gpu", "high-mem", ...). Its string value is the stable wire form persisted on jobs.executor_class and job_runs.executor_class.

The empty class — AnyClass — is the wildcard: a job carrying it may be claimed by any runner, and a runner configured with ClaimAnyClass claims jobs of every class. Routing a job to a dedicated pool is just a matter of giving the job and that pool's runner the same non-empty class.

const AnyClass ExecutorClass = ""

AnyClass is the empty wildcard executor class. A job inserted with AnyClass (the default) is claimable by every runner regardless of the runner's own class; it is the right choice whenever a job is not pinned to a specific executor pool.

type FailureView added in v0.4.0

type FailureView struct {
	JobID        string    `json:"job_id"`
	Kind         string    `json:"kind"`
	Queue        string    `json:"queue"`
	Attempt      int       `json:"attempt"`
	FinalizedAt  time.Time `json:"finalized_at"`
	ErrorClass   string    `json:"error_class"`
	ErrorMessage string    `json:"error_message"`
}

FailureView is one recently discarded job paired with the error from its final recorded attempt. It is the "what is failing, and why" diagnosis surface behind `flywheel status` and an operator dashboard: a discarded job is one that exhausted its retries (or hit a permanent error), so its last job_runs row carries the classification and message that ended it.

func RecentFailures added in v0.4.0

func RecentFailures(ctx context.Context, db *gorm.DB, p RecentFailuresParams) ([]FailureView, error)

RecentFailures returns the most recently discarded jobs (newest finalized first) within the window, each joined to the error on its latest attempt, reading through db. Soft-deleted jobs are excluded. It powers the failures section of `flywheel status` so an operator can glance and see what broke.

The errors come from a second read keyed by the page's job ids rather than a SQL join, so the page size bounds both queries and the dialects stay identical: the runs for the page are loaded ordered by attempt, and the last row seen per job — the highest attempt — wins, which is the attempt that discarded it.

type FakeHTTPDoer

type FakeHTTPDoer struct {
	// contains filtered or unexported fields
}

FakeHTTPDoer is a programmable HTTPDoer test double. Replies are stubbed per request URL with an optional fallback. It is safe for concurrent use.

func NewFakeHTTPDoer

func NewFakeHTTPDoer() *FakeHTTPDoer

NewFakeHTTPDoer returns an empty FakeHTTPDoer. Unstubbed requests get a 200 OK with an empty body unless StubDefault overrides the fallback.

func (*FakeHTTPDoer) Calls

func (f *FakeHTTPDoer) Calls() int

Calls reports how many requests have been made.

func (*FakeHTTPDoer) Do

func (f *FakeHTTPDoer) Do(req *http.Request) (*http.Response, error)

Do records the request and returns its programmed reply.

func (*FakeHTTPDoer) StubBodyReadError

func (f *FakeHTTPDoer) StubBodyReadError(url string, status int, readErr error) *FakeHTTPDoer

StubBodyReadError programs requests to url to return a response whose Body fails every Read with readErr — modeling a truncated stream (FR-017).

func (*FakeHTTPDoer) StubDefault

func (f *FakeHTTPDoer) StubDefault(status int, body string) *FakeHTTPDoer

StubDefault programs the fallback reply for any unstubbed request URL.

func (*FakeHTTPDoer) StubDefaultBodyReadError

func (f *FakeHTTPDoer) StubDefaultBodyReadError(status int, readErr error) *FakeHTTPDoer

StubDefaultBodyReadError programs the fallback so unstubbed requests return a response whose Body fails every Read with readErr.

func (*FakeHTTPDoer) StubError

func (f *FakeHTTPDoer) StubError(url string, err error) *FakeHTTPDoer

StubError programs requests to url to fail with err.

func (*FakeHTTPDoer) StubURL

func (f *FakeHTTPDoer) StubURL(url string, status int, body string) *FakeHTTPDoer

StubURL programs the reply for requests to url.

type FinishEvent added in v0.2.0

type FinishEvent struct {
	JobEvent
	// Outcome is the attempt's recorded outcome (success, error, snooze,
	// cancelled, or timeout).
	Outcome RunOutcome
	// ErrorClass is the failure classification; it is the zero value on success.
	ErrorClass ErrorClass
	// Err is the worker error, or nil on success.
	Err error
	// Duration is the wall time the attempt took.
	Duration time.Duration
}

FinishEvent reports one completed attempt.

type FixedClock

type FixedClock struct {
	// contains filtered or unexported fields
}

FixedClock is a Clock that always returns a constant time, regardless of the context or the wall clock.

func NewFixedClock

func NewFixedClock(t time.Time) FixedClock

NewFixedClock returns a FixedClock anchored at t.

func (FixedClock) Now

Now returns the anchored time.

type FollowUp

type FollowUp struct {
	Kind       string
	Args       any
	Queue      string
	UniqueKey  string
	ScheduleAt *time.Time
	// Parent, when true, sets the child's parent_job_id to the spawning job.
	Parent   bool
	Priority int
	// ExecutorClass routes the child job to a specific executor pool. Empty
	// (AnyClass) leaves the child claimable by any runner.
	ExecutorClass ExecutorClass
}

FollowUp describes a child job a worker requests be enqueued.

type HTTPDoer

type HTTPDoer interface {
	Do(req *http.Request) (*http.Response, error)
}

HTTPDoer is the seam through which workers make external HTTP calls. Tests substitute a FakeHTTPDoer so no real network call is made.

func DefaultHTTPDoer

func DefaultHTTPDoer() HTTPDoer

DefaultHTTPDoer returns an HTTPDoer backed by http.DefaultClient.

type HealthConfig added in v0.2.0

type HealthConfig struct {
	// Addr is the listen address (e.g. ":8080"). Empty disables the server.
	Addr string
	// Readiness reports whether the node is ready to serve work. When nil, the
	// Node installs a default that pings the first runner's database. /healthz is
	// always a shallow liveness 200; /readyz gates its 200 on Readiness.
	Readiness func(ctx context.Context) error
	// MetricsHandler, when non-nil, is served at /metrics. The Node stays format-
	// agnostic: a host passes any http.Handler (e.g. observers.MetricsHandler for
	// Prometheus), and a nil handler simply leaves /metrics unrouted (404). Liveness
	// and readiness are unaffected either way.
	MetricsHandler http.Handler
	// ShutdownTimeout bounds the health server's graceful shutdown. Optional;
	// defaults to five seconds.
	ShutdownTimeout time.Duration
}

HealthConfig configures a Node's optional in-process health server. It uses only net/http from the standard library, so enabling it adds no dependency. A zero value (empty Addr) disables the server.

type InsertOpts

type InsertOpts struct {
	Queue string
	// UniqueKey enforces idempotency forever: an insert collides with any job
	// that ever carried the same key, terminal or not. Use it for "enqueue this
	// exact unit of work at most once, ever".
	UniqueKey string
	// UniqueActiveKey enforces idempotency only while a job is active (available,
	// running, retryable, or scheduled): an insert collides only with a still-live
	// job carrying the same key, and the key frees up once that job reaches a
	// terminal state. Use it for "at most one in-flight job for this subject",
	// where a later run is expected once the current one finishes.
	UniqueActiveKey string
	ScheduleAt      *time.Time
	Parent          *string
	Priority        int
	ExecutorClass   ExecutorClass
	MaxAttempts     int
	// Timeout, when > 0, bounds this job's worker execution: the Runner cancels
	// the worker's ctx after it elapses. It overrides the worker's Timeouter and
	// the runner's DefaultTimeout.
	Timeout time.Duration
	// Tx, when set, writes the job row on the caller's transaction (outbox).
	Tx *gorm.DB
	// RequestID, when non-empty, is stamped on the job's metadata so the
	// Runner can thread it through ctx + slog on dequeue. Falls back to
	// [RequestIDFrom] on the caller's ctx when this field is empty.
	RequestID string
}

InsertOpts configures a single Insert.

type Job

type Job[A Args] struct {
	ID          string
	Kind        string
	Queue       string
	Args        A
	Attempt     int
	MaxAttempts int
	ParentJobID *string
	EnqueuedAt  time.Time
	Tags        []string
	Logger      *slog.Logger
	// RunID is the pre-allocated job_runs.id for this attempt. A side-effect
	// row may set its job_run_id to RunID safely — the run row already exists.
	RunID string
}

Job is what a worker receives. RunID and Logger are injected by the Runner.

type JobArgsView

type JobArgsView struct {
	ID   string
	Kind string
	Args []byte
}

JobArgsView is a host-internal read projection that carries a job's raw args payload so a host can match jobs on their typed arguments without binding to the unexported row. Unlike JobView it is not a wire contract — it exists so a host (e.g. a "do I already have an active job for this subject?" lookup) can inspect args server-side.

func ListActiveByKind

func ListActiveByKind(ctx context.Context, db *gorm.DB, kind string) ([]JobArgsView, error)

ListActiveByKind returns the non-terminal jobs of the given kind, each with its raw args payload, reading through db. Soft-deleted jobs are excluded. A host uses it to answer "is there already an in-flight job of this kind for some subject?" by inspecting the returned args.

type JobEvent added in v0.2.0

type JobEvent struct {
	JobID   string
	RunID   string
	Kind    string
	Queue   string
	Attempt int
}

JobEvent identifies one attempt. It is embedded in the finish and retry events.

type JobRunView

type JobRunView struct {
	ID            string     `json:"id"`
	Outcome       string     `json:"outcome"`
	ExecutorClass string     `json:"executor_class"`
	StartedAt     time.Time  `json:"started_at"`
	FinishedAt    *time.Time `json:"finished_at"`
	// Error is the worker error recorded for a failed attempt, if any.
	Error *string `json:"error,omitempty"`
	// Output is the worker's structured Result.Output as stored in job_runs.output
	// (for the command workers, an ExecOutput with exit code and captured streams).
	// It is empty when the attempt produced no output.
	Output json.RawMessage `json:"output,omitempty"`
}

JobRunView is the public read projection of a single job attempt.

func ListRuns

func ListRuns(ctx context.Context, db *gorm.DB, jobID string, p ListRunsParams) ([]JobRunView, error)

ListRuns returns a job's runs newest-first (created_at desc, id desc), reading through db. When p.Before is non-zero only rows strictly older than the cursor are returned; a positive p.Limit caps the page.

type JobState

type JobState string

JobState is a job's lifecycle state. Its string values are the stable wire form persisted on jobs rows.

const (
	StateAvailable JobState = "available"
	StateRunning   JobState = "running"
	StateRetryable JobState = "retryable"
	StateScheduled JobState = "scheduled"
	StateSucceeded JobState = "succeeded"
	StateCancelled JobState = "cancelled"
	StateDiscarded JobState = "discarded"
)

Recognized JobState values.

func NonTerminalStates

func NonTerminalStates() []JobState

NonTerminalStates returns the job states from which a job may still progress. The terminal states (succeeded, cancelled, discarded) are excluded. A host uses it to scope "still in flight" queries without re-deriving the runtime's state vocabulary.

func TerminalStates added in v0.3.0

func TerminalStates() []JobState

TerminalStates returns the job states a job can no longer progress from (succeeded, cancelled, discarded). A host uses it to scope "finished" queries — e.g. retention — without re-deriving the runtime's state vocabulary.

func (JobState) Valid

func (s JobState) Valid() bool

Valid reports whether s is a recognized JobState.

type JobView

type JobView struct {
	ID          string    `json:"id"`
	Kind        string    `json:"kind"`
	State       string    `json:"state"`
	ParentJobID string    `json:"parent_job_id"`
	EnqueuedAt  time.Time `json:"enqueued_at"`
	Attempt     int       `json:"attempt"`
}

JobView is the public read projection of a job. The runtime keeps its row struct unexported and exposes this stable, JSON-tagged view instead, so a host inspection API binds to flywheel's contract rather than the mutable schema.

func FindJob

func FindJob(ctx context.Context, db *gorm.DB, id string) (JobView, error)

FindJob returns the JobView for id, reading through the host-provided db. A soft-deleted job is excluded (gorm scopes deleted_at IS NULL). A miss returns ErrJobNotFound so the caller can map it to a 404.

func ListJobs added in v0.2.0

func ListJobs(ctx context.Context, db *gorm.DB, p ListJobsParams) ([]JobView, error)

ListJobs returns jobs newest-first (created_at desc, id desc), reading through db and optionally filtered by exact state and kind. Soft-deleted jobs are excluded. It is the inspection seam behind a "list jobs" CLI or dashboard.

type JobsOverview

type JobsOverview struct {
	CountsByState map[string]int `json:"counts_by_state"`
	Total         int            `json:"total"`
}

JobsOverview is the aggregate job-state report: a count per state plus the total across all states in scope.

func Overview

func Overview(ctx context.Context, db *gorm.DB, p OverviewParams) (JobsOverview, error)

Overview returns the job count grouped by state, optionally scoped to a single kind, reading through db. Soft-deleted jobs are excluded.

type ListJobsParams added in v0.2.0

type ListJobsParams struct {
	State string
	Kind  string
	Limit int
}

ListJobsParams filters and pages a ListJobs query. State and Kind, when set, are exact-match filters; Limit caps the page (default 50).

type ListRunsParams

type ListRunsParams struct {
	Before time.Time
	Limit  int
}

ListRunsParams configures a ListRuns page. Before is a created_at cursor (zero means newest); Limit caps the rows returned. A host that wants a has-more sentinel passes Limit+1 and trims the extra row itself.

type Node added in v0.2.0

type Node struct {
	// contains filtered or unexported fields
}

Node is a self-contained job-runtime process: it owns its runners, an optional scheduler, and an optional health server, starts them together, and drains cleanly when its context is cancelled. It turns the ~100 lines of lifecycle boilerplate every host re-implements into a single Run call.

func NewNode added in v0.2.0

func NewNode(cfg NodeConfig) (*Node, error)

NewNode validates cfg and constructs the Node. Each runner is built through NewRunner (so the SQLite concurrency-1 guard and every zero-value default still apply) and the scheduler, when configured, through NewSchedulerWithConfig.

Example

ExampleNewNode wires a complete job-runtime daemon — a runner, the periodic scheduler, and a health server — and runs it until SIGINT/SIGTERM. This is the whole daemon: define workers, register them, and let the Node own the runner/scheduler/health/drain lifecycle.

package main

import (
	"context"
	"log/slog"
	"os"
	"os/signal"
	"syscall"

	"github.com/glebarez/sqlite"
	flywheel "github.com/mrz1836/go-flywheel"
	"gorm.io/gorm"
)

// EmailArgs is a job's typed arguments. Its Kind method names the worker that
// handles it, and the struct is JSON-serialized as the job payload.
type EmailArgs struct {
	To      string `json:"to"`
	Subject string `json:"subject"`
}

// Kind names the worker for these args.
func (EmailArgs) Kind() string { return "send_email" }

// EmailWorker handles EmailArgs jobs.
type EmailWorker struct{}

// Kind is the stable worker name, matching EmailArgs.Kind.
func (EmailWorker) Kind() string { return "send_email" }

// Work runs one job. It receives the decoded args and a logger pre-tagged with
// the job id, kind, and run id.
func (EmailWorker) Work(ctx context.Context, job *flywheel.Job[EmailArgs]) (flywheel.Result, error) {
	job.Logger.InfoContext(ctx, "sending email", "to", job.Args.To, "subject", job.Args.Subject)
	return flywheel.Result{}, nil
}

func main() {
	db, _ := gorm.Open(sqlite.Open("flywheel.db"), &gorm.Config{})
	_ = flywheel.Migrate(db)

	reg := flywheel.NewRegistry()
	flywheel.Register(reg, EmailWorker{})

	node, err := flywheel.NewNode(flywheel.NodeConfig{
		Runners: []flywheel.RunnerConfig{{
			DB:       db,
			Driver:   flywheel.NewSQLiteDriver(db),
			Registry: reg,
			Queues:   []string{"default", "periodic"},
			// SQLite is single-writer: keep Concurrency at 1 and claim every class.
			Concurrency:   1,
			ClaimAnyClass: true,
		}},
		Scheduler: &flywheel.SchedulerConfig{DB: db, Client: flywheel.NewClient(db)},
		Health:    flywheel.HealthConfig{Addr: ":8080"},
		Logger:    slog.New(slog.NewJSONHandler(os.Stdout, nil)),
	})
	if err != nil {
		panic(err)
	}

	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer stop()
	_ = node.Run(ctx)
}

func (*Node) Run added in v0.2.0

func (n *Node) Run(ctx context.Context) error

Run starts every component and blocks until ctx is cancelled and all components have drained (lease-bounded, or DrainTimeout if it is shorter). It returns the first non-cancellation error any component reported, or nil on a clean drain. Signal handling stays with the caller: pass a context from signal.NotifyContext.

type NodeConfig added in v0.2.0

type NodeConfig struct {
	// Runners are the dispatch loops this Node hosts. Each may claim a different
	// set of queues at a different concurrency for a different executor class. At
	// least one is required.
	Runners []RunnerConfig
	// Scheduler, when non-nil, runs periodic ticks plus the stuck-lease sweep.
	// Leave it nil on a pure worker node where another process owns scheduling.
	Scheduler *SchedulerConfig
	// Health configures the optional liveness/readiness server.
	Health HealthConfig
	// Logger logs the Node's own lifecycle events. Optional; defaults to
	// slog.Default(). It does not override a RunnerConfig.Logger.
	Logger *slog.Logger
	// DrainTimeout bounds how long Run waits for in-flight work after ctx is
	// cancelled before returning regardless. Zero waits for the natural,
	// lease-bounded drain.
	DrainTimeout time.Duration
}

NodeConfig declares everything a Node runs: one or more runners, an optional scheduler, and an optional health server. It is the one-call replacement for a hand-wired runner + scheduler + health-server + signal-drain main().

type Observer added in v0.2.0

type Observer interface {
	// OnClaim fires once per non-empty claimed batch, after Dequeue and before
	// any dispatch.
	OnClaim(ctx context.Context, ev ClaimEvent)
	// OnStart fires immediately before a worker's Work runs.
	OnStart(ctx context.Context, ev JobEvent)
	// OnFinish fires after each attempt is decided, for every terminal-or-retry
	// outcome.
	OnFinish(ctx context.Context, ev FinishEvent)
	// OnRetry fires when an attempt is scheduled for another try — a subset of
	// OnFinish — so a metric can count retries without re-deriving the state
	// machine.
	OnRetry(ctx context.Context, ev RetryEvent)
}

Observer is the optional lifecycle hook the Runner invokes around each attempt. It is the dependency-free telemetry seam: the core never imports a metrics or tracing library — a consumer implements Observer against their own stack (OpenTelemetry, Prometheus, statsd, slog) and wires it via RunnerConfig.Observer.

Every method is called synchronously on the dispatch path and must not block; an implementation that needs to do I/O should buffer and return immediately. All methods receive the worker ctx, so a tracing implementation can pull the active span and RequestIDFrom(ctx) without extra plumbing.

OnStart fires only for a registered kind; a job whose kind has no worker goes straight to OnFinish (with a permanent error) and never OnStart.

type OverviewParams

type OverviewParams struct {
	Kind string
}

OverviewParams configures an Overview query. Kind, when non-empty, scopes the counts to a single job kind.

type PeriodicSpec added in v0.2.0

type PeriodicSpec struct {
	// Slug is the stable identity of the schedule; re-upserting the same slug
	// updates the existing definition rather than creating a duplicate.
	Slug string
	// Kind is the worker kind enqueued on each fire.
	Kind string
	// Queue is the queue the enqueued jobs land on. Empty defaults to "periodic".
	Queue string
	// ArgsTemplate is the JSON args payload for each enqueued job. Empty defaults
	// to an empty object.
	ArgsTemplate []byte
	// Cron is a standard 5-field cron expression. Mutually exclusive with Every.
	Cron string
	// Every is a fixed interval between fires. Mutually exclusive with Cron.
	Every time.Duration
	// Active toggles the definition. An inactive definition is preserved but never
	// fires.
	Active bool
}

PeriodicSpec declares a periodic (cron or fixed-interval) job. It is the exported, host-facing form of a job_periodics row: UpsertPeriodic reconciles it by slug, and the Scheduler fires the matching kind on each due tick. Exactly one of Cron or Every must be set.

type PeriodicView added in v0.2.0

type PeriodicView struct {
	Slug            string     `json:"slug"`
	Kind            string     `json:"kind"`
	Queue           string     `json:"queue"`
	Cron            string     `json:"cron,omitempty"`
	IntervalSeconds int        `json:"interval_seconds,omitempty"`
	NextRunAt       time.Time  `json:"next_run_at"`
	LastEnqueuedAt  *time.Time `json:"last_enqueued_at,omitempty"`
	Active          bool       `json:"active"`
}

PeriodicView is the public read projection of a periodic definition.

func ListPeriodics added in v0.2.0

func ListPeriodics(ctx context.Context, db *gorm.DB) ([]PeriodicView, error)

ListPeriodics returns every periodic definition, ordered by slug.

type QueueHealth added in v0.4.0

type QueueHealth struct {
	// CountsByState is the job count per state, soft-deleted jobs excluded (the
	// same scope as Overview). Every present state maps to its count; absent
	// states are zero.
	CountsByState map[string]int64 `json:"counts_by_state"`
	// Ready is the number of jobs claimable right now: a claimable state
	// (available, retryable, scheduled) whose scheduled_at is at or before the
	// sample instant.
	Ready int64 `json:"ready"`
	// InFlight is the number of running jobs (claimed, not yet finalized).
	InFlight int64 `json:"inflight"`
	// ScheduledAhead is the number of claimable jobs not yet due (scheduled_at in
	// the future) — work that is waiting on the clock, not on a runner.
	ScheduledAhead int64 `json:"scheduled_ahead"`
	// OldestReadyAge is the lag: how long the oldest ready job has been claimable
	// (sample instant minus its scheduled_at). It is zero when nothing is ready.
	// A growing OldestReadyAge is the canonical "the runners are falling behind"
	// signal.
	OldestReadyAge time.Duration `json:"oldest_ready_age"`
	// SampledAt is the clock instant the snapshot was taken (from the context
	// Clock), so a caller can reason about staleness deterministically.
	SampledAt time.Time `json:"sampled_at"`
}

QueueHealth is a point-in-time gauge snapshot of the queue: how much work is piling up and how far behind the oldest claimable job has fallen. Where the Observer seam reports per-attempt flows (counters, durations), QueueHealth is the depth/lag view an operator scrapes to answer "is work starving?". It is read by SampleQueueHealth, surfaced by the Scheduler heartbeat, the `/metrics` gauges, and `flywheel status`.

func SampleQueueHealth added in v0.4.0

func SampleQueueHealth(ctx context.Context, db *gorm.DB) (QueueHealth, error)

SampleQueueHealth reads a QueueHealth gauge snapshot through db. "Now" comes from the context Clock (ClockFrom), so a test drives it deterministically with a FixedClock and production uses the wall clock.

It runs four index-backed reads, none of which mutate: a GROUP BY state count (soft-deleted excluded), a ready count and a scheduled-ahead count over the claimable states (both served by the jobs_ready partial index), and an ordered single-row read for the oldest ready scheduled_at. The lag is read by ordering rather than MIN(scheduled_at) because SQLite returns a bare aggregate as text and drops the column's datetime affinity, which would fail the time scan; an ordered read of the real typed column round-trips on both dialects.

The snapshot is a sample, not a transaction: the reads are not serialized against concurrent claims, so a busy queue's numbers may not be mutually consistent to the single job. That is the right trade for a gauge an operator scrapes infrequently — correctness of each number, not a frozen global view.

type RawJob

type RawJob struct {
	ID          string
	Kind        string
	Queue       string
	Args        []byte
	Attempt     int
	MaxAttempts int
	// TimeoutMs, when non-nil, is this job's per-job execution timeout in
	// milliseconds, applied by the Runner around the worker call.
	TimeoutMs   *int
	ParentJobID *string
	Tags        []string
	ScheduledAt time.Time
	// Metadata is the raw jobs.metadata JSON blob. The Runner uses it to
	// thread request_id through to the worker's ctx and slog attrs; workers
	// generally read the value via [RequestIDFrom] rather than parsing it.
	Metadata []byte
}

RawJob is a claimed jobs row as the Driver returns it, before the Runner binds it into a typed Job[A].

type RecentFailuresParams added in v0.4.0

type RecentFailuresParams struct {
	Since time.Time
	Limit int
}

RecentFailuresParams scopes a RecentFailures query. Since bounds the window — only jobs finalized at or after it are returned; a zero Since means no lower bound. Limit caps the rows (default 20 when not positive).

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry maps a job kind to its registered worker. It is safe for concurrent reads after registration.

func NewRegistry

func NewRegistry() *Registry

NewRegistry returns an empty Registry.

type Result

type Result struct {
	// Output is the worker's structured output, stored on the JobRun.
	Output any
	// Snooze, when non-nil, reschedules the job for the given delay without
	// consuming an attempt.
	Snooze *time.Duration
	// Cancel, when true, moves the job to a terminal cancelled state.
	Cancel bool
	// FollowUps are child jobs enqueued atomically with finalization.
	FollowUps []FollowUp
	// CostMicros is the accumulated external-call cost for this attempt.
	CostMicros int64
	// SourceFetchIDs records side-effect source fetches this attempt produced.
	SourceFetchIDs []string
}

Result is what a worker returns on success.

type RetryEvent added in v0.2.0

type RetryEvent struct {
	JobEvent
	// NextAttempt is the attempt number the retry will run as.
	NextAttempt int
	// Delay is the backoff before the retry becomes claimable.
	Delay time.Duration
	// ErrorClass is the failure classification that triggered the retry.
	ErrorClass ErrorClass
}

RetryEvent reports an attempt that has been scheduled to retry.

type Retryable

type Retryable interface {
	NextRetry(err error, attempt int) time.Duration
}

Retryable is an optional worker interface. When implemented, the Runner uses NextRetry to compute the backoff delay instead of the default schedule.

type RunOutcome

type RunOutcome string

RunOutcome is the outcome of a single job attempt. Its string values are the stable wire form persisted on job_runs rows.

const (
	OutcomeStarted   RunOutcome = "started"
	OutcomeSuccess   RunOutcome = "success"
	OutcomeError     RunOutcome = "error"
	OutcomeSnooze    RunOutcome = "snooze"
	OutcomeCancelled RunOutcome = "cancelled"
	OutcomeTimeout   RunOutcome = "timeout"
	OutcomeCrashed   RunOutcome = "crashed"
)

Recognized RunOutcome values.

func (RunOutcome) Valid

func (o RunOutcome) Valid() bool

Valid reports whether o is a recognized RunOutcome.

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner claims jobs from a Driver and dispatches them to registered workers.

func NewRunner

func NewRunner(cfg RunnerConfig) (*Runner, error)

NewRunner validates cfg and returns a Runner. It returns ErrSQLiteConcurrency when a SQLite driver is wired with Concurrency greater than 1 (FR-039).

func (*Runner) Run

func (r *Runner) Run(ctx context.Context) error

Run drives the dispatch loop until ctx is cancelled.

func (*Runner) RunUntilIdle

func (r *Runner) RunUntilIdle(ctx context.Context) error

RunUntilIdle drives the dispatch loop until every job has reached a terminal state, then returns. It is the deterministic test driver.

type RunnerConfig

type RunnerConfig struct {
	// DB is the database the Runner reads queue state from (RunUntilIdle).
	DB *gorm.DB
	// Driver claims and finalizes jobs.
	Driver Driver
	// Registry maps job kinds to workers.
	Registry *Registry
	// Queues are the logical queues this Runner claims from.
	Queues []string
	// ExecutorClass is the routing label this Runner serves: it claims jobs whose
	// executor_class equals it (or is the empty wildcard) unless ClaimAnyClass is
	// set, and stamps it on every job_runs row this Runner writes.
	ExecutorClass ExecutorClass
	// ClaimAnyClass, when true, makes this Runner claim jobs of every executor
	// class, not only its own class and the wildcard. A single-node local
	// deployment typically sets it so one Runner drains the whole queue.
	ClaimAnyClass bool
	// LeaseDuration is the visibility timeout on a claimed job.
	LeaseDuration time.Duration
	// PollInterval is the pause between empty polls.
	PollInterval time.Duration
	// Concurrency is the number of jobs claimed and run per poll. A SQLite
	// driver requires 1.
	Concurrency int
	// RetryBackoffBase is the base delay for the exponential retry backoff.
	// Optional; defaults to one second.
	RetryBackoffBase time.Duration
	// DefaultTimeout, when > 0, is the execution ceiling applied to every attempt
	// that specifies no timeout of its own (per-job InsertOpts.Timeout or per-kind
	// Timeouter). Optional; zero means no default timeout.
	DefaultTimeout time.Duration
	// Observer, when set, receives lifecycle events (claim/start/finish/retry) for
	// metrics or tracing. Optional; a nil Observer installs an internal no-op.
	Observer Observer
	// Logger is the base logger bound onto each Job. Optional.
	Logger *slog.Logger
}

RunnerConfig configures a Runner.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler enqueues jobs from periodic definitions and reclaims stuck jobs.

func NewScheduler

func NewScheduler(db *gorm.DB, client *Client) *Scheduler

NewScheduler returns a Scheduler over db and the producer client with the default cadence and backfill cap.

func NewSchedulerWithConfig added in v0.2.0

func NewSchedulerWithConfig(cfg SchedulerConfig) *Scheduler

NewSchedulerWithConfig returns a Scheduler from cfg, applying the cadence and backfill defaults for any field left zero.

func (*Scheduler) PruneRetention added in v0.3.0

func (s *Scheduler) PruneRetention(ctx context.Context) (int64, error)

PruneRetention hard-deletes terminal jobs (and their job_runs) finalized longer ago than RetentionMaxAge, reporting how many jobs were removed. It is a no-op returning (0, nil) when retention is disabled, so calling it on a retention-less Scheduler can never delete anything.

func (*Scheduler) Run

func (s *Scheduler) Run(ctx context.Context) error

Run ticks periodic definitions and runs the stuck-lease sweep until ctx is cancelled. The sweep runs on a 30-second cadence (FR-030). When retention is enabled (RetentionMaxAge > 0) it also runs a retention sweep on its own cadence; otherwise no retention ticker is armed.

func (*Scheduler) SampleHealth added in v0.4.0

func (s *Scheduler) SampleHealth(ctx context.Context) (QueueHealth, error)

SampleHealth reads a QueueHealth gauge snapshot through the Scheduler's database. It parallels Tick, Sweep, and PruneRetention so the heartbeat cadence is testable directly, and lets a host reuse the Scheduler's db handle as the sampler behind a `/metrics` endpoint.

func (*Scheduler) Sweep

func (s *Scheduler) Sweep(ctx context.Context) (int, error)

Sweep reclaims jobs whose lease has expired (FR-030, FR-031).

func (*Scheduler) Tick

func (s *Scheduler) Tick(ctx context.Context) (int, error)

Tick processes every due, active periodic definition once and reports how many jobs were enqueued.

type SchedulerConfig added in v0.2.0

type SchedulerConfig struct {
	// DB is the database the Scheduler reads periodic definitions from and runs
	// the stuck-lease sweep against.
	DB *gorm.DB
	// Client is the producer the Scheduler enqueues periodic jobs through.
	Client *Client
	// Logger logs tick and sweep failures. Optional; defaults to slog.Default().
	Logger *slog.Logger
	// BackfillCap bounds how many missed buckets a single due definition
	// enqueues on catch-up. Optional; defaults to 10.
	BackfillCap int
	// TickInterval is the cadence at which due periodic definitions are checked.
	// Optional; defaults to one second.
	TickInterval time.Duration
	// SweepInterval is the cadence of the stuck-lease reclaim sweep. Optional;
	// defaults to 30 seconds.
	SweepInterval time.Duration
	// RetentionMaxAge enables the retention sweep: terminal jobs (and their
	// job_runs) finalized longer ago than this are hard-deleted. Zero (the
	// default) disables retention entirely — no surprise deletes for an embedded
	// consumer that never asked for them.
	RetentionMaxAge time.Duration
	// RetentionInterval is the cadence of the retention sweep. It applies only
	// when RetentionMaxAge is set; left zero, it defaults to one hour.
	RetentionInterval time.Duration
	// HealthSampleInterval enables the queue-health heartbeat: when > 0 the
	// Scheduler samples QueueHealth on this cadence and logs a one-line pulse
	// (ready, in-flight, oldest-ready lag, discarded). Zero (the default) disables
	// it — no surprise log output for an embedded consumer that never asked for a
	// heartbeat; a `/metrics` scrape samples fresh regardless.
	HealthSampleInterval time.Duration
}

SchedulerConfig configures a Scheduler. Only DB and Client are required; the cadence and backfill knobs default when left zero. It is the config form a Node composes; NewScheduler is the two-argument shorthand for the common case.

type Timeouter added in v0.2.0

type Timeouter interface {
	Timeout() time.Duration
}

Timeouter is an optional worker interface. When implemented, the Runner cancels the worker's ctx after the returned duration, turning a hung attempt into a context.DeadlineExceeded that retries via the normal backoff and records a timeout outcome. A zero or negative duration means no per-kind timeout; an InsertOpts.Timeout overrides it. A worker that ignores ctx cancellation still runs to completion — the lease sweep remains the ultimate backstop.

type ValidationError

type ValidationError struct {
	Field   string
	Message string
}

ValidationError is a single field's validation failure raised by a row's lifecycle hook (BeforeCreate/BeforeSave). It unwraps to ErrValidation.

func (*ValidationError) Error

func (e *ValidationError) Error() string

Error renders the field and message.

func (*ValidationError) Unwrap

func (e *ValidationError) Unwrap() error

Unwrap exposes ErrValidation for errors.Is.

type Worker

type Worker[A Args] interface {
	// Kind is the stable worker name, persisted as jobs.kind.
	Kind() string
	// Work runs the job. It executes outside any transaction.
	Work(ctx context.Context, job *Job[A]) (Result, error)
}

Worker is the interface domain authors implement. The runtime ships the interface; implementations arrive in later phases.

Directories

Path Synopsis
cmd
flywheel command
Command flywheel is the local daemon and operator CLI for the go-flywheel job runtime.
Command flywheel is the local daemon and operator CLI for the go-flywheel job runtime.
flywheel/internal/update
Package update implements flywheel's self-update: a cached "new version available" check against the GitHub Releases API, a TTY-aware banner, and a checksum-verified self-replace of the running binary.
Package update implements flywheel's self-update: a cached "new version available" check against the GitHub Releases API, a TTY-aware banner, and a checksum-verified self-replace of the running binary.
flywheel/internal/version
Package version provides semantic version comparison for the self-updater.
Package version provides semantic version comparison for the self-updater.
Package config holds the job-runtime configuration that travels with the flywheel runtime.
Package config holds the job-runtime configuration that travels with the flywheel runtime.
examples
exec-cron command
Command exec-cron shows flywheel replacing cron.
Command exec-cron shows flywheel replacing cron.
local-tasks command
Command local-tasks shows flywheel running local developer tasks durably: a shell script, a Python script, and a magex/mage build target — each registered as a typed worker, enqueued once immediately, and (for the shell task) scheduled to repeat.
Command local-tasks shows flywheel running local developer tasks durably: a shell script, a Python script, and a magex/mage build target — each registered as a typed worker, enqueued once immediately, and (for the shell task) scheduled to repeat.
sqlite-quickstart command
Command sqlite-quickstart is a minimal flywheel daemon over a local SQLite file.
Command sqlite-quickstart is a minimal flywheel daemon over a local SQLite file.
Package observers provides ready-made, dependency-free implementations of flywheel.Observer — the telemetry seam the Runner invokes around each attempt — so a consumer gets metrics, structured logs, and a Prometheus endpoint without hand-rolling and wiring an adapter first.
Package observers provides ready-made, dependency-free implementations of flywheel.Observer — the telemetry seam the Runner invokes around each attempt — so a consumer gets metrics, structured logs, and a Prometheus endpoint without hand-rolling and wiring an adapter first.
Package workers provides ready-made flywheel workers for the cases that need no custom Go:
Package workers provides ready-made flywheel workers for the cases that need no custom Go:

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL