sql

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package sql provides a bun-based SQL storage implementation for gqs.

This package implements gqs interfaces (Pusher, Puller, Observer, Cleaner) using a relational database via github.com/uptrace/bun.

Overview

The SQL backend provides:

  • durable persistence of jobs
  • atomic state transitions
  • visibility timeout (lease) semantics
  • retry-safe Pull using UPDATE ... RETURNING

It is compatible with SQLite, PostgreSQL and other bun-supported dialects, subject to their transactional guarantees.

Concurrency Model

Pull operations are implemented using a single atomic UPDATE statement with a subquery to avoid race conditions between selection and state transition.

Correct behavior under high concurrency depends on:

  • proper indexing
  • database isolation guarantees
  • write contention characteristics of the chosen backend

SQLite users are strongly encouraged to enable WAL mode and configure an appropriate busy_timeout.

Schema

The backend expects a "jobs" table corresponding to jobModel. InitDB (or MustInitDB) creates:

  • the jobs table (if not exists)
  • index (status, next_run_at)
  • index (status, locked_until)
  • index (status, updated_at)

These indexes are required for efficient Pull and Clean operations.

InitDB is idempotent and runs inside a transaction. It does not perform destructive migrations. Schema evolution must be handled externally.

Database Lifecycle

This package does not manage connection pooling, migrations, or database lifecycle.

The caller is responsible for:

  • creating and configuring *bun.DB
  • connection limits
  • WAL/busy_timeout configuration (for SQLite)
  • running InitDB before use

Limitations

The SQL backend uses status + timestamp fields to implement lease semantics. It does not use lease tokens or optimistic locking versions.

Exactly-once processing is not guaranteed. Delivery semantics remain at-least-once.

Summary

Package sql provides a pragmatic, storage-backed implementation of gqs suitable for embedded (SQLite) and server-grade (PostgreSQL) deployments, while keeping queue logic storage-agnostic.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InitDB

func InitDB(ctx context.Context, db *bun.DB) error

InitDB initializes the database schema required by the SQL backend.

It creates the jobs table and required indexes inside a single transaction. If any step fails, the transaction is rolled back.

InitDB is idempotent and may be safely called multiple times. It does not drop or modify existing tables beyond creating missing objects.

The caller is responsible for providing a properly configured *bun.DB.

func MustInitDB

func MustInitDB(ctx context.Context, db *bun.DB)

MustInitDB behaves like InitDB but panics if initialization fails.

This helper is intended for application bootstrap code where failure to initialize schema is considered unrecoverable.

Types

type Cleaner

type Cleaner struct {
	// contains filtered or unexported fields
}

Cleaner implements gqs.Cleaner using a SQL backend.

Cleaner permanently removes terminal jobs from storage. It is intended for retention management and administrative cleanup.

This implementation deletes rows directly from the jobs table and does not participate in visibility timeout or processing logic.

func NewCleaner

func NewCleaner(db *bun.DB) *Cleaner

NewCleaner creates a new SQL-backed Cleaner.

The provided *bun.DB must be properly configured and connected. Schema initialization must be completed before using Cleaner.

func (*Cleaner) Clean

func (c *Cleaner) Clean(ctx context.Context, status job.Status, before *time.Time) (int64, error)

Clean deletes jobs matching the provided status and time filter.

Only terminal states are allowed:

  • job.Done
  • job.Dead

If status is job.Unknown (zero value), both Done and Dead jobs are eligible for deletion.

If status refers to a non-terminal state (such as Pending or Processing), ErrBadStatus is returned.

If before is non-nil, only jobs with updated_at <= *before are deleted. If before is nil, no time-based filtering is applied.

Clean returns the number of deleted rows.

Clean does not attempt to lock or coordinate with running workers. Deleting Processing jobs is explicitly disallowed by status checks.

type Observer

type Observer struct {
	// contains filtered or unexported fields
}

Observer implements gqs.Observer using a SQL backend.

Observer provides read-only access to job state stored in the database. It does not participate in visibility timeout handling or state transitions and must not modify job records.

Returned Job values represent authoritative snapshots of storage state at the time of the query.

func NewObserver

func NewObserver(db *bun.DB) *Observer

NewObserver creates a new SQL-backed Observer.

The provided *bun.DB must be properly configured and connected. Schema initialization must be completed before using Observer.

func (*Observer) Get

func (o *Observer) Get(ctx context.Context, id uuid.UUID) (*job.Job, error)

Get retrieves a job by its identifier.

If no job with the given id exists, Get returns (nil, nil).

The returned Job is a snapshot of the current database state. Modifying the returned value does not affect storage.

Get performs a simple SELECT query and does not apply any locking or transactional semantics beyond what the underlying database provides.

func (*Observer) List

func (o *Observer) List(ctx context.Context, status job.Status, limit int) ([]*job.Job, error)

List returns up to limit jobs filtered by status.

If status is job.Unknown (zero value), no status filter is applied.

If limit is zero or negative, no LIMIT clause is added and all matching rows may be returned.

The returned slice contains independent Job snapshots. Mutating them does not affect the underlying storage.

List is intended for administrative or diagnostic use and should not be used as part of normal job consumption logic.

type Puller

type Puller struct {
	// contains filtered or unexported fields
}

Puller implements gqs.Puller using a SQL backend.

Puller performs atomic state transitions using UPDATE ... RETURNING semantics to ensure safe concurrent access across multiple workers.

The implementation assumes:

  • durable writes
  • transactional guarantees provided by the underlying database
  • correct indexing of status and scheduling columns

Puller enforces visibility timeout semantics using the locked_until column.

func NewPuller

func NewPuller(db *bun.DB) *Puller

NewPuller creates a new SQL-backed Puller.

The provided *bun.DB must be properly configured and connected. Schema initialization must be completed before using Puller.

func (*Puller) Complete

func (p *Puller) Complete(ctx context.Context, jb *job.Job) error

Complete transitions a Processing job to Done state.

The job must currently be in Processing state. If the update affects no rows, ErrCompleteFailed is returned.

Complete clears locked_until and updates updated_at.

func (*Puller) ExtendLock

func (p *Puller) ExtendLock(ctx context.Context, jb *job.Job, lock time.Duration) error

ExtendLock extends the visibility timeout of a Processing job.

The job must currently be in Processing state. If no rows are affected, ErrLockLost is returned.

ExtendLock updates locked_until and updated_at.

This method does not guarantee exclusive ownership; it only ensures the row was still Processing at update time.

func (*Puller) Kill

func (p *Puller) Kill(ctx context.Context, jb *job.Job) error

Kill transitions a job to Dead state.

The job must be in Pending or Processing state. locked_until is cleared. updated_at is refreshed.

If the update affects no rows, ErrJobLost is returned.

Kill is typically used when retry limits are exceeded.

func (*Puller) Pull

func (p *Puller) Pull(ctx context.Context, batch int, lock time.Duration) ([]*job.Job, error)

Pull selects up to batch eligible jobs and transitions them to Processing state atomically.

A job is eligible if:

  • next_run_at <= now
  • status = Pending OR
  • status = Processing AND locked_until < now

Eligible jobs are transitioned to Processing, attempts are incremented, locked_until is set to now + lock, updated_at is refreshed.

Pull returns the updated job snapshots.

Pull relies on a single UPDATE ... WHERE id IN (subquery) statement with RETURNING to avoid race conditions between selection and state transition.

func (*Puller) Return

func (p *Puller) Return(ctx context.Context, jb *job.Job, backoff time.Duration) error

Return reschedules a Processing job back to Pending state.

next_run_at is set to now + backoff. locked_until is cleared. updated_at is refreshed.

If the update affects no rows, ErrJobLost is returned.

Return is typically used after handler failure when retry attempts to remain.

type Pusher

type Pusher struct {
	// contains filtered or unexported fields
}

Pusher implements gqs.Pusher using a SQL backend.

Pusher inserts new jobs into storage in the Pending state. It does not perform any deduplication or idempotency checks. The caller is responsible for ensuring that message identifiers are unique if required.

func NewPusher

func NewPusher(db *bun.DB) *Pusher

NewPusher creates a new SQL-backed Pusher.

The provided *bun.DB must be properly configured and connected. Schema initialization must be completed before pushing jobs.

func (*Pusher) Push

func (p *Pusher) Push(ctx context.Context, msg *message.Message, delay time.Duration) error

Push inserts a new message into storage.

The message is scheduled for execution after the specified delay. Internally, delay determines the initial NextRunAt timestamp.

Push does not modify the provided message after insertion. If insertion fails, no job is created.

Push respects the provided context for cancellation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL