Documentation
¶
Overview ¶
Package sql provides a bun-based SQL storage implementation for gqs.
This package implements gqs interfaces (Pusher, Puller, Observer, Cleaner) using a relational database via github.com/uptrace/bun.
Overview ¶
The SQL backend provides:
- durable persistence of jobs
- atomic state transitions
- visibility timeout (lease) semantics
- retry-safe Pull using UPDATE ... RETURNING
It is compatible with SQLite, PostgreSQL and other bun-supported dialects, subject to their transactional guarantees.
Concurrency Model ¶
Pull operations are implemented using a single atomic UPDATE statement with a subquery to avoid race conditions between selection and state transition.
Correct behavior under high concurrency depends on:
- proper indexing
- database isolation guarantees
- write contention characteristics of the chosen backend
SQLite users are strongly encouraged to enable WAL mode and configure an appropriate busy_timeout.
Schema ¶
The backend expects a "jobs" table corresponding to jobModel. InitDB (or MustInitDB) creates:
- the jobs table (if not exists)
- index (status, next_run_at)
- index (status, locked_until)
- index (status, updated_at)
These indexes are required for efficient Pull and Clean operations.
InitDB is idempotent and runs inside a transaction. It does not perform destructive migrations. Schema evolution must be handled externally.
Database Lifecycle ¶
This package does not manage connection pooling, migrations, or database lifecycle.
The caller is responsible for:
- creating and configuring *bun.DB
- connection limits
- WAL/busy_timeout configuration (for SQLite)
- running InitDB before use
Limitations ¶
The SQL backend uses status + timestamp fields to implement lease semantics. It does not use lease tokens or optimistic locking versions.
Exactly-once processing is not guaranteed. Delivery semantics remain at-least-once.
Summary ¶
Package sql provides a pragmatic, storage-backed implementation of gqs suitable for embedded (SQLite) and server-grade (PostgreSQL) deployments, while keeping queue logic storage-agnostic.
Index ¶
- func InitDB(ctx context.Context, db *bun.DB) error
- func MustInitDB(ctx context.Context, db *bun.DB)
- type Cleaner
- type Observer
- type Puller
- func (p *Puller) Complete(ctx context.Context, jb *job.Job) error
- func (p *Puller) ExtendLock(ctx context.Context, jb *job.Job, lock time.Duration) error
- func (p *Puller) Kill(ctx context.Context, jb *job.Job) error
- func (p *Puller) Pull(ctx context.Context, batch int, lock time.Duration) ([]*job.Job, error)
- func (p *Puller) Return(ctx context.Context, jb *job.Job, backoff time.Duration) error
- type Pusher
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func InitDB ¶
InitDB initializes the database schema required by the SQL backend.
It creates the jobs table and required indexes inside a single transaction. If any step fails, the transaction is rolled back.
InitDB is idempotent and may be safely called multiple times. It does not drop or modify existing tables beyond creating missing objects.
The caller is responsible for providing a properly configured *bun.DB.
Types ¶
type Cleaner ¶
type Cleaner struct {
// contains filtered or unexported fields
}
Cleaner implements gqs.Cleaner using a SQL backend.
Cleaner permanently removes terminal jobs from storage. It is intended for retention management and administrative cleanup.
This implementation deletes rows directly from the jobs table and does not participate in visibility timeout or processing logic.
func NewCleaner ¶
NewCleaner creates a new SQL-backed Cleaner.
The provided *bun.DB must be properly configured and connected. Schema initialization must be completed before using Cleaner.
func (*Cleaner) Clean ¶
Clean deletes jobs matching the provided status and time filter.
Only terminal states are allowed:
- job.Done
- job.Dead
If status is job.Unknown (zero value), both Done and Dead jobs are eligible for deletion.
If status refers to a non-terminal state (such as Pending or Processing), ErrBadStatus is returned.
If before is non-nil, only jobs with updated_at <= *before are deleted. If before is nil, no time-based filtering is applied.
Clean returns the number of deleted rows.
Clean does not attempt to lock or coordinate with running workers. Deleting Processing jobs is explicitly disallowed by status checks.
type Observer ¶
type Observer struct {
// contains filtered or unexported fields
}
Observer implements gqs.Observer using a SQL backend.
Observer provides read-only access to job state stored in the database. It does not participate in visibility timeout handling or state transitions and must not modify job records.
Returned Job values represent authoritative snapshots of storage state at the time of the query.
func NewObserver ¶
NewObserver creates a new SQL-backed Observer.
The provided *bun.DB must be properly configured and connected. Schema initialization must be completed before using Observer.
func (*Observer) Get ¶
Get retrieves a job by its identifier.
If no job with the given id exists, Get returns (nil, nil).
The returned Job is a snapshot of the current database state. Modifying the returned value does not affect storage.
Get performs a simple SELECT query and does not apply any locking or transactional semantics beyond what the underlying database provides.
func (*Observer) List ¶
List returns up to limit jobs filtered by status.
If status is job.Unknown (zero value), no status filter is applied.
If limit is zero or negative, no LIMIT clause is added and all matching rows may be returned.
The returned slice contains independent Job snapshots. Mutating them does not affect the underlying storage.
List is intended for administrative or diagnostic use and should not be used as part of normal job consumption logic.
type Puller ¶
type Puller struct {
// contains filtered or unexported fields
}
Puller implements gqs.Puller using a SQL backend.
Puller performs atomic state transitions using UPDATE ... RETURNING semantics to ensure safe concurrent access across multiple workers.
The implementation assumes:
- durable writes
- transactional guarantees provided by the underlying database
- correct indexing of status and scheduling columns
Puller enforces visibility timeout semantics using the locked_until column.
func NewPuller ¶
NewPuller creates a new SQL-backed Puller.
The provided *bun.DB must be properly configured and connected. Schema initialization must be completed before using Puller.
func (*Puller) Complete ¶
Complete transitions a Processing job to Done state.
The job must currently be in Processing state. If the update affects no rows, ErrCompleteFailed is returned.
Complete clears locked_until and updates updated_at.
func (*Puller) ExtendLock ¶
ExtendLock extends the visibility timeout of a Processing job.
The job must currently be in Processing state. If no rows are affected, ErrLockLost is returned.
ExtendLock updates locked_until and updated_at.
This method does not guarantee exclusive ownership; it only ensures the row was still Processing at update time.
func (*Puller) Kill ¶
Kill transitions a job to Dead state.
The job must be in Pending or Processing state. locked_until is cleared. updated_at is refreshed.
If the update affects no rows, ErrJobLost is returned.
Kill is typically used when retry limits are exceeded.
func (*Puller) Pull ¶
Pull selects up to batch eligible jobs and transitions them to Processing state atomically.
A job is eligible if:
- next_run_at <= now
- status = Pending OR
- status = Processing AND locked_until < now
Eligible jobs are transitioned to Processing, attempts are incremented, locked_until is set to now + lock, updated_at is refreshed.
Pull returns the updated job snapshots.
Pull relies on a single UPDATE ... WHERE id IN (subquery) statement with RETURNING to avoid race conditions between selection and state transition.
func (*Puller) Return ¶
Return reschedules a Processing job back to Pending state.
next_run_at is set to now + backoff. locked_until is cleared. updated_at is refreshed.
If the update affects no rows, ErrJobLost is returned.
Return is typically used after handler failure when retry attempts to remain.
type Pusher ¶
type Pusher struct {
// contains filtered or unexported fields
}
Pusher implements gqs.Pusher using a SQL backend.
Pusher inserts new jobs into storage in the Pending state. It does not perform any deduplication or idempotency checks. The caller is responsible for ensuring that message identifiers are unique if required.
func NewPusher ¶
NewPusher creates a new SQL-backed Pusher.
The provided *bun.DB must be properly configured and connected. Schema initialization must be completed before pushing jobs.
func (*Pusher) Push ¶
Push inserts a new message into storage.
The message is scheduled for execution after the specified delay. Internally, delay determines the initial NextRunAt timestamp.
Push does not modify the provided message after insertion. If insertion fails, no job is created.
Push respects the provided context for cancellation.