dbkit

package module
v0.0.0-...-d28b152 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2026 License: MIT Imports: 20 Imported by: 0

README

DBKit

An opinionated database layer for Go applications built on top of Bun ORM and pgx.

Features

  • Connection Pooling: Full control over pool size, lifetimes, and timeouts
  • Migrations: Execute SQL migrations with checksum verification
  • Transactions: Callback-based (auto commit/rollback) + Manual + Savepoints
  • Rich Errors: Typed errors with Code, Constraint, Table, Column, Detail, Hint
  • Observability: Structured logging, Prometheus metrics, OpenTelemetry tracing

Installation

go get github.com/fernandezvara/dbkit

Common Database Patterns with Bun

Here are common database operations and how to implement them using Bun ORM directly:

Pattern Bun Implementation
FindByID db.NewSelect().Model(&model).Where("id = ?", id).Scan(ctx)
FindByPK db.NewSelect().Model(model).WherePK().Scan(ctx)
FindOne db.NewSelect().Model(&model).Limit(1).Scan(ctx)
FindAll db.NewSelect().Model(&models).Scan(ctx)
Create db.NewInsert().Model(&model).Exec(ctx)
CreateReturning db.NewInsert().Model(&model).Returning("id").Exec(ctx)
CreateMany db.NewInsert().Model(&models).Exec(ctx)
Update db.NewUpdate().Model(&model).WherePK().Exec(ctx)
UpdateColumns db.NewUpdate().Model(&model).Column("name").WherePK().Exec(ctx)
Delete db.NewDelete().Model(&model).WherePK().Exec(ctx)
DeleteByID db.NewDelete().Model(&model).Where("id = ?", id).Exec(ctx)
Exists db.NewSelect().Model(&model).Where("name = ?", name).Exists(ctx)
ExistsByID db.NewSelect().Model(&model).Where("id = ?", id).Exists(ctx)
Count db.NewSelect().Model(&model).Where("active = ?", true).Count(ctx)
CountAll db.NewSelect().Model(&model).Count(ctx)
Reload db.NewSelect().Model(&model).WherePK().Scan(ctx)
Upsert db.NewInsert().Model(&model).On("CONFLICT (email) DO UPDATE").Exec(ctx)
Transaction db.Transaction(ctx, func(tx *bun.Tx) error { ... })
Raw Query db.NewRaw("SELECT * FROM users WHERE age > ?", age).Scan(ctx, &users)

Enhanced Error Handling with dbkit:

// Wrap any Bun operation with enhanced error context:
user, err := dbkit.WithErr(db.NewSelect().Model(&user).Where("id = ?", id).Scan(ctx), "FindByID").Unwrap()
if err != nil {
    // Get rich error information:
    if dbkit.IsNotFound(err) {
        // Handle not found
    }
    var dbErr *dbkit.Error
    if errors.As(err, &dbErr) {
        fmt.Printf("Table: %s, Constraint: %s\n", dbErr.Table, dbErr.Constraint)
    }
}

Quick Start

package main

import (
    "context"
    "log/slog"
    "os"
    "time"

    "github.com/fernandezvara/dbkit"
    "github.com/uptrace/bun"
)

type User struct {
    bun.BaseModel `bun:"table:users,alias:u"`

    ID    string `bun:"id,pk,type:uuid,default:gen_random_uuid()"`
    Email string `bun:"email,notnull,unique"`
    Name  string `bun:"name,notnull"`
}

func main() {
    ctx := context.Background()

    // Connect
    db, err := dbkit.New(dbkit.Config{
        URL:             os.Getenv("DATABASE_URL"),
        MaxOpenConns:    25,
        MaxIdleConns:    5,
        ConnMaxLifetime: 5 * time.Minute,
        Logger:          slog.Default(),
        LogSlowQueries:  100 * time.Millisecond,
    })
    if err != nil {
        panic(err)
    }
    defer db.Close()

    // Run migrations
    _, err = db.Migrate(ctx, []dbkit.Migration{
        {ID: "001", Description: "Create users", SQL: `
            CREATE TABLE users (
                id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                email VARCHAR(255) UNIQUE NOT NULL,
                name VARCHAR(255) NOT NULL
            );
        `},
    })
    if err != nil {
        panic(err)
    }

    // Create using direct Bun call
    user := &User{Email: "john@example.com", Name: "John"}
    if _, err := db.NewInsert().Model(user).Exec(ctx); err != nil {
        panic(err)
    }

    // Find using direct Bun call
    var found User
    err = db.NewSelect().Model(&found).Where("id = ?", user.ID).Scan(ctx)
    if err != nil {
        if dbkit.IsNotFound(err) {
            println("user not found")
        }
    }

    // Update using direct Bun call
    found.Name = "John Doe"
    db.NewUpdate().Model(&found).WherePK().Exec(ctx)

    // Transaction
    err = db.Transaction(ctx, func(tx *dbkit.Tx) error {
        _, err := tx.NewDelete().Model(&found).WherePK().Exec(ctx)
        return err
    })
}

Migrations

Migrations are provided as a slice and executed in order:

migrations := []dbkit.Migration{
    {
        ID:          "001",
        Description: "Create users table",
        SQL: `
            CREATE TABLE users (
                id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                email VARCHAR(255) UNIQUE NOT NULL
            );
        `,
    },
    {
        ID:          "002",
        Description: "Add name column",
        SQL:         `ALTER TABLE users ADD COLUMN name VARCHAR(255);`,
    },
}

result, err := db.Migrate(ctx, migrations)
// result.Applied: migrations that were applied
// result.Skipped: migrations already in database

DBKit tracks migrations in _dbkit_migrations table with checksums to detect changes.

⚠️ Important: Migration ID Collision Prevention

Migration IDs must be unique across your entire application to prevent conflicts.

When using migrations from multiple libraries or packages, ensure that migration IDs do not collide. Each migration ID must be globally unique within your application context.

Examples of problematic collisions:

  • Library A has migration ID "001"
  • Library B also has migration ID "001"
  • This will cause conflicts and unpredictable behavior

Recommended strategies:

  1. Use prefixed IDs: libraryA_001, libraryB_001
  2. Use timestamps: 20240115_001_create_users
  3. Use package names: auth_001, billing_001
  4. Use UUIDs: Generate unique IDs for each migration
// Good - prefixed with library/package name
migrations := []dbkit.Migration{
    {ID: "auth_001", Description: "Create users table", SQL: "..."},
    {ID: "billing_001", Description: "Create invoices table", SQL: "..."},
}

// Bad - potential collision with other libraries
migrations := []dbkit.Migration{
    {ID: "001", Description: "Create users table", SQL: "..."},
    {ID: "002", Description: "Create invoices table", SQL: "..."},
}

Failure to ensure unique migration IDs can result in:

  • Migrations being skipped or applied out of order
  • Database schema inconsistencies
  • Difficult-to-debug migration conflicts

Transactions

Callback-based (auto commit/rollback)
err := db.Transaction(ctx, func(tx *dbkit.Tx) error {
    if _, err := tx.NewInsert().Model(&user).Exec(ctx); err != nil {
        return err // auto rollback
    }
    if _, err := tx.NewInsert().Model(&profile).Exec(ctx); err != nil {
        return err // auto rollback
    }
    return nil // auto commit
})
Manual control
tx, err := db.Begin(ctx)
if err != nil {
    return err
}
defer tx.Rollback() // no-op if committed

// ... do work ...

return tx.Commit()
Nested transactions (savepoints)
err := db.Transaction(ctx, func(tx *dbkit.Tx) error {
    tx.NewInsert().Model(&outer).Exec(ctx)

    // Nested - uses SAVEPOINT
    err := tx.Transaction(ctx, func(tx2 *dbkit.Tx) error {
        tx2.NewInsert().Model(&inner).Exec(ctx)
        return errors.New("fail") // only rolls back inner
    })

    // outer is still committed
    return nil
})

Chainable Error Wrapping

DBKit provides chainable error wrapping to add meaningful context to database errors:

// For operations that return (sql.Result, error)
result, err := dbkit.WithErr(db.NewInsert().Model(&user).Exec(ctx), "CreateUser").Unwrap()
if err != nil {
    // err is wrapped with operation context
}

// For operations that return only error (like Scan)
err := dbkit.WithErr1(db.NewSelect().Model(&user).Where("id = ?", id).Scan(ctx), "FindByID").Err()

// Check error directly
if dbkit.WithErr(db.NewInsert().Model(&user).Exec(ctx), "CreateUser").HasError() {
    // handle error
}

// Get result without error check
qr := dbkit.WithErr(db.NewInsert().Model(&user).Exec(ctx), "CreateUser")
if !qr.HasError() {
    result := qr.Result()
}

Error Handling

_, err := db.NewInsert().Model(&user).Exec(ctx)
if err != nil {
    // Quick checks with sentinel errors
    if dbkit.IsNotFound(err) { ... }
    if dbkit.IsDuplicate(err) { ... }
    if dbkit.IsForeignKey(err) { ... }
    if dbkit.IsRetryable(err) { ... } // serialization or deadlock

    // Rich error details
    var dbErr *dbkit.Error
    if errors.As(err, &dbErr) {
        fmt.Println(dbErr.Code)       // DUPLICATE
        fmt.Println(dbErr.Table)      // users
        fmt.Println(dbErr.Column)     // email
        fmt.Println(dbErr.Constraint) // users_email_key
        fmt.Println(dbErr.Detail)     // from PostgreSQL
        fmt.Println(dbErr.Hint)       // from PostgreSQL
    }
}

Base Models

DBKit provides composable base models for common patterns:

// Basic model with ID and timestamps
type User struct {
    bun.BaseModel `bun:"table:users,alias:u"`
    dbkit.BaseModel
    Email string `bun:"email,notnull,unique"`
}

// With soft delete capability
type Post struct {
    bun.BaseModel `bun:"table:posts,alias:p"`
    dbkit.BaseModel
    dbkit.SoftDeletableModel
    Title string `bun:"title,notnull"`
}

// With optimistic locking (versioning)
type Account struct {
    bun.BaseModel `bun:"table:accounts,alias:a"`
    dbkit.BaseModel
    dbkit.VersionedModel
    Balance int64 `bun:"balance,notnull"`
}

// Full model with all features
type Document struct {
    bun.BaseModel `bun:"table:documents,alias:d"`
    dbkit.FullModel  // ID, timestamps, soft delete, version
    Content string `bun:"content"`
}
Available Base Models
Model Fields Use Case
BaseModel ID, CreatedAt, UpdatedAt Standard models
SoftDeletableModel DeletedAt Add soft delete capability
VersionedModel Version Add optimistic locking
TimestampedModel CreatedAt, UpdatedAt Timestamps without UUID ID
FullModel All fields combined Models needing all features
Soft Delete Operations
// Soft delete a record
dbkit.SoftDelete(ctx, db, &user)
dbkit.SoftDeleteByID[User](ctx, db, userID)

// Restore a soft-deleted record
dbkit.Restore(ctx, db, &user)
dbkit.RestoreByID[User](ctx, db, userID)

// Permanently delete (bypass soft delete)
dbkit.HardDelete(ctx, db, &user)
dbkit.HardDeleteByID[User](ctx, db, userID)

// Query modifiers for soft delete
db.NewSelect().Model(&users).Apply(dbkit.NotDeleted).Scan(ctx)   // Exclude deleted
db.NewSelect().Model(&users).Apply(dbkit.OnlyDeleted).Scan(ctx)  // Only deleted
db.NewSelect().Model(&users).Apply(dbkit.WithDeleted).Scan(ctx)  // Include all
Optimistic Locking
// Update with version check (returns ErrConflict on mismatch)
err := dbkit.UpdateWithVersion(ctx, db, &account, account.Version)
if dbkit.IsConflict(err) {
    // Handle conflict - reload and retry
}

// Update specific columns with version check
err := dbkit.UpdateColumnsWithVersion(ctx, db, &account, account.Version, "balance")

// Retry on conflict automatically
err := dbkit.RetryOnConflict(ctx, 3, func() error {
    db.NewSelect().Model(&account).WherePK().Scan(ctx)  // Reload
    account.Balance += 100
    return dbkit.UpdateWithVersion(ctx, db, &account, account.Version)
})

// Builder pattern for versioned updates
result, err := dbkit.NewVersionedUpdate(db, &account, account.Version).
    Columns("balance", "updated_at").
    Exec(ctx)
Audit Trail
// Create a database audit handler
handler := dbkit.NewDatabaseAuditHandler(db)

// Log operations manually
dbkit.AuditCreate(ctx, handler, "users", user.ID, &user)
dbkit.AuditUpdate(ctx, handler, "users", user.ID, &oldUser, &newUser)
dbkit.AuditDelete(ctx, handler, "users", user.ID, &user)

// Add audit context (user ID, IP, user agent)
ctx = dbkit.WithAuditContext(ctx, userID, ipAddress, userAgent)

// Configure audit hook
hook := dbkit.NewAuditHook(dbkit.AuditConfig{
    Handler:        handler,
    Tables:         []string{"users", "orders"},  // Optional: specific tables
    ExcludeTables:  []string{"sessions"},         // Optional: exclude tables
    IncludeOldData: true,
    IncludeNewData: true,
    UserIDExtractor: dbkit.DefaultUserIDExtractor,
})

Pagination

Offset-based Pagination
// Simple pagination modifier
var users []User
db.NewSelect().Model(&users).Apply(dbkit.Paginate(2, 10)).Scan(ctx)

// With count and metadata
page, err := dbkit.PaginateWithCount[User](ctx, db, 1, 10, func(q *bun.SelectQuery) *bun.SelectQuery {
    return q.Where("active = ?", true).Order("created_at DESC")
})
// page.Items, page.TotalItems, page.TotalPages, page.PageInfo
Cursor-based Pagination
// Forward pagination
var users []User
db.NewSelect().Model(&users).
    Apply(dbkit.CursorPaginate("id", "", afterCursor, 10, true)).
    Scan(ctx)

// Process results
items, pageInfo := dbkit.CursorPaginateResult(users, 10, true, func(u User) string {
    return dbkit.EncodeCursor(u.ID, "")
})
// pageInfo.HasNextPage, pageInfo.EndCursor
Keyset Pagination
// Efficient for large datasets
var users []User
db.NewSelect().Model(&users).
    Apply(dbkit.KeysetPaginate("id", lastID, 10)).
    Order("id ASC").
    Scan(ctx)

Batch Operations

// Batch insert (handles large datasets)
count, err := dbkit.BatchInsert(ctx, db, users, 100)

// Batch update
count, err := dbkit.BatchUpdate(ctx, db, users, 100)

// Batch delete by IDs
count, err := dbkit.BatchDelete[User](ctx, db, ids, 100)

// Batch upsert
count, err := dbkit.BatchUpsert(ctx, db, users, []string{"email"}, []string{"name"}, 100)

// Bulk insert with returning
inserted, err := dbkit.BulkInsertReturning(ctx, db, users)

Query Helpers

// Check existence
exists, err := dbkit.Exists[User](ctx, db, func(q *bun.SelectQuery) *bun.SelectQuery {
    return q.Where("email = ?", email)
})

// Count records
count, err := dbkit.Count[User](ctx, db, func(q *bun.SelectQuery) *bun.SelectQuery {
    return q.Where("active = ?", true)
})

// Pluck single column
emails, err := dbkit.Pluck[User, string](ctx, db, "email", nil)

// Find or create
user, created, err := dbkit.FindOrCreate(ctx, db, &User{Email: "test@example.com"},
    func(q *bun.SelectQuery) *bun.SelectQuery {
        return q.Where("email = ?", "test@example.com")
    })

// Update with returning
updated, err := dbkit.UpdateReturning(ctx, db, &user)

// Delete with returning
deleted, err := dbkit.DeleteReturning(ctx, db, &user)

Multi-tenancy

// Add tenant to context
ctx = dbkit.WithTenant(ctx, "tenant-123")

// Get tenant from context
tenantID := dbkit.GetTenant(ctx)
tenantID, err := dbkit.RequireTenant(ctx)  // Returns error if not set

// Tenant-scoped queries
db.NewSelect().Model(&users).Apply(dbkit.TenantScope(ctx)).Scan(ctx)
db.NewUpdate().Model(&user).Apply(dbkit.TenantUpdateScope(ctx)).WherePK().Exec(ctx)
db.NewDelete().Model(&user).Apply(dbkit.TenantDeleteScope(ctx)).WherePK().Exec(ctx)

// Tenant model embedding
type User struct {
    bun.BaseModel `bun:"table:users,alias:u"`
    dbkit.BaseModel
    dbkit.TenantModel  // Adds tenant_id field
    Email string `bun:"email,notnull"`
}

// Tenant isolation helper
ti := dbkit.NewTenantIsolation(db, dbkit.DefaultTenantConfig())
ti.Select(ctx).Model(&users).Scan(ctx)
ti.Update(ctx).Model(&user).WherePK().Exec(ctx)
ti.Delete(ctx).Model(&user).WherePK().Exec(ctx)

Observability

Logging
db, _ := dbkit.New(dbkit.Config{
    URL:            url,
    Logger:         slog.Default(),
    LogQueries:     true,                    // Log all queries (debug)
    LogSlowQueries: 100 * time.Millisecond,  // Log slow queries (warn)
})
Prometheus Metrics
import "github.com/prometheus/client_golang/prometheus"

db, _ := dbkit.New(dbkit.Config{
    URL:             url,
    MetricsRegistry: prometheus.DefaultRegisterer,
})

// Exposed metrics:
// - dbkit_query_duration_seconds (histogram)
// - dbkit_queries_total (counter)
// - dbkit_query_errors_total (counter)
OpenTelemetry Tracing
import "go.opentelemetry.io/otel"

db, _ := dbkit.New(dbkit.Config{
    URL:    url,
    Tracer: otel.Tracer("dbkit"),
})

Health Checks

// Simple check
if db.IsHealthy(ctx) {
    // OK
}

// Detailed status
status := db.Health(ctx)
fmt.Println(status.Healthy)              // true/false
fmt.Println(status.Latency)              // ping latency
fmt.Println(status.PoolStats.InUse)      // connections in use
fmt.Println(status.PoolStats.Idle)       // idle connections

Direct Bun Access

For complex queries, access Bun directly:

// Raw queries
var results []MyStruct
db.NewRaw("SELECT ... COMPLEX QUERY ...", args...).Scan(ctx, &results)

// Full query builder
db.NewSelect().
    Model(&users).
    Column("email").
    ColumnExpr("COUNT(*) AS count").
    Group("email").
    Having("COUNT(*) > 1").
    Scan(ctx)

// Get underlying bun.DB
bunDB := db.Bun()

License

MIT

Documentation

Overview

Package dbkit provides a consistent database layer for Go applications. It wraps Bun ORM with connection pooling, migrations, transactions, generic CRUD helpers, rich error handling, and configurable observability.

Package dbkit provides a consistent database layer for Go applications.

DBKit wraps Bun ORM with additional features:

  • Connection pooling with full configuration
  • Migration execution with checksum verification
  • Transaction support with auto commit/rollback and savepoints
  • Rich error handling with PostgreSQL error parsing
  • Configurable observability (logging, metrics, tracing)
  • Health check utilities

Basic Usage

cfg := dbkit.DefaultConfig(os.Getenv("DATABASE_URL"))
cfg.Logger = slog.Default()
cfg.LogSlowQueries = 100 * time.Millisecond

db, err := dbkit.New(cfg)
if err != nil {
    log.Fatal(err)
}
defer db.Close()

Migrations

migrations := []dbkit.Migration{
    {ID: "001", Description: "Create users", SQL: "CREATE TABLE users (...)"},
    {ID: "002", Description: "Add index", SQL: "CREATE INDEX ..."},
}

result, err := db.Migrate(ctx, migrations)

Database Operations

Use Bun ORM directly for all CRUD operations:

// Find by ID
var user User
err := db.NewSelect().Model(&user).Where("id = ?", id).Scan(ctx)

// Find with query
var users []User
err := db.NewSelect().Model(&users).Where("active = ?", true).Order("created_at DESC").Scan(ctx)

// Create
_, err := db.NewInsert().Model(&user).Exec(ctx)

// Update
_, err := db.NewUpdate().Model(&user).WherePK().Exec(ctx)

// Delete
_, err := db.NewDelete().Model(&user).WherePK().Exec(ctx)

Transactions

Callback-based (auto commit/rollback):

err := db.Transaction(ctx, func(tx *dbkit.Tx) error {
    if _, err := tx.NewInsert().Model(&user).Exec(ctx); err != nil {
        return err // rollback
    }
    return nil // commit
})

Manual control:

tx, err := db.Begin(ctx)
if err != nil {
    return err
}
defer tx.Rollback()

// ... operations ...

return tx.Commit()

Nested transactions (savepoints):

err := db.Transaction(ctx, func(tx *dbkit.Tx) error {
    tx.NewInsert().Model(&outer).Exec(ctx)

    err := tx.Transaction(ctx, func(tx2 *dbkit.Tx) error {
        return errors.New("fail") // only rolls back inner
    })

    return nil // outer commits
})

Error Handling

DBKit provides rich error types for PostgreSQL errors:

if _, err := db.NewInsert().Model(&user).Exec(ctx); err != nil {
    if dbkit.IsDuplicate(err) {
        // Handle duplicate key
    }

    var dbErr *dbkit.Error
    if errors.As(err, &dbErr) {
        fmt.Println(dbErr.Code)       // DUPLICATE
        fmt.Println(dbErr.Constraint) // users_email_key
        fmt.Println(dbErr.Detail)     // Key (email)=(test@example.com) already exists
    }
}

Index

Constants

View Source
const BatchSize = 100

BatchSize is the default batch size for batch operations.

View Source
const DefaultPageSize = 20

DefaultPageSize is the default number of items per page.

View Source
const MaxPageSize = 100

MaxPageSize is the maximum allowed page size.

Variables

View Source
var (
	ErrNotFound         = errors.New("dbkit: record not found")
	ErrDuplicate        = errors.New("dbkit: duplicate key violation")
	ErrForeignKey       = errors.New("dbkit: foreign key violation")
	ErrCheckViolation   = errors.New("dbkit: check constraint violation")
	ErrNotNullViolation = errors.New("dbkit: not null violation")
	ErrConnection       = errors.New("dbkit: connection failed")
	ErrTimeout          = errors.New("dbkit: operation timeout")
	ErrSerialization    = errors.New("dbkit: serialization failure")
	ErrDeadlock         = errors.New("dbkit: deadlock detected")
)

Sentinel errors for quick checks

View Source
var ErrConflict = errors.New("dbkit: optimistic locking conflict - record was modified")

ErrConflict is returned when an optimistic locking conflict is detected.

View Source
var ErrNoTenant = errors.New("dbkit: tenant ID not found in context")

ErrNoTenant is returned when tenant ID is required but not found in context.

Functions

func AuditCreate

func AuditCreate(ctx context.Context, handler AuditHandler, tableName, recordID string, newData interface{}) error

AuditCreate logs a create action for a model. Call this after inserting a record.

Usage:

_, err := db.NewInsert().Model(&user).Exec(ctx)
if err == nil {
    dbkit.AuditCreate(ctx, auditor, "users", user.ID, &user)
}

func AuditDelete

func AuditDelete(ctx context.Context, handler AuditHandler, tableName, recordID string, oldData interface{}) error

AuditDelete logs a delete action for a model. Call this after deleting a record.

Usage:

dbkit.AuditDelete(ctx, auditor, "users", user.ID, &user)
_, err := db.NewDelete().Model(&user).WherePK().Exec(ctx)

func AuditUpdate

func AuditUpdate(ctx context.Context, handler AuditHandler, tableName, recordID string, oldData, newData interface{}) error

AuditUpdate logs an update action for a model. Call this after updating a record.

Usage:

oldUser := user  // Copy before update
_, err := db.NewUpdate().Model(&user).WherePK().Exec(ctx)
if err == nil {
    dbkit.AuditUpdate(ctx, auditor, "users", user.ID, &oldUser, &user)
}

func BatchDelete

func BatchDelete[T any](ctx context.Context, db bun.IDB, ids []string, batchSize int) (int64, error)

BatchDelete deletes records in batches by their IDs. Returns the total number of rows affected.

Usage:

ids := []string{"id1", "id2", "id3"}
count, err := dbkit.BatchDelete[User](ctx, db, ids, 100)

func BatchInsert

func BatchInsert[T any](ctx context.Context, db bun.IDB, items []T, batchSize int) (int64, error)

BatchInsert inserts records in batches to avoid exceeding PostgreSQL limits. Returns the total number of rows affected.

Usage:

users := []User{{Name: "A"}, {Name: "B"}, ...}
count, err := dbkit.BatchInsert(ctx, db, users, 100)

func BatchUpdate

func BatchUpdate[T any](ctx context.Context, db bun.IDB, items []T, batchSize int) (int64, error)

BatchUpdate updates records in batches. Returns the total number of rows affected.

Usage:

users := []User{{ID: "1", Name: "Updated1"}, {ID: "2", Name: "Updated2"}}
count, err := dbkit.BatchUpdate(ctx, db, users, 100)

func BatchUpsert

func BatchUpsert[T any](ctx context.Context, db bun.IDB, items []T, conflictColumns, updateColumns []string, batchSize int) (int64, error)

BatchUpsert performs upsert (insert or update) in batches. conflictColumns specifies which columns to check for conflicts. updateColumns specifies which columns to update on conflict.

Usage:

users := []User{{Email: "a@example.com", Name: "A"}, ...}
count, err := dbkit.BatchUpsert(ctx, db, users, []string{"email"}, []string{"name", "updated_at"}, 100)

func BulkInsertReturning

func BulkInsertReturning[T any](ctx context.Context, db bun.IDB, items []T) ([]T, error)

BulkInsertReturning inserts records and returns the inserted rows with generated values.

Usage:

users := []User{{Name: "A"}, {Name: "B"}}
inserted, err := dbkit.BulkInsertReturning(ctx, db, users)
// inserted now has IDs filled in

func CheckVersion

func CheckVersion[T any](ctx context.Context, db bun.IDB, id string, expectedVersion int64) error

CheckVersion verifies that a record's version matches the expected version. Returns ErrConflict if versions don't match.

Usage:

if err := dbkit.CheckVersion[Account](ctx, db, accountID, expectedVersion); err != nil {
    // Version mismatch - reload required
}

func Count

func Count[T any](ctx context.Context, db bun.IDB, queryFn func(*bun.SelectQuery) *bun.SelectQuery) (int, error)

Count returns the count of records matching the query.

Usage:

count, err := dbkit.Count[User](ctx, db, func(q *bun.SelectQuery) *bun.SelectQuery {
    return q.Where("active = ?", true)
})

func CursorPaginate

func CursorPaginate(idColumn, sortColumn, cursor string, limit int, forward bool) func(*bun.SelectQuery) *bun.SelectQuery

CursorPaginate applies cursor-based pagination to a query. The idColumn should be the primary key or a unique column. The sortColumn is optional and used for sorting before the ID.

Usage:

var users []User
db.NewSelect().Model(&users).
    Apply(dbkit.CursorPaginate("id", "", afterCursor, 10, true)).
    Scan(ctx)

func DefaultUserIDExtractor

func DefaultUserIDExtractor(ctx context.Context) string

DefaultUserIDExtractor extracts the user ID from the context.

func DeleteReturning

func DeleteReturning[T any](ctx context.Context, db bun.IDB, model *T) (*T, error)

DeleteReturning deletes a record and returns the deleted row.

Usage:

deleted, err := dbkit.DeleteReturning(ctx, db, &user)

func EncodeCursor

func EncodeCursor(id string, sortValue string) string

EncodeCursor encodes a cursor to a base64 string.

func Exists

func Exists[T any](ctx context.Context, db bun.IDB, queryFn func(*bun.SelectQuery) *bun.SelectQuery) (bool, error)

Exists checks if any record matches the query.

Usage:

exists, err := dbkit.Exists[User](ctx, db, func(q *bun.SelectQuery) *bun.SelectQuery {
    return q.Where("email = ?", email)
})

func FindOrCreate

func FindOrCreate[T any](ctx context.Context, db bun.IDB, model *T, findFn func(*bun.SelectQuery) *bun.SelectQuery) (*T, bool, error)

FindOrCreate finds a record or creates it if it doesn't exist. Returns the record and a boolean indicating if it was created.

Usage:

user, created, err := dbkit.FindOrCreate(ctx, db, &User{Email: "test@example.com"},
    func(q *bun.SelectQuery) *bun.SelectQuery {
        return q.Where("email = ?", "test@example.com")
    })

func GetColumn

func GetColumn(err error) (string, bool)

GetColumn extracts the column name if available

func GetConstraint

func GetConstraint(err error) (string, bool)

GetConstraint extracts the constraint name if available

func GetDetail

func GetDetail(err error) (string, bool)

GetDetail extracts the error detail if available

func GetHint

func GetHint(err error) (string, bool)

GetHint extracts the error hint if available

func GetTable

func GetTable(err error) (string, bool)

GetTable extracts the table name if available

func GetTenant

func GetTenant(ctx context.Context) string

GetTenant extracts tenant ID from the context. Returns empty string if not found.

Usage:

tenantID := dbkit.GetTenant(ctx)

func HardDelete

func HardDelete[T any](ctx context.Context, db bun.IDB, model *T) (sql.Result, error)

HardDelete permanently removes a soft-deleted record. This bypasses the soft delete and actually deletes the record.

Usage:

err := dbkit.HardDelete(ctx, db, &user)

func HardDeleteByID

func HardDeleteByID[T any](ctx context.Context, db bun.IDB, id string) (sql.Result, error)

HardDeleteByID permanently removes a record by its ID.

Usage:

err := dbkit.HardDeleteByID[User](ctx, db, userID)

func InTransaction

func InTransaction(ctx context.Context, db *bun.DB, fn func(ctx context.Context, tx bun.Tx) error) error

InTransaction executes a function within a transaction. This is an alias for DBKit.Transaction for use with plain bun.IDB.

Usage:

err := dbkit.InTransaction(ctx, db, func(ctx context.Context, tx bun.Tx) error {
    // do work
    return nil
})

func IsCheckViolation

func IsCheckViolation(err error) bool

IsCheckViolation checks if error is a check constraint error

func IsConflict

func IsConflict(err error) bool

IsConflict checks if the error is an optimistic locking conflict.

func IsConnection

func IsConnection(err error) bool

IsConnection checks if error is a connection error

func IsDuplicate

func IsDuplicate(err error) bool

IsDuplicate checks if error is a duplicate key error

func IsForeignKey

func IsForeignKey(err error) bool

IsForeignKey checks if error is a foreign key error

func IsNotFound

func IsNotFound(err error) bool

IsNotFound checks if error is a not found error This also checks for sql.ErrNoRows for direct Bun calls

func IsNotNullViolation

func IsNotNullViolation(err error) bool

IsNotNullViolation checks if error is a not null violation error

func IsRetryable

func IsRetryable(err error) bool

IsRetryable checks if the error is retryable (serialization, deadlock)

func IsTimeout

func IsTimeout(err error) bool

IsTimeout checks if error is a timeout error

func KeysetPaginate

func KeysetPaginate(column string, lastValue interface{}, limit int) func(*bun.SelectQuery) *bun.SelectQuery

KeysetPaginate applies keyset pagination (also known as seek method). This is more efficient than offset for large datasets.

Usage:

var users []User
db.NewSelect().Model(&users).
    Apply(dbkit.KeysetPaginate("id", lastID, 10)).
    Order("id ASC").
    Scan(ctx)

func NotDeleted

func NotDeleted(q *bun.SelectQuery) *bun.SelectQuery

NotDeleted returns a query modifier that filters out soft-deleted records. Use this with Bun's query builder to exclude deleted records.

Usage:

var users []User
db.NewSelect().Model(&users).Apply(dbkit.NotDeleted).Scan(ctx)

func OnlyDeleted

func OnlyDeleted(q *bun.SelectQuery) *bun.SelectQuery

OnlyDeleted returns a query modifier that includes only soft-deleted records. Use this to find records that have been soft deleted.

Usage:

var deletedUsers []User
db.NewSelect().Model(&deletedUsers).Apply(dbkit.OnlyDeleted).Scan(ctx)

func Paginate

func Paginate(page, pageSize int) func(*bun.SelectQuery) *bun.SelectQuery

Paginate applies offset-based pagination to a query. Returns a query modifier that can be used with Apply().

Usage:

var users []User
db.NewSelect().Model(&users).Apply(dbkit.Paginate(2, 10)).Scan(ctx)

func Pluck

func Pluck[T any, V any](ctx context.Context, db bun.IDB, column string, queryFn func(*bun.SelectQuery) *bun.SelectQuery) ([]V, error)

Pluck extracts a single column from matching records.

Usage:

emails, err := dbkit.Pluck[User, string](ctx, db, "email", func(q *bun.SelectQuery) *bun.SelectQuery {
    return q.Where("active = ?", true)
})

func RawExec

func RawExec(ctx context.Context, db bun.IDB, query string, args ...interface{}) (sql.Result, error)

RawExec executes a raw SQL statement.

Usage:

result, err := dbkit.RawExec(ctx, db, "UPDATE users SET active = ? WHERE last_login < ?", false, cutoffDate)

func RawQuery

func RawQuery(ctx context.Context, db bun.IDB, dest interface{}, query string, args ...interface{}) error

RawQuery executes a raw SQL query and scans results into the destination.

Usage:

var results []map[string]interface{}
err := dbkit.RawQuery(ctx, db, &results, "SELECT * FROM users WHERE age > ?", 18)

func RequireTenant

func RequireTenant(ctx context.Context) (string, error)

RequireTenant extracts tenant ID from context or returns an error.

Usage:

tenantID, err := dbkit.RequireTenant(ctx)

func Restore

func Restore[T any](ctx context.Context, db bun.IDB, model *T) (sql.Result, error)

Restore removes the soft delete mark from a model.

Usage:

err := dbkit.Restore(ctx, db, &user)

func RestoreByID

func RestoreByID[T any](ctx context.Context, db bun.IDB, id string) (sql.Result, error)

RestoreByID removes the soft delete mark from a record by its ID.

Usage:

err := dbkit.RestoreByID[User](ctx, db, userID)

func RetryOnConflict

func RetryOnConflict(ctx context.Context, maxRetries int, fn func() error) error

RetryOnConflict executes a function and retries on optimistic locking conflicts. The function should reload the model and retry the operation.

Usage:

err := dbkit.RetryOnConflict(ctx, 3, func() error {
    // Reload the record
    db.NewSelect().Model(&account).WherePK().Scan(ctx)
    // Modify and update
    account.Balance += 100
    return dbkit.UpdateWithVersion(ctx, db, &account, account.Version)
})

func SetTenantID

func SetTenantID(ctx context.Context, model interface{}) error

SetTenantID sets the tenant ID on a model from context. The model must have a TenantID field.

Usage:

user := &User{Email: "test@example.com"}
dbkit.SetTenantID(ctx, user)
db.NewInsert().Model(user).Exec(ctx)

func SoftDelete

func SoftDelete[T any](ctx context.Context, db bun.IDB, model *T) (sql.Result, error)

SoftDelete marks a model as deleted by setting the DeletedAt field. The model must embed SoftDeletableModel or have a DeletedAt field.

Usage:

err := dbkit.SoftDelete(ctx, db, &user)

func SoftDeleteByID

func SoftDeleteByID[T any](ctx context.Context, db bun.IDB, id string) (sql.Result, error)

SoftDeleteByID marks a record as deleted by its ID.

Usage:

err := dbkit.SoftDeleteByID[User](ctx, db, userID)

func TenantDeleteScope

func TenantDeleteScope(ctx context.Context) func(*bun.DeleteQuery) *bun.DeleteQuery

TenantDeleteScope returns a query modifier for delete queries.

Usage:

db.NewDelete().Model(&user).Apply(dbkit.TenantDeleteScope(ctx)).WherePK().Exec(ctx)

func TenantScope

func TenantScope(ctx context.Context) func(*bun.SelectQuery) *bun.SelectQuery

TenantScope returns a query modifier that filters by tenant ID from context. Use this with Bun's query builder to scope queries to the current tenant.

Usage:

var users []User
db.NewSelect().Model(&users).Apply(dbkit.TenantScope(ctx)).Scan(ctx)

func TenantUpdateScope

func TenantUpdateScope(ctx context.Context) func(*bun.UpdateQuery) *bun.UpdateQuery

TenantUpdateScope returns a query modifier for update queries.

Usage:

db.NewUpdate().Model(&user).Apply(dbkit.TenantUpdateScope(ctx)).WherePK().Exec(ctx)

func UpdateColumnsWithVersion

func UpdateColumnsWithVersion[T any](ctx context.Context, db bun.IDB, model *T, version int64, columns ...string) error

UpdateColumnsWithVersion performs an optimistic locking update on specific columns. It increments the version and only succeeds if the current version matches.

Usage:

err := dbkit.UpdateColumnsWithVersion(ctx, db, &account, account.Version, "balance", "updated_at")

func UpdateReturning

func UpdateReturning[T any](ctx context.Context, db bun.IDB, model *T) (*T, error)

UpdateReturning updates a record and returns the updated row.

Usage:

user.Name = "Updated"
updated, err := dbkit.UpdateReturning(ctx, db, &user)

func UpdateWithVersion

func UpdateWithVersion[T any](ctx context.Context, db bun.IDB, model *T, version int64) error

UpdateWithVersion performs an optimistic locking update. It increments the version and only succeeds if the current version matches. Returns ErrConflict if the record was modified by another process.

Usage:

account.Balance += 100
err := dbkit.UpdateWithVersion(ctx, db, &account)
if errors.Is(err, dbkit.ErrConflict) {
    // Handle conflict - reload and retry
}

func ValidateTenant

func ValidateTenant(ctx context.Context, db bun.IDB) error

ValidateTenant checks if the tenant ID in context is valid. Returns error if tenant doesn't exist or is not active.

func WithAuditContext

func WithAuditContext(ctx context.Context, userID, ipAddress, userAgent string) context.Context

WithAuditContext adds audit context information to a context.

Usage:

ctx = dbkit.WithAuditContext(ctx, userID, ipAddress, userAgent)

func WithDeleted

func WithDeleted(q *bun.SelectQuery) *bun.SelectQuery

WithDeleted returns a query modifier that includes all records (both deleted and not). This is useful when you need to see all records regardless of deletion status. Note: By default, models with soft_delete tag are automatically filtered.

Usage:

var allUsers []User
db.NewSelect().Model(&allUsers).Apply(dbkit.WithDeleted).Scan(ctx)

func WithTenant

func WithTenant(ctx context.Context, tenantID string) context.Context

WithTenant adds tenant ID to the context.

Usage:

ctx = dbkit.WithTenant(ctx, "tenant-123")

Types

type AppliedMigration

type AppliedMigration struct {
	ID          string
	Description string
	AppliedAt   time.Time
	Duration    time.Duration
	Checksum    string
}

AppliedMigration represents a successfully applied migration

type AuditAction

type AuditAction string

AuditAction represents the type of action being audited.

const (
	AuditActionCreate AuditAction = "CREATE"
	AuditActionUpdate AuditAction = "UPDATE"
	AuditActionDelete AuditAction = "DELETE"
)

type AuditConfig

type AuditConfig struct {
	// Handler is called for each audit entry.
	Handler AuditHandler

	// Tables specifies which tables to audit. If empty, all tables are audited.
	Tables []string

	// ExcludeTables specifies tables to exclude from auditing.
	ExcludeTables []string

	// IncludeOldData includes the old data in update/delete operations.
	IncludeOldData bool

	// IncludeNewData includes the new data in create/update operations.
	IncludeNewData bool

	// UserIDExtractor extracts the user ID from the context.
	UserIDExtractor func(ctx context.Context) string

	// MetadataExtractor extracts additional metadata from the context.
	MetadataExtractor func(ctx context.Context) map[string]interface{}
}

AuditConfig configures the audit system.

type AuditEntry

type AuditEntry struct {
	ID        string          `json:"id,omitempty"`
	Action    AuditAction     `json:"action"`
	TableName string          `json:"table_name"`
	RecordID  string          `json:"record_id"`
	OldData   json.RawMessage `json:"old_data,omitempty"`
	NewData   json.RawMessage `json:"new_data,omitempty"`
	UserID    string          `json:"user_id,omitempty"`
	IPAddress string          `json:"ip_address,omitempty"`
	UserAgent string          `json:"user_agent,omitempty"`
	Metadata  json.RawMessage `json:"metadata,omitempty"`
	CreatedAt time.Time       `json:"created_at"`
}

AuditEntry represents a single audit log entry.

type AuditHandler

type AuditHandler func(ctx context.Context, entry *AuditEntry) error

AuditHandler is a function that handles audit entries. Implement this to store audit logs in your preferred backend.

func NewDatabaseAuditHandler

func NewDatabaseAuditHandler(db bun.IDB) AuditHandler

NewDatabaseAuditHandler creates an AuditHandler that stores entries in the database.

Usage:

handler := dbkit.NewDatabaseAuditHandler(db)
dbkit.AuditCreate(ctx, handler, "users", user.ID, &user)

type AuditHook

type AuditHook struct {
	// contains filtered or unexported fields
}

AuditHook is a Bun query hook that creates audit log entries.

func NewAuditHook

func NewAuditHook(config AuditConfig) *AuditHook

NewAuditHook creates a new audit hook with the given configuration.

func (*AuditHook) CreateEntry

func (h *AuditHook) CreateEntry(ctx context.Context, action AuditAction, tableName, recordID string, oldData, newData interface{}) *AuditEntry

CreateEntry creates an audit entry from the context and query.

type AuditLog

type AuditLog struct {
	bun.BaseModel `bun:"table:audit_logs,alias:al"`

	ID        string          `bun:"id,pk,type:uuid,default:gen_random_uuid()"`
	Action    AuditAction     `bun:"action,notnull"`
	TableName string          `bun:"table_name,notnull"`
	RecordID  string          `bun:"record_id,notnull"`
	OldData   json.RawMessage `bun:"old_data,type:jsonb"`
	NewData   json.RawMessage `bun:"new_data,type:jsonb"`
	UserID    string          `bun:"user_id"`
	IPAddress string          `bun:"ip_address"`
	UserAgent string          `bun:"user_agent"`
	Metadata  json.RawMessage `bun:"metadata,type:jsonb"`
	CreatedAt time.Time       `bun:"created_at,notnull,default:current_timestamp"`
}

AuditLog is a database model for storing audit entries. Use this if you want to store audit logs in the database.

Create the table with:

CREATE TABLE audit_logs (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    action VARCHAR(20) NOT NULL,
    table_name VARCHAR(255) NOT NULL,
    record_id VARCHAR(255) NOT NULL,
    old_data JSONB,
    new_data JSONB,
    user_id VARCHAR(255),
    ip_address VARCHAR(45),
    user_agent TEXT,
    metadata JSONB,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_audit_logs_table_record ON audit_logs(table_name, record_id);
CREATE INDEX idx_audit_logs_user ON audit_logs(user_id);
CREATE INDEX idx_audit_logs_created_at ON audit_logs(created_at);

type Auditable

type Auditable interface {
	// AuditID returns the ID of the record for audit purposes.
	AuditID() string
	// AuditTableName returns the table name for audit purposes.
	AuditTableName() string
}

Auditable is an interface that models can implement to provide audit information.

type AuditableModel

type AuditableModel struct{}

AuditableModel is a base model that implements the Auditable interface. Embed this in your models to enable audit logging.

Usage:

type User struct {
    bun.BaseModel `bun:"table:users,alias:u"`
    dbkit.BaseModel
    dbkit.AuditableModel
    Email string `bun:"email,notnull,unique"`
}

type BaseModel

type BaseModel struct {
	ID        string    `bun:"id,pk,type:uuid,default:gen_random_uuid()"`
	CreatedAt time.Time `bun:"created_at,nullzero,notnull,default:current_timestamp"`
	UpdatedAt time.Time `bun:"updated_at,nullzero,notnull,default:current_timestamp"`
}

BaseModel provides common fields for all models: ID and timestamps. Embed this in your model structs for standard ID and timestamp handling.

Usage:

type User struct {
    bun.BaseModel `bun:"table:users,alias:u"`
    dbkit.BaseModel
    Email string `bun:"email,notnull,unique"`
}

func (*BaseModel) BeforeAppendModel

func (m *BaseModel) BeforeAppendModel(ctx context.Context, query schema.Query) error

type Config

type Config struct {
	// Connection
	URL string // PostgreSQL connection string (required)

	// Pool settings
	MaxOpenConns    int           // Max open connections (default: 25)
	MaxIdleConns    int           // Max idle connections (default: 5)
	ConnMaxLifetime time.Duration // Max connection lifetime (default: 5m)
	ConnMaxIdleTime time.Duration // Max idle time (default: 1m)

	// Timeouts
	DialTimeout  time.Duration // Connection dial timeout (default: 5s)
	ReadTimeout  time.Duration // Read timeout (default: 30s)
	WriteTimeout time.Duration // Write timeout (default: 30s)

	// Observability (all optional)
	Logger          *slog.Logger          // Structured logger
	LogQueries      bool                  // Log all queries
	LogSlowQueries  time.Duration         // Log queries slower than this (0 = disabled)
	MetricsRegistry prometheus.Registerer // Prometheus registry for metrics
	Tracer          trace.Tracer          // OpenTelemetry tracer
}

Config holds database configuration

func DefaultConfig

func DefaultConfig(url string) Config

DefaultConfig returns sensible defaults

func (Config) WithLogger

func (c Config) WithLogger(logger *slog.Logger) Config

WithLogger enables query logging

func (Config) WithMetrics

func (c Config) WithMetrics(registry prometheus.Registerer) Config

WithMetrics enables Prometheus metrics

func (Config) WithSlowQueryLog

func (c Config) WithSlowQueryLog(threshold time.Duration) Config

WithSlowQueryLog logs queries slower than the threshold

func (Config) WithTracing

func (c Config) WithTracing(tracer trace.Tracer) Config

WithTracing enables OpenTelemetry tracing

type ContextKey

type ContextKey string

ContextKey is a type for context keys used by the audit system.

const (
	// ContextKeyUserID is the context key for the user ID.
	ContextKeyUserID ContextKey = "dbkit_user_id"
	// ContextKeyIPAddress is the context key for the IP address.
	ContextKeyIPAddress ContextKey = "dbkit_ip_address"
	// ContextKeyUserAgent is the context key for the user agent.
	ContextKeyUserAgent ContextKey = "dbkit_user_agent"
)

type Cursor

type Cursor struct {
	ID        string `json:"id"`
	SortValue string `json:"sv,omitempty"`
}

Cursor represents a pagination cursor.

func DecodeCursor

func DecodeCursor(cursor string) (*Cursor, error)

DecodeCursor decodes a base64 cursor string.

type CursorPage

type CursorPage[T any] struct {
	Items    []T      `json:"items"`
	PageInfo PageInfo `json:"page_info"`
}

CursorPage represents a cursor-based paginated result.

type DBKit

type DBKit struct {
	*bun.DB
	// contains filtered or unexported fields
}

DBKit wraps bun.DB with additional functionality

func New

func New(cfg Config) (*DBKit, error)

New creates a new database connection with the given configuration

func (*DBKit) Begin

func (db *DBKit) Begin(ctx context.Context) (*Tx, error)

Begin starts a new transaction (manual control)

func (*DBKit) BeginWithOptions

func (db *DBKit) BeginWithOptions(ctx context.Context, opts TxOptions) (*Tx, error)

BeginWithOptions starts a new transaction with custom options

func (*DBKit) Bun

func (db *DBKit) Bun() *bun.DB

Bun returns the underlying bun.DB for direct access

func (*DBKit) Close

func (db *DBKit) Close() error

Close closes the database connection

func (*DBKit) Config

func (db *DBKit) Config() Config

Config returns the current configuration

func (*DBKit) GetAppliedMigrations

func (db *DBKit) GetAppliedMigrations(ctx context.Context) ([]AppliedMigration, error)

GetAppliedMigrations returns all migrations that have been applied

func (*DBKit) Health

func (db *DBKit) Health(ctx context.Context) HealthStatus

Health performs a health check with detailed status

func (*DBKit) IsHealthy

func (db *DBKit) IsHealthy(ctx context.Context) bool

IsHealthy returns true if the database is reachable

func (*DBKit) Migrate

func (db *DBKit) Migrate(ctx context.Context, migrations []Migration) (*MigrationResult, error)

Migrate executes migrations in order, skipping already-applied ones

func (*DBKit) MigrationStatus

func (db *DBKit) MigrationStatus(ctx context.Context, migrations []Migration) ([]MigrationStatusEntry, error)

MigrationStatus returns the status of all known migrations

func (*DBKit) Ping

func (db *DBKit) Ping(ctx context.Context) error

Ping verifies the database connection is alive

func (*DBKit) ReadOnlyTransaction

func (db *DBKit) ReadOnlyTransaction(ctx context.Context, fn TxFunc) error

ReadOnlyTransaction executes fn within a read-only transaction

func (*DBKit) Stats

func (db *DBKit) Stats() sql.DBStats

Stats returns connection pool statistics

func (*DBKit) Transaction

func (db *DBKit) Transaction(ctx context.Context, fn TxFunc) error

Transaction executes fn within a transaction with automatic commit/rollback

func (*DBKit) TransactionWithOptions

func (db *DBKit) TransactionWithOptions(ctx context.Context, opts TxOptions, fn TxFunc) error

TransactionWithOptions executes fn within a transaction with custom options

type Error

type Error struct {
	Code       ErrorCode // Error classification
	Message    string    // Human-readable message
	Op         string    // Operation that failed (e.g., "FindByID", "Create")
	Table      string    // Table name if known
	Column     string    // Column name if known
	Constraint string    // Constraint name if applicable
	Detail     string    // Additional detail from PostgreSQL
	Hint       string    // Hint from PostgreSQL
	Query      string    // Query that failed (may be empty for security)
	Cause      error     // Underlying error
}

Error is a rich database error with context

func (*Error) Error

func (e *Error) Error() string

func (*Error) Is

func (e *Error) Is(target error) bool

Is implements errors.Is for sentinel error matching

func (*Error) Unwrap

func (e *Error) Unwrap() error

type ErrorCode

type ErrorCode string

ErrorCode represents a database error classification

const (
	CodeNotFound         ErrorCode = "NOT_FOUND"
	CodeDuplicate        ErrorCode = "DUPLICATE"
	CodeForeignKey       ErrorCode = "FOREIGN_KEY"
	CodeCheckViolation   ErrorCode = "CHECK_VIOLATION"
	CodeNotNullViolation ErrorCode = "NOT_NULL"
	CodeConnectionFailed ErrorCode = "CONNECTION_FAILED"
	CodeTimeout          ErrorCode = "TIMEOUT"
	CodeSerialization    ErrorCode = "SERIALIZATION"
	CodeDeadlock         ErrorCode = "DEADLOCK"
	CodeConflict         ErrorCode = "CONFLICT"
	CodeUnknown          ErrorCode = "UNKNOWN"
)

func GetErrorCode

func GetErrorCode(err error) (ErrorCode, bool)

GetErrorCode extracts the error code if it's a dbkit error

type FullModel

type FullModel struct {
	ID        string     `bun:"id,pk,type:uuid,default:gen_random_uuid()"`
	CreatedAt time.Time  `bun:"created_at,nullzero,notnull,default:current_timestamp"`
	UpdatedAt time.Time  `bun:"updated_at,nullzero,notnull,default:current_timestamp"`
	DeletedAt *time.Time `bun:"deleted_at,soft_delete,nullzero"`
	Version   int64      `bun:"version,notnull,default:1"`
}

FullModel combines BaseModel, SoftDeletableModel, and VersionedModel. Use this for models that need all features.

Usage:

type User struct {
    bun.BaseModel `bun:"table:users,alias:u"`
    dbkit.FullModel
    Email string `bun:"email,notnull,unique"`
}

func (*FullModel) BeforeAppendModel

func (m *FullModel) BeforeAppendModel(ctx context.Context, query schema.Query) error

func (*FullModel) IsDeleted

func (m *FullModel) IsDeleted() bool

IsDeleted returns true if the model has been soft deleted.

type HealthStatus

type HealthStatus struct {
	Healthy   bool          `json:"healthy"`
	Latency   time.Duration `json:"latency"`
	Error     string        `json:"error,omitempty"`
	PoolStats PoolStats     `json:"pool_stats"`
}

HealthStatus represents the database health status

type IDB

type IDB interface {
	bun.IDB
	NewSelect() *bun.SelectQuery
	NewInsert() *bun.InsertQuery
	NewUpdate() *bun.UpdateQuery
	NewDelete() *bun.DeleteQuery
	NewRaw(query string, args ...any) *bun.RawQuery
	NewCreateTable() *bun.CreateTableQuery
	NewDropTable() *bun.DropTableQuery
	NewCreateIndex() *bun.CreateIndexQuery
	NewDropIndex() *bun.DropIndexQuery
	NewTruncateTable() *bun.TruncateTableQuery
	NewAddColumn() *bun.AddColumnQuery
	NewDropColumn() *bun.DropColumnQuery
	ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
	QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
	QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}

IDB is the interface for both DB and Tx to enable function reuse

type Migration

type Migration struct {
	ID          string // Unique identifier (e.g., "001", "20240115120000", or any string)
	Description string // Human-readable description
	SQL         string // SQL statements to execute
}

Migration represents a single migration to execute

type MigrationResult

type MigrationResult struct {
	Applied   []AppliedMigration
	Skipped   []string // IDs that were already applied
	TotalTime time.Duration
}

MigrationResult represents the result of running migrations

type MigrationStatusEntry

type MigrationStatusEntry struct {
	ID            string
	Description   string
	Checksum      string
	Applied       bool
	ChecksumMatch bool // Only relevant if Applied is true
}

MigrationStatusEntry represents the status of a single migration

type OffsetPage

type OffsetPage[T any] struct {
	Items      []T      `json:"items"`
	Page       int      `json:"page"`
	PageSize   int      `json:"page_size"`
	TotalItems int      `json:"total_items"`
	TotalPages int      `json:"total_pages"`
	PageInfo   PageInfo `json:"page_info"`
}

OffsetPage represents an offset-based paginated result.

func PaginateWithCount

func PaginateWithCount[T any](ctx context.Context, db bun.IDB, page, pageSize int, queryFn func(*bun.SelectQuery) *bun.SelectQuery) (*OffsetPage[T], error)

PaginateWithCount executes an offset-paginated query and returns results with metadata.

Usage:

page, err := dbkit.PaginateWithCount[User](ctx, db, 1, 10, func(q *bun.SelectQuery) *bun.SelectQuery {
    return q.Where("active = ?", true).Order("created_at DESC")
})

type PageInfo

type PageInfo struct {
	HasNextPage     bool   `json:"has_next_page"`
	HasPreviousPage bool   `json:"has_previous_page"`
	StartCursor     string `json:"start_cursor,omitempty"`
	EndCursor       string `json:"end_cursor,omitempty"`
	TotalCount      int    `json:"total_count,omitempty"`
}

PageInfo contains pagination metadata.

func CursorPaginateResult

func CursorPaginateResult[T any](items []T, limit int, forward bool, cursorFn func(T) string) ([]T, PageInfo)

CursorPaginateResult processes cursor pagination results and builds page info. Pass the items fetched with limit+1, and it will trim and determine hasMore.

Usage:

items, pageInfo := dbkit.CursorPaginateResult(users, 10, true, func(u User) string {
    return dbkit.EncodeCursor(u.ID, "")
})

type PaginationOptions

type PaginationOptions struct {
	// Page number (1-indexed) for offset pagination
	Page int
	// PageSize is the number of items per page
	PageSize int
	// After cursor for forward cursor pagination
	After string
	// Before cursor for backward cursor pagination
	Before string
	// First N items (cursor pagination)
	First int
	// Last N items (cursor pagination)
	Last int
	// IncludeTotalCount includes total count in response (can be expensive)
	IncludeTotalCount bool
}

PaginationOptions configures pagination behavior.

type PoolStats

type PoolStats struct {
	MaxOpenConnections int           `json:"max_open_connections"`
	OpenConnections    int           `json:"open_connections"`
	InUse              int           `json:"in_use"`
	Idle               int           `json:"idle"`
	WaitCount          int64         `json:"wait_count"`
	WaitDuration       time.Duration `json:"wait_duration"`
	MaxIdleClosed      int64         `json:"max_idle_closed"`
	MaxIdleTimeClosed  int64         `json:"max_idle_time_closed"`
	MaxLifetimeClosed  int64         `json:"max_lifetime_closed"`
}

PoolStats contains connection pool statistics

func PoolStatsFromSQL

func PoolStatsFromSQL(stats sql.DBStats) PoolStats

PoolStatsFromSQL converts sql.DBStats to PoolStats

type QueryResult

type QueryResult[T any] struct {
	// contains filtered or unexported fields
}

QueryResult wraps a query result with error context for chainable error handling. It provides a way to add meaningful context to errors without depending on Bun internals.

func WithErr

func WithErr[T any](result T, err error, op string) *QueryResult[T]

WithErr wraps a result and error with operation context for enhanced error handling. This function allows chainable error handling with meaningful context.

Usage:

// For operations that return (sql.Result, error)
result, err := dbkit.WithErr(db.NewInsert().Model(&user).Exec(ctx), "CreateUser").Unwrap()

// For operations that return only error (like Scan)
err := dbkit.WithErr1(db.NewSelect().Model(&user).Where("id = ?", id).Scan(ctx), "FindByID").Err()

// Check error directly
if dbkit.WithErr(db.NewInsert().Model(&user).Exec(ctx), "CreateUser").HasError() {
    // handle error
}

func WithErr1

func WithErr1(err error, op string) *QueryResult[struct{}]

WithErr1 is a convenience function for operations that return only an error. This is useful for Scan() operations which don't return a result.

Usage:

err := dbkit.WithErr1(db.NewSelect().Model(&user).Where("id = ?", id).Scan(ctx), "FindByID").Err()

func (*QueryResult[T]) Err

func (qr *QueryResult[T]) Err() error

Err returns the wrapped error with enhanced context. If there was no error, it returns nil.

func (*QueryResult[T]) HasError

func (qr *QueryResult[T]) HasError() bool

HasError returns true if there was an error.

func (*QueryResult[T]) Result

func (qr *QueryResult[T]) Result() T

Result returns only the result value. Use Err() to check for errors first.

func (*QueryResult[T]) Unwrap

func (qr *QueryResult[T]) Unwrap() (T, error)

Unwrap returns the result and the wrapped error. Use this when you need both the result and the error.

type SoftDeletableModel

type SoftDeletableModel struct {
	DeletedAt *time.Time `bun:"deleted_at,soft_delete,nullzero"`
}

SoftDeletableModel adds soft delete capability to models. Embed this alongside BaseModel for soft delete functionality.

Usage:

type User struct {
    bun.BaseModel `bun:"table:users,alias:u"`
    dbkit.BaseModel
    dbkit.SoftDeletableModel
    Email string `bun:"email,notnull,unique"`
}

When querying, add a filter to exclude soft-deleted records:

db.NewSelect().Model(&users).Where("deleted_at IS NULL").Scan(ctx)

To soft delete:

db.NewUpdate().Model(&user).Set("deleted_at = ?", time.Now()).WherePK().Exec(ctx)

To restore:

db.NewUpdate().Model(&user).Set("deleted_at = NULL").WherePK().Exec(ctx)

func (*SoftDeletableModel) IsDeleted

func (m *SoftDeletableModel) IsDeleted() bool

IsDeleted returns true if the model has been soft deleted.

type Tenant

type Tenant struct {
	bun.BaseModel `bun:"table:tenants,alias:t"`

	ID        string `bun:"id,pk,type:uuid,default:gen_random_uuid()"`
	Name      string `bun:"name,notnull"`
	Subdomain string `bun:"subdomain,notnull,unique"`
	Active    bool   `bun:"active,notnull,default:true"`
	Metadata  string `bun:"metadata,type:jsonb"`

	TimestampedModel
}

Tenant represents a tenant entity.

type TenantConfig

type TenantConfig struct {
	// Column is the tenant ID column name
	Column string

	// EnforceOnSelect automatically filters SELECT queries by tenant
	EnforceOnSelect bool

	// EnforceOnUpdate automatically filters UPDATE queries by tenant
	EnforceOnUpdate bool

	// EnforceOnDelete automatically filters DELETE queries by tenant
	EnforceOnDelete bool

	// SetOnInsert automatically sets tenant ID on INSERT
	SetOnInsert bool
}

TenantConfig configures multi-tenancy behavior.

func DefaultTenantConfig

func DefaultTenantConfig() TenantConfig

DefaultTenantConfig returns the default tenant configuration.

type TenantContextKey

type TenantContextKey struct{}

TenantContextKey is the context key for tenant ID.

type TenantHook

type TenantHook struct {
	// Column is the tenant ID column name (default: "tenant_id")
	Column string
}

TenantHook is a Bun query hook that automatically applies tenant filtering.

func NewTenantHook

func NewTenantHook(column string) *TenantHook

NewTenantHook creates a new tenant hook.

type TenantIsolation

type TenantIsolation struct {
	// contains filtered or unexported fields
}

TenantIsolation provides methods for tenant-isolated operations.

func NewTenantIsolation

func NewTenantIsolation(db bun.IDB, config TenantConfig) *TenantIsolation

NewTenantIsolation creates a new tenant isolation helper.

func (*TenantIsolation) Delete

func (ti *TenantIsolation) Delete(ctx context.Context) *bun.DeleteQuery

Delete creates a tenant-scoped DELETE query.

Usage:

ti.Delete(ctx).Model(&user).WherePK().Exec(ctx)

func (*TenantIsolation) Insert

func (ti *TenantIsolation) Insert(ctx context.Context) *bun.InsertQuery

Insert creates a query and can set tenant ID automatically. Note: You should still set the tenant ID on the model before insert.

func (*TenantIsolation) Select

func (ti *TenantIsolation) Select(ctx context.Context) *bun.SelectQuery

Select creates a tenant-scoped SELECT query.

Usage:

var users []User
ti.Select(ctx).Model(&users).Scan(ctx)

func (*TenantIsolation) Update

func (ti *TenantIsolation) Update(ctx context.Context) *bun.UpdateQuery

Update creates a tenant-scoped UPDATE query.

Usage:

ti.Update(ctx).Model(&user).WherePK().Exec(ctx)

type TenantModel

type TenantModel struct {
	TenantID string `bun:"tenant_id,notnull"`
}

TenantModel provides tenant isolation for models. Embed this in your model structs to add tenant_id field.

Usage:

type User struct {
    bun.BaseModel `bun:"table:users,alias:u"`
    dbkit.BaseModel
    dbkit.TenantModel
    Email string `bun:"email,notnull"`
}

func (*TenantModel) SetTenantID

func (m *TenantModel) SetTenantID(tenantID string)

SetTenantID sets the tenant ID on the model.

type TimestampedModel

type TimestampedModel struct {
	CreatedAt time.Time `bun:"created_at,nullzero,notnull,default:current_timestamp"`
	UpdatedAt time.Time `bun:"updated_at,nullzero,notnull,default:current_timestamp"`
}

TimestampedModel is an alias for BaseModel for clarity. Use this if you only need timestamps without the ID field.

Usage:

type AuditLog struct {
    bun.BaseModel `bun:"table:audit_logs,alias:al"`
    ID            int64 `bun:"id,pk,autoincrement"`
    dbkit.TimestampedModel
    Action string `bun:"action,notnull"`
}

func (*TimestampedModel) BeforeAppendModel

func (m *TimestampedModel) BeforeAppendModel(ctx context.Context, query schema.Query) error

type Tx

type Tx struct {
	bun.Tx
	// contains filtered or unexported fields
}

Tx wraps bun.Tx with additional functionality

func (*Tx) Commit

func (tx *Tx) Commit() error

Commit commits the transaction

func (*Tx) DBKit

func (tx *Tx) DBKit() *DBKit

DB returns the parent database

func (*Tx) ReleaseSavepoint

func (tx *Tx) ReleaseSavepoint(ctx context.Context, name string) error

ReleaseSavepoint releases a named savepoint

func (*Tx) Rollback

func (tx *Tx) Rollback() error

Rollback aborts the transaction

func (*Tx) RollbackTo

func (tx *Tx) RollbackTo(ctx context.Context, name string) error

RollbackTo rolls back to a named savepoint

func (*Tx) Savepoint

func (tx *Tx) Savepoint(ctx context.Context, name string) error

Savepoint creates a named savepoint for manual control

func (*Tx) Transaction

func (tx *Tx) Transaction(ctx context.Context, fn TxFunc) error

Transaction creates a savepoint for nested transaction support

type TxFunc

type TxFunc func(tx *Tx) error

TxFunc is a function executed within a transaction

type TxOptions

type TxOptions struct {
	Isolation sql.IsolationLevel
	ReadOnly  bool
}

TxOptions configures transaction behavior

func DefaultTxOptions

func DefaultTxOptions() TxOptions

DefaultTxOptions returns default transaction options

func ReadOnlyTxOptions

func ReadOnlyTxOptions() TxOptions

ReadOnlyTxOptions returns options for read-only transactions

func SerializableTxOptions

func SerializableTxOptions() TxOptions

SerializableTxOptions returns options for serializable transactions

type VersionedModel

type VersionedModel struct {
	Version int64 `bun:"version,notnull,default:1"`
}

VersionedModel adds optimistic locking capability to models. Embed this alongside BaseModel for version-based conflict detection.

Usage:

type User struct {
    bun.BaseModel `bun:"table:users,alias:u"`
    dbkit.BaseModel
    dbkit.VersionedModel
    Email string `bun:"email,notnull,unique"`
}

When updating, include version check:

result, err := db.NewUpdate().
    Model(&user).
    Set("version = version + 1").
    Where("id = ?", user.ID).
    Where("version = ?", user.Version).
    Exec(ctx)

Check if update was successful:

rows, _ := result.RowsAffected()
if rows == 0 {
    // Conflict detected - record was modified by another process
}

type VersionedUpdate

type VersionedUpdate[T any] struct {
	// contains filtered or unexported fields
}

VersionedUpdate is a helper struct for building versioned update queries.

func NewVersionedUpdate

func NewVersionedUpdate[T any](db bun.IDB, model *T, version int64) *VersionedUpdate[T]

NewVersionedUpdate creates a new versioned update builder.

func (*VersionedUpdate[T]) Columns

func (v *VersionedUpdate[T]) Columns(cols ...string) *VersionedUpdate[T]

Columns specifies which columns to update.

func (*VersionedUpdate[T]) Exec

func (v *VersionedUpdate[T]) Exec(ctx context.Context) (sql.Result, error)

Exec executes the versioned update.

Directories

Path Synopsis
Package hooks provides observability hooks for dbkit
Package hooks provides observability hooks for dbkit

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL