qwr

package module
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2026 License: MIT Imports: 23 Imported by: 1

README

qwr - Query Write Reader

A Go library for SQLite that provides serialised writes and concurrent reads with optional context support. qwr uses a worker pool pattern with a single worker to sequentially queue writes to SQLite. It supports a configurable reader with connections.

Quick Start

package main

import (
    "log"
    
    "github.com/jpl-au/qwr"
    "github.com/jpl-au/qwr/profile"
)

func main() {
    // Create manager with default options
    manager, err := qwr.New("test.db").
        Reader(profile.ReadBalanced()).
        Writer(profile.WriteBalanced()).
        Open()
    if err != nil {
        log.Fatal(err)
    }
    defer manager.Close()

    // Write bypassing queue
    result, err := manager.Query("INSERT INTO users (name) VALUES (?)", "Alice").Write()

    // Synchronous write
    result, err := manager.Query("INSERT INTO users (name) VALUES (?)", "Jane").Execute()
    if err != nil {
        log.Fatal(err)
    }

    // Asynchronous write
    jobID, err := manager.Query("INSERT INTO users (name) VALUES (?)", "Bob").Async()
    if err != nil {
        log.Fatal(err)
    }

    // Read data
    rows, err := manager.Query("SELECT * FROM users").Read()
    if err != nil {
        log.Fatal(err)
    }
    defer rows.Close()
}
Write Operations

Direct Write Bypasses the worker queue for immediate execution.

result, err := manager.Query("INSERT INTO users (name) VALUES (?)", "Alice").Write()

Synchronous Write Uses the worker pool to serialise the write, blocking until the write is complete.

result, err := manager.Query("INSERT INTO USERS (name) VALUES (?)", "Jane").Execute()

Async Write Asynchronous writes are non-blocking and are guarded by an error queue. The error queue attempts to retry transactions automatically with an exponential backoff + jitter. Certain errors are not recoverable.

jobID, err := manager.Query("INSERT INTO users (name) VALUES (?)", "Bob").Async()

Note: Async operations that fail are automatically added to the error queue. Since there's no immediate error return, you must check the error queue to detect failures:

if jobErr, found := manager.ErrorByID(jobID); found {
    log.Printf("Async job %d failed: %v", jobID, jobErr.Error())
}

Batch Write Batching creates a slice of queries for deferred processing. Writing occurs either at a timed interval, or once the queue depth reaches a pre-determined threshold. An experimental feature inlines common queries in the batch to optimise the write process.

// These will be automatically batched together
manager.Query("INSERT INTO users (name) VALUES (?)", "Charlie").Batch()
manager.Query("INSERT INTO users (name) VALUES (?)", "Diana").Batch()

Transactions Multi-statement atomic operations with pre-declared statements.

tx := manager.Transaction().
    Add("INSERT INTO users (name) VALUES (?)", "Eve").
    Add("UPDATE users SET active = ? WHERE name = ?", true, "Eve")

result, err := tx.Write() // or tx.Exec() for queued

Callback Transactions For interleaved reads and writes within a single transaction. The callback receives a *sql.Tx - qwr manages begin, commit, and rollback.

result, err := manager.TransactionFunc(func(tx *sql.Tx) (any, error) {
    var maxPos int
    tx.QueryRow("SELECT MAX(position) FROM items").Scan(&maxPos)

    _, err := tx.Exec("INSERT INTO items (position) VALUES (?)", maxPos+1)
    if err != nil {
        return nil, err
    }
    return maxPos + 1, nil
}).Exec() // or .Write() for direct execution
Read Operations

Read operations use the reader connection pool and can be executed concurrently:

Multiple Rows:

rows, err := manager.Query("SELECT * FROM users WHERE active = ?", true).Read()
if err != nil {
    log.Fatal(err)
}
defer rows.Close() // Must manually close

for rows.Next() {
    var user User
    if err := rows.Scan(&user.ID, &user.Name); err != nil {
        log.Fatal(err)
    }
    // process user...
}

Multiple Rows with Automatic Cleanup:

var users []User
err := manager.Query("SELECT * FROM users WHERE active = ?", true).ReadClose(func(rows *sql.Rows) error {
    for rows.Next() {
        var user User
        if err := rows.Scan(&user.ID, &user.Name); err != nil {
            return err
        }
        users = append(users, user)
    }
    return nil
})
// rows.Close() called automatically

Single Row:

row, err := manager.Query("SELECT name FROM users WHERE id = ?", 1).ReadRow()
if err != nil {
    log.Fatal(err)
}

var name string
if err := row.Scan(&name); err != nil {
    log.Fatal(err)
}
Prepared Statements

Prepared statements are cached automatically when enabled. This reduces preparation overhead for repeated queries:

result, err := manager.Query("INSERT INTO users (name) VALUES (?)", "Alice").
    Prepared().
    Write()
Context Support (Optional)

Contexts can be used for timeouts and cancellation but are not required. The library functions normally without any context usage:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Per-query context
result, err := manager.Query("SELECT * FROM users").
    WithContext(ctx).
    Read()

// Manager-level context (affects all operations)
manager, err := qwr.New("test.db").
    WithContext(ctx).
    Open()

Database Profiles

Database profiles configure connection pools and SQLite PRAGMA settings for different use cases. qwr includes several pre-configured profiles:

Read Profiles
  • profile.ReadLight() - Low resource usage (5 connections, 30MB cache)
  • profile.ReadBalanced() - General purpose (10 connections, 75MB cache)
  • profile.ReadHeavy() - High concurrency (25 connections, 150MB cache)
Write Profiles
  • profile.WriteLight() - Basic performance (50MB cache, 4KB pages)
  • profile.WriteBalanced() - Most applications (100MB cache, 8KB pages)
  • profile.WriteHeavy() - High volume (200MB cache, 8KB pages)
Attached Database Profiles

Use profile.Attached() to configure per-schema PRAGMAs for attached databases. Pool settings (MaxOpenConns, etc.) are ignored since attached databases share the main connection pool.

manager, err := qwr.New("main.db").
    Attach("analytics", "analytics.db", profile.Attached().
        WithJournalMode(profile.JournalWal).
        WithCacheSize(-30720)).
    Open()
Custom Profiles
customProfile := profile.New().
    WithMaxOpenConns(15).
    WithCacheSize(-102400). // 100MB
    WithJournalMode(profile.JournalWal).
    WithSynchronous(profile.SyncNormal).
    WithPageSize(8192)

manager, err := qwr.New("test.db").
    Reader(customProfile).
    Writer(profile.WriteBalanced()).
    Open()

Attached Databases

qwr supports SQLite's ATTACH DATABASE for working with multiple database files through a single manager. Attached databases share the main connection pool and write serialiser, enabling cross-database queries and atomic transactions.

Builder-Level Attach

Attach databases at construction time. An optional profile configures per-schema PRAGMAs for the attached database (pool settings are ignored since attached databases share the main pool).

manager, err := qwr.New("main.db").
    Reader(profile.ReadBalanced()).
    Writer(profile.WriteBalanced()).
    Attach("analytics", "analytics.db", profile.Attached().
        WithJournalMode(profile.JournalWal).
        WithCacheSize(-30720)).
    Attach("cache", "cache.db").
    Open()
Runtime Attach

Attach databases after the manager is opened. The writer gets immediate access. New reader connections pick up the attachment as the pool recycles. Call ResetReaderPool() for immediate reader access.

err := manager.Attach("logs", "logs.db")
if err != nil {
    log.Fatal(err)
}

// Optional: force all reader connections to see the new attachment immediately
manager.ResetReaderPool()
Cross-Database Queries

Reference attached databases using schema-qualified table names:

// Write to an attached database
manager.Query("INSERT INTO analytics.events (action) VALUES (?)", "login").Execute()

// Cross-database join
rows, err := manager.Query(`
    SELECT u.name, e.action
    FROM users u
    JOIN analytics.events e ON u.id = e.user_id
`).Read()

// Cross-database transaction (atomic across both databases)
manager.Transaction().
    Add("INSERT INTO audit.log (action) VALUES (?)", "transfer").
    Add("UPDATE analytics.counters SET total = total + 1").
    Write()
Per-Schema Maintenance

Vacuum and checkpoint operations accept an optional schema parameter:

// Checkpoint an attached database
manager.RunCheckpoint(checkpoint.Full, "analytics")

// Vacuum an attached database
manager.RunVacuum("analytics")
Important Notes
  • Always use schema-qualified names: Use analytics.events, not just events, for attached database tables. Unqualified names resolve to the main database.
  • No bare :memory:: Bare :memory: paths are rejected because each pooled connection would get its own isolated in-memory database. Use file::memory:?cache=shared for a shared in-memory attached database.
  • Relative paths need a file-backed main database: Relative attachment paths are resolved against the main database's directory. If the main database is :memory:, relative paths are rejected.
  • Prepared statement timing: Do not use .Prepared() for schema-qualified queries before the schema is attached. The preparation will fail on every call until the schema exists.
  • Parallel writes: Attached databases share a single serialised writer, which is correct for cross-database transactions. For parallel writes to independent databases, use separate Manager instances instead of ATTACH.
  • Not supported with NewSQL: Attach requires qwr-managed connections. When using NewSQL(), manage ATTACH DATABASE statements on your own connections.
  • Reserved aliases: main and temp are reserved by SQLite and cannot be used as attachment aliases.
  • SQLite limit: SQLite allows up to 10 attached databases by default (compile-time configurable up to 125).

Observing Events

qwr emits structured events for all significant operations. Subscribe to receive events for logging, metrics, tracing, or alerting:

// Subscribe to all events
manager.Subscribe(func(e qwr.Event) {
    fmt.Printf("event: %d at %v\n", e.Type, e.Timestamp)
})

// Subscribe to specific event types using filters
manager.SubscribeFiltered(func(e qwr.Event) {
    fmt.Printf("error: %v\n", e.Err)
}, qwr.ErrorEvents())

// Unsubscribe when no longer needed
id := manager.Subscribe(handler)
manager.Unsubscribe(id)

To capture events from the moment the manager opens (including EventManagerOpened), use WithObserver during construction:

manager, err := qwr.New("test.db").
    WithObserver(func(e qwr.Event) {
        log.Printf("qwr: %d", e.Type)
    }).
    Open()

Available filter constructors:

Filter Matches
JobEvents() JobQueued, JobStarted, JobCompleted, JobFailed
ErrorEvents() JobFailed, ErrorStored, ErrorPersisted, ErrorQueueOverflow, RetryExhausted, DirectWriteFailed
CacheEvents() CacheHit, CacheMiss, CacheEvicted, CachePrepError
RetryEvents() RetryScheduled, RetryStarted, RetryExhausted
WriteEvents() JobQueued, JobStarted, JobCompleted, JobFailed, DirectWriteCompleted, DirectWriteFailed
BatchEvents() BatchQueryAdded, BatchFlushed, BatchInlineOptimized, BatchSubmitted, BatchSubmitFailed
BackupEvents() BackupStarted, BackupCompleted, BackupFailed, BackupFallback
LifecycleEvents() ManagerOpened, ManagerClosing, ManagerClosed, WorkerStarted, WorkerStopped

Configuration Options

The Options struct controls various aspects of qwr's behaviour. All options have sensible defaults:

options := qwr.Options{
    // Worker configuration
    WorkerQueueDepth:  50000,         // Queue buffer size
    EnableReader:      true,          // Enable read operations
    EnableWriter:      true,          // Enable write operations

    // Batching
    BatchSize:         200,           // Queries per batch
    BatchTimeout:      1*time.Second, // Max batch wait time
    InlineInserts:     false,         // Experimental: combine INSERT statements

    // Context behaviour
    UseContexts:       false,         // Default context usage

    // Statement caching
    StmtCacheMaxSize:      1000,      // Max cached prepared statements
    UsePreparedStatements: false,     // Use prepared statements for all queries by default

    // Error handling
    ErrorQueueMaxSize: 1000,          // Max errors in memory
    ErrorLogPath:      "",            // Set via WithErrorDB() with an empty string disabling it
    EnableAutoRetry:   false,         // Automatic retry
    MaxRetries:        3,             // Max retry attempts
    BaseRetryDelay:    30*time.Second,// Initial retry delay

    // Timeouts
    JobTimeout:         30*time.Second, // Individual job timeout
    TransactionTimeout: 30*time.Second, // Transaction timeout
    RetrySubmitTimeout: 5*time.Second,  // Retry submission timeout
    QueueSubmitTimeout: 5*time.Minute,  // Timeout for context-free queue submissions
}

Error Handling & Retry

For asynchronous processing of queries, qwr provides error classifications to optimise retry strategies.

Enhanced Error Classification

qwr classifies errors:

  • Connection Errors: File I/O issues, permission problems → Linear backoff retry
  • Lock Errors: Database busy, locked → Exponential backoff retry
  • Constraint Violations: Unique key, foreign key, NOT NULL → No retry (permanent failure)
  • Schema Errors: Missing tables/columns, syntax errors → No retry (permanent failure)
  • Resource Errors: Disk full, out of memory → Linear backoff retry
  • Timeout Errors: Context timeouts, deadlines → Linear backoff retry
  • Permission Errors: Access denied, read-only → No retry (permanent failure)
  • Internal Errors: qwr-specific errors → No retry
  • Unknown Errors: Unclassified → No retry (safe default)
Error Logging

Errors from async operations can be persisted to a separate SQLite database. This is disabled by default and must be explicitly enabled using WithErrorDB():

manager, err := qwr.New("test.db").
    WithErrorDB("errors.db").  // Enable persistent error logging
    Open()

When enabled, errors are logged with full context:

  • SQL statement and parameters (CBOR encoded)
  • Error type and message
  • Retry attempts and timestamps
  • Failure reason

If WithErrorDB() is not called, error logging is disabled. Using :memory: is rejected to prevent unbound memory growth.

Retry Configuration
options := qwr.Options{
    EnableAutoRetry: true,
    MaxRetries:      3,
    BaseRetryDelay:  30 * time.Second,
}

manager, err := qwr.New("test.db", options).Open()

When auto-retry is enabled, each retriable failure schedules its own retry using the calculated exponential backoff delay. No polling is needed.

Error Queue Management
// Get all errors
errors := manager.Errors()

// Get specific error with enhanced information
if jobErr, found := manager.ErrorByID(jobID); found {
    fmt.Printf("Job %d failed: %v\n", jobID, jobErr.Error())
    
    // Access structured error information
    if qwrErr := jobErr.errType; qwrErr != nil {
        fmt.Printf("Error category: %s\n", qwrErr.Category)
        fmt.Printf("Retry strategy: %s\n", qwrErr.Strategy) 
        fmt.Printf("Context: %+v\n", qwrErr.Context)
    }
}

// Manually retry a failed job
if err := manager.RetryJob(jobID); err != nil {
    switch {
    case errors.Is(err, qwr.ErrJobNotFound):
        fmt.Printf("Job %d not found in error queue\n", jobID)
    case errors.Is(err, qwr.ErrRetrySubmissionFailed):
        fmt.Printf("Failed to resubmit job %d\n", jobID)
    default:
        fmt.Printf("Retry error: %v\n", err)
    }
}

// Clear error queue
manager.ClearErrors()
Automatic Batching

The batch collector groups multiple operations together and executes them in a single transaction. This reduces the number of database round trips:

// Configure batching
options := qwr.Options{
    BatchSize:    200,                // Execute after 200 queries
    BatchTimeout: 1 * time.Second,    // Or after 1 second
    InlineInserts: true,              // Experimental: opt-in for simple INSERTs
}
Statement Caching

Prepared statements are cached using ristretto with LRU eviction. The maximum size is controlled by StmtCacheMaxSize (default: 1000).

Inline INSERT Optimisation (Experimental)

Inline INSERT optimises statements with identical SQL are combined into a single multi-value INSERT:

// These individual statements:
INSERT INTO users (name) VALUES ('Alice')
INSERT INTO users (name) VALUES ('Bob')
INSERT INTO users (name) VALUES ('Charlie')

// Becomes:
INSERT INTO users (name) VALUES ('Alice'), ('Bob'), ('Charlie')

Monitoring

qwr emits structured events for all significant operations. Subscribe to events to build custom monitoring:

// Count completed jobs
var jobsCompleted atomic.Int64
manager.SubscribeFiltered(func(e qwr.Event) {
    jobsCompleted.Add(1)
}, qwr.JobEvents())

// Track cache hit ratio
var hits, misses atomic.Int64
manager.SubscribeFiltered(func(e qwr.Event) {
    if e.Type == qwr.EventCacheHit {
        hits.Add(1)
    } else if e.Type == qwr.EventCacheMiss {
        misses.Add(1)
    }
}, qwr.CacheEvents())

// Log errors
manager.SubscribeFiltered(func(e qwr.Event) {
    log.Printf("qwr error: %v (job %d)", e.Err, e.JobID)
}, qwr.ErrorEvents())
Observer Performance & Safety
  • Synchronous Dispatch: Event handlers are executed synchronously on the caller's goroutine (e.g., the write worker loop). Never perform blocking I/O or slow operations directly in a handler; always offload them to a separate goroutine if they might block.
  • Re-entrancy: It is safe to call Subscribe or Unsubscribe from within an event handler (the subscriber list is snapshot-copied before dispatch). However, this is not recommended as it makes event flow harder to reason about.
  • Panic Isolation: Panics in event handlers are recovered automatically and will not crash the caller. However, handlers should still handle their own errors gracefully to avoid silent failures.
  • Lifecycle Guarantees: EventManagerClosed is the last event emitted and is guaranteed to be delivered before Close() returns. After Close(), no further events are dispatched.
Wait for Queue to Drain
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

if err := manager.WaitForIdle(ctx); err != nil {
    log.Printf("Timeout waiting for queue to drain: %v", err)
}
Manual Cache Management
// Clear statement cache (frees memory, cache rebuilds on demand)
manager.ResetCaches()
Database Maintenance
// Full vacuum (rebuilds entire database)
err := manager.RunVacuum()

// Vacuum an attached database
err := manager.RunVacuum("analytics")

// Incremental vacuum (reclaims some space)
err := manager.RunIncrementalVacuum(1000) // 1000 pages

// Manual WAL checkpoint
err := manager.RunCheckpoint(checkpoint.Passive)

// Checkpoint an attached database
err := manager.RunCheckpoint(checkpoint.Full, "analytics")

// Online backup (Default tries API first, falls back to Vacuum)
err := manager.Backup("/path/to/backup.db", backup.Default)

// Backup using SQLite backup API (less locking, better for large DBs)
err := manager.Backup("/path/to/backup.db", backup.API)

// Backup using VACUUM INTO (creates optimized/defragmented copy)
err := manager.Backup("/path/to/backup.db", backup.Vacuum)
Automatic WAL Checkpoint on Close

Configure automatic WAL checkpointing when the manager closes:

import "github.com/jpl-au/qwr/checkpoint"

manager, err := qwr.New("test.db").
    Checkpoint(checkpoint.Truncate).
    Open()

Available checkpoint modes:

  • checkpoint.None - No checkpoint on close (default)
  • checkpoint.Passive - Non-blocking, best-effort checkpoint
  • checkpoint.Full - Wait for writers, checkpoint all frames
  • checkpoint.Restart - Full + restart WAL from beginning
  • checkpoint.Truncate - Restart + truncate WAL to zero bytes

Caveats & Design Decisions

Statement Cache

The prepared statement cache uses ristretto with LRU eviction. The maximum cache size is controlled by StmtCacheMaxSize (default: 1000). When the cache is full, least recently used statements are evicted.

  • Always use parameterised queries with ? placeholders
  • Avoid building SQL strings with dynamic values embedded
Error Queue Overflow Behaviour

When the error queue exceeds ErrorQueueMaxSize, the oldest errors are persisted to the error log database and then removed from the in-memory queue.

  1. In-memory queue fills to ErrorQueueMaxSize (default: 1,000 errors)
  2. New errors trigger overflow handling
  3. Oldest errors are written to the SQLite3 error log database
  4. Oldest errors are removed from memory to make space
  5. Error information is preserved on disk for later analysis

If the error log database write fails (e.g., disk full, permissions), the error data is permanently lost.

Using Custom SQLite Drivers

qwr uses modernc.org/sqlite by default, but you can bring your own SQLite driver using the NewSQL() constructor:

package main

import (
    "database/sql"
    "log"

    "github.com/jpl-au/qwr"
    "github.com/jpl-au/qwr/profile"
    _ "github.com/mattn/go-sqlite3" // Your preferred SQLite driver
)

func main() {
    // Open your own database connections
    readerDB, err := sql.Open("sqlite3", "test.db")
    if err != nil {
        log.Fatal(err)
    }

    writerDB, err := sql.Open("sqlite3", "test.db")
    if err != nil {
        log.Fatal(err)
    }

    // Pass connections to qwr and enable error logging
    manager, err := qwr.NewSQL(readerDB, writerDB).
        WithErrorDB("errors.db").  // Optional: enable persistent error logging
        Reader(profile.ReadBalanced()).
        Writer(profile.WriteBalanced()).
        Open()
    if err != nil {
        log.Fatal(err)
    }
    defer manager.Close() // This will also close readerDB and writerDB

    // Use manager normally...
}

The Manager takes full ownership of your database connections. Call Manager.Close() to close both reader and writer connections. Use WithErrorDB() to enable persistent error logging for async operations.

Profiles can still be applied to your connections (profiles are just specified SQLite PRAGMAs).

Note: Attach() is not supported with NewSQL(). Since qwr does not control connection creation for user-provided handles, it cannot guarantee ATTACH statements run on every pooled connection. Manage ATTACH DATABASE statements on your own connections before passing them to NewSQL().

qwr was inspired by numerous articles that describe using SQLite3 in production systems.

Consider SQLite

Author: Wesley Aptekar-Cassels URL: https://blog.wesleyac.com/posts/consider-sqlite

Scaling SQLite to 4M QPS on a Single Server

Author: Expensify Engineering URL: https://use.expensify.com/blog/scaling-sqlite-to-4m-qps-on-a-single-server

Your Database is Your Prison - Here's How Expensify Broke Free

Author: First Round Review URL: https://review.firstround.com/your-database-is-your-prison-heres-how-expensify-broke-free/

How (and Why) to Run SQLite in Production

Author: Stephen Margheim URL: https://fractaledmind.github.io/2023/12/23/rubyconftw/

Gotchas with SQLite in Production

Author: Anže Pečar URL: https://blog.pecar.me/sqlite-prod

Documentation

Overview

Package qwr provides serialised writes and concurrent reads for SQLite databases.

qwr (Query Write Reader) uses a worker pool pattern with a single writer to sequentially queue writes to SQLite while allowing concurrent read operations through a configurable connection pool.

Quick Start

manager, err := qwr.New("app.db").
	Reader(profile.ReadBalanced()).
	Writer(profile.WriteBalanced()).
	Open()
if err != nil {
	log.Fatal(err)
}
defer manager.Close()

Write Operations

qwr provides several write modes:

  • Direct Write: Bypasses the queue for immediate execution
  • Synchronous Write: Uses the worker pool, blocks until complete
  • Async Write: Non-blocking, errors captured in error queue
  • Batch Write: Groups multiple operations into single transactions
  • Transactions: Multi-statement atomic operations

Example writes:

// Direct write (bypasses queue)
result, err := manager.Query("INSERT INTO users (name) VALUES (?)", "Alice").Write()

// Synchronous write (queued, blocks)
result, err := manager.Query("INSERT INTO users (name) VALUES (?)", "Bob").Execute()

// Async write (queued, non-blocking)
jobID, err := manager.Query("INSERT INTO users (name) VALUES (?)", "Charlie").Async()

Read Operations

Reads use the connection pool and can be executed concurrently:

rows, err := manager.Query("SELECT * FROM users").Read()
if err != nil {
	log.Fatal(err)
}
defer rows.Close()

Attached Databases

qwr supports SQLite's ATTACH DATABASE for working with multiple database files through a single manager. Attached databases share the main connection pool and write serialiser, enabling cross-database queries and atomic transactions across databases.

Attach databases at construction time via the builder:

manager, err := qwr.New("main.db").
	Reader(profile.ReadBalanced()).
	Writer(profile.WriteBalanced()).
	Attach("analytics", "analytics.db", profile.Attached().
		WithJournalMode(profile.JournalWal)).
	Open()

Or at runtime via the manager:

manager.Attach("logs", "logs.db")
manager.ResetReaderPool() // force immediate reader access

Queries reference attached databases using the schema-qualified syntax:

// Cross-database query
rows, _ := manager.Query("SELECT u.name FROM users u JOIN analytics.events e ON ...").Read()

// Write to attached database
manager.Query("INSERT INTO analytics.events (action) VALUES (?)", "login").Execute()

Always use schema-qualified table names for attached databases (e.g. analytics.events, not just events). Unqualified names resolve to the main database.

Bare :memory: paths are rejected because each pooled connection would get its own isolated in-memory database. Use file::memory:?cache=shared for a shared in-memory attached database.

Do not use [Query.Prepared] for schema-qualified queries before the schema is attached - the preparation will fail on every call until the schema exists.

For parallel writes to independent databases, use separate Manager instances rather than ATTACH. Attached databases share a single serialised writer, which is correct for cross-database transactions but does not offer write parallelism.

Attach is not supported with NewSQL because qwr cannot control connection creation for user-provided database handles.

Observing Events

qwr emits structured events for all significant operations. Subscribe to receive events for logging, metrics, tracing, or alerting:

manager.Subscribe(func(e qwr.Event) {
	fmt.Printf("event: %d at %v\n", e.Type, e.Timestamp)
})

Use filters to receive only specific event types:

manager.SubscribeFiltered(func(e qwr.Event) {
	fmt.Printf("error: %v\n", e.Err)
}, qwr.ErrorEvents())

Profiles

Database profiles configure connection pools and SQLite PRAGMA settings. Pre-configured profiles are available in the profile subpackage:

  • profile.ReadLight(), ReadBalanced(), ReadHeavy()
  • profile.WriteLight(), WriteBalanced(), WriteHeavy()
  • profile.Attached() for per-schema PRAGMAs on attached databases

Error Handling

Async operations capture errors in an error queue with automatic retry support for transient failures. Errors are classified by type to determine appropriate retry strategies.

if jobErr, found := manager.ErrorByID(jobID); found {
	log.Printf("Job %d failed: %v", jobID, jobErr.Error())
}

Custom SQLite Drivers

qwr uses modernc.org/sqlite by default. Use NewSQL to provide your own database connections with a different driver. Note that NewSQL does not support Attach - manage ATTACH statements on your own connections.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrAttachReservedAlias  = errors.New("alias is a reserved SQLite schema name")
	ErrAttachInvalidAlias   = errors.New("alias must be a valid SQLite identifier (letters, digits, underscores)")
	ErrAttachEmptyAlias     = errors.New("alias cannot be empty")
	ErrAttachEmptyPath      = errors.New("path cannot be empty")
	ErrAttachDuplicateAlias = errors.New("alias is already attached")
	ErrAttachMemoryPath     = errors.New(":memory: databases are per-connection and cannot be shared across the pool - use file::memory:?cache=shared instead")
	ErrAttachNotSupported   = errors.New("Attach is not supported with NewSQL - manage ATTACH statements on your own connections")
)
View Source
var (
	ErrManagerClosed             = errors.New("manager is closed")
	ErrReaderDisabled            = errors.New("reader is disabled")
	ErrWriterDisabled            = errors.New("writer is disabled")
	ErrResultNotFound            = errors.New("result not found")
	ErrInvalidResult             = errors.New("invalid result type")
	ErrQueryTooLarge             = errors.New("query exceeds maximum allowed size")
	ErrStatementCacheFull        = errors.New("statement cache is full")
	ErrHashCollision             = errors.New("statement hash collision")
	ErrErrorQueueDisabled        = errors.New("error queue is disabled")
	ErrJobNotFound               = errors.New("job not found in error queue")
	ErrWorkerNotRunning          = errors.New("worker pool is not running")
	ErrQueueTimeout              = errors.New("timeout waiting for queue to accept submission")
	ErrRetrySubmissionFailed     = errors.New("failed to resubmit job for retry")
	ErrInvalidQuery              = errors.New("query is invalid")
	ErrNilPreparedStatement      = errors.New("internal error: prepared statement is nil before execution")
	ErrNilPreparedStatementCache = errors.New("internal error: global prepared statement cache returned nil statement without error")
	ErrFailedToPrepareStatement  = errors.New("failed to prepare statement")
	ErrPreparedCacheRequired     = errors.New("prepared statement cache is required when using prepared queries")
	ErrCacheClosed               = errors.New("statement cache is closed")
	ErrBatchContainsNonQuery     = errors.New("batch jobs can only contain Query jobs, not Transaction or nested Batch jobs")
	ErrConnectionUnhealthy       = errors.New("database connection is unhealthy")
	ErrBackupDestinationExists   = errors.New("backup destination already exists")
	ErrBackupDriverUnsupported   = errors.New("driver does not support backup API")
	ErrBackupFailed              = errors.New("backup failed")
	ErrBackupInit                = errors.New("failed to initialize backup")
	ErrBackupStep                = errors.New("backup step failed")
	ErrBackupConnection          = errors.New("failed to get connection for backup")
	ErrBackupInvalidMethod       = errors.New("unknown backup method")
	ErrQueueFull                 = errors.New("worker queue is full")
)

Error constants

View Source
var DefaultOptions = Options{
	WorkerQueueDepth:   1000,
	EnableReader:       true,
	EnableWriter:       true,
	BatchSize:          200,
	BatchTimeout:       1 * time.Second,
	InlineInserts:      false,
	UseContexts:        false,
	StmtCacheMaxSize:   1000,
	ErrorQueueMaxSize:  1000,
	EnableAutoRetry:    false,
	MaxRetries:         3,
	BaseRetryDelay:     30 * time.Second,
	JobTimeout:         30 * time.Second,
	TransactionTimeout: 30 * time.Second,
	RetrySubmitTimeout: 5 * time.Second,
	QueueSubmitTimeout: 5 * time.Minute,
}

DefaultOptions provides sensible defaults suitable for most applications. WorkerQueueDepth is set conservatively at 1000 (not 50000) to avoid excessive memory use by default - raise it for high-throughput workloads.

Functions

func New

func New(path string, opts ...Options) *qwrBuilder

New creates a new qwr Manager instance builder

Options are immutable after construction. They can only be set here during manager creation and cannot be modified at runtime. If no options are provided, DefaultOptions will be used.

To change options, you must stop the application, create a new manager with different options, and restart.

func NewSQL

func NewSQL(reader, writer *sql.DB, opts ...Options) *qwrBuilder

NewSQL creates a new qwr Manager instance builder using user-provided database connections. This allows you to bring your own SQLite driver (e.g., mattn/go-sqlite3 instead of modernc.org/sqlite).

Parameters:

  • reader: Database connection for read operations (pass nil to disable reader)
  • writer: Database connection for write operations (pass nil to disable writer)
  • opts: Optional configuration options (variadic). If not provided, DefaultOptions will be used.

Important notes:

  • Passing nil for reader/writer automatically disables that connection (sets EnableReader/EnableWriter to false)
  • The Manager takes full ownership of the provided database connections
  • Calling Manager.Close() will close these database connections
  • You should not use these connections directly after passing them to NewSQL()
  • Profiles will be applied to your connections (including SQLite PRAGMAs)
  • If you provide a non-SQLite database, PRAGMA errors are your responsibility
  • Use WithErrorLogPath() to enable persistent error logging to disk
  • Attach() is not supported with NewSQL - manage ATTACH statements on your own connections

Example with mattn/go-sqlite3:

import _ "github.com/mattn/go-sqlite3"

readerDB, _ := sql.Open("sqlite3", "mydb.db")
writerDB, _ := sql.Open("sqlite3", "mydb.db")
opts := qwr.Options{ErrorLogPath: "errors.db"} // Optional error logging
manager, err := qwr.NewSQL(readerDB, writerDB, opts).
    Reader(profile.ReadBalanced()).
    Writer(profile.WriteBalanced()).
    Open()

func ReleaseQueryBuilder

func ReleaseQueryBuilder(qb *QueryBuilder)

ReleaseQueryBuilder returns a QueryBuilder to the pool after cleaning it

Types

type BatchCollector

type BatchCollector struct {
	// contains filtered or unexported fields
}

BatchCollector manages automatic batching of database jobs for asynchronous execution.

func NewBatchCollector

func NewBatchCollector(ctx context.Context, ws *WriteSerialiser, events *EventBus, options Options, dbPath string) *BatchCollector

NewBatchCollector creates a new batch collector with pre-allocated capacity and context.

func (*BatchCollector) Add

func (bc *BatchCollector) Add(job Job) error

Add adds a job to the current batch for eventual execution using the collector's context. Returns ErrQueueFull if a size-triggered flush cannot submit to the worker queue.

func (*BatchCollector) Close

func (bc *BatchCollector) Close() error

Close flushes any pending batch and stops the timer. Returns ErrQueueFull if the final batch cannot be submitted.

type BatchJob

type BatchJob struct {
	Queries []Job
	// contains filtered or unexported fields
}

BatchJob represents a collection of database jobs to be executed as a batch

func (BatchJob) ExecuteWithContext

func (b BatchJob) ExecuteWithContext(ctx context.Context, db *sql.DB) JobResult

ExecuteWithContext runs each job in the batch within a single transaction. If a statement cache is available, prepared queries reuse cached statements via tx.Stmt() to avoid re-parsing SQL on every execution.

func (BatchJob) ID

func (b BatchJob) ID() int64

ID returns the unique identifier for this batch

type BatchResult

type BatchResult struct {
	Results []JobResult
	// contains filtered or unexported fields
}

BatchResult represents the outcome of a batch execution

func (*BatchResult) Duration

func (r *BatchResult) Duration() time.Duration

GetDuration returns how long the batch took to execute

func (*BatchResult) Error

func (r *BatchResult) Error() error

GetError returns any error that occurred during execution

func (*BatchResult) ID

func (r *BatchResult) ID() int64

GetID returns the ID of the batch that produced this result

type ErrorCategory

type ErrorCategory int

ErrorCategory provides granular error classification for better handling

const (
	// ErrorCategoryConnection indicates connection-related errors
	ErrorCategoryConnection ErrorCategory = iota
	// ErrorCategoryLock indicates database locking/concurrency errors
	ErrorCategoryLock
	// ErrorCategoryConstraint indicates constraint violation errors
	ErrorCategoryConstraint
	// ErrorCategorySchema indicates schema-related errors
	ErrorCategorySchema
	// ErrorCategoryResource indicates resource exhaustion errors
	ErrorCategoryResource
	// ErrorCategoryTimeout indicates timeout/deadline errors
	ErrorCategoryTimeout
	// ErrorCategoryPermission indicates access control errors
	ErrorCategoryPermission
	// ErrorCategoryInternal indicates internal QWR errors
	ErrorCategoryInternal
	// ErrorCategoryUnknown indicates unclassified errors
	ErrorCategoryUnknown
)

func (ErrorCategory) String

func (ec ErrorCategory) String() string

String returns the string representation of ErrorCategory

type ErrorQueue

type ErrorQueue struct {
	// contains filtered or unexported fields
}

ErrorQueue maintains a simple registry of failed async operations

func NewErrorQueue

func NewErrorQueue(events *EventBus, opts Options, dbPath string) *ErrorQueue

NewErrorQueue creates a new error queue

func (*ErrorQueue) Clear

func (eq *ErrorQueue) Clear()

Clear removes all errors from the queue

func (*ErrorQueue) Close

func (eq *ErrorQueue) Close()

Close shuts down the error queue

func (*ErrorQueue) Count

func (eq *ErrorQueue) Count() int

Count returns the number of errors in the queue

func (*ErrorQueue) Get

func (eq *ErrorQueue) Get(jobID int64) (JobError, bool)

Get retrieves a specific error by job ID

func (*ErrorQueue) GetAll

func (eq *ErrorQueue) GetAll() []JobError

GetAll returns all errors in the queue in chronological order

func (*ErrorQueue) GetReadyForRetry

func (eq *ErrorQueue) GetReadyForRetry(now time.Time, maxRetries int) []JobError

GetReadyForRetry returns errors that are ready for retry

func (*ErrorQueue) PersistError

func (eq *ErrorQueue) PersistError(jobErr JobError, reason string) error

PersistError logs an error to the database

func (*ErrorQueue) Remove

func (eq *ErrorQueue) Remove(jobID int64) bool

Remove removes an error from the queue

func (*ErrorQueue) Store

func (eq *ErrorQueue) Store(jobErr JobError)

Store adds or overwrites an error in the queue, or immediately persists non-retriable errors

type Event added in v0.2.0

type Event struct {
	// Type identifies the kind of event. All events have a Type and Timestamp.
	Type      EventType
	Timestamp time.Time

	// --- Job context ---
	// Set by: Job lifecycle, Direct write, and Retry events.
	JobID   int64
	JobType JobType
	SQL     string

	// --- Timing ---
	// Set by: EventJobStarted (QueueWait only), EventJobCompleted, EventJobFailed,
	// EventDirectWriteCompleted, EventDirectWriteFailed.
	QueueWait time.Duration
	ExecTime  time.Duration

	// --- Error context ---
	// Err is set by all "Failed" and error-related events.
	Err error
	// EvictedCount is set by EventErrorQueueOverflow: the number of oldest
	// entries removed to bring the queue back within its max size.
	EvictedCount int

	// --- Retry context ---
	// Set by: EventJobCompleted, EventJobFailed (query jobs only),
	// EventRetryScheduled, EventRetryStarted, EventRetryExhausted.
	// Attempt is the zero-based retry count: 0 means the first execution,
	// 1 means the first retry, etc.
	Attempt   int
	NextRetry time.Time

	// --- Batch context ---
	// Set by: Batch lifecycle events.
	BatchID       int64
	BatchSize     int
	BatchReason   string // "size_limit", "timeout", "close"
	OriginalCount int
	CombinedCount int

	// --- Cache context ---
	// Set by: EventCacheHit, EventCacheMiss, EventCachePrepError.
	CacheQuery    string
	CachePrepTime time.Duration

	// --- Backup/maintenance context ---
	// Set by: Backup and Checkpoint events.
	BackupMethod   string
	BackupDest     string
	CheckpointMode string // "PASSIVE", "FULL", "RESTART", "TRUNCATE"

	// --- Result ---
	// Set by: EventDirectWriteCompleted.
	Result sql.Result
}

Event carries data for a single occurrence in the qwr system.

This is a flat struct: all event types share the same fields, and each event type only populates the fields listed in its EventType documentation. Fields not listed for a given event type will be zero-valued. Always check the Type field first to know which other fields are meaningful.

Example handler:

func(e qwr.Event) {
    switch e.Type {
    case qwr.EventJobCompleted:
        log.Printf("job %d completed in %v: %s", e.JobID, e.ExecTime, e.SQL)
    case qwr.EventJobFailed:
        log.Printf("job %d failed (attempt %d): %v", e.JobID, e.Attempt, e.Err)
    }
}

type EventBus added in v0.2.0

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus provides synchronous, in-process event dispatch.

Delivery guarantee: each event is delivered exactly once to every subscriber whose filter matches at the time of the Emit call. Events are never queued, retried, or persisted - if no subscribers are registered, the event is silently discarded. After Close, all Emit calls are no-ops.

Concurrency model:

  • Multiple goroutines may call Emit concurrently.
  • Each handler is called on the emitting goroutine (not a background worker).
  • Subscribe, Unsubscribe, and Emit may all be called concurrently, including from within a handler (see Emit for details).
  • After Close, Emit is a no-op. Close is safe to call concurrently with Emit.

func NewEventBus added in v0.2.0

func NewEventBus() *EventBus

NewEventBus creates a ready-to-use EventBus.

func (*EventBus) Close added in v0.2.0

func (eb *EventBus) Close()

Close stops all future event delivery. After Close returns, Emit is a no-op. Any Emit calls already in progress will finish delivering to their snapshot of subscribers, but no new Emit calls will proceed.

func (*EventBus) Emit added in v0.2.0

func (eb *EventBus) Emit(event Event)

Emit dispatches an event to all matching subscribers synchronously. Safe to call concurrently. The Timestamp field is set automatically if zero.

After Close, Emit is a no-op. Panics in handlers are recovered and do not propagate to the caller.

Handlers are called outside the lock, so it is safe to call Subscribe or Unsubscribe from within a handler without deadlocking. Changes to the subscriber list take effect on the next Emit call, not the current one.

func (*EventBus) Subscribe added in v0.2.0

func (eb *EventBus) Subscribe(handler EventHandler) uint64

Subscribe registers a handler that receives all events. Returns a subscription ID that can be passed to Unsubscribe.

func (*EventBus) SubscribeFiltered added in v0.2.0

func (eb *EventBus) SubscribeFiltered(handler EventHandler, filter EventFilter) uint64

SubscribeFiltered registers a handler that only receives events accepted by filter. If filter is nil the handler receives all events.

func (*EventBus) Unsubscribe added in v0.2.0

func (eb *EventBus) Unsubscribe(id uint64)

Unsubscribe removes the subscription with the given ID.

type EventFilter added in v0.2.0

type EventFilter func(EventType) bool

EventFilter returns true if the handler should receive this event type. Use the predefined filters (JobEvents, ErrorEvents, etc.) or provide your own.

func BackupEvents added in v0.2.0

func BackupEvents() EventFilter

BackupEvents returns a filter matching backup events: EventBackupStarted, EventBackupCompleted, EventBackupFailed, EventBackupFallback.

func BatchEvents added in v0.2.0

func BatchEvents() EventFilter

BatchEvents returns a filter matching batch lifecycle events: EventBatchQueryAdded, EventBatchFlushed, EventBatchInlineOptimised, EventBatchSubmitted, EventBatchSubmitFailed.

func CacheEvents added in v0.2.0

func CacheEvents() EventFilter

CacheEvents returns a filter matching statement cache events: EventCacheHit, EventCacheMiss, EventCacheEvicted, EventCachePrepError.

func ErrorEvents added in v0.2.0

func ErrorEvents() EventFilter

ErrorEvents returns a filter matching all events that indicate something went wrong, across multiple categories. This includes job failures (EventJobFailed, EventDirectWriteFailed) in addition to error queue and retry exhaustion events.

Matches: EventJobFailed, EventDirectWriteFailed, EventReaderQueryFailed, EventRetrySubmitFailed, EventErrorStored, EventErrorPersisted, EventErrorQueueOverflow, EventRetryExhausted.

func JobEvents added in v0.2.0

func JobEvents() EventFilter

JobEvents returns a filter matching job lifecycle events: EventJobQueued, EventJobStarted, EventJobCompleted, EventJobFailed.

func LifecycleEvents added in v0.2.0

func LifecycleEvents() EventFilter

LifecycleEvents returns a filter matching manager and worker lifecycle events: EventManagerOpened, EventManagerClosing, EventManagerClosed, EventWorkerStarted, EventWorkerStopped.

func ReadEvents added in v0.2.1

func ReadEvents() EventFilter

ReadEvents returns a filter matching reader-side events: EventReaderQueryCompleted, EventReaderQueryFailed.

func RetryEvents added in v0.2.0

func RetryEvents() EventFilter

RetryEvents returns a filter matching retry lifecycle events: EventRetryScheduled, EventRetryStarted, EventRetryExhausted, EventRetrySubmitFailed.

func WriteEvents added in v0.2.0

func WriteEvents() EventFilter

WriteEvents returns a filter matching all write-related events, both queued (job lifecycle) and direct: EventJobQueued, EventJobStarted, EventJobCompleted, EventJobFailed, EventDirectWriteCompleted, EventDirectWriteFailed.

type EventHandler added in v0.2.0

type EventHandler func(Event)

EventHandler is a callback that receives events from the EventBus.

Handlers are called synchronously on the goroutine that called Emit. In practice this means handlers run on the write worker goroutine for job events, or on the caller's goroutine for direct operations like RunVacuum.

When multiple handlers are registered, they are called sequentially in subscription order for each event. A slow handler delays all handlers after it for that event AND blocks the caller (e.g., the write worker).

Handlers MUST NOT block. A slow handler stalls the write worker, which prevents all other queued jobs from executing. If you need to do expensive work (network I/O, disk writes, etc.), send the event to a channel and process it in a separate goroutine.

Panics in handlers are recovered by the EventBus and do not propagate.

type EventType added in v0.2.0

type EventType int

EventType identifies the kind of event emitted by the qwr system.

Event uses a single flat struct rather than per-type structs. This avoids type assertions and interface hierarchies. The trade-off is that most fields are zero-valued for any given event. Each constant below documents exactly which Event fields are populated, so you never need to read the emitting code to know what data is available.

const (

	// EventManagerOpened is emitted after Open() completes successfully.
	EventManagerOpened EventType = iota
	// EventManagerClosing is emitted at the start of Close().
	EventManagerClosing
	// EventManagerClosed is emitted at the end of Close(), just before
	// the EventBus itself is closed. This is the last event emitted by
	// a Manager; no events will follow it.
	EventManagerClosed

	// EventJobQueued is emitted when a job enters the worker queue.
	// Fields: JobID, JobType.
	EventJobQueued
	// EventJobStarted is emitted when the worker begins executing a job.
	// Fields: JobID, JobType, QueueWait.
	EventJobStarted
	// EventJobCompleted is emitted when a job finishes successfully.
	// Fields: JobID, JobType, QueueWait, ExecTime.
	// Query jobs also set: SQL, Attempt.
	EventJobCompleted
	// EventJobFailed is emitted when a job finishes with an error.
	// Fields: JobID, JobType, QueueWait, ExecTime, Err.
	// Query jobs also set: SQL, Attempt.
	EventJobFailed

	// EventDirectWriteCompleted is emitted after a direct Write() or
	// Transaction.Write() succeeds.
	// Fields: JobID, ExecTime. Query writes also set SQL and Result.
	EventDirectWriteCompleted
	// EventDirectWriteFailed is emitted after a direct Write() or
	// Transaction.Write() fails.
	// Fields: JobID, ExecTime, Err. Query writes also set SQL.
	EventDirectWriteFailed

	// EventBatchQueryAdded is emitted when a query is added to the batch.
	// Fields: BatchSize (current count after adding).
	EventBatchQueryAdded
	// EventBatchFlushed is emitted when a batch is flushed for execution.
	// Fields: BatchID, BatchSize, BatchReason ("size_limit"|"timeout"|"close").
	EventBatchFlushed
	// EventBatchInlineOptimised is emitted when inline INSERT combining
	// reduces the number of queries in a batch.
	// Fields: BatchID, OriginalCount, CombinedCount.
	EventBatchInlineOptimised
	// EventBatchSubmitted is emitted after a batch is placed on the worker queue.
	// Fields: BatchID, BatchSize.
	EventBatchSubmitted
	// EventBatchSubmitFailed is emitted when a batch cannot be queued.
	// Fields: BatchID, BatchReason ("worker_not_running"|"queue_full").
	EventBatchSubmitFailed

	// EventErrorStored is emitted when a retriable error is added to the queue.
	// Fields: JobID, Err.
	EventErrorStored
	// EventErrorRemoved is emitted when an error is removed from the queue.
	// Fields: JobID.
	EventErrorRemoved
	// EventErrorQueueCleared is emitted when ClearErrors() is called.
	// No extra fields.
	EventErrorQueueCleared
	// EventErrorQueueOverflow is emitted when the queue exceeds max size
	// and oldest entries are evicted.
	// Fields: EvictedCount.
	EventErrorQueueOverflow
	// EventErrorPersisted is emitted when an error is written to the
	// persistent error log database.
	// Fields: JobID. May also set Err for non-retriable errors.
	EventErrorPersisted
	// EventErrorPersistFailed is emitted when writing to the error log
	// database fails.
	// Fields: JobID, Err.
	EventErrorPersistFailed

	// EventRetryScheduled is emitted when a retry is scheduled after a
	// retriable failure.
	// Fields: JobID, Attempt, NextRetry.
	EventRetryScheduled
	// EventRetryStarted is emitted when a scheduled retry begins execution.
	// Fields: JobID, Attempt.
	EventRetryStarted
	// EventRetryExhausted is emitted when all retry attempts are used up.
	// Fields: JobID, Err, Attempt.
	EventRetryExhausted

	// EventCacheHit is emitted when a prepared statement is found in cache.
	// Fields: CacheQuery.
	EventCacheHit
	// EventCacheMiss is emitted when a statement is not cached and must be
	// prepared. Emitted after successful preparation.
	// Fields: CacheQuery, CachePrepTime.
	EventCacheMiss
	// EventCacheEvicted is emitted when a statement is evicted from cache.
	// No extra fields.
	EventCacheEvicted
	// EventCachePrepError is emitted when statement preparation fails.
	// Fields: CacheQuery, Err.
	EventCachePrepError

	// EventBackupStarted is emitted when a backup operation begins.
	// Fields: BackupMethod ("api"|"vacuum"), BackupDest.
	EventBackupStarted
	// EventBackupCompleted is emitted when a backup finishes successfully.
	// Fields: BackupMethod, BackupDest.
	EventBackupCompleted
	// EventBackupFailed is emitted when a backup fails.
	// Fields: BackupMethod, BackupDest, Err.
	EventBackupFailed
	// EventBackupFallback is emitted when the backup API is unsupported
	// and qwr falls back to VACUUM INTO.
	// Fields: BackupDest.
	EventBackupFallback

	// EventVacuumStarted is emitted before a VACUUM or incremental VACUUM.
	// No extra fields.
	EventVacuumStarted
	// EventVacuumCompleted is emitted after a successful VACUUM.
	// No extra fields.
	EventVacuumCompleted
	// EventVacuumFailed is emitted when a VACUUM fails.
	// Fields: Err.
	EventVacuumFailed
	// EventCheckpointStarted is emitted before a WAL checkpoint.
	// Fields: CheckpointMode.
	EventCheckpointStarted
	// EventCheckpointCompleted is emitted after a successful checkpoint.
	// Fields: CheckpointMode.
	EventCheckpointCompleted
	// EventCheckpointFailed is emitted when a WAL checkpoint fails.
	// Fields: CheckpointMode, Err.
	EventCheckpointFailed

	// EventWorkerStarted is emitted when the write worker goroutine starts.
	// No extra fields.
	EventWorkerStarted
	// EventWorkerStopped is emitted when the write worker goroutine exits.
	// No extra fields.
	EventWorkerStopped

	// EventReaderQueryCompleted is emitted after a synchronous Read(), ReadRow(),
	// or ReadClose() succeeds.
	// Fields: ExecTime. Query reads also set SQL.
	EventReaderQueryCompleted
	// EventReaderQueryFailed is emitted when a reader operation fails.
	// Fields: ExecTime, Err. Query reads also set SQL.
	EventReaderQueryFailed

	// EventRetrySubmitFailed is emitted when a background retry cannot be
	// placed on the worker queue.
	// Fields: JobID, Err.
	EventRetrySubmitFailed
)

func (EventType) String added in v0.2.0

func (t EventType) String() string

String returns the name of the event type (e.g. "EventJobCompleted"). This makes test failures and log output readable instead of printing raw ints.

type Job

type Job struct {
	Type            JobType
	Query           Query
	Transaction     Transaction
	BatchJob        BatchJob
	TransactionFunc TransactionFunc
}

Job represents a database job that can be executed

func NewBatchJobJob

func NewBatchJobJob(b BatchJob) Job

NewBatchJob creates a Job from a BatchJob

func NewQueryJob

func NewQueryJob(q Query) Job

NewQueryJob creates a Job from a Query

func NewTransactionFuncJob added in v0.2.2

func NewTransactionFuncJob(tf TransactionFunc) Job

NewTransactionFuncJob creates a Job from a TransactionFunc

func NewTransactionJob

func NewTransactionJob(t Transaction) Job

NewTransactionJob creates a Job from a Transaction

func (Job) ExecuteWithContext

func (j Job) ExecuteWithContext(ctx context.Context, db *sql.DB) JobResult

ExecuteWithContext runs the job against the database with context

func (Job) ID

func (j Job) ID() int64

ID returns the unique identifier for this job

type JobError

type JobError struct {
	Query Query // The query that failed
	// contains filtered or unexported fields
}

JobError represents an error that occurred during async job execution

func (JobError) Age

func (je JobError) Age() time.Duration

Age returns how long ago the error occurred

func (*JobError) CalculateNextRetry

func (je *JobError) CalculateNextRetry(baseDelay time.Duration)

CalculateNextRetry calculates when this job should be retried next Uses exponential backoff with jitter to prevent thundering herd

func (JobError) CreateRetryQuery

func (je JobError) CreateRetryQuery() Query

CreateRetryQuery creates a new query for retry with incremented retry count

func (JobError) ID

func (je JobError) ID() int64

func (JobError) SQL

func (je JobError) SQL() (string, []any)

GetSQL returns the SQL statement and arguments

func (JobError) ShouldRetry

func (je JobError) ShouldRetry(maxRetries int) bool

ShouldRetry determines if this error should be retried based on error type and retry count

func (JobError) String

func (je JobError) String() string

String returns a string representation of the error

type JobResult

type JobResult struct {
	Type                  ResultType
	QueryResult           QueryResult
	TransactionResult     TransactionResult
	BatchResult           BatchResult
	TransactionFuncResult TransactionFuncResult
}

JobResult represents the outcome of a job execution

func NewBatchResult

func NewBatchResult(br BatchResult) JobResult

NewBatchResult creates a JobResult from a BatchResult

func NewQueryResult

func NewQueryResult(qr QueryResult) JobResult

NewQueryResult creates a JobResult from a QueryResult

func NewTransactionFuncResult added in v0.2.2

func NewTransactionFuncResult(tfr TransactionFuncResult) JobResult

NewTransactionFuncResult creates a JobResult from a TransactionFuncResult

func NewTransactionResult

func NewTransactionResult(tr TransactionResult) JobResult

NewTransactionResult creates a JobResult from a TransactionResult

func (JobResult) Duration

func (jr JobResult) Duration() time.Duration

Duration returns how long the job took to execute

func (JobResult) Error

func (jr JobResult) Error() error

Error returns any error that occurred during execution

func (JobResult) ID

func (jr JobResult) ID() int64

ID returns the ID of the job that produced this result

type JobType

type JobType int
const (
	JobTypeQuery JobType = iota
	JobTypeTransaction
	JobTypeBatch
	JobTypeTransactionFunc
)

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager handles serialised database operations

func (*Manager) Attach added in v0.3.0

func (m *Manager) Attach(alias, path string, p ...*profile.Profile) error

Attach attaches a database at runtime. The ATTACH statement runs immediately on the writer connection. The reader connector is updated so new reader connections get the attachment automatically as the pool recycles them. Call ResetReaderPool to force immediate reader access to the attached database.

An optional profile configures per-schema PRAGMAs for the attached database. Only PRAGMA settings are used - pool parameters are ignored.

Not supported for managers created with NewSQL.

func (*Manager) Backup

func (m *Manager) Backup(dest string, method backup.Method) error

Backup creates a backup of the database to the specified destination path.

Available methods:

  • backup.Default: Uses backup API if available, falls back to Vacuum
  • backup.API: Uses SQLite's online backup API (less locking)
  • backup.Vacuum: Uses VACUUM INTO (creates optimized copy)

The destination file must not already exist.

func (*Manager) Batch

func (m *Manager) Batch(job Job) error

Batch adds a job to be executed as part of a batch

func (*Manager) ClearErrors

func (m *Manager) ClearErrors()

ClearErrors removes all errors from the queue

func (*Manager) Close

func (m *Manager) Close() error

Close closes all database connections and stops the worker pool. Safe to call multiple times - subsequent calls return the same result.

func (*Manager) Database

func (m *Manager) Database() string

Database extracts the database filename for logging context

func (*Manager) ErrorByID added in v0.3.0

func (m *Manager) ErrorByID(jobID int64) (JobError, bool)

ErrorByID retrieves a specific error from the queue

func (*Manager) Errors added in v0.3.0

func (m *Manager) Errors() []JobError

Errors returns all errors in the error queue

func (*Manager) JobStatus added in v0.3.0

func (m *Manager) JobStatus(jobID int64) (string, error)

JobStatus checks if a job failed by looking in the error queue

func (*Manager) Query

func (m *Manager) Query(sql string, args ...any) *QueryBuilder

Query creates a new query with the given SQL and arguments

func (*Manager) ReaderProfile added in v0.3.0

func (m *Manager) ReaderProfile() *profile.Profile

ReaderProfile returns the current reader profile

func (*Manager) RemoveError

func (m *Manager) RemoveError(jobID int64) bool

RemoveError removes a specific error from the queue

func (*Manager) ResetCaches

func (m *Manager) ResetCaches()

ResetCaches clears all cached prepared statements, freeing memory. The cache remains usable - new statements will be prepared on demand.

func (*Manager) ResetReaderPool added in v0.3.0

func (m *Manager) ResetReaderPool() error

ResetReaderPool creates a new reader pool using the same connector, then drains and closes the old one. New connections will include any attachments added via Manager.Attach since the pool was last created.

In-flight queries that have already started (rows being iterated) will complete before the old resources are released. However, a read that grabs the old pool pointer just before the swap may see a transient "database is closed" error. This is benign and safe to retry.

func (*Manager) RetryJob

func (m *Manager) RetryJob(jobID int64) error

RetryJob manually retries a failed job by its ID

func (*Manager) RunCheckpoint

func (m *Manager) RunCheckpoint(mode checkpoint.Mode, schema ...string) error

RunCheckpoint triggers a WAL checkpoint on the main database or an attached database if a schema name is provided.

func (*Manager) RunIncrementalVacuum

func (m *Manager) RunIncrementalVacuum(pages int, schema ...string) error

RunIncrementalVacuum performs incremental vacuum on the main database or an attached database if a schema name is provided.

func (*Manager) RunVacuum

func (m *Manager) RunVacuum(schema ...string) error

RunVacuum performs a VACUUM operation on the main database or an attached database if a schema name is provided.

func (*Manager) SetSecureDelete

func (m *Manager) SetSecureDelete(enabled bool) error

SetSecureDelete enables or disables secure_delete

func (*Manager) Subscribe added in v0.2.0

func (m *Manager) Subscribe(handler EventHandler) uint64

Subscribe registers an event handler that receives all events. Returns a subscription ID for later removal via Unsubscribe.

func (*Manager) SubscribeFiltered added in v0.2.0

func (m *Manager) SubscribeFiltered(handler EventHandler, filter EventFilter) uint64

SubscribeFiltered registers an event handler with a filter. The handler only receives events for which filter returns true.

func (*Manager) Transaction

func (m *Manager) Transaction(capacity ...int) *Transaction

Transaction creates a new transaction

func (*Manager) TransactionFunc added in v0.2.2

func (m *Manager) TransactionFunc(fn func(*sql.Tx) (any, error)) *TransactionFunc

TransactionFunc creates a new callback-based transaction. The function receives a *sql.Tx and may perform any combination of reads and writes. Return a non-nil error to trigger rollback; return nil to commit.

func (*Manager) Unsubscribe added in v0.2.0

func (m *Manager) Unsubscribe(id uint64)

Unsubscribe removes a previously registered event handler.

func (*Manager) WaitForIdle

func (m *Manager) WaitForIdle(ctx context.Context) error

WaitForIdle waits until all operations are processed

func (*Manager) WriterProfile added in v0.3.0

func (m *Manager) WriterProfile() *profile.Profile

WriterProfile returns the current writer profile

type Options

type Options struct {
	// WorkerQueueDepth sets the buffer size for the serialised write queue.
	// A deeper queue absorbs bursts without blocking callers but uses more
	// memory. Too shallow and Execute/Async calls block waiting for space.
	// Default: 1000.
	WorkerQueueDepth int

	// EnableReader creates the reader connection pool. Disable for
	// write-only applications to avoid opening unused connections.
	// Default: true.
	EnableReader bool

	// EnableWriter creates the writer connection and serialised worker.
	// Disable for read-only applications. Without a writer, all write
	// methods (Write, Execute, Async, Batch) return ErrWriterDisabled.
	// Default: true.
	EnableWriter bool

	// BatchSize is the number of queries collected before the batch
	// collector flushes automatically. Larger batches amortise the
	// per-transaction cost across more rows but increase latency for
	// individual writes and memory held by the collector.
	// Default: 200.
	BatchSize int

	// BatchTimeout is the maximum time the collector waits before
	// flushing a partial batch. Prevents stale writes sitting in the
	// buffer when write volume is too low to trigger a size-based flush.
	// Default: 1s.
	BatchTimeout time.Duration

	// InlineInserts enables combining of identically-structured INSERT
	// statements within a batch into a single multi-value INSERT. This
	// reduces round-trips but relies on simple string parsing - it will
	// produce incorrect results for INSERT...SELECT or VALUES clauses
	// containing parentheses in string literals.
	//
	// Only enable when you control the SQL structure and all batched
	// inserts follow the pattern INSERT INTO t (...) VALUES (?,...).
	// Default: false.
	InlineInserts bool

	// UseContexts makes all operations use context-aware database methods
	// by default. When false, context methods are only used when the caller
	// explicitly calls WithContext(). The non-context path avoids the
	// overhead of context propagation for applications that don't need
	// cancellation or deadlines.
	// Default: false.
	UseContexts bool

	// StmtCacheMaxSize caps the number of prepared statements held in
	// cache. Eviction is LRU. Raise this for applications with many
	// distinct queries; lower it to reduce memory under constrained
	// environments. Each cached statement holds a database-side resource.
	// Default: 1000.
	StmtCacheMaxSize int

	// ErrorQueueMaxSize caps the number of errors retained in memory
	// for inspection via ErrorByID. When full, oldest errors are
	// evicted. If ErrorLogPath is set, evicted errors are persisted
	// to disk first.
	// Default: 1000.
	ErrorQueueMaxSize int

	// EnableAutoRetry automatically resubmits failed async jobs that
	// hit retriable errors (SQLITE_BUSY, SQLITE_LOCKED, timeouts).
	// Retries use exponential backoff governed by BaseRetryDelay and
	// MaxRetries. Disable for applications that need manual control
	// over retry logic.
	// Default: false.
	EnableAutoRetry bool

	// MaxRetries is the maximum retry attempts for a failed async job
	// before it is marked permanently failed and added to the error
	// queue. Only applies when EnableAutoRetry is true.
	// Default: 3.
	MaxRetries int

	// BaseRetryDelay is the base for exponential backoff between retries.
	// Actual delay is BaseRetryDelay * 2^(attempt-1) with jitter. Longer
	// delays reduce pressure on a contended database but increase the
	// time before a transient failure resolves.
	// Default: 30s.
	BaseRetryDelay time.Duration

	// JobTimeout is the deadline applied to individual query execution
	// when context is available. Does not apply to transactions (see
	// TransactionTimeout).
	// Default: 30s.
	JobTimeout time.Duration

	// TransactionTimeout is the deadline applied to the entire transaction
	// lifecycle (begin through commit) when context is available. Should
	// be at least as long as JobTimeout since transactions typically
	// contain multiple operations.
	// Default: 30s.
	TransactionTimeout time.Duration

	// RetrySubmitTimeout caps how long the retry handler waits to
	// resubmit a job to the worker queue. Prevents the retry goroutine
	// from blocking indefinitely when the queue is full.
	// Default: 5s.
	RetrySubmitTimeout time.Duration

	// QueueSubmitTimeout caps how long context-free submissions
	// (SubmitWaitNoContext, SubmitNoWaitNoContext) wait for queue space.
	// Prevents deadlock when the queue is saturated and no context
	// cancellation is available.
	// Default: 5m.
	QueueSubmitTimeout time.Duration

	// UsePreparedStatements makes all queries use cached prepared
	// statements by default, avoiding repeated SQL parsing. Individual
	// queries can still override via Prepared(). Most beneficial when
	// the same queries execute repeatedly with different parameters.
	// Default: false.
	UsePreparedStatements bool

	// ErrorLogPath is the file path for persistent error logging. When
	// set, errors evicted from the in-memory queue are written to a
	// SQLite database at this path for post-mortem inspection. Leave
	// empty to keep error logging in-memory only.
	// Default: "" (disabled).
	ErrorLogPath string
}

Options holds configuration for the qwr manager's internal behaviour.

Options are immutable after manager startup. They can only be set during manager construction via New() and cannot be modified at runtime. To change options, stop the application, create a new manager with different options, and restart.

func (*Options) SetDefaults

func (o *Options) SetDefaults()

SetDefaults applies default values for any option left at its zero value.

func (*Options) Validate

func (o *Options) Validate() error

Validate validates and sets defaults for all options.

type QWRError

type QWRError struct {
	// Original error from the underlying operation
	Original error
	// Category of the error for granular handling
	Category ErrorCategory
	// RetryStrategy for this specific error
	Strategy RetryStrategy
	// Context provides additional information about the error
	Context map[string]any
	// Timestamp when the error occurred
	Timestamp time.Time
	// Operation that caused the error (query, transaction, etc.)
	Operation string
}

QWRError provides structured error information with enhanced classification

func ClassifyError

func ClassifyError(err error, operation string) *QWRError

ClassifyError provides enhanced error classification with detailed categorisation. If the error implements the sqliteErrorCode interface (i.e. exposes a Code() int method), classification uses the structured result code. Otherwise, it falls back to string matching on the error message.

func NewQWRError

func NewQWRError(original error, category ErrorCategory, strategy RetryStrategy, operation string) *QWRError

NewQWRError creates a new structured QWR error

func (*QWRError) Error

func (qe *QWRError) Error() string

Error implements the error interface

func (*QWRError) IsRetriable

func (qe *QWRError) IsRetriable() bool

IsRetriable returns true if the error should be retried

func (*QWRError) Unwrap

func (qe *QWRError) Unwrap() error

Unwrap allows error unwrapping for errors.Is and errors.As

func (*QWRError) WithContext

func (qe *QWRError) WithContext(key string, value any) *QWRError

WithContext adds context information to the error

type Query

type Query struct {
	SQL  string
	Args []any
	// contains filtered or unexported fields
}

Query represents a database query operation

func (Query) ExecuteWithContext

func (q Query) ExecuteWithContext(ctx context.Context, db *sql.DB) JobResult

ExecuteWithContext runs the query against the database with context

func (Query) ID

func (q Query) ID() int64

ID returns the unique identifier for this query

type QueryBuilder

type QueryBuilder struct {
	// contains filtered or unexported fields
}

QueryBuilder provides a fluent API for building and executing queries

func GetQueryBuilder

func GetQueryBuilder() *QueryBuilder

GetQueryBuilder gets a pre-allocated QueryBuilder object from pool

func (*QueryBuilder) Async

func (qb *QueryBuilder) Async() (int64, error)

Async submits the query to the worker pool for background execution. Returns immediately with a job ID that can be used to check for errors later. Failed async queries are automatically added to the error queue for inspection or retry.

func (*QueryBuilder) Batch

func (qb *QueryBuilder) Batch() (int64, error)

Batch adds the query to a batch for deferred execution

IMPORTANT: Batch operations use the manager's internal context, NOT the context set via WithContext(). This means:

  • Query-level contexts (from WithContext()) are ignored for batch operations
  • All queries in a batch share the same manager-level context
  • Timeouts and cancellation apply to the entire batch, not individual queries

If you need query-specific context control, use Execute() or Async() instead.

func (*QueryBuilder) Execute

func (qb *QueryBuilder) Execute() (*QueryResult, error)

Execute submits the query to the worker pool and waits for completion. Provides queued execution with immediate error feedback. Query will be serialised with other operations but the caller will block until completion.

func (*QueryBuilder) GenID

func (qb *QueryBuilder) GenID() *QueryBuilder

GenID generates a unique ID for this query

func (*QueryBuilder) Prepared

func (qb *QueryBuilder) Prepared() *QueryBuilder

Prepared marks the query to use a prepared statement for execution. Prepared statements are cached and reused, reducing parsing overhead for repeated queries. Most beneficial for queries executed multiple times with different parameters.

func (*QueryBuilder) Read

func (qb *QueryBuilder) Read() (*sql.Rows, error)

Read executes a read operation on the reader connection pool and returns multiple rows. Uses concurrent reader connections for better read performance. Remember to call rows.Close() when finished to prevent connection leaks.

func (*QueryBuilder) ReadClose

func (qb *QueryBuilder) ReadClose(fn func(*sql.Rows) error) error

ReadClose executes a read operation and automatically closes the rows when done. The provided function receives the rows and should iterate/scan them as needed. Rows are automatically closed after the function returns, preventing resource leaks.

This is a convenience method that eliminates the need to manually defer rows.Close(), making it safer and cleaner for typical read operations.

Example:

var users []User
err := mgr.Query("SELECT id, name FROM users").ReadClose(func(rows *sql.Rows) error {
    for rows.Next() {
        var u User
        if err := rows.Scan(&u.ID, &u.Name); err != nil {
            return err
        }
        users = append(users, u)
    }
    return nil
})

Returns any error from the query execution, the callback function, or row iteration.

func (*QueryBuilder) ReadRow

func (qb *QueryBuilder) ReadRow() (*sql.Row, error)

ReadRow executes a read operation on the reader connection pool and returns a single row. Convenient for queries expected to return exactly one row. Use row.Scan() to extract values. sql.ErrNoRows is returned when no rows match the query.

func (*QueryBuilder) Release

func (qb *QueryBuilder) Release()

Release manually returns the QueryBuilder to the object pool for reuse. Only call this if you don't execute the query (Write/Async/Execute/Read/ReadRow). All execution methods automatically release the QueryBuilder when complete.

func (*QueryBuilder) WithContext

func (qb *QueryBuilder) WithContext(ctx context.Context) *QueryBuilder

WithContext adds a context to the query and enables context usage for this specific query. The context will be used for timeouts, cancellation, and deadlines during query execution. Note: Batch operations ignore query-level contexts and use the manager's internal context.

func (*QueryBuilder) Write

func (qb *QueryBuilder) Write() (*QueryResult, error)

Write executes the query directly on the writer connection, bypassing the worker queue. This provides immediate execution and error feedback but may block the caller. Returns a QueryResult containing SQL result, error, duration, and query ID.

type QueryResult

type QueryResult struct {
	SQLResult sql.Result
	// contains filtered or unexported fields
}

QueryResult represents the outcome of a query execution

func (*QueryResult) Duration

func (r *QueryResult) Duration() time.Duration

GetDuration returns how long the query took to execute

func (*QueryResult) Error

func (r *QueryResult) Error() error

GetError returns any error that occurred during execution

func (*QueryResult) ID

func (r *QueryResult) ID() int64

GetID returns the ID of the query that produced this result

type ResultType

type ResultType int
const (
	ResultTypeQuery ResultType = iota
	ResultTypeTransaction
	ResultTypeBatch
	ResultTypeTransactionFunc
)

type RetryStrategy

type RetryStrategy int

RetryStrategy defines how errors should be retried

const (
	// RetryStrategyNone indicates no retry should be attempted
	RetryStrategyNone RetryStrategy = iota
	// RetryStrategyImmediate indicates immediate retry with no delay
	RetryStrategyImmediate
	// RetryStrategyExponential indicates exponential backoff retry
	RetryStrategyExponential
	// RetryStrategyLinear indicates linear backoff retry
	RetryStrategyLinear
)

func (RetryStrategy) String

func (rs RetryStrategy) String() string

String returns the string representation of RetryStrategy

type StmtCache

type StmtCache struct {
	// contains filtered or unexported fields
}

StmtCache is a high-performance prepared statement cache using ristretto for optimal concurrent access with LRU eviction. Stores SQL prepared statements keyed by their query string.

func NewStmtCache

func NewStmtCache(events *EventBus, options Options) (*StmtCache, error)

NewStmtCache creates a new prepared statement cache with LRU eviction. maxSize controls the maximum number of statements to cache (0 = default 1000).

func (*StmtCache) Clear

func (c *StmtCache) Clear()

Clear closes all cached statements and clears the cache. The cache remains usable after Clear() - new statements will be prepared on demand.

func (*StmtCache) Close

func (c *StmtCache) Close()

Close gracefully shuts down the cache. After Close(), the cache cannot be used - Get() will return ErrCacheClosed.

func (*StmtCache) Get

func (c *StmtCache) Get(db *sql.DB, query string) (*sql.Stmt, error)

Get retrieves a prepared statement from cache or prepares it if not found

type Transaction

type Transaction struct {
	// contains filtered or unexported fields
}

Transaction represents multiple SQL statements to execute in a transaction

func (*Transaction) Add

func (t *Transaction) Add(sql string, args ...any) *Transaction

Add adds a query to the transaction

func (*Transaction) AddPrepared

func (t *Transaction) AddPrepared(sql string, args ...any) *Transaction

AddPrepared adds a prepared query to the transaction

func (*Transaction) Exec

func (t *Transaction) Exec() (*TransactionResult, error)

Exec runs the transaction through the worker pool

func (*Transaction) ExecuteWithContext

func (t *Transaction) ExecuteWithContext(ctx context.Context, db *sql.DB) JobResult

ExecuteWithContext runs all queries within a single transaction. Used by the write serialiser to dispatch Transaction jobs.

func (*Transaction) ID

func (t *Transaction) ID() int64

ID returns the transaction ID

func (*Transaction) WithContext

func (t *Transaction) WithContext(ctx context.Context) *Transaction

WithContext adds context to the transaction

func (*Transaction) Write

func (t *Transaction) Write() (*TransactionResult, error)

Write executes the transaction directly

type TransactionFunc added in v0.2.2

type TransactionFunc struct {
	// contains filtered or unexported fields
}

TransactionFunc executes a caller-provided function within a serialised transaction. The callback receives a *sql.Tx for full read-write access. qwr handles transaction lifecycle (begin, commit on success, rollback on error or panic).

func (*TransactionFunc) Exec added in v0.2.2

Exec runs the transaction through the serialised writer queue. Blocks until the callback completes and the transaction is committed or rolled back.

func (*TransactionFunc) ExecuteWithContext added in v0.2.2

func (tf *TransactionFunc) ExecuteWithContext(ctx context.Context, db *sql.DB) JobResult

ExecuteWithContext runs the callback within a transaction on the given db. Used by the write serialiser to dispatch TransactionFunc jobs.

func (*TransactionFunc) ID added in v0.2.2

func (tf *TransactionFunc) ID() int64

ID returns the unique identifier for this transaction.

func (*TransactionFunc) WithContext added in v0.2.2

func (tf *TransactionFunc) WithContext(ctx context.Context) *TransactionFunc

WithContext adds context to the transaction. When set, qwr uses BeginTx with a timeout derived from Options.TransactionTimeout.

func (*TransactionFunc) Write added in v0.2.2

Write executes the transaction directly on the writer connection, bypassing the queue. The callback still runs within a real transaction.

type TransactionFuncResult added in v0.2.2

type TransactionFuncResult struct {
	// Value is the result returned by the callback on success.
	Value any
	// contains filtered or unexported fields
}

TransactionFuncResult holds the outcome of a TransactionFunc execution.

func (*TransactionFuncResult) Duration added in v0.2.2

func (r *TransactionFuncResult) Duration() time.Duration

Duration returns how long the transaction took to execute.

func (*TransactionFuncResult) Error added in v0.2.2

func (r *TransactionFuncResult) Error() error

Error returns any error from the callback or transaction lifecycle.

func (*TransactionFuncResult) ID added in v0.2.2

func (r *TransactionFuncResult) ID() int64

ID returns the ID of the transaction that produced this result.

type TransactionResult

type TransactionResult struct {
	Results []*QueryResult
	// contains filtered or unexported fields
}

TransactionResult represents the outcome of a transaction execution

func (*TransactionResult) Duration

func (r *TransactionResult) Duration() time.Duration

GetDuration returns how long the transaction took to execute

func (*TransactionResult) Error

func (r *TransactionResult) Error() error

GetError returns any error that occurred during execution

func (*TransactionResult) ID

func (r *TransactionResult) ID() int64

GetID returns the ID of the transaction that produced this result

type WriteSerialiser

type WriteSerialiser struct {
	// contains filtered or unexported fields
}

WriteSerialiser manages a single worker that processes database jobs

func NewWorkerPool

func NewWorkerPool(db *sql.DB, queueDepth int, events *EventBus, writeCache *StmtCache, options Options) *WriteSerialiser

NewWorkerPool creates a new worker pool for database jobs

func (*WriteSerialiser) QueueLen added in v0.2.0

func (wp *WriteSerialiser) QueueLen() int

QueueLen returns the current number of items in the work queue.

func (*WriteSerialiser) Start

func (wp *WriteSerialiser) Start(ctx context.Context)

Start begins the worker processing loop

func (*WriteSerialiser) Stop

func (wp *WriteSerialiser) Stop() error

Stop shuts down the worker pool

func (*WriteSerialiser) SubmitNoWait

func (wp *WriteSerialiser) SubmitNoWait(ctx context.Context, job Job) (int64, error)

SubmitNoWait submits a job to the queue without waiting

func (*WriteSerialiser) SubmitNoWaitNoContext

func (wp *WriteSerialiser) SubmitNoWaitNoContext(job Job) (int64, error)

SubmitNoWaitNoContext submits a job without using any context

func (*WriteSerialiser) SubmitWait

func (wp *WriteSerialiser) SubmitWait(ctx context.Context, job Job) (JobResult, error)

SubmitWait submits a job to the queue and waits for its result

func (*WriteSerialiser) SubmitWaitNoContext

func (wp *WriteSerialiser) SubmitWaitNoContext(job Job) (JobResult, error)

SubmitWaitNoContext submits a job without using any context

Directories

Path Synopsis
Package backup defines backup methods for SQLite databases.
Package backup defines backup methods for SQLite databases.
Package checkpoint defines WAL checkpoint modes for SQLite.
Package checkpoint defines WAL checkpoint modes for SQLite.
Package profile provides pre-configured database profiles for different workload types.
Package profile provides pre-configured database profiles for different workload types.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL