db

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package db provides PostgreSQL connection pooling, query execution, transactions, typed scanning, batch dispatch, and LISTEN/NOTIFY support, built on top of github.com/jackc/pgx/v5.

Use github.com/rajangupta9/pgkit/qb standalone when you only need to build SQL strings. Use this package when you also need pool management, transactions, and execution.

Creating a Client

Construct a Client once at startup with one or more named pools. By convention "write" targets the primary and "read" targets a replica, but any names work:

client, err := db.New(ctx, db.Config{},
    db.NamedPool{Name: "write", PoolConfig: db.PoolConfig{ConnString: writeDSN}},
    db.NamedPool{Name: "read",  PoolConfig: db.PoolConfig{ConnString: readDSN}},
)
if err != nil {
    log.Fatal(err)
}
defer client.Close()

Querying

Use Client.QB to get a query builder, then pass it to Client.Query:

rows, err := client.Query(ctx, client.QB("users").
    Columns("id", "name").
    Where(qb.Where("active", qb.OpEq, true)).
    Limit(20),
)

Typed Scanning

Use the generic QueryInto and InsertInto functions for compile-time type safety:

type User struct {
    ID   uuid.UUID `db:"id"`
    Name string    `db:"name"`
}
users, err := db.QueryInto[User](ctx, client, client.QB("users").Limit(20))

Transactions

Client.WithTx commits on nil return and rolls back otherwise:

err := client.WithTx(ctx, func(tx db.Tx) error {
    id, err := tx.Insert(ctx, tx.QB("orders"), data)
    return err
})

Batch Queries

Client.SendWrite and Client.SendRead dispatch multiple queries in a single network round-trip:

b := db.NewBatch()
b.AddSelect(client.QB("users").Where(qb.Where("id", qb.OpEq, uid)))
b.AddExec("UPDATE sessions SET last_seen = NOW() WHERE user_id = $1", uid)
results, err := client.SendWrite(ctx, b)

Error Helpers

PostgreSQL error codes are wrapped in typed predicates:

if db.IsUniqueViolation(err) { ... }
if db.IsSerializationFailure(err) { ... }

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoRows is returned by QueryOne, QueryOneInto, and scan helpers when
	// the query matches no rows. Use [IsNoRows] to check for this error.
	ErrNoRows = errors.New("db: no rows found")

	// ErrEmptyRows is returned by [Client.SendWrite] and [Client.SendRead]
	// when a Batch with no queued queries is submitted.
	ErrEmptyRows = errors.New("db: batch rows must not be empty")
)

Functions

func AcquireAdvisoryLock

func AcquireAdvisoryLock(ctx context.Context, tx Tx, key int64) error

AcquireAdvisoryLock acquires a transaction-scoped advisory lock. Blocks until the lock is available. Released automatically when the tx ends.

client.WithTx(ctx, func(tx db.Tx) error {
    return db.AcquireAdvisoryLock(ctx, tx, userID)
})

func InsertInto

func InsertInto[T any](ctx context.Context, c *Client, b *qb.Builder, data map[string]any) (*T, error)

InsertInto executes BuildInsert and scans RETURNING columns into T. The Builder must have Returning(…) or ReturningAll() set.

type CreatedOrder struct {
    ID        uuid.UUID `db:"id"`
    CreatedAt time.Time `db:"created_at"`
}
order, err := db.InsertInto[CreatedOrder](ctx, client,
    client.QB("orders").ReturningAll(),
    map[string]any{"user_id": uid, "total": 99.99},
)

func IsCheckViolation

func IsCheckViolation(err error) bool

IsCheckViolation reports a CHECK constraint violation (23514).

func IsConnectionException

func IsConnectionException(err error) bool

IsConnectionException reports a connection failure (08xxx class).

func IsDeadlock

func IsDeadlock(err error) bool

IsDeadlock reports a PostgreSQL deadlock (40P01).

func IsForeignKeyViolation

func IsForeignKeyViolation(err error) bool

IsForeignKeyViolation reports a foreign key violation (23503).

func IsInvalidTextRepresentation

func IsInvalidTextRepresentation(err error) bool

IsInvalidTextRepresentation reports invalid input syntax (22P02). Commonly caused by malformed UUIDs or invalid enum values.

func IsNoRows

func IsNoRows(err error) bool

IsNoRows reports whether err represents a "no rows" result.

func IsNotNullViolation

func IsNotNullViolation(err error) bool

IsNotNullViolation reports a NOT NULL violation (23502).

func IsSerializationFailure

func IsSerializationFailure(err error) bool

IsSerializationFailure reports a serialization failure (40001). These are safe to retry in SERIALIZABLE transactions.

func IsUndefinedTable

func IsUndefinedTable(err error) bool

IsUndefinedTable reports a missing table error (42P01).

func IsUniqueViolation

func IsUniqueViolation(err error) bool

IsUniqueViolation reports a unique constraint violation (23505).

func PgError

func PgError(err error) (*pgconn.PgError, bool)

PgError unwraps the raw pgconn.PgError from err. The error gives access to Code, Detail, Hint, Schema, Table, Column, etc.

func QueryInto

func QueryInto[T any](ctx context.Context, c *Client, b *qb.Builder) ([]T, error)

QueryInto executes b.BuildSelect() and scans each row into T using struct field tags `db:"column_name"` (pgx RowToStructByName).

type Student struct {
    ID   uuid.UUID `db:"id"`
    Name string    `db:"name"`
}
students, err := db.QueryInto[Student](ctx, client, client.QB("students").Limit(20))

func QueryOneInto

func QueryOneInto[T any](ctx context.Context, c *Client, b *qb.Builder) (*T, error)

QueryOneInto executes b.BuildSelect() LIMIT 1 and scans the first row into T. Returns ErrNoRows if no row is found. Never mutates the supplied builder.

func ScanUUID

func ScanUUID(row pgx.Row) (uuid.UUID, error)

ScanUUID scans a single uuid.UUID from a QueryRow result.

func TryAdvisoryLock

func TryAdvisoryLock(ctx context.Context, tx Tx, key int64) (bool, error)

TryAdvisoryLock tries to acquire a transaction-scoped advisory lock without blocking. Returns true if acquired, false if already held by another session. Must be called inside a WithTx callback; the lock is released when the tx ends.

func TxInsertInto

func TxInsertInto[T any](ctx context.Context, tx pgx.Tx, b *qb.Builder, data map[string]any) (*T, error)

TxInsertInto executes BuildInsert inside a raw pgx.Tx and scans RETURNING columns into T.

func TxQueryInto

func TxQueryInto[T any](ctx context.Context, tx pgx.Tx, b *qb.Builder) ([]T, error)

TxQueryInto executes b.BuildSelect() inside a raw pgx.Tx and scans rows into T. Use when you need the low-level pgx transaction directly.

func TxQueryOneInto

func TxQueryOneInto[T any](ctx context.Context, tx pgx.Tx, b *qb.Builder) (*T, error)

TxQueryOneInto executes b.BuildSelect() LIMIT 1 inside a raw pgx.Tx and scans the first row into T. Returns ErrNoRows if no row is found. Never mutates the supplied builder.

func TxUpdateInto

func TxUpdateInto[T any](ctx context.Context, tx pgx.Tx, b *qb.Builder, data map[string]any) (*T, error)

TxUpdateInto executes BuildUpdate inside a raw pgx.Tx and scans the first RETURNING row into T.

func UpdateInto

func UpdateInto[T any](ctx context.Context, c *Client, b *qb.Builder, data map[string]any) (*T, error)

UpdateInto executes BuildUpdate and scans the first RETURNING row into T. Requires Returning(…) or ReturningAll() on the Builder.

Types

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

Batch accumulates multiple queries and sends them in a single network round-trip. Results are returned in the same order queries were added.

func NewBatch

func NewBatch() *Batch

NewBatch creates an empty Batch.

func (*Batch) Add

func (b *Batch) Add(sql string, args ...any) *Batch

Add queues a raw SQL statement that returns rows (e.g. SELECT, or INSERT … RETURNING).

func (*Batch) AddDelete

func (b *Batch) AddDelete(builder *qb.Builder) *Batch

AddDelete queues a DELETE from a Builder.

func (*Batch) AddExec

func (b *Batch) AddExec(sql string, args ...any) *Batch

AddExec queues a raw SQL statement that does not return rows (e.g. UPDATE, DELETE without RETURNING).

func (*Batch) AddInsert

func (b *Batch) AddInsert(builder *qb.Builder, data map[string]any) *Batch

AddInsert queues an INSERT from a Builder + data map.

func (*Batch) AddSelect

func (b *Batch) AddSelect(builder *qb.Builder) *Batch

AddSelect queues a SELECT from a Builder.

func (*Batch) AddUpdate

func (b *Batch) AddUpdate(builder *qb.Builder, data map[string]any) *Batch

AddUpdate queues an UPDATE from a Builder + data map.

func (*Batch) Len

func (b *Batch) Len() int

Len returns the number of queued queries.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client owns one or more named connection pools and exposes a query builder and execution API.

Create once at startup; safe for concurrent use across goroutines.

func New

func New(ctx context.Context, cfg Config, pools ...NamedPool) (*Client, error)

New creates a Client and establishes all provided named pools. At least one NamedPool is required. Each pool can have independent credentials, DSN, and sizing. Cancel ctx to abort connection attempts.

func (*Client) Close

func (c *Client) Close()

Close closes all pools. Call during graceful shutdown.

func (*Client) Delete

func (c *Client) Delete(ctx context.Context, b *qb.Builder) (int64, error)

Delete executes b.BuildDelete() on the "write" pool and returns rows affected.

func (*Client) ExecPoolSQL

func (c *Client) ExecPoolSQL(ctx context.Context, poolName string, sql string, args ...any) (int64, error)

ExecPoolSQL executes a raw write statement on the named pool.

func (*Client) ExecSQL

func (c *Client) ExecSQL(ctx context.Context, sql string, args ...any) (int64, error)

ExecSQL executes a raw write statement on the "write" pool.

func (*Client) HealthCheck

func (c *Client) HealthCheck(ctx context.Context) error

HealthCheck pings all registered pools concurrently.

func (*Client) Insert

func (c *Client) Insert(ctx context.Context, b *qb.Builder, data map[string]any) (uuid.UUID, error)

Insert executes b.BuildInsert(data) on the "write" pool and returns the generated UUID from RETURNING "id".

func (*Client) InsertBatch

func (c *Client) InsertBatch(ctx context.Context, b *qb.Builder, rows []map[string]any) (uuid.UUID, error)

InsertBatch executes b.BuildInsertBatch(rows) on the "write" pool. Returns the first generated UUID from RETURNING "id".

func (*Client) Listen

func (c *Client) Listen(ctx context.Context, channel string, handler func(Notification) error) error

Listen acquires a dedicated connection, issues LISTEN channel, and calls handler for every notification until ctx is cancelled.

handler is called synchronously; return an error to stop listening.

IMPORTANT: This holds a connection open indefinitely. Ensure your pool has enough capacity for long-running listeners.

err := client.Listen(ctx, "orders", func(n db.Notification) error {
    fmt.Println(n.Payload)
    return nil
})

func (*Client) ListenMulti

func (c *Client) ListenMulti(ctx context.Context, channels []string, handler func(Notification) error) error

ListenMulti listens on multiple channels simultaneously. Notifications from any channel are dispatched to handler.

IMPORTANT: This holds a dedicated connection open indefinitely.

func (*Client) Notify

func (c *Client) Notify(ctx context.Context, channel, payload string) error

Notify sends a notification to channel with payload. The channel name is safely quoted; payload is a bound parameter.

client.Notify(ctx, "orders", `{"id":"123"}`)

func (*Client) Pool

func (c *Client) Pool(name string) *pgxpool.Pool

Pool returns the *pgxpool.Pool registered under name. Returns nil if no pool with that name was registered.

func (*Client) QB

func (c *Client) QB(table string) *qb.Builder

QB returns a new qb.Builder targeting table.

Example
package main

import (
	"fmt"

	"github.com/rajangupta9/pgkit/db"
	"github.com/rajangupta9/pgkit/qb"
)

func main() {
	c := &db.Client{}
	b := c.QB("orders").Columns("id", "amount").Where(qb.Where("status", qb.OpEq, "active"))

	fmt.Println(b != nil)

}
Output:
true

func (*Client) Query

func (c *Client) Query(ctx context.Context, b *qb.Builder) ([]map[string]any, error)

Query executes b.BuildSelect() on the "read" pool and returns all rows.

func (*Client) QueryOne

func (c *Client) QueryOne(ctx context.Context, b *qb.Builder) (map[string]any, error)

QueryOne executes b.BuildSelect() on the "read" pool and returns the first row. Returns ErrNoRows if no rows match. Never mutates the supplied builder.

func (*Client) QueryPool

func (c *Client) QueryPool(ctx context.Context, poolName string, b *qb.Builder) ([]map[string]any, error)

QueryPool executes b.BuildSelect() on the named pool and returns all rows. Use when you need to target a specific pool by name.

func (*Client) QuerySQL

func (c *Client) QuerySQL(ctx context.Context, sql string, args ...any) ([]map[string]any, error)

QuerySQL executes a raw SELECT on the "read" pool and returns all rows.

func (*Client) QueryWrite

func (c *Client) QueryWrite(ctx context.Context, b *qb.Builder) ([]map[string]any, error)

QueryWrite executes a SELECT on the "write" pool. Use immediately after a write when read replicas may lag.

func (*Client) SendRead

func (c *Client) SendRead(ctx context.Context, b *Batch) ([][]map[string]any, error)

SendRead sends all batched queries over the read pool in one round-trip.

func (*Client) SendWrite

func (c *Client) SendWrite(ctx context.Context, b *Batch) ([][]map[string]any, error)

SendWrite sends all batched queries over the write pool in one round-trip.

func (*Client) Update

func (c *Client) Update(ctx context.Context, b *qb.Builder, data map[string]any) (int64, error)

Update executes b.BuildUpdate(data) on the "write" pool and returns rows affected.

func (*Client) WithPoolTx

func (c *Client) WithPoolTx(ctx context.Context, poolName string, fn func(Tx) error) error

WithPoolTx runs fn in a transaction on the named pool.

func (*Client) WithRetryTx

func (c *Client) WithRetryTx(ctx context.Context, maxRetries int, fn func(Tx) error) error

WithRetryTx runs fn in a SERIALIZABLE transaction on the "write" pool and retries automatically on serialization failures (40001). maxRetries caps the attempt count. Includes exponential backoff between retries.

func (*Client) WithTx

func (c *Client) WithTx(ctx context.Context, fn func(Tx) error) error

WithTx runs fn inside a transaction on the "write" pool. Commits on nil return, rolls back otherwise.

err := client.WithTx(ctx, func(tx db.Tx) error {
    id, err := tx.Insert(ctx, tx.QB("orders"), data)
    return err
})

func (*Client) WithTxOpts

func (c *Client) WithTxOpts(ctx context.Context, opts TxOptions, fn func(Tx) error) error

WithTxOpts is like WithTx but accepts custom transaction options (isolation level, access mode).

type Config

type Config struct {
	// QueryTimeout is applied per-query via context. 0 = no limit.
	QueryTimeout time.Duration

	// Logger is used for query debug/error logging.
	// Defaults to slog.Default() if nil.
	Logger *slog.Logger
}

Config holds executor-level settings shared across all pools.

type NamedPool

type NamedPool struct {
	// Name is the unique identifier for this pool, e.g. "read" or "write".
	Name string
	PoolConfig
}

NamedPool pairs a logical name with its pool configuration. Each NamedPool can have completely independent credentials and DSN. The name is used to retrieve the pool via Client.Pool and is referenced internally by convention ("read", "write").

type Notification

type Notification struct {
	Channel string // PostgreSQL notification channel.
	Payload string // Notification payload.
	PID     uint32 // Process ID of the sender.
}

Notification is a LISTEN/NOTIFY message received from PostgreSQL.

type PoolConfig

type PoolConfig struct {
	// ConnString is the libpq-compatible connection string or DSN.
	// Example: "postgres://user:pass@host:5432/dbname?sslmode=require"
	ConnString string

	// MaxConns is the maximum number of open connections. Default: 10.
	MaxConns int32

	// MinConns is the minimum number of idle connections kept open. Default: 2.
	MinConns int32

	// MaxConnIdleTime is how long a connection may sit idle before being
	// closed. Default: 5 minutes.
	MaxConnIdleTime time.Duration

	// MaxConnLifetime is the maximum total lifetime of a connection.
	// Default: 1 hour.
	MaxConnLifetime time.Duration

	// HealthCheckPeriod is how often idle connections are pinged.
	// Default: 1 minute.
	HealthCheckPeriod time.Duration

	// ConnectTimeout is the deadline applied to each initial connection
	// attempt and to the startup ping. Default: 20 seconds.
	ConnectTimeout time.Duration

	// ForceIPv4 rejects IPv6 literals and fails if no A record exists for the
	// host. Use on environments without an IPv6 internet route (e.g. GCP Cloud
	// Run). When false (default) IPv4 is preferred but IPv6 is the fallback.
	ForceIPv4 bool
}

PoolConfig holds settings for a single named connection pool. ConnString is required; all other fields default to production-safe values when left at zero.

type Tx

type Tx interface {
	// QB returns a new query builder — same as Client.QB.
	QB(table string) *qb.Builder

	// Insert executes INSERT … RETURNING "id" and returns the generated UUID.
	Insert(ctx context.Context, b *qb.Builder, data map[string]any) (uuid.UUID, error)

	// Update executes UPDATE and returns rows affected.
	Update(ctx context.Context, b *qb.Builder, data map[string]any) (int64, error)

	// Delete executes DELETE and returns rows affected.
	Delete(ctx context.Context, b *qb.Builder) (int64, error)

	// Select executes SELECT and returns all rows as []map[string]any.
	Select(ctx context.Context, b *qb.Builder) ([]map[string]any, error)

	// SelectOne executes SELECT and returns the first row or ErrNoRows.
	SelectOne(ctx context.Context, b *qb.Builder) (map[string]any, error)

	// ExecRaw executes a raw write statement and returns rows affected.
	ExecRaw(ctx context.Context, sql string, args ...any) (int64, error)

	// QueryRaw executes a raw SELECT statement and returns all rows.
	QueryRaw(ctx context.Context, sql string, args ...any) ([]map[string]any, error)

	// Savepoint creates a named savepoint within the transaction.
	Savepoint(ctx context.Context, name string) error

	// RollbackTo rolls back to a named savepoint.
	RollbackTo(ctx context.Context, name string) error

	// ReleaseSavepoint releases a named savepoint.
	ReleaseSavepoint(ctx context.Context, name string) error
}

Tx is the transaction API exposed to WithTx callbacks.

type TxOptions

type TxOptions = pgx.TxOptions

TxOptions is a convenience alias for pgx.TxOptions.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL