pgxkit

package module
v2.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2026 License: MIT Imports: 23 Imported by: 0

README

pgxkit

CI Go Reference Go Report Card

A focused PostgreSQL toolkit for Go, built on pgx. Connection pool with read/write split, an extensible hook system, retry helpers, plan-regression and golden-transcript testing, graceful shutdown, and small type helpers — and nothing else.

Install

go get github.com/nhalm/pgxkit/v2

Go 1.25+ · PostgreSQL 12+.

Quick start

package main

import (
    "context"
    "log"

    "github.com/nhalm/pgxkit/v2"
)

func main() {
    ctx := context.Background()
    db := pgxkit.NewDB()
    if err := db.Connect(ctx, ""); err != nil { // "" → POSTGRES_* env vars
        log.Fatal(err)
    }
    defer db.Shutdown(ctx)

    if _, err := db.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", "Alice"); err != nil {
        log.Fatal(err)
    }
    rows, err := db.ReadQuery(ctx, "SELECT id, name FROM users")
    if err != nil {
        log.Fatal(err)
    }
    defer rows.Close()
}

Query / QueryRow / Exec go through the write pool. ReadQuery / ReadQueryRow go through the read pool when one is configured via ConnectReadWrite; otherwise they share the single pool.

Configuration

If dsn == "", pgxkit builds one from POSTGRES_HOST, POSTGRES_PORT, POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_DB, POSTGRES_SSLMODE. pgxkit.GetDSN() returns the same string for tools that need it externally.

db.Connect(ctx, "",
    pgxkit.WithMaxConns(25),
    pgxkit.WithMinConns(5),
    pgxkit.WithMaxConnLifetime(time.Hour),
)

// Or with read/write split:
db.ConnectReadWrite(ctx, replicaDSN, primaryDSN)

Hooks

HookFunc is func(ctx, sql, args, tag pgconn.CommandTag, err error) error. tag carries the real command tag for AfterOperation on Exec; it's the zero value everywhere else (including AfterOperation on Query — pgx doesn't fill the tag until rows are closed).

db.Connect(ctx, "",
    pgxkit.WithBeforeOperation(func(ctx context.Context, sql string, _ []interface{}, _ pgconn.CommandTag, _ error) error {
        slog.InfoContext(ctx, "query", "sql", sql)
        return nil
    }),
    pgxkit.WithAfterOperation(func(ctx context.Context, _ string, _ []interface{}, tag pgconn.CommandTag, err error) error {
        if err != nil {
            metrics.QueryErrors.Inc()
        } else if tag.String() != "" { // Exec
            metrics.RowsAffected.Add(float64(tag.RowsAffected()))
        }
        return nil
    }),
)

Lifecycle hooks: WithBeforeOperation, WithAfterOperation, WithBeforeTransaction, WithAfterTransaction, WithOnShutdown. Connection-level hooks: WithOnConnect, WithOnDisconnect, WithOnAcquire, WithOnRelease.

Retry

RetryOperation retries transient PostgreSQL errors (serialization failures, deadlocks, connection drops). Constraint violations and other deterministic errors pass through.

err := pgxkit.RetryOperation(ctx, func(ctx context.Context) error {
    _, err := db.Exec(ctx, "UPDATE accounts SET balance = balance - $1 WHERE id = $2", amt, id)
    return err
}, pgxkit.WithMaxRetries(5))

The typed form pgxkit.Retry[T] returns a value. The retry budget is the context deadline — all attempts share it.

Testing

make test-db-up    # containerized Postgres on a free port
make test
make test-db-down

make is the canonical entry point — see the project Makefile for test-coverage, bench, lint, etc.

func TestThing(t *testing.T) {
    testDB := pgxkit.RequireDB(t) // skips when TEST_DATABASE_URL is unset
    // ... use testDB.DB ...
}
Plan-regression

AssertPlan captures EXPLAIN (FORMAT JSON, COSTS OFF) per query and compares to testdata/plans/<name>.json. A plan-shape change (index-scan → seq-scan, hash-join → nested-loop, etc.) fails with a unified diff.

db := testDB.EnableAssertPlan("TestUserSummary")
_, _ = db.Query(ctx, "SELECT ...")
db.AssertPlan(t, "TestUserSummary")

Refresh: go test -overwrite-plan.

Golden transcript

AssertGolden records the ordered event stream (BEGIN, every Query/Exec with SQL + normalized args, rows_affected for Exec, COMMIT/ROLLBACK) and compares to testdata/golden/<name>.json. Catches an extra UPDATE, missing INSERT, different argument, COMMIT becoming ROLLBACK.

golden := testDB.EnableGolden("TestCreateOrder")
// ... run code under test ...
golden.AssertGolden(t, "TestCreateOrder")

Refresh: go test -overwrite-golden.

Type helpers

Thin converters between Go types and pgx's pgtype.*. See the API reference for the full list.

pgxText := pgxkit.ToPgxText(&myString)
str     := pgxkit.FromPgxText(pgxText)
pgxUUID := pgxkit.ToPgxUUID(myUUID)

Graceful shutdown

db.Shutdown(ctx) waits for active operations (including in-flight transactions started by BeginTx), runs OnShutdown hooks, then closes the pools.

sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
<-sig
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_ = db.Shutdown(ctx)

Code generation

*pgxkit.DB implements the same Query/QueryRow/Exec shape generators expect, and db.WritePool() / db.ReadPool() return *pgxpool.Pool for tools that want it directly.

queries := sqlc.New(db.WritePool())

Documentation

Full docs are in the wiki:

License

MIT — see LICENSE.

Documentation

Overview

Package pgxkit provides a production-ready PostgreSQL toolkit for Go applications.

pgxkit is a tool-agnostic PostgreSQL toolkit that works with any approach to PostgreSQL development - raw pgx usage, code generation tools like sqlc or Skimatik, or any other PostgreSQL development workflow.

Key Features:

  • Read/Write Pool Abstraction: Safe by default with write pool, explicit read pool methods for optimization
  • Extensible Hook System: Add logging, tracing, metrics, circuit breakers through hooks
  • Smart Retry Logic: PostgreSQL-aware error detection with exponential backoff
  • Testing Infrastructure: Plan-regression test support for catching query-plan changes
  • Type Helpers: Seamless pgx type conversions for clean architecture
  • Health Checks: Built-in database connectivity monitoring
  • Graceful Shutdown: Production-ready lifecycle management

Basic Usage:

db := pgxkit.NewDB()
err := db.Connect(ctx, "", pgxkit.WithMaxConns(25))
if err != nil {
    log.Fatal(err)
}
defer db.Shutdown(ctx)

// Execute queries (uses write pool by default - safe)
_, err = db.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", "John")

// Optimize reads with explicit read pool usage
rows, err := db.ReadQuery(ctx, "SELECT id, name FROM users")

Hook System:

db := pgxkit.NewDB()
err := db.Connect(ctx, "",
    pgxkit.WithBeforeOperation(func(ctx context.Context, sql string, args []interface{}, operationErr error) error {
        log.Printf("Executing: %s", sql)
        return nil
    }),
)

The package follows a "safety first" design - all default methods use the write pool for consistency, with explicit ReadQuery() methods available for read optimization.

Transaction Usage:

pgxkit provides a Tx type that wraps pgx.Tx and implements the Executor interface, allowing you to write functions that work with both *DB and *Tx interchangeably.

func CreateUser(ctx context.Context, exec pgxkit.Executor, name string) (int, error) {
    var id int
    err := exec.QueryRow(ctx, "INSERT INTO users (name) VALUES ($1) RETURNING id", name).Scan(&id)
    return id, err
}

// Works with *DB
id, err := CreateUser(ctx, db, "Alice")

// Works with *Tx
tx, _ := db.BeginTx(ctx, pgx.TxOptions{})
id, err := CreateUser(ctx, tx, "Bob")

The recommended transaction pattern uses defer for safety:

tx, err := db.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
    return err
}
defer tx.Rollback(ctx) // Safe no-op if already committed

_, err = tx.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", "Alice")
if err != nil {
    return err
}

return tx.Commit(ctx)

Transactions are tracked by activeOps for graceful shutdown - Shutdown will wait for active transactions to complete. The *Tx type is NOT goroutine-safe.

Index

Constants

View Source
const (
	TxCommit   = "TX:COMMIT"
	TxRollback = "TX:ROLLBACK"
)

Variables

View Source
var ErrTxFinalized = errors.New("transaction already finalized")

Functions

func CleanupTestData

func CleanupTestData(sqlStatements ...string)

CleanupTestData executes cleanup SQL statements on the shared test database

func FromPgxBool

func FromPgxBool(b pgtype.Bool) *bool

FromPgxBool converts a pgtype.Bool to a bool pointer. If the pgtype.Bool is invalid (NULL), returns nil.

func FromPgxBoolToBool

func FromPgxBoolToBool(b pgtype.Bool) bool

FromPgxBoolToBool converts a pgtype.Bool to a bool value. If the pgtype.Bool is invalid (NULL), returns false.

func FromPgxDate

func FromPgxDate(d pgtype.Date) *time.Time

FromPgxDate converts a pgtype.Date to a time.Time pointer. If the pgtype.Date is invalid (NULL), returns nil.

func FromPgxFloat4

func FromPgxFloat4(f pgtype.Float4) *float32

FromPgxFloat4 converts a pgtype.Float4 to a float32 pointer. If the pgtype.Float4 is invalid (NULL), returns nil.

func FromPgxFloat8

func FromPgxFloat8(f pgtype.Float8) *float64

FromPgxFloat8 converts a pgtype.Float8 to a float64 pointer. If the pgtype.Float8 is invalid (NULL), returns nil.

func FromPgxInt2

func FromPgxInt2(i pgtype.Int2) *int16

FromPgxInt2 converts a pgtype.Int2 to an int16 pointer. If the pgtype.Int2 is invalid (NULL), returns nil.

func FromPgxInt4

func FromPgxInt4(i pgtype.Int4) *int32

FromPgxInt4 converts a pgtype.Int4 to an int32 pointer. If the pgtype.Int4 is invalid (NULL), returns nil.

func FromPgxInt4ToInt

func FromPgxInt4ToInt(i pgtype.Int4) *int

FromPgxInt4ToInt converts a pgtype.Int4 to an int pointer. If the pgtype.Int4 is invalid (NULL), returns nil.

func FromPgxInt8

func FromPgxInt8(i pgtype.Int8) *int64

FromPgxInt8 converts a pgtype.Int8 to an int64 pointer. If the pgtype.Int8 is invalid (NULL), returns nil.

func FromPgxInt8Array

func FromPgxInt8Array(a pgtype.Array[pgtype.Int8]) []int64

FromPgxInt8Array converts a pgtype.Array[pgtype.Int8] to an int64 slice. If the array is invalid (NULL), returns nil.

func FromPgxNumeric

func FromPgxNumeric(n pgtype.Numeric) *float64

FromPgxNumeric converts a pgtype.Numeric to a float64 pointer. If the pgtype.Numeric is invalid (NULL), returns nil.

func FromPgxText

func FromPgxText(t pgtype.Text) *string

FromPgxText converts a pgtype.Text to a string pointer. If the pgtype.Text is invalid (NULL), returns nil.

func FromPgxTextArray

func FromPgxTextArray(a pgtype.Array[pgtype.Text]) []string

FromPgxTextArray converts a pgtype.Array[pgtype.Text] to a string slice. If the array is invalid (NULL), returns nil.

func FromPgxTextToString

func FromPgxTextToString(t pgtype.Text) string

FromPgxTextToString converts a pgtype.Text to a string value. If the pgtype.Text is invalid (NULL), returns empty string.

func FromPgxTime

func FromPgxTime(t pgtype.Time) *time.Time

FromPgxTime converts a pgtype.Time to a time.Time pointer. If the pgtype.Time is invalid (NULL), returns nil. The returned time will be on the current date with the time component.

func FromPgxTimestamp

func FromPgxTimestamp(t pgtype.Timestamp) *time.Time

FromPgxTimestamp converts a pgtype.Timestamp to a time.Time pointer. If the pgtype.Timestamp is invalid (NULL), returns nil.

func FromPgxTimestamptz

func FromPgxTimestamptz(t pgtype.Timestamptz) time.Time

FromPgxTimestamptz converts a pgtype.Timestamptz to a time.Time value. If the pgtype.Timestamptz is invalid (NULL), returns zero time.

func FromPgxTimestamptzPtr

func FromPgxTimestamptzPtr(t pgtype.Timestamptz) *time.Time

FromPgxTimestamptzPtr converts a pgtype.Timestamptz to a time.Time pointer. If the pgtype.Timestamptz is invalid (NULL), returns nil.

func FromPgxUUID

func FromPgxUUID(pgxID pgtype.UUID) uuid.UUID

FromPgxUUID converts a pgtype.UUID to uuid.UUID. Returns uuid.Nil if the pgtype.UUID is invalid or cannot be parsed.

func FromPgxUUIDToPtr

func FromPgxUUIDToPtr(pgxID pgtype.UUID) *uuid.UUID

FromPgxUUIDToPtr converts a pgtype.UUID to a uuid.UUID pointer. If the pgtype.UUID is invalid (NULL), returns nil.

func GetDSN

func GetDSN() string

GetDSN returns a PostgreSQL connection string built from environment variables. This is useful for scripts and tools that need a connection string rather than a pgxpool.Pool.

Environment variables used:

  • POSTGRES_HOST (default: "localhost")
  • POSTGRES_PORT (default: 5432)
  • POSTGRES_USER (default: "postgres")
  • POSTGRES_PASSWORD (default: "")
  • POSTGRES_DB (default: "postgres")
  • POSTGRES_SSLMODE (default: "disable")

Example:

dsn := pgxkit.GetDSN()

func IsRetryableError

func IsRetryableError(err error) bool

IsRetryableError determines if an error is worth retrying

func Retry

func Retry[T any](ctx context.Context, fn func(context.Context) (T, error), opts ...RetryOption) (T, error)

Retry executes a generic operation with configurable retry logic. It uses exponential backoff to avoid thundering herd problems.

func RetryOperation

func RetryOperation(ctx context.Context, operation func(context.Context) error, opts ...RetryOption) error

RetryOperation executes an operation with configurable retry logic. It uses exponential backoff to avoid thundering herd problems.

Example:

err := pgxkit.RetryOperation(ctx, func(ctx context.Context) error {
    return doSomething(ctx)
}, pgxkit.WithMaxRetries(5), pgxkit.WithMaxDelay(5*time.Second))

func ToPgxBool

func ToPgxBool(b *bool) pgtype.Bool

ToPgxBool converts a bool pointer to pgtype.Bool. If the input is nil, returns an invalid pgtype.Bool (NULL in database).

func ToPgxBoolFromBool

func ToPgxBoolFromBool(b bool) pgtype.Bool

ToPgxBoolFromBool converts a bool value to pgtype.Bool. Use this when you have a bool value instead of a pointer.

func ToPgxDate

func ToPgxDate(t *time.Time) pgtype.Date

ToPgxDate converts a time.Time pointer to pgtype.Date. If the input is nil, returns an invalid pgtype.Date (NULL in database).

func ToPgxFloat4

func ToPgxFloat4(f *float32) pgtype.Float4

ToPgxFloat4 converts a float32 pointer to pgtype.Float4. If the input is nil, returns an invalid pgtype.Float4 (NULL in database).

func ToPgxFloat8

func ToPgxFloat8(f *float64) pgtype.Float8

ToPgxFloat8 converts a float64 pointer to pgtype.Float8. If the input is nil, returns an invalid pgtype.Float8 (NULL in database).

func ToPgxInt2

func ToPgxInt2(i *int16) pgtype.Int2

ToPgxInt2 converts an int16 pointer to pgtype.Int2. If the input is nil, returns an invalid pgtype.Int2 (NULL in database).

func ToPgxInt4

func ToPgxInt4(i *int32) pgtype.Int4

ToPgxInt4 converts an int32 pointer to pgtype.Int4. If the input is nil, returns an invalid pgtype.Int4 (NULL in database).

func ToPgxInt4FromInt

func ToPgxInt4FromInt(i *int) pgtype.Int4

ToPgxInt4FromInt converts an int pointer to pgtype.Int4. If the input is nil, returns an invalid pgtype.Int4 (NULL in database).

func ToPgxInt8

func ToPgxInt8(i *int64) pgtype.Int8

ToPgxInt8 converts an int64 pointer to pgtype.Int8. If the input is nil, returns an invalid pgtype.Int8 (NULL in database).

func ToPgxInt8Array

func ToPgxInt8Array(s []int64) pgtype.Array[pgtype.Int8]

ToPgxInt8Array converts an int64 slice to pgtype.Array[pgtype.Int8]. If the input is nil, returns an invalid array (NULL in database).

func ToPgxNumeric

func ToPgxNumeric(f *float64) pgtype.Numeric

ToPgxNumeric converts a float64 pointer to pgtype.Numeric. If the input is nil, returns an invalid pgtype.Numeric (NULL in database). Uses 6 decimal places as standard precision.

func ToPgxText

func ToPgxText(s *string) pgtype.Text

ToPgxText converts a string pointer to pgtype.Text. If the input is nil, returns an invalid pgtype.Text (NULL in database).

func ToPgxTextArray

func ToPgxTextArray(s []string) pgtype.Array[pgtype.Text]

ToPgxTextArray converts a string slice to pgtype.Array[pgtype.Text]. If the input is nil, returns an invalid array (NULL in database).

func ToPgxTextFromString

func ToPgxTextFromString(s string) pgtype.Text

ToPgxTextFromString converts a string value to pgtype.Text. Use this when you have a string value instead of a pointer.

func ToPgxTime

func ToPgxTime(t *time.Time) pgtype.Time

ToPgxTime converts a time.Time pointer to pgtype.Time. If the input is nil, returns an invalid pgtype.Time (NULL in database).

func ToPgxTimestamp

func ToPgxTimestamp(t *time.Time) pgtype.Timestamp

ToPgxTimestamp converts a time.Time pointer to pgtype.Timestamp. If the input is nil, returns an invalid pgtype.Timestamp (NULL in database).

func ToPgxTimestamptz

func ToPgxTimestamptz(t *time.Time) pgtype.Timestamptz

ToPgxTimestamptz converts a time.Time pointer to pgtype.Timestamptz. If the input is nil, returns an invalid pgtype.Timestamptz (NULL in database).

func ToPgxUUID

func ToPgxUUID(id uuid.UUID) pgtype.UUID

ToPgxUUID converts a uuid.UUID to pgtype.UUID.

func ToPgxUUIDFromPtr

func ToPgxUUIDFromPtr(id *uuid.UUID) pgtype.UUID

ToPgxUUIDFromPtr converts a uuid.UUID pointer to pgtype.UUID. If the input is nil, returns an invalid pgtype.UUID (NULL in database).

Types

type ConnectOption

type ConnectOption func(*connectConfig)

ConnectOption configures a database connection.

func WithAfterOperation

func WithAfterOperation(fn HookFunc) ConnectOption

func WithAfterTransaction

func WithAfterTransaction(fn HookFunc) ConnectOption

func WithBeforeOperation

func WithBeforeOperation(fn HookFunc) ConnectOption

func WithBeforeTransaction

func WithBeforeTransaction(fn HookFunc) ConnectOption

func WithMaxConnIdleTime

func WithMaxConnIdleTime(d time.Duration) ConnectOption

func WithMaxConnLifetime

func WithMaxConnLifetime(d time.Duration) ConnectOption

func WithMaxConns

func WithMaxConns(n int32) ConnectOption

func WithMinConns

func WithMinConns(n int32) ConnectOption

func WithOnAcquire

func WithOnAcquire(fn func(context.Context, *pgx.Conn) error) ConnectOption

func WithOnConnect

func WithOnConnect(fn func(*pgx.Conn) error) ConnectOption

func WithOnDisconnect

func WithOnDisconnect(fn func(*pgx.Conn)) ConnectOption

func WithOnRelease

func WithOnRelease(fn func(*pgx.Conn)) ConnectOption

func WithOnShutdown

func WithOnShutdown(fn HookFunc) ConnectOption

func WithReadMaxConns

func WithReadMaxConns(n int32) ConnectOption

func WithReadMinConns

func WithReadMinConns(n int32) ConnectOption

func WithWriteMaxConns

func WithWriteMaxConns(n int32) ConnectOption

func WithWriteMinConns

func WithWriteMinConns(n int32) ConnectOption

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB represents a database connection with read/write pool abstraction. It provides a safe-by-default approach where all operations use the write pool unless explicitly using Read* methods for optimization.

The DB supports:

  • Single pool mode (same pool for read/write)
  • Read/write split mode (separate pools for optimization)
  • Extensible hook system for logging, tracing, metrics
  • Graceful shutdown with active operation tracking
  • Built-in retry logic for transient failures
  • Health checks and connection statistics

func NewDB

func NewDB() *DB

NewDB creates a new unconnected DB instance. Call Connect() with options to establish the database connection.

Example:

db := pgxkit.NewDB()
err := db.Connect(ctx, "postgres://user:pass@localhost/db",
    pgxkit.WithMaxConns(25),
    pgxkit.WithBeforeOperation(myLoggingHook),
)

func (*DB) AssertGolden

func (db *DB) AssertGolden(t *testing.T, testName string)

AssertGolden compares the captured transcript against testdata/golden/<testName>.json. First run (or with -overwrite-golden) writes the baseline; later runs fail with a unified diff if it changes.

func (*DB) AssertPlan added in v2.1.0

func (db *DB) AssertPlan(t *testing.T, testName string)

AssertPlan compares the captured plans against testdata/plans/<testName>.json.

func (*DB) BeginTx

func (db *DB) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (*Tx, error)

BeginTx starts a transaction using the write pool. Transactions always use the write pool to ensure consistency. The transaction will execute BeforeTransaction hook on start and AfterTransaction hook on Commit/Rollback.

Example:

tx, err := db.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
    return err
}
defer tx.Rollback(ctx) // Safe to call even after commit

_, err = tx.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", name)
if err != nil {
    return err
}
return tx.Commit(ctx)

func (*DB) Connect

func (db *DB) Connect(ctx context.Context, dsn string, opts ...ConnectOption) error

Connect establishes a database connection with a single pool (same pool for read/write). If dsn is empty, it uses environment variables to construct the connection string. Options are applied to configure pool settings and hooks.

This is the recommended approach for most applications as it provides safety by default while still allowing read optimization through ReadQuery methods.

Example:

db := pgxkit.NewDB()
err := db.Connect(ctx, "postgres://user:pass@localhost/db",
    pgxkit.WithMaxConns(25),
    pgxkit.WithOnConnect(func(conn *pgx.Conn) error {
        _, err := conn.Exec(context.Background(), "SET application_name = 'myapp'")
        return err
    }),
)
// Or use environment variables:
err := db.Connect(ctx, "")

func (*DB) ConnectReadWrite

func (db *DB) ConnectReadWrite(ctx context.Context, readDSN, writeDSN string, opts ...ConnectOption) error

ConnectReadWrite establishes database connections with separate read and write pools. If readDSN or writeDSN is empty, it uses environment variables to construct the connection string. Options are applied to both pools.

This is useful for applications that want to optimize read performance by routing read queries to read replicas while ensuring writes go to the primary database.

Example:

db := pgxkit.NewDB()
err := db.ConnectReadWrite(ctx, "postgres://user:pass@read-replica/db", "postgres://user:pass@primary/db",
    pgxkit.WithMaxConns(25),
)
// Now ReadQuery methods will use the read pool, while Query/Exec use the write pool

func (*DB) Exec

func (db *DB) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)

Exec executes a statement using the write pool. This method is used for INSERT, UPDATE, DELETE, and other write operations.

Example:

tag, err := db.Exec(ctx, "INSERT INTO users (name, email) VALUES ($1, $2)", name, email)
if err != nil {
    return err
}
fmt.Printf("Inserted %d rows\n", tag.RowsAffected())

func (*DB) HealthCheck

func (db *DB) HealthCheck(ctx context.Context) error

HealthCheck performs a simple health check by pinging the database. This is useful for health check endpoints and monitoring systems. It returns an error if the database is not connected, shutting down, or unreachable.

Example:

if err := db.HealthCheck(ctx); err != nil {
    log.Printf("Database health check failed: %v", err)
    http.Error(w, "Database unavailable", http.StatusServiceUnavailable)
    return
}

func (*DB) IsReady

func (db *DB) IsReady(ctx context.Context) bool

IsReady checks if the database connection is ready to accept queries. This is a convenience method that returns true if HealthCheck() succeeds. It's useful for readiness probes and quick status checks.

Example:

if db.IsReady(ctx) {
    log.Println("Database is ready to accept queries")
}

func (*DB) Query

func (db *DB) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)

Query executes a query using the write pool (safe by default). This ensures consistency by always using the primary database connection. Use ReadQuery for read-only queries that can benefit from read replicas.

Example:

rows, err := db.Query(ctx, "SELECT * FROM users WHERE active = $1", true)
if err != nil {
    return err
}
defer rows.Close()

func (*DB) QueryRow

func (db *DB) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row

QueryRow executes a query that returns a single row using the write pool. This ensures consistency by always using the primary database connection. Use ReadQueryRow for read-only queries that can benefit from read replicas.

Example:

var userID int
err := db.QueryRow(ctx, "SELECT id FROM users WHERE email = $1", email).Scan(&userID)

func (*DB) ReadPool

func (db *DB) ReadPool() *pgxpool.Pool

ReadPool returns the underlying read connection pool. Returns nil if no separate read pool is configured.

func (*DB) ReadQuery

func (db *DB) ReadQuery(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)

ReadQuery executes a query using the read pool (explicit optimization). This method routes the query to read replicas when available, improving performance for read-heavy workloads. Only use this for queries that can tolerate read replica lag.

Example:

rows, err := db.ReadQuery(ctx, "SELECT * FROM users WHERE active = $1", true)
if err != nil {
    return err
}
defer rows.Close()

func (*DB) ReadQueryRow

func (db *DB) ReadQueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row

ReadQueryRow executes a query that returns a single row using the read pool. This method routes the query to read replicas when available, improving performance for read-heavy workloads. Only use this for queries that can tolerate read replica lag.

Example:

var count int
err := db.ReadQueryRow(ctx, "SELECT COUNT(*) FROM users").Scan(&count)

func (*DB) ReadStats

func (db *DB) ReadStats() *pgxpool.Stat

ReadStats returns statistics for the read pool. This provides information about read connection usage, which is useful for monitoring read replica performance and connection pool health.

Example:

stats := db.ReadStats()
if stats != nil {
    log.Printf("Read pool active connections: %d", stats.AcquiredConns())
}

func (*DB) Shutdown

func (db *DB) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the database connections. It waits for active operations to complete, respecting the context timeout. If the context times out, shutdown proceeds anyway to prevent hanging.

The shutdown process: 1. Marks the database as shutting down (new operations will fail) 2. Waits for active operations to complete (respects context timeout) 3. Executes OnShutdown hooks 4. Closes connection pools

Example:

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err := db.Shutdown(ctx)

func (*DB) Stats

func (db *DB) Stats() *pgxpool.Stat

Stats returns statistics for the write pool. This provides information about connection usage, which is useful for monitoring and debugging connection pool performance.

Example:

stats := db.Stats()
if stats != nil {
    log.Printf("Active connections: %d", stats.AcquiredConns())
    log.Printf("Idle connections: %d", stats.IdleConns())
}

func (*DB) WritePool

func (db *DB) WritePool() *pgxpool.Pool

WritePool returns the underlying write connection pool. Useful for integrating with code generation tools like sqlc.

type DatabaseError

type DatabaseError struct {
	Entity    string
	Operation string // "create", "update", "delete", "query"
	Err       error
}

DatabaseError represents database operation failures such as connection errors, constraint violations, or other database-specific errors.

func NewDatabaseError

func NewDatabaseError(entity, operation string, err error) *DatabaseError

NewDatabaseError creates a new DatabaseError with the given entity, operation, and underlying error.

func (*DatabaseError) Error

func (e *DatabaseError) Error() string

func (*DatabaseError) Unwrap

func (e *DatabaseError) Unwrap() error

type Executor

type Executor interface {
	Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
	QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
	Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)
}

Executor is a unified interface for database operations that both *DB and *Tx implement. This allows passing a single interface for database operations whether in a transaction or not.

type GoldenOption added in v2.1.0

type GoldenOption func(*assertGoldenHook)

GoldenOption configures the assertGoldenHook installed by EnableGolden.

func WithGoldenNormalizer added in v2.1.0

func WithGoldenNormalizer(fn func(any) (any, bool)) GoldenOption

WithGoldenNormalizer registers a custom normalizer that runs before the defaults (timestamps, UUIDs). Return ok=true to take over normalization for the value; ok=false to fall through.

type HookFunc

type HookFunc func(ctx context.Context, sql string, args []interface{}, tag pgconn.CommandTag, operationErr error) error

HookFunc is the universal hook function signature for operation-level hooks.

tag carries pool.Exec's CommandTag on AfterOperation for Exec calls. It is the zero value everywhere else — including AfterOperation for Query, because pgx fills the tag only after rows are closed and AfterOperation fires before iteration. operationErr is nil on before-hooks. Returning an error from a before-hook aborts the operation; from an after-hook it does not affect the original result but is reported.

type HookType

type HookType int

HookType represents the type of hook for operation-level hooks. These hooks are executed during database operations and provide extensibility for logging, tracing, metrics, circuit breakers, and other cross-cutting concerns.

const (
	// BeforeOperation is called before any query/exec operation.
	// The operationErr parameter will always be nil.
	BeforeOperation HookType = iota

	// AfterOperation is called after any query/exec operation.
	// The operationErr parameter contains the result of the operation.
	AfterOperation

	// BeforeTransaction is called before starting a transaction.
	// The operationErr parameter will always be nil.
	BeforeTransaction

	// AfterTransaction is called after a transaction completes.
	// The operationErr parameter contains the result of the transaction.
	AfterTransaction

	// OnShutdown is called during graceful shutdown.
	// The sql and args parameters will be empty, operationErr will be nil.
	OnShutdown
)

type NotFoundError

type NotFoundError struct {
	Entity     string
	Identifier interface{}
}

NotFoundError represents when a requested entity is not found in the database. This error should be used instead of returning pgx.ErrNoRows directly.

Example:

if err == pgx.ErrNoRows {
    return nil, pgxkit.NewNotFoundError("User", userID)
}

func NewNotFoundError

func NewNotFoundError(entity string, identifier interface{}) *NotFoundError

NewNotFoundError creates a new NotFoundError with the given entity and identifier.

func (*NotFoundError) Error

func (e *NotFoundError) Error() string

type QueryPlan

type QueryPlan struct {
	Query int                      `json:"query"`
	SQL   string                   `json:"sql"`
	Plan  []map[string]interface{} `json:"plan"`
}

QueryPlan is one captured structural query plan.

type RetryOption

type RetryOption func(*retryConfig)

RetryOption configures retry behavior for operations.

func WithBackoffMultiplier

func WithBackoffMultiplier(m float64) RetryOption

WithBackoffMultiplier sets the multiplier for exponential backoff.

func WithBaseDelay

func WithBaseDelay(d time.Duration) RetryOption

WithBaseDelay sets the initial delay between retries.

func WithMaxDelay

func WithMaxDelay(d time.Duration) RetryOption

WithMaxDelay sets the maximum delay between retries.

func WithMaxRetries

func WithMaxRetries(n int) RetryOption

WithMaxRetries sets the maximum number of retry attempts.

type TestDB

type TestDB struct {
	*DB
}

TestDB is a testing utility that wraps DB with testing-specific functionality.

func NewTestDB

func NewTestDB() *TestDB

func RequireDB

func RequireDB(t *testing.T) *TestDB

RequireDB ensures a test database is available or skips the test.

func (*TestDB) Clean

func (tdb *TestDB) Clean() error

func (*TestDB) EnableAssertPlan added in v2.1.0

func (tdb *TestDB) EnableAssertPlan(testName string) *DB

EnableAssertPlan returns a *DB that captures the structural EXPLAIN plan of each SELECT/INSERT/UPDATE/DELETE/WITH query into memory.

func (*TestDB) EnableGolden

func (tdb *TestDB) EnableGolden(testName string, opts ...GoldenOption) *DB

EnableGolden returns a *DB that records database events (BEGIN, QUERY, COMMIT, ROLLBACK) for the test scenario via the hook system. Call AssertGolden after the scenario to compare against testdata/golden/<testName>.json.

func (*TestDB) Setup

func (tdb *TestDB) Setup() error

type Tx

type Tx struct {
	// contains filtered or unexported fields
}

Tx wraps a pgx.Tx to implement the Executor interface and provide transaction lifecycle management integrated with pgxkit's activeOps tracking and hook system.

func (*Tx) Commit

func (t *Tx) Commit(ctx context.Context) error

Commit commits the transaction and fires AfterTransaction. Atomic finalization makes "defer Rollback() + explicit Commit()" safe.

func (*Tx) Exec

func (t *Tx) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)

Exec executes a statement within the transaction. Fires BeforeOperation / AfterOperation hooks on the parent DB; AfterOperation receives the command tag.

func (*Tx) IsFinalized

func (t *Tx) IsFinalized() bool

IsFinalized returns true if the transaction has been committed or rolled back.

func (*Tx) Query

func (t *Tx) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)

Query executes a query within the transaction. Fires BeforeOperation / AfterOperation hooks on the parent DB.

func (*Tx) QueryRow

func (t *Tx) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row

QueryRow executes a query that returns a single row within the transaction. Fires BeforeOperation / AfterOperation hooks on the parent DB.

func (*Tx) Rollback

func (t *Tx) Rollback(ctx context.Context) error

Rollback rolls back the transaction and fires AfterTransaction. Atomic finalization makes "defer Rollback() + explicit Commit()" safe.

func (*Tx) Tx

func (t *Tx) Tx() pgx.Tx

Tx returns the underlying pgx.Tx for advanced use cases that require direct access to pgx transaction functionality.

type ValidationError

type ValidationError struct {
	Entity    string
	Operation string
	Field     string
	Reason    string
	Err       error
}

ValidationError represents validation failures that occur before database operations. Use this for input validation, constraint violations, or business rule failures.

func NewValidationError

func NewValidationError(entity, operation, field, reason string, err error) *ValidationError

NewValidationError creates a new ValidationError with the given parameters.

func (*ValidationError) Error

func (e *ValidationError) Error() string

func (*ValidationError) Unwrap

func (e *ValidationError) Unwrap() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL