Documentation
¶
Overview ¶
Package pgxkit provides a production-ready PostgreSQL toolkit for Go applications.
pgxkit is a tool-agnostic PostgreSQL toolkit that works with any approach to PostgreSQL development - raw pgx usage, code generation tools like sqlc or Skimatik, or any other PostgreSQL development workflow.
Key Features:
- Read/Write Pool Abstraction: Safe by default with write pool, explicit read pool methods for optimization
- Extensible Hook System: Add logging, tracing, metrics, circuit breakers through hooks
- Smart Retry Logic: PostgreSQL-aware error detection with exponential backoff
- Testing Infrastructure: Plan-regression test support for catching query-plan changes
- Type Helpers: Seamless pgx type conversions for clean architecture
- Health Checks: Built-in database connectivity monitoring
- Graceful Shutdown: Production-ready lifecycle management
Basic Usage:
db := pgxkit.NewDB()
err := db.Connect(ctx, "", pgxkit.WithMaxConns(25))
if err != nil {
log.Fatal(err)
}
defer db.Shutdown(ctx)
// Execute queries (uses write pool by default - safe)
_, err = db.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", "John")
// Optimize reads with explicit read pool usage
rows, err := db.ReadQuery(ctx, "SELECT id, name FROM users")
Hook System:
db := pgxkit.NewDB()
err := db.Connect(ctx, "",
pgxkit.WithBeforeOperation(func(ctx context.Context, sql string, args []interface{}, operationErr error) error {
log.Printf("Executing: %s", sql)
return nil
}),
)
The package follows a "safety first" design - all default methods use the write pool for consistency, with explicit ReadQuery() methods available for read optimization.
Transaction Usage:
pgxkit provides a Tx type that wraps pgx.Tx and implements the Executor interface, allowing you to write functions that work with both *DB and *Tx interchangeably.
func CreateUser(ctx context.Context, exec pgxkit.Executor, name string) (int, error) {
var id int
err := exec.QueryRow(ctx, "INSERT INTO users (name) VALUES ($1) RETURNING id", name).Scan(&id)
return id, err
}
// Works with *DB
id, err := CreateUser(ctx, db, "Alice")
// Works with *Tx
tx, _ := db.BeginTx(ctx, pgx.TxOptions{})
id, err := CreateUser(ctx, tx, "Bob")
The recommended transaction pattern uses defer for safety:
tx, err := db.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return err
}
defer tx.Rollback(ctx) // Safe no-op if already committed
_, err = tx.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", "Alice")
if err != nil {
return err
}
return tx.Commit(ctx)
Transactions are tracked by activeOps for graceful shutdown - Shutdown will wait for active transactions to complete. The *Tx type is NOT goroutine-safe.
Index ¶
- Constants
- Variables
- func CleanupTestData(sqlStatements ...string)
- func FromPgxBool(b pgtype.Bool) *bool
- func FromPgxBoolToBool(b pgtype.Bool) bool
- func FromPgxDate(d pgtype.Date) *time.Time
- func FromPgxFloat4(f pgtype.Float4) *float32
- func FromPgxFloat8(f pgtype.Float8) *float64
- func FromPgxInt2(i pgtype.Int2) *int16
- func FromPgxInt4(i pgtype.Int4) *int32
- func FromPgxInt4ToInt(i pgtype.Int4) *int
- func FromPgxInt8(i pgtype.Int8) *int64
- func FromPgxInt8Array(a pgtype.Array[pgtype.Int8]) []int64
- func FromPgxNumeric(n pgtype.Numeric) *float64
- func FromPgxText(t pgtype.Text) *string
- func FromPgxTextArray(a pgtype.Array[pgtype.Text]) []string
- func FromPgxTextToString(t pgtype.Text) string
- func FromPgxTime(t pgtype.Time) *time.Time
- func FromPgxTimestamp(t pgtype.Timestamp) *time.Time
- func FromPgxTimestamptz(t pgtype.Timestamptz) time.Time
- func FromPgxTimestamptzPtr(t pgtype.Timestamptz) *time.Time
- func FromPgxUUID(pgxID pgtype.UUID) uuid.UUID
- func FromPgxUUIDToPtr(pgxID pgtype.UUID) *uuid.UUID
- func GetDSN() string
- func IsRetryableError(err error) bool
- func Retry[T any](ctx context.Context, fn func(context.Context) (T, error), opts ...RetryOption) (T, error)
- func RetryOperation(ctx context.Context, operation func(context.Context) error, ...) error
- func ToPgxBool(b *bool) pgtype.Bool
- func ToPgxBoolFromBool(b bool) pgtype.Bool
- func ToPgxDate(t *time.Time) pgtype.Date
- func ToPgxFloat4(f *float32) pgtype.Float4
- func ToPgxFloat8(f *float64) pgtype.Float8
- func ToPgxInt2(i *int16) pgtype.Int2
- func ToPgxInt4(i *int32) pgtype.Int4
- func ToPgxInt4FromInt(i *int) pgtype.Int4
- func ToPgxInt8(i *int64) pgtype.Int8
- func ToPgxInt8Array(s []int64) pgtype.Array[pgtype.Int8]
- func ToPgxNumeric(f *float64) pgtype.Numeric
- func ToPgxText(s *string) pgtype.Text
- func ToPgxTextArray(s []string) pgtype.Array[pgtype.Text]
- func ToPgxTextFromString(s string) pgtype.Text
- func ToPgxTime(t *time.Time) pgtype.Time
- func ToPgxTimestamp(t *time.Time) pgtype.Timestamp
- func ToPgxTimestamptz(t *time.Time) pgtype.Timestamptz
- func ToPgxUUID(id uuid.UUID) pgtype.UUID
- func ToPgxUUIDFromPtr(id *uuid.UUID) pgtype.UUID
- type ConnectOption
- func WithAfterOperation(fn HookFunc) ConnectOption
- func WithAfterTransaction(fn HookFunc) ConnectOption
- func WithBeforeOperation(fn HookFunc) ConnectOption
- func WithBeforeTransaction(fn HookFunc) ConnectOption
- func WithMaxConnIdleTime(d time.Duration) ConnectOption
- func WithMaxConnLifetime(d time.Duration) ConnectOption
- func WithMaxConns(n int32) ConnectOption
- func WithMinConns(n int32) ConnectOption
- func WithOnAcquire(fn func(context.Context, *pgx.Conn) error) ConnectOption
- func WithOnConnect(fn func(*pgx.Conn) error) ConnectOption
- func WithOnDisconnect(fn func(*pgx.Conn)) ConnectOption
- func WithOnRelease(fn func(*pgx.Conn)) ConnectOption
- func WithOnShutdown(fn HookFunc) ConnectOption
- func WithReadMaxConns(n int32) ConnectOption
- func WithReadMinConns(n int32) ConnectOption
- func WithWriteMaxConns(n int32) ConnectOption
- func WithWriteMinConns(n int32) ConnectOption
- type DB
- func (db *DB) AssertGolden(t *testing.T, testName string)
- func (db *DB) AssertPlan(t *testing.T, testName string)
- func (db *DB) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (*Tx, error)
- func (db *DB) Connect(ctx context.Context, dsn string, opts ...ConnectOption) error
- func (db *DB) ConnectReadWrite(ctx context.Context, readDSN, writeDSN string, opts ...ConnectOption) error
- func (db *DB) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)
- func (db *DB) HealthCheck(ctx context.Context) error
- func (db *DB) IsReady(ctx context.Context) bool
- func (db *DB) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
- func (db *DB) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
- func (db *DB) ReadPool() *pgxpool.Pool
- func (db *DB) ReadQuery(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
- func (db *DB) ReadQueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
- func (db *DB) ReadStats() *pgxpool.Stat
- func (db *DB) Shutdown(ctx context.Context) error
- func (db *DB) Stats() *pgxpool.Stat
- func (db *DB) WritePool() *pgxpool.Pool
- type DatabaseError
- type Executor
- type GoldenOption
- type HookFunc
- type HookType
- type NotFoundError
- type QueryPlan
- type RetryOption
- type TestDB
- type Tx
- func (t *Tx) Commit(ctx context.Context) error
- func (t *Tx) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)
- func (t *Tx) IsFinalized() bool
- func (t *Tx) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
- func (t *Tx) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
- func (t *Tx) Rollback(ctx context.Context) error
- func (t *Tx) Tx() pgx.Tx
- type ValidationError
Constants ¶
const ( TxCommit = "TX:COMMIT" TxRollback = "TX:ROLLBACK" )
Variables ¶
var ErrTxFinalized = errors.New("transaction already finalized")
Functions ¶
func CleanupTestData ¶
func CleanupTestData(sqlStatements ...string)
CleanupTestData executes cleanup SQL statements on the shared test database
func FromPgxBool ¶
FromPgxBool converts a pgtype.Bool to a bool pointer. If the pgtype.Bool is invalid (NULL), returns nil.
func FromPgxBoolToBool ¶
FromPgxBoolToBool converts a pgtype.Bool to a bool value. If the pgtype.Bool is invalid (NULL), returns false.
func FromPgxDate ¶
FromPgxDate converts a pgtype.Date to a time.Time pointer. If the pgtype.Date is invalid (NULL), returns nil.
func FromPgxFloat4 ¶
FromPgxFloat4 converts a pgtype.Float4 to a float32 pointer. If the pgtype.Float4 is invalid (NULL), returns nil.
func FromPgxFloat8 ¶
FromPgxFloat8 converts a pgtype.Float8 to a float64 pointer. If the pgtype.Float8 is invalid (NULL), returns nil.
func FromPgxInt2 ¶
FromPgxInt2 converts a pgtype.Int2 to an int16 pointer. If the pgtype.Int2 is invalid (NULL), returns nil.
func FromPgxInt4 ¶
FromPgxInt4 converts a pgtype.Int4 to an int32 pointer. If the pgtype.Int4 is invalid (NULL), returns nil.
func FromPgxInt4ToInt ¶
FromPgxInt4ToInt converts a pgtype.Int4 to an int pointer. If the pgtype.Int4 is invalid (NULL), returns nil.
func FromPgxInt8 ¶
FromPgxInt8 converts a pgtype.Int8 to an int64 pointer. If the pgtype.Int8 is invalid (NULL), returns nil.
func FromPgxInt8Array ¶
FromPgxInt8Array converts a pgtype.Array[pgtype.Int8] to an int64 slice. If the array is invalid (NULL), returns nil.
func FromPgxNumeric ¶
FromPgxNumeric converts a pgtype.Numeric to a float64 pointer. If the pgtype.Numeric is invalid (NULL), returns nil.
func FromPgxText ¶
FromPgxText converts a pgtype.Text to a string pointer. If the pgtype.Text is invalid (NULL), returns nil.
func FromPgxTextArray ¶
FromPgxTextArray converts a pgtype.Array[pgtype.Text] to a string slice. If the array is invalid (NULL), returns nil.
func FromPgxTextToString ¶
FromPgxTextToString converts a pgtype.Text to a string value. If the pgtype.Text is invalid (NULL), returns empty string.
func FromPgxTime ¶
FromPgxTime converts a pgtype.Time to a time.Time pointer. If the pgtype.Time is invalid (NULL), returns nil. The returned time will be on the current date with the time component.
func FromPgxTimestamp ¶
FromPgxTimestamp converts a pgtype.Timestamp to a time.Time pointer. If the pgtype.Timestamp is invalid (NULL), returns nil.
func FromPgxTimestamptz ¶
func FromPgxTimestamptz(t pgtype.Timestamptz) time.Time
FromPgxTimestamptz converts a pgtype.Timestamptz to a time.Time value. If the pgtype.Timestamptz is invalid (NULL), returns zero time.
func FromPgxTimestamptzPtr ¶
func FromPgxTimestamptzPtr(t pgtype.Timestamptz) *time.Time
FromPgxTimestamptzPtr converts a pgtype.Timestamptz to a time.Time pointer. If the pgtype.Timestamptz is invalid (NULL), returns nil.
func FromPgxUUID ¶
FromPgxUUID converts a pgtype.UUID to uuid.UUID. Returns uuid.Nil if the pgtype.UUID is invalid or cannot be parsed.
func FromPgxUUIDToPtr ¶
FromPgxUUIDToPtr converts a pgtype.UUID to a uuid.UUID pointer. If the pgtype.UUID is invalid (NULL), returns nil.
func GetDSN ¶
func GetDSN() string
GetDSN returns a PostgreSQL connection string built from environment variables. This is useful for scripts and tools that need a connection string rather than a pgxpool.Pool.
Environment variables used:
- POSTGRES_HOST (default: "localhost")
- POSTGRES_PORT (default: 5432)
- POSTGRES_USER (default: "postgres")
- POSTGRES_PASSWORD (default: "")
- POSTGRES_DB (default: "postgres")
- POSTGRES_SSLMODE (default: "disable")
Example:
dsn := pgxkit.GetDSN()
func IsRetryableError ¶
IsRetryableError determines if an error is worth retrying
func Retry ¶
func Retry[T any](ctx context.Context, fn func(context.Context) (T, error), opts ...RetryOption) (T, error)
Retry executes a generic operation with configurable retry logic. It uses exponential backoff to avoid thundering herd problems.
func RetryOperation ¶
func RetryOperation(ctx context.Context, operation func(context.Context) error, opts ...RetryOption) error
RetryOperation executes an operation with configurable retry logic. It uses exponential backoff to avoid thundering herd problems.
Example:
err := pgxkit.RetryOperation(ctx, func(ctx context.Context) error {
return doSomething(ctx)
}, pgxkit.WithMaxRetries(5), pgxkit.WithMaxDelay(5*time.Second))
func ToPgxBool ¶
ToPgxBool converts a bool pointer to pgtype.Bool. If the input is nil, returns an invalid pgtype.Bool (NULL in database).
func ToPgxBoolFromBool ¶
ToPgxBoolFromBool converts a bool value to pgtype.Bool. Use this when you have a bool value instead of a pointer.
func ToPgxDate ¶
ToPgxDate converts a time.Time pointer to pgtype.Date. If the input is nil, returns an invalid pgtype.Date (NULL in database).
func ToPgxFloat4 ¶
ToPgxFloat4 converts a float32 pointer to pgtype.Float4. If the input is nil, returns an invalid pgtype.Float4 (NULL in database).
func ToPgxFloat8 ¶
ToPgxFloat8 converts a float64 pointer to pgtype.Float8. If the input is nil, returns an invalid pgtype.Float8 (NULL in database).
func ToPgxInt2 ¶
ToPgxInt2 converts an int16 pointer to pgtype.Int2. If the input is nil, returns an invalid pgtype.Int2 (NULL in database).
func ToPgxInt4 ¶
ToPgxInt4 converts an int32 pointer to pgtype.Int4. If the input is nil, returns an invalid pgtype.Int4 (NULL in database).
func ToPgxInt4FromInt ¶
ToPgxInt4FromInt converts an int pointer to pgtype.Int4. If the input is nil, returns an invalid pgtype.Int4 (NULL in database).
func ToPgxInt8 ¶
ToPgxInt8 converts an int64 pointer to pgtype.Int8. If the input is nil, returns an invalid pgtype.Int8 (NULL in database).
func ToPgxInt8Array ¶
ToPgxInt8Array converts an int64 slice to pgtype.Array[pgtype.Int8]. If the input is nil, returns an invalid array (NULL in database).
func ToPgxNumeric ¶
ToPgxNumeric converts a float64 pointer to pgtype.Numeric. If the input is nil, returns an invalid pgtype.Numeric (NULL in database). Uses 6 decimal places as standard precision.
func ToPgxText ¶
ToPgxText converts a string pointer to pgtype.Text. If the input is nil, returns an invalid pgtype.Text (NULL in database).
func ToPgxTextArray ¶
ToPgxTextArray converts a string slice to pgtype.Array[pgtype.Text]. If the input is nil, returns an invalid array (NULL in database).
func ToPgxTextFromString ¶
ToPgxTextFromString converts a string value to pgtype.Text. Use this when you have a string value instead of a pointer.
func ToPgxTime ¶
ToPgxTime converts a time.Time pointer to pgtype.Time. If the input is nil, returns an invalid pgtype.Time (NULL in database).
func ToPgxTimestamp ¶
ToPgxTimestamp converts a time.Time pointer to pgtype.Timestamp. If the input is nil, returns an invalid pgtype.Timestamp (NULL in database).
func ToPgxTimestamptz ¶
func ToPgxTimestamptz(t *time.Time) pgtype.Timestamptz
ToPgxTimestamptz converts a time.Time pointer to pgtype.Timestamptz. If the input is nil, returns an invalid pgtype.Timestamptz (NULL in database).
Types ¶
type ConnectOption ¶
type ConnectOption func(*connectConfig)
ConnectOption configures a database connection.
func WithAfterOperation ¶
func WithAfterOperation(fn HookFunc) ConnectOption
func WithAfterTransaction ¶
func WithAfterTransaction(fn HookFunc) ConnectOption
func WithBeforeOperation ¶
func WithBeforeOperation(fn HookFunc) ConnectOption
func WithBeforeTransaction ¶
func WithBeforeTransaction(fn HookFunc) ConnectOption
func WithMaxConnIdleTime ¶
func WithMaxConnIdleTime(d time.Duration) ConnectOption
func WithMaxConnLifetime ¶
func WithMaxConnLifetime(d time.Duration) ConnectOption
func WithMaxConns ¶
func WithMaxConns(n int32) ConnectOption
func WithMinConns ¶
func WithMinConns(n int32) ConnectOption
func WithOnAcquire ¶
func WithOnConnect ¶
func WithOnConnect(fn func(*pgx.Conn) error) ConnectOption
func WithOnDisconnect ¶
func WithOnDisconnect(fn func(*pgx.Conn)) ConnectOption
func WithOnRelease ¶
func WithOnRelease(fn func(*pgx.Conn)) ConnectOption
func WithOnShutdown ¶
func WithOnShutdown(fn HookFunc) ConnectOption
func WithReadMaxConns ¶
func WithReadMaxConns(n int32) ConnectOption
func WithReadMinConns ¶
func WithReadMinConns(n int32) ConnectOption
func WithWriteMaxConns ¶
func WithWriteMaxConns(n int32) ConnectOption
func WithWriteMinConns ¶
func WithWriteMinConns(n int32) ConnectOption
type DB ¶
type DB struct {
// contains filtered or unexported fields
}
DB represents a database connection with read/write pool abstraction. It provides a safe-by-default approach where all operations use the write pool unless explicitly using Read* methods for optimization.
The DB supports:
- Single pool mode (same pool for read/write)
- Read/write split mode (separate pools for optimization)
- Extensible hook system for logging, tracing, metrics
- Graceful shutdown with active operation tracking
- Built-in retry logic for transient failures
- Health checks and connection statistics
func NewDB ¶
func NewDB() *DB
NewDB creates a new unconnected DB instance. Call Connect() with options to establish the database connection.
Example:
db := pgxkit.NewDB()
err := db.Connect(ctx, "postgres://user:pass@localhost/db",
pgxkit.WithMaxConns(25),
pgxkit.WithBeforeOperation(myLoggingHook),
)
func (*DB) AssertGolden ¶
AssertGolden compares the captured transcript against testdata/golden/<testName>.json. First run (or with -overwrite-golden) writes the baseline; later runs fail with a unified diff if it changes.
func (*DB) AssertPlan ¶ added in v2.1.0
AssertPlan compares the captured plans against testdata/plans/<testName>.json.
func (*DB) BeginTx ¶
BeginTx starts a transaction using the write pool. Transactions always use the write pool to ensure consistency. The transaction will execute BeforeTransaction hook on start and AfterTransaction hook on Commit/Rollback.
Example:
tx, err := db.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return err
}
defer tx.Rollback(ctx) // Safe to call even after commit
_, err = tx.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", name)
if err != nil {
return err
}
return tx.Commit(ctx)
func (*DB) Connect ¶
Connect establishes a database connection with a single pool (same pool for read/write). If dsn is empty, it uses environment variables to construct the connection string. Options are applied to configure pool settings and hooks.
This is the recommended approach for most applications as it provides safety by default while still allowing read optimization through ReadQuery methods.
Example:
db := pgxkit.NewDB()
err := db.Connect(ctx, "postgres://user:pass@localhost/db",
pgxkit.WithMaxConns(25),
pgxkit.WithOnConnect(func(conn *pgx.Conn) error {
_, err := conn.Exec(context.Background(), "SET application_name = 'myapp'")
return err
}),
)
// Or use environment variables:
err := db.Connect(ctx, "")
func (*DB) ConnectReadWrite ¶
func (db *DB) ConnectReadWrite(ctx context.Context, readDSN, writeDSN string, opts ...ConnectOption) error
ConnectReadWrite establishes database connections with separate read and write pools. If readDSN or writeDSN is empty, it uses environment variables to construct the connection string. Options are applied to both pools.
This is useful for applications that want to optimize read performance by routing read queries to read replicas while ensuring writes go to the primary database.
Example:
db := pgxkit.NewDB()
err := db.ConnectReadWrite(ctx, "postgres://user:pass@read-replica/db", "postgres://user:pass@primary/db",
pgxkit.WithMaxConns(25),
)
// Now ReadQuery methods will use the read pool, while Query/Exec use the write pool
func (*DB) Exec ¶
Exec executes a statement using the write pool. This method is used for INSERT, UPDATE, DELETE, and other write operations.
Example:
tag, err := db.Exec(ctx, "INSERT INTO users (name, email) VALUES ($1, $2)", name, email)
if err != nil {
return err
}
fmt.Printf("Inserted %d rows\n", tag.RowsAffected())
func (*DB) HealthCheck ¶
HealthCheck performs a simple health check by pinging the database. This is useful for health check endpoints and monitoring systems. It returns an error if the database is not connected, shutting down, or unreachable.
Example:
if err := db.HealthCheck(ctx); err != nil {
log.Printf("Database health check failed: %v", err)
http.Error(w, "Database unavailable", http.StatusServiceUnavailable)
return
}
func (*DB) IsReady ¶
IsReady checks if the database connection is ready to accept queries. This is a convenience method that returns true if HealthCheck() succeeds. It's useful for readiness probes and quick status checks.
Example:
if db.IsReady(ctx) {
log.Println("Database is ready to accept queries")
}
func (*DB) Query ¶
Query executes a query using the write pool (safe by default). This ensures consistency by always using the primary database connection. Use ReadQuery for read-only queries that can benefit from read replicas.
Example:
rows, err := db.Query(ctx, "SELECT * FROM users WHERE active = $1", true)
if err != nil {
return err
}
defer rows.Close()
func (*DB) QueryRow ¶
QueryRow executes a query that returns a single row using the write pool. This ensures consistency by always using the primary database connection. Use ReadQueryRow for read-only queries that can benefit from read replicas.
Example:
var userID int err := db.QueryRow(ctx, "SELECT id FROM users WHERE email = $1", email).Scan(&userID)
func (*DB) ReadPool ¶
ReadPool returns the underlying read connection pool. Returns nil if no separate read pool is configured.
func (*DB) ReadQuery ¶
ReadQuery executes a query using the read pool (explicit optimization). This method routes the query to read replicas when available, improving performance for read-heavy workloads. Only use this for queries that can tolerate read replica lag.
Example:
rows, err := db.ReadQuery(ctx, "SELECT * FROM users WHERE active = $1", true)
if err != nil {
return err
}
defer rows.Close()
func (*DB) ReadQueryRow ¶
ReadQueryRow executes a query that returns a single row using the read pool. This method routes the query to read replicas when available, improving performance for read-heavy workloads. Only use this for queries that can tolerate read replica lag.
Example:
var count int err := db.ReadQueryRow(ctx, "SELECT COUNT(*) FROM users").Scan(&count)
func (*DB) ReadStats ¶
ReadStats returns statistics for the read pool. This provides information about read connection usage, which is useful for monitoring read replica performance and connection pool health.
Example:
stats := db.ReadStats()
if stats != nil {
log.Printf("Read pool active connections: %d", stats.AcquiredConns())
}
func (*DB) Shutdown ¶
Shutdown gracefully shuts down the database connections. It waits for active operations to complete, respecting the context timeout. If the context times out, shutdown proceeds anyway to prevent hanging.
The shutdown process: 1. Marks the database as shutting down (new operations will fail) 2. Waits for active operations to complete (respects context timeout) 3. Executes OnShutdown hooks 4. Closes connection pools
Example:
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() err := db.Shutdown(ctx)
func (*DB) Stats ¶
Stats returns statistics for the write pool. This provides information about connection usage, which is useful for monitoring and debugging connection pool performance.
Example:
stats := db.Stats()
if stats != nil {
log.Printf("Active connections: %d", stats.AcquiredConns())
log.Printf("Idle connections: %d", stats.IdleConns())
}
type DatabaseError ¶
type DatabaseError struct {
Entity string
Operation string // "create", "update", "delete", "query"
Err error
}
DatabaseError represents database operation failures such as connection errors, constraint violations, or other database-specific errors.
func NewDatabaseError ¶
func NewDatabaseError(entity, operation string, err error) *DatabaseError
NewDatabaseError creates a new DatabaseError with the given entity, operation, and underlying error.
func (*DatabaseError) Error ¶
func (e *DatabaseError) Error() string
func (*DatabaseError) Unwrap ¶
func (e *DatabaseError) Unwrap() error
type Executor ¶
type Executor interface {
Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)
}
Executor is a unified interface for database operations that both *DB and *Tx implement. This allows passing a single interface for database operations whether in a transaction or not.
type GoldenOption ¶ added in v2.1.0
type GoldenOption func(*assertGoldenHook)
GoldenOption configures the assertGoldenHook installed by EnableGolden.
func WithGoldenNormalizer ¶ added in v2.1.0
func WithGoldenNormalizer(fn func(any) (any, bool)) GoldenOption
WithGoldenNormalizer registers a custom normalizer that runs before the defaults (timestamps, UUIDs). Return ok=true to take over normalization for the value; ok=false to fall through.
type HookFunc ¶
type HookFunc func(ctx context.Context, sql string, args []interface{}, tag pgconn.CommandTag, operationErr error) error
HookFunc is the universal hook function signature for operation-level hooks.
tag carries pool.Exec's CommandTag on AfterOperation for Exec calls. It is the zero value everywhere else — including AfterOperation for Query, because pgx fills the tag only after rows are closed and AfterOperation fires before iteration. operationErr is nil on before-hooks. Returning an error from a before-hook aborts the operation; from an after-hook it does not affect the original result but is reported.
type HookType ¶
type HookType int
HookType represents the type of hook for operation-level hooks. These hooks are executed during database operations and provide extensibility for logging, tracing, metrics, circuit breakers, and other cross-cutting concerns.
const ( // BeforeOperation is called before any query/exec operation. // The operationErr parameter will always be nil. BeforeOperation HookType = iota // AfterOperation is called after any query/exec operation. // The operationErr parameter contains the result of the operation. AfterOperation // BeforeTransaction is called before starting a transaction. // The operationErr parameter will always be nil. BeforeTransaction // AfterTransaction is called after a transaction completes. // The operationErr parameter contains the result of the transaction. AfterTransaction // OnShutdown is called during graceful shutdown. // The sql and args parameters will be empty, operationErr will be nil. OnShutdown )
type NotFoundError ¶
type NotFoundError struct {
Entity string
Identifier interface{}
}
NotFoundError represents when a requested entity is not found in the database. This error should be used instead of returning pgx.ErrNoRows directly.
Example:
if err == pgx.ErrNoRows {
return nil, pgxkit.NewNotFoundError("User", userID)
}
func NewNotFoundError ¶
func NewNotFoundError(entity string, identifier interface{}) *NotFoundError
NewNotFoundError creates a new NotFoundError with the given entity and identifier.
func (*NotFoundError) Error ¶
func (e *NotFoundError) Error() string
type QueryPlan ¶
type QueryPlan struct {
Query int `json:"query"`
SQL string `json:"sql"`
Plan []map[string]interface{} `json:"plan"`
}
QueryPlan is one captured structural query plan.
type RetryOption ¶
type RetryOption func(*retryConfig)
RetryOption configures retry behavior for operations.
func WithBackoffMultiplier ¶
func WithBackoffMultiplier(m float64) RetryOption
WithBackoffMultiplier sets the multiplier for exponential backoff.
func WithBaseDelay ¶
func WithBaseDelay(d time.Duration) RetryOption
WithBaseDelay sets the initial delay between retries.
func WithMaxDelay ¶
func WithMaxDelay(d time.Duration) RetryOption
WithMaxDelay sets the maximum delay between retries.
func WithMaxRetries ¶
func WithMaxRetries(n int) RetryOption
WithMaxRetries sets the maximum number of retry attempts.
type TestDB ¶
type TestDB struct {
*DB
}
TestDB is a testing utility that wraps DB with testing-specific functionality.
func (*TestDB) EnableAssertPlan ¶ added in v2.1.0
EnableAssertPlan returns a *DB that captures the structural EXPLAIN plan of each SELECT/INSERT/UPDATE/DELETE/WITH query into memory.
func (*TestDB) EnableGolden ¶
func (tdb *TestDB) EnableGolden(testName string, opts ...GoldenOption) *DB
EnableGolden returns a *DB that records database events (BEGIN, QUERY, COMMIT, ROLLBACK) for the test scenario via the hook system. Call AssertGolden after the scenario to compare against testdata/golden/<testName>.json.
type Tx ¶
type Tx struct {
// contains filtered or unexported fields
}
Tx wraps a pgx.Tx to implement the Executor interface and provide transaction lifecycle management integrated with pgxkit's activeOps tracking and hook system.
func (*Tx) Commit ¶
Commit commits the transaction and fires AfterTransaction. Atomic finalization makes "defer Rollback() + explicit Commit()" safe.
func (*Tx) Exec ¶
Exec executes a statement within the transaction. Fires BeforeOperation / AfterOperation hooks on the parent DB; AfterOperation receives the command tag.
func (*Tx) IsFinalized ¶
IsFinalized returns true if the transaction has been committed or rolled back.
func (*Tx) Query ¶
Query executes a query within the transaction. Fires BeforeOperation / AfterOperation hooks on the parent DB.
func (*Tx) QueryRow ¶
QueryRow executes a query that returns a single row within the transaction. Fires BeforeOperation / AfterOperation hooks on the parent DB.
type ValidationError ¶
ValidationError represents validation failures that occur before database operations. Use this for input validation, constraint violations, or business rule failures.
func NewValidationError ¶
func NewValidationError(entity, operation, field, reason string, err error) *ValidationError
NewValidationError creates a new ValidationError with the given parameters.
func (*ValidationError) Error ¶
func (e *ValidationError) Error() string
func (*ValidationError) Unwrap ¶
func (e *ValidationError) Unwrap() error