Documentation
¶
Overview ¶
Package db provides PostgreSQL connection pooling, query execution, transactions, typed scanning, batch dispatch, and LISTEN/NOTIFY support, built on top of github.com/jackc/pgx/v5.
Use github.com/rajangupta9/pgkit/qb standalone when you only need to build SQL strings. Use this package when you also need pool management, transactions, and execution.
Creating a Client ¶
Construct a Client once at startup with one or more named pools. By convention "write" targets the primary and "read" targets a replica, but any names work:
client, err := db.New(ctx, db.Config{},
db.NamedPool{Name: "write", PoolConfig: db.PoolConfig{ConnString: writeDSN}},
db.NamedPool{Name: "read", PoolConfig: db.PoolConfig{ConnString: readDSN}},
)
if err != nil {
log.Fatal(err)
}
defer client.Close()
Querying ¶
Use Client.QB to get a query builder, then pass it to Client.Query:
rows, err := client.Query(ctx, client.QB("users").
Columns("id", "name").
Where(qb.Where("active", qb.OpEq, true)).
Limit(20),
)
Typed Scanning ¶
Use the generic QueryInto and InsertInto functions for compile-time type safety:
type User struct {
ID uuid.UUID `db:"id"`
Name string `db:"name"`
}
users, err := db.QueryInto[User](ctx, client, client.QB("users").Limit(20))
Transactions ¶
Client.WithTx commits on nil return and rolls back otherwise:
err := client.WithTx(ctx, func(tx db.Tx) error {
id, err := tx.Insert(ctx, tx.QB("orders"), data)
return err
})
Batch Queries ¶
Client.SendWrite and Client.SendRead dispatch multiple queries in a single network round-trip:
b := db.NewBatch()
b.AddSelect(client.QB("users").Where(qb.Where("id", qb.OpEq, uid)))
b.AddExec("UPDATE sessions SET last_seen = NOW() WHERE user_id = $1", uid)
results, err := client.SendWrite(ctx, b)
Error Helpers ¶
PostgreSQL error codes are wrapped in typed predicates:
if db.IsUniqueViolation(err) { ... }
if db.IsSerializationFailure(err) { ... }
Index ¶
- Variables
- func AcquireAdvisoryLock(ctx context.Context, tx Tx, key int64) error
- func InsertInto[T any](ctx context.Context, c *Client, b *qb.Builder, data map[string]any) (*T, error)
- func IsCheckViolation(err error) bool
- func IsConnectionException(err error) bool
- func IsDeadlock(err error) bool
- func IsForeignKeyViolation(err error) bool
- func IsInvalidTextRepresentation(err error) bool
- func IsNoRows(err error) bool
- func IsNotNullViolation(err error) bool
- func IsSerializationFailure(err error) bool
- func IsUndefinedTable(err error) bool
- func IsUniqueViolation(err error) bool
- func PgError(err error) (*pgconn.PgError, bool)
- func QueryInto[T any](ctx context.Context, c *Client, b *qb.Builder) ([]T, error)
- func QueryOneInto[T any](ctx context.Context, c *Client, b *qb.Builder) (*T, error)
- func ScanUUID(row pgx.Row) (uuid.UUID, error)
- func TryAdvisoryLock(ctx context.Context, tx Tx, key int64) (bool, error)
- func TxInsertInto[T any](ctx context.Context, tx pgx.Tx, b *qb.Builder, data map[string]any) (*T, error)
- func TxQueryInto[T any](ctx context.Context, tx pgx.Tx, b *qb.Builder) ([]T, error)
- func TxQueryOneInto[T any](ctx context.Context, tx pgx.Tx, b *qb.Builder) (*T, error)
- func TxUpdateInto[T any](ctx context.Context, tx pgx.Tx, b *qb.Builder, data map[string]any) (*T, error)
- func UpdateInto[T any](ctx context.Context, c *Client, b *qb.Builder, data map[string]any) (*T, error)
- type Batch
- func (b *Batch) Add(sql string, args ...any) *Batch
- func (b *Batch) AddDelete(builder *qb.Builder) *Batch
- func (b *Batch) AddExec(sql string, args ...any) *Batch
- func (b *Batch) AddInsert(builder *qb.Builder, data map[string]any) *Batch
- func (b *Batch) AddSelect(builder *qb.Builder) *Batch
- func (b *Batch) AddUpdate(builder *qb.Builder, data map[string]any) *Batch
- func (b *Batch) Len() int
- type Client
- func (c *Client) Close()
- func (c *Client) Delete(ctx context.Context, b *qb.Builder) (int64, error)
- func (c *Client) ExecPoolSQL(ctx context.Context, poolName string, sql string, args ...any) (int64, error)
- func (c *Client) ExecSQL(ctx context.Context, sql string, args ...any) (int64, error)
- func (c *Client) HealthCheck(ctx context.Context) error
- func (c *Client) Insert(ctx context.Context, b *qb.Builder, data map[string]any) (uuid.UUID, error)
- func (c *Client) InsertBatch(ctx context.Context, b *qb.Builder, rows []map[string]any) (uuid.UUID, error)
- func (c *Client) Listen(ctx context.Context, channel string, handler func(Notification) error) error
- func (c *Client) ListenMulti(ctx context.Context, channels []string, handler func(Notification) error) error
- func (c *Client) Notify(ctx context.Context, channel, payload string) error
- func (c *Client) Pool(name string) *pgxpool.Pool
- func (c *Client) QB(table string) *qb.Builder
- func (c *Client) Query(ctx context.Context, b *qb.Builder) ([]map[string]any, error)
- func (c *Client) QueryOne(ctx context.Context, b *qb.Builder) (map[string]any, error)
- func (c *Client) QueryPool(ctx context.Context, poolName string, b *qb.Builder) ([]map[string]any, error)
- func (c *Client) QuerySQL(ctx context.Context, sql string, args ...any) ([]map[string]any, error)
- func (c *Client) QueryWrite(ctx context.Context, b *qb.Builder) ([]map[string]any, error)
- func (c *Client) SendRead(ctx context.Context, b *Batch) ([][]map[string]any, error)
- func (c *Client) SendWrite(ctx context.Context, b *Batch) ([][]map[string]any, error)
- func (c *Client) Update(ctx context.Context, b *qb.Builder, data map[string]any) (int64, error)
- func (c *Client) WithPoolTx(ctx context.Context, poolName string, fn func(Tx) error) error
- func (c *Client) WithRetryTx(ctx context.Context, maxRetries int, fn func(Tx) error) error
- func (c *Client) WithTx(ctx context.Context, fn func(Tx) error) error
- func (c *Client) WithTxOpts(ctx context.Context, opts TxOptions, fn func(Tx) error) error
- type Config
- type NamedPool
- type Notification
- type PoolConfig
- type Tx
- type TxOptions
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoRows is returned by QueryOne, QueryOneInto, and scan helpers when // the query matches no rows. Use [IsNoRows] to check for this error. ErrNoRows = errors.New("db: no rows found") // ErrEmptyRows is returned by [Client.SendWrite] and [Client.SendRead] // when a Batch with no queued queries is submitted. ErrEmptyRows = errors.New("db: batch rows must not be empty") )
Functions ¶
func AcquireAdvisoryLock ¶
AcquireAdvisoryLock acquires a transaction-scoped advisory lock. Blocks until the lock is available. Released automatically when the tx ends.
client.WithTx(ctx, func(tx db.Tx) error {
return db.AcquireAdvisoryLock(ctx, tx, userID)
})
func InsertInto ¶
func InsertInto[T any](ctx context.Context, c *Client, b *qb.Builder, data map[string]any) (*T, error)
InsertInto executes BuildInsert and scans RETURNING columns into T. The Builder must have Returning(…) or ReturningAll() set.
type CreatedOrder struct {
ID uuid.UUID `db:"id"`
CreatedAt time.Time `db:"created_at"`
}
order, err := db.InsertInto[CreatedOrder](ctx, client,
client.QB("orders").ReturningAll(),
map[string]any{"user_id": uid, "total": 99.99},
)
func IsCheckViolation ¶
IsCheckViolation reports a CHECK constraint violation (23514).
func IsConnectionException ¶
IsConnectionException reports a connection failure (08xxx class).
func IsForeignKeyViolation ¶
IsForeignKeyViolation reports a foreign key violation (23503).
func IsInvalidTextRepresentation ¶
IsInvalidTextRepresentation reports invalid input syntax (22P02). Commonly caused by malformed UUIDs or invalid enum values.
func IsNotNullViolation ¶
IsNotNullViolation reports a NOT NULL violation (23502).
func IsSerializationFailure ¶
IsSerializationFailure reports a serialization failure (40001). These are safe to retry in SERIALIZABLE transactions.
func IsUndefinedTable ¶
IsUndefinedTable reports a missing table error (42P01).
func IsUniqueViolation ¶
IsUniqueViolation reports a unique constraint violation (23505).
func PgError ¶
PgError unwraps the raw pgconn.PgError from err. The error gives access to Code, Detail, Hint, Schema, Table, Column, etc.
func QueryInto ¶
QueryInto executes b.BuildSelect() and scans each row into T using struct field tags `db:"column_name"` (pgx RowToStructByName).
type Student struct {
ID uuid.UUID `db:"id"`
Name string `db:"name"`
}
students, err := db.QueryInto[Student](ctx, client, client.QB("students").Limit(20))
func QueryOneInto ¶
QueryOneInto executes b.BuildSelect() LIMIT 1 and scans the first row into T. Returns ErrNoRows if no row is found. Never mutates the supplied builder.
func TryAdvisoryLock ¶
TryAdvisoryLock tries to acquire a transaction-scoped advisory lock without blocking. Returns true if acquired, false if already held by another session. Must be called inside a WithTx callback; the lock is released when the tx ends.
func TxInsertInto ¶
func TxInsertInto[T any](ctx context.Context, tx pgx.Tx, b *qb.Builder, data map[string]any) (*T, error)
TxInsertInto executes BuildInsert inside a raw pgx.Tx and scans RETURNING columns into T.
func TxQueryInto ¶
TxQueryInto executes b.BuildSelect() inside a raw pgx.Tx and scans rows into T. Use when you need the low-level pgx transaction directly.
func TxQueryOneInto ¶
TxQueryOneInto executes b.BuildSelect() LIMIT 1 inside a raw pgx.Tx and scans the first row into T. Returns ErrNoRows if no row is found. Never mutates the supplied builder.
Types ¶
type Batch ¶
type Batch struct {
// contains filtered or unexported fields
}
Batch accumulates multiple queries and sends them in a single network round-trip. Results are returned in the same order queries were added.
func (*Batch) Add ¶
Add queues a raw SQL statement that returns rows (e.g. SELECT, or INSERT … RETURNING).
func (*Batch) AddExec ¶
AddExec queues a raw SQL statement that does not return rows (e.g. UPDATE, DELETE without RETURNING).
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client owns one or more named connection pools and exposes a query builder and execution API.
Create once at startup; safe for concurrent use across goroutines.
func New ¶
New creates a Client and establishes all provided named pools. At least one NamedPool is required. Each pool can have independent credentials, DSN, and sizing. Cancel ctx to abort connection attempts.
func (*Client) Close ¶
func (c *Client) Close()
Close closes all pools. Call during graceful shutdown.
func (*Client) Delete ¶
Delete executes b.BuildDelete() on the "write" pool and returns rows affected.
func (*Client) ExecPoolSQL ¶
func (c *Client) ExecPoolSQL(ctx context.Context, poolName string, sql string, args ...any) (int64, error)
ExecPoolSQL executes a raw write statement on the named pool.
func (*Client) HealthCheck ¶
HealthCheck pings all registered pools concurrently.
func (*Client) Insert ¶
Insert executes b.BuildInsert(data) on the "write" pool and returns the generated UUID from RETURNING "id".
func (*Client) InsertBatch ¶
func (c *Client) InsertBatch(ctx context.Context, b *qb.Builder, rows []map[string]any) (uuid.UUID, error)
InsertBatch executes b.BuildInsertBatch(rows) on the "write" pool. Returns the first generated UUID from RETURNING "id".
func (*Client) Listen ¶
func (c *Client) Listen(ctx context.Context, channel string, handler func(Notification) error) error
Listen acquires a dedicated connection, issues LISTEN channel, and calls handler for every notification until ctx is cancelled.
handler is called synchronously; return an error to stop listening.
IMPORTANT: This holds a connection open indefinitely. Ensure your pool has enough capacity for long-running listeners.
err := client.Listen(ctx, "orders", func(n db.Notification) error {
fmt.Println(n.Payload)
return nil
})
func (*Client) ListenMulti ¶
func (c *Client) ListenMulti(ctx context.Context, channels []string, handler func(Notification) error) error
ListenMulti listens on multiple channels simultaneously. Notifications from any channel are dispatched to handler.
IMPORTANT: This holds a dedicated connection open indefinitely.
func (*Client) Notify ¶
Notify sends a notification to channel with payload. The channel name is safely quoted; payload is a bound parameter.
client.Notify(ctx, "orders", `{"id":"123"}`)
func (*Client) Pool ¶
Pool returns the *pgxpool.Pool registered under name. Returns nil if no pool with that name was registered.
func (*Client) QB ¶
QB returns a new qb.Builder targeting table.
Example ¶
package main
import (
"fmt"
"github.com/rajangupta9/pgkit/db"
"github.com/rajangupta9/pgkit/qb"
)
func main() {
c := &db.Client{}
b := c.QB("orders").Columns("id", "amount").Where(qb.Where("status", qb.OpEq, "active"))
fmt.Println(b != nil)
}
Output: true
func (*Client) QueryOne ¶
QueryOne executes b.BuildSelect() on the "read" pool and returns the first row. Returns ErrNoRows if no rows match. Never mutates the supplied builder.
func (*Client) QueryPool ¶
func (c *Client) QueryPool(ctx context.Context, poolName string, b *qb.Builder) ([]map[string]any, error)
QueryPool executes b.BuildSelect() on the named pool and returns all rows. Use when you need to target a specific pool by name.
func (*Client) QueryWrite ¶
QueryWrite executes a SELECT on the "write" pool. Use immediately after a write when read replicas may lag.
func (*Client) SendWrite ¶
SendWrite sends all batched queries over the write pool in one round-trip.
func (*Client) Update ¶
Update executes b.BuildUpdate(data) on the "write" pool and returns rows affected.
func (*Client) WithPoolTx ¶
WithPoolTx runs fn in a transaction on the named pool.
func (*Client) WithRetryTx ¶
WithRetryTx runs fn in a SERIALIZABLE transaction on the "write" pool and retries automatically on serialization failures (40001). maxRetries caps the attempt count. Includes exponential backoff between retries.
type Config ¶
type Config struct {
// QueryTimeout is applied per-query via context. 0 = no limit.
QueryTimeout time.Duration
// Logger is used for query debug/error logging.
// Defaults to slog.Default() if nil.
Logger *slog.Logger
}
Config holds executor-level settings shared across all pools.
type NamedPool ¶
type NamedPool struct {
// Name is the unique identifier for this pool, e.g. "read" or "write".
Name string
PoolConfig
}
NamedPool pairs a logical name with its pool configuration. Each NamedPool can have completely independent credentials and DSN. The name is used to retrieve the pool via Client.Pool and is referenced internally by convention ("read", "write").
type Notification ¶
type Notification struct {
Channel string // PostgreSQL notification channel.
Payload string // Notification payload.
PID uint32 // Process ID of the sender.
}
Notification is a LISTEN/NOTIFY message received from PostgreSQL.
type PoolConfig ¶
type PoolConfig struct {
// ConnString is the libpq-compatible connection string or DSN.
// Example: "postgres://user:pass@host:5432/dbname?sslmode=require"
ConnString string
// MaxConns is the maximum number of open connections. Default: 10.
MaxConns int32
// MinConns is the minimum number of idle connections kept open. Default: 2.
MinConns int32
// MaxConnIdleTime is how long a connection may sit idle before being
// closed. Default: 5 minutes.
MaxConnIdleTime time.Duration
// MaxConnLifetime is the maximum total lifetime of a connection.
// Default: 1 hour.
MaxConnLifetime time.Duration
// HealthCheckPeriod is how often idle connections are pinged.
// Default: 1 minute.
HealthCheckPeriod time.Duration
// ConnectTimeout is the deadline applied to each initial connection
// attempt and to the startup ping. Default: 20 seconds.
ConnectTimeout time.Duration
// ForceIPv4 rejects IPv6 literals and fails if no A record exists for the
// host. Use on environments without an IPv6 internet route (e.g. GCP Cloud
// Run). When false (default) IPv4 is preferred but IPv6 is the fallback.
ForceIPv4 bool
}
PoolConfig holds settings for a single named connection pool. ConnString is required; all other fields default to production-safe values when left at zero.
type Tx ¶
type Tx interface {
// QB returns a new query builder — same as Client.QB.
QB(table string) *qb.Builder
// Insert executes INSERT … RETURNING "id" and returns the generated UUID.
Insert(ctx context.Context, b *qb.Builder, data map[string]any) (uuid.UUID, error)
// Update executes UPDATE and returns rows affected.
Update(ctx context.Context, b *qb.Builder, data map[string]any) (int64, error)
// Delete executes DELETE and returns rows affected.
Delete(ctx context.Context, b *qb.Builder) (int64, error)
// Select executes SELECT and returns all rows as []map[string]any.
Select(ctx context.Context, b *qb.Builder) ([]map[string]any, error)
// SelectOne executes SELECT and returns the first row or ErrNoRows.
SelectOne(ctx context.Context, b *qb.Builder) (map[string]any, error)
// ExecRaw executes a raw write statement and returns rows affected.
ExecRaw(ctx context.Context, sql string, args ...any) (int64, error)
// QueryRaw executes a raw SELECT statement and returns all rows.
QueryRaw(ctx context.Context, sql string, args ...any) ([]map[string]any, error)
// Savepoint creates a named savepoint within the transaction.
Savepoint(ctx context.Context, name string) error
// RollbackTo rolls back to a named savepoint.
RollbackTo(ctx context.Context, name string) error
// ReleaseSavepoint releases a named savepoint.
ReleaseSavepoint(ctx context.Context, name string) error
}
Tx is the transaction API exposed to WithTx callbacks.