dbutil

package module
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 10, 2025 License: MIT Imports: 15 Imported by: 0

README

Database Utilities

Go Version CI Status Go Report Card Release

A reusable Go package that provides database connection utilities and testing infrastructure for applications using PostgreSQL with pgx and sqlc.

Overview

This package is designed specifically for sqlc users who want:

  • Reusable database utilities that work with any sqlc-generated queries
  • Optimized testing infrastructure with shared connections for faster tests
  • Type-safe PostgreSQL operations with comprehensive pgx type helpers
  • Structured error handling with consistent error types
  • Production-ready features like health checks, metrics, retry logic, and connection hooks

Installation

go get github.com/nhalm/dbutil

Quick Start

package main

import (
    "context"
    "log"
    
    "github.com/nhalm/dbutil"
    "your-project/internal/repository/sqlc" // Your sqlc-generated package
)

func main() {
    ctx := context.Background()
    
    // Create connection with your sqlc queries
    conn, err := dbutil.NewConnection(ctx, "", sqlc.New)
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()
    
    // Use your queries directly
    queries := conn.Queries()
    users, err := queries.GetAllUsers(ctx)
    if err != nil {
        log.Fatal(err)
    }
    
    log.Printf("Found %d users", len(users))
}

Configuration

Environment Variables

The package uses these environment variables with sensible defaults:

  • POSTGRES_HOST (default: "localhost")
  • POSTGRES_PORT (default: 5432)
  • POSTGRES_USER (default: "postgres")
  • POSTGRES_PASSWORD (default: "")
  • POSTGRES_DB (default: "postgres")
  • POSTGRES_SSLMODE (default: "disable")
  • TEST_DATABASE_URL (for integration tests)
Custom Configuration
config := &dbutil.Config{
    MaxConns:        20,
    MinConns:        5,
    MaxConnLifetime: 1 * time.Hour,
    SearchPath:      "myschema",
}
conn, err := dbutil.NewConnectionWithConfig(ctx, "", sqlc.New, config)

Key Features

Generic Design

Works with any sqlc-generated queries without coupling to specific packages:

conn, err := dbutil.NewConnection(ctx, "", myapp.New)
conn, err := dbutil.NewConnection(ctx, "", yourapp.New)
Transaction Support
err = conn.WithTransaction(ctx, func(ctx context.Context, tx *sqlc.Queries) error {
    // All operations run in transaction, automatically rolled back on error
    user, err := tx.CreateUser(ctx, params)
    if err != nil {
        return err
    }
    return tx.CreateUserProfile(ctx, profileParams)
})
Health Checks & Monitoring
if conn.IsReady(ctx) {
    log.Println("Database is ready")
}

stats := conn.Stats()
log.Printf("Active connections: %d", stats.TotalConns())

// With metrics and hooks
conn = conn.WithMetrics(myMetricsCollector)
conn = conn.WithHooks(myHooks)
Read/Write Splitting
rwConn, err := dbutil.NewReadWriteConnection(ctx, readDSN, writeDSN, sqlc.New)
readQueries := rwConn.ReadQueries()   // Use for SELECT queries
writeQueries := rwConn.WriteQueries() // Use for INSERT/UPDATE/DELETE
Retry Logic
retryableConn := conn.WithRetry(nil) // Uses defaults
err = retryableConn.WithRetryableTransaction(ctx, func(ctx context.Context, tx *sqlc.Queries) error {
    return tx.CreateUser(ctx, params)
})

Testing

This package provides optimized testing utilities with shared connections for faster integration tests:

func TestUserOperations(t *testing.T) {
    conn := dbutil.RequireTestDB(t, sqlc.New)     // Shared connection
    dbutil.CleanupTestData(conn,                  // Clean data between tests
        "DELETE FROM users WHERE email LIKE 'test_%'",
    )
    
    // Run your test logic
    queries := conn.Queries()
    user, err := queries.CreateUser(ctx, params)
    // ... test assertions
}
Test Database Setup
# Start test database
docker run --name test-postgres -e POSTGRES_PASSWORD=testpass -p 5433:5432 -d postgres:15

# Set environment variable
export TEST_DATABASE_URL="postgres://postgres:testpass@localhost:5433/postgres?sslmode=disable"

# Run integration tests
go test ./...
Test Utilities
  • RequireTestDB(t, sqlc.New) - Returns shared test connection, skips if no database
  • CleanupTestData(conn, "DELETE ...") - Cleans test data between tests
  • GetTestConnection(sqlc.New) - Returns connection or nil if unavailable

Type Helpers

Comprehensive pgx type conversion utilities:

// String conversions
pgxText := dbutil.ToPgxText(&myString)
stringPtr := dbutil.FromPgxText(pgxText)

// Numeric conversions
pgxNum := dbutil.ToPgxNumericFromFloat64Ptr(&myFloat)
floatPtr := dbutil.FromPgxNumericPtr(pgxNum)

// Time conversions
pgxTime := dbutil.ToPgxTimestamptz(&myTime)
timePtr := dbutil.FromPgxTimestamptzPtr(pgxTime)

// UUID conversions
pgxUUID := dbutil.ToPgxUUID(myUUID)
myUUID := dbutil.FromPgxUUID(pgxUUID)

Error Handling

Structured error types for consistent error handling:

// Create structured errors
err := dbutil.NewNotFoundError("User", userID)
err := dbutil.NewValidationError("Email", "create", "address", "invalid format", nil)
err := dbutil.NewDatabaseError("Order", "query", originalErr)

// Use with errors.As for type checking
var notFoundErr *dbutil.NotFoundError
if errors.As(err, &notFoundErr) {
    // Handle not found case
}

Examples

See examples.md for comprehensive usage examples including:

  • Custom configuration
  • Transaction handling
  • Error handling patterns
  • Read/write splitting
  • Retry logic
  • Connection hooks
  • Integration testing
  • Type conversion helpers

Integration with golang-migrate

Use GetDSN() with golang-migrate for database migrations:

import (
    "github.com/golang-migrate/migrate/v4"
    _ "github.com/golang-migrate/migrate/v4/database/postgres"
    _ "github.com/golang-migrate/migrate/v4/source/file"
)

m, err := migrate.New("file://migrations", dbutil.GetDSN())
if err != nil {
    log.Fatal(err)
}
defer m.Close()

if err := m.Up(); err != nil {
    log.Fatal(err)
}

Documentation

Overview

Package dbutil provides database connection utilities and testing infrastructure for applications using PostgreSQL with pgx and sqlc.

This package is designed specifically for sqlc users who want:

  • Type-safe database operations with any sqlc-generated queries
  • Optimized testing infrastructure with shared connections
  • Comprehensive pgx type helpers for seamless type conversions
  • Structured error handling with consistent error types
  • Connection lifecycle management with hooks
  • Production-ready features like health checks, metrics, and retry logic
  • Read/write connection splitting for scaled deployments

The package uses Go generics to work with any sqlc-generated queries without requiring code generation or importing specific sqlc packages.

Example usage:

conn, err := database.NewConnection(ctx, "", sqlc.New)
if err != nil {
    log.Fatal(err)
}
defer conn.Close()

queries := conn.Queries()
users, err := queries.GetAllUsers(ctx)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CleanupTestData

func CleanupTestData[T Querier](conn *Connection[T], sqlStatements ...string)

CleanupTestData executes cleanup SQL statements This is a generic cleanup utility that takes SQL statements as parameters

func FromPgxBool

func FromPgxBool(b pgtype.Bool) *bool

FromPgxBool converts pgtype.Bool to *bool

func FromPgxInt4

func FromPgxInt4(val pgtype.Int4) *int

func FromPgxInt8

func FromPgxInt8(i pgtype.Int8) *int64

func FromPgxNumericPtr

func FromPgxNumericPtr(val pgtype.Numeric) *float64

FromPgxNumericPtr converts pgtype.Numeric to *float64

func FromPgxText

func FromPgxText(t pgtype.Text) *string

FromPgxText converts a pgtype.Text to a string pointer. If the pgtype.Text is invalid (NULL), returns nil.

func FromPgxTextToString

func FromPgxTextToString(val pgtype.Text) string

func FromPgxTimestamptz

func FromPgxTimestamptz(ts pgtype.Timestamptz) time.Time

func FromPgxTimestamptzPtr

func FromPgxTimestamptzPtr(val pgtype.Timestamptz) *time.Time

func FromPgxUUID

func FromPgxUUID(pgxID pgtype.UUID) uuid.UUID

FromPgxUUID converts a pgtype.UUID to uuid.UUID. Returns uuid.Nil if the pgtype.UUID is invalid or cannot be parsed.

func GetDSN

func GetDSN() string

GetDSN returns the database connection string using the same POSTGRES_* environment variables that NewConnection uses. This is useful for tools like golang-migrate that need a connection string rather than a pgxpool.Pool.

func ToPgxBool

func ToPgxBool(b *bool) pgtype.Bool

ToPgxBool converts a *bool to pgtype.Bool

func ToPgxInt4FromInt

func ToPgxInt4FromInt(val *int) pgtype.Int4

func ToPgxInt8

func ToPgxInt8(i *int64) pgtype.Int8

ToPgxInt8 converts an int64 pointer to pgtype.Int8. If the input is nil, returns an invalid pgtype.Int8 (NULL in database).

func ToPgxNumericFromFloat64Ptr

func ToPgxNumericFromFloat64Ptr(val *float64) pgtype.Numeric

ToPgxNumericFromFloat64Ptr converts *float64 to pgtype.Numeric with configurable precision

func ToPgxText

func ToPgxText(s *string) pgtype.Text

ToPgxText converts a string pointer to pgtype.Text. If the input is nil, returns an invalid pgtype.Text (NULL in database).

func ToPgxTimestamptz

func ToPgxTimestamptz(t *time.Time) pgtype.Timestamptz

func ToPgxUUID

func ToPgxUUID(id uuid.UUID) pgtype.UUID

ToPgxUUID converts a uuid.UUID to pgtype.UUID.

func WithTimeout

func WithTimeout[T any](ctx context.Context, timeout time.Duration, fn func(context.Context) (T, error)) (T, error)

WithTimeout executes a function with a timeout and optional retry logic

func WithTimeoutAndRetry

func WithTimeoutAndRetry[T any](ctx context.Context, timeout time.Duration, retryConfig *RetryConfig, fn func(context.Context) (T, error)) (T, error)

WithTimeoutAndRetry executes a function with timeout and retry logic

Types

type Config

type Config struct {
	MaxConns        int32
	MinConns        int32
	MaxConnLifetime time.Duration
	SearchPath      string
	OnConnect       func(*pgx.Conn) error
	OnDisconnect    func(*pgx.Conn)
	Hooks           *ConnectionHooks
}

Config holds configuration options for database connections

type Connection

type Connection[T Querier] struct {
	// contains filtered or unexported fields
}

Connection represents a database connection with sqlc queries

func GetTestConnection

func GetTestConnection[T Querier](newQueriesFunc func(*pgxpool.Pool) T) *Connection[T]

GetTestConnection returns a shared test database connection, initializing it once This is a generic function that must be called with the appropriate type parameter

func NewConnection

func NewConnection[T Querier](ctx context.Context, dsn string, newQueriesFunc func(*pgxpool.Pool) T) (*Connection[T], error)

NewConnection initializes a new pgxpool.Pool connection with user-provided sqlc queries

func NewConnectionWithConfig

func NewConnectionWithConfig[T Querier](ctx context.Context, dsn string, newQueriesFunc func(*pgxpool.Pool) T, cfg *Config) (*Connection[T], error)

NewConnectionWithConfig initializes a new pgxpool.Pool connection with configuration options

func NewConnectionWithHooks

func NewConnectionWithHooks[T Querier](ctx context.Context, dsn string, newQueriesFunc func(*pgxpool.Pool) T, hooks *ConnectionHooks) (*Connection[T], error)

NewConnectionWithHooks creates a connection with hooks enabled

func NewConnectionWithLoggingHooks

func NewConnectionWithLoggingHooks[T Querier](ctx context.Context, dsn string, newQueriesFunc func(*pgxpool.Pool) T, logger Logger) (*Connection[T], error)

NewConnectionWithLoggingHooks creates a connection with logging hooks enabled

func NewConnectionWithValidationHooks

func NewConnectionWithValidationHooks[T Querier](ctx context.Context, dsn string, newQueriesFunc func(*pgxpool.Pool) T) (*Connection[T], error)

NewConnectionWithValidationHooks creates a connection with validation hooks enabled

func RequireTestDB

func RequireTestDB[T Querier](t TestingT, newQueriesFunc func(*pgxpool.Pool) T) *Connection[T]

RequireTestDB ensures a test database is available or skips the test

func (*Connection[T]) AddHook

func (c *Connection[T]) AddHook(hook *ConnectionHooks) *Connection[T]

AddHook adds a hook to the existing connection's hooks (creates hooks if none exist)

func (*Connection[T]) BeginTransaction

func (c *Connection[T]) BeginTransaction(ctx context.Context) (pgx.Tx, T, error)

BeginTransaction starts a new transaction and returns both the transaction and a querier. The caller is responsible for committing or rolling back the transaction. This is useful for more complex transaction management scenarios.

func (*Connection[T]) Close

func (c *Connection[T]) Close()

Close closes the pool

func (*Connection[T]) GetDB

func (c *Connection[T]) GetDB() *pgxpool.Pool

GetDB returns the underlying *pgxpool.Pool

func (*Connection[T]) GetHooks

func (c *Connection[T]) GetHooks() *ConnectionHooks

GetHooks returns the current hooks (may be nil)

func (*Connection[T]) HealthCheck

func (c *Connection[T]) HealthCheck(ctx context.Context) error

HealthCheck performs a simple health check by pinging the database

func (*Connection[T]) IsReady

func (c *Connection[T]) IsReady(ctx context.Context) bool

IsReady checks if the database connection is ready to accept queries

func (*Connection[T]) Queries

func (c *Connection[T]) Queries() T

Queries returns the cached sqlc queries instance for this connection

func (*Connection[T]) Stats

func (c *Connection[T]) Stats() *pgxpool.Stat

Stats returns the current connection pool statistics

func (*Connection[T]) WithHooks

func (c *Connection[T]) WithHooks(hooks *ConnectionHooks) *Connection[T]

WithHooks returns a new connection with hooks enabled

func (*Connection[T]) WithLogging

func (c *Connection[T]) WithLogging(logger Logger) *LoggingConnection[T]

WithLogging returns a new connection with logging enabled

func (*Connection[T]) WithMetrics

func (c *Connection[T]) WithMetrics(metrics MetricsCollector) *Connection[T]

WithMetrics returns a new connection with metrics collection enabled

func (*Connection[T]) WithRetry

func (c *Connection[T]) WithRetry(config *RetryConfig) *RetryableConnection[T]

WithRetry returns a new connection with retry logic enabled

func (*Connection[T]) WithTransaction

func (c *Connection[T]) WithTransaction(ctx context.Context, fn TransactionFunc[T]) error

WithTransaction executes the given function within a database transaction. If the function returns an error, the transaction is rolled back. If the function completes successfully, the transaction is committed.

type ConnectionHooks

type ConnectionHooks struct {
	// contains filtered or unexported fields
}

ConnectionHooks manages connection lifecycle hooks

func CombineHooks

func CombineHooks(hooksList ...*ConnectionHooks) *ConnectionHooks

CombineHooks combines multiple hook managers into one

func LoggingHook

func LoggingHook(logger Logger) *ConnectionHooks

LoggingHook creates a hook that logs connection events

func MetricsHook

func MetricsHook(metrics MetricsCollector) *ConnectionHooks

MetricsHook creates a hook that records connection metrics. Note: Duration tracking for acquire/release is not implemented in hooks as it requires pool-level instrumentation. Use Connection.WithMetrics() for comprehensive metrics.

func NewConnectionHooks

func NewConnectionHooks() *ConnectionHooks

NewConnectionHooks creates a new connection hooks manager

func SetupHook

func SetupHook(setupSQL string) *ConnectionHooks

SetupHook creates a hook that sets up connection-specific settings

func ValidationHook

func ValidationHook() *ConnectionHooks

ValidationHook creates a hook that validates connections

func (*ConnectionHooks) AddOnAcquire

func (h *ConnectionHooks) AddOnAcquire(fn func(context.Context, *pgx.Conn) error)

AddOnAcquire adds a callback that will be called when a connection is acquired from the pool

func (*ConnectionHooks) AddOnConnect

func (h *ConnectionHooks) AddOnConnect(fn func(*pgx.Conn) error)

AddOnConnect adds a callback that will be called when a new connection is established

func (*ConnectionHooks) AddOnDisconnect

func (h *ConnectionHooks) AddOnDisconnect(fn func(*pgx.Conn))

AddOnDisconnect adds a callback that will be called when a connection is closed

func (*ConnectionHooks) AddOnRelease

func (h *ConnectionHooks) AddOnRelease(fn func(*pgx.Conn))

AddOnRelease adds a callback that will be called when a connection is released back to the pool

func (*ConnectionHooks) ExecuteOnAcquire

func (h *ConnectionHooks) ExecuteOnAcquire(ctx context.Context, conn *pgx.Conn) error

ExecuteOnAcquire executes all OnAcquire callbacks

func (*ConnectionHooks) ExecuteOnConnect

func (h *ConnectionHooks) ExecuteOnConnect(conn *pgx.Conn) error

ExecuteOnConnect executes all OnConnect callbacks

func (*ConnectionHooks) ExecuteOnDisconnect

func (h *ConnectionHooks) ExecuteOnDisconnect(conn *pgx.Conn)

ExecuteOnDisconnect executes all OnDisconnect callbacks

func (*ConnectionHooks) ExecuteOnRelease

func (h *ConnectionHooks) ExecuteOnRelease(conn *pgx.Conn)

ExecuteOnRelease executes all OnRelease callbacks

type DatabaseError

type DatabaseError struct {
	Entity    string
	Operation string // "create", "update", "delete", "query"
	Err       error
}

DatabaseError represents database operation failures such as connection errors, constraint violations, or other database-specific errors.

func NewDatabaseError

func NewDatabaseError(entity, operation string, err error) *DatabaseError

NewDatabaseError creates a new DatabaseError with the given entity, operation, and underlying error.

func (*DatabaseError) Error

func (e *DatabaseError) Error() string

func (*DatabaseError) Unwrap

func (e *DatabaseError) Unwrap() error

type DefaultLogger

type DefaultLogger struct {
	// contains filtered or unexported fields
}

DefaultLogger is a simple logger implementation using Go's standard log package

func NewDefaultLogger

func NewDefaultLogger(minLevel LogLevel) *DefaultLogger

NewDefaultLogger creates a new default logger with the specified minimum log level

func (*DefaultLogger) Log

func (l *DefaultLogger) Log(ctx context.Context, level LogLevel, msg string, data map[string]interface{})

Log implements the Logger interface

type LogLevel

type LogLevel int

LogLevel represents the severity level of a log message

const (
	LogLevelTrace LogLevel = iota
	LogLevelDebug
	LogLevelInfo
	LogLevelWarn
	LogLevelError
	LogLevelNone
)

func (LogLevel) String

func (l LogLevel) String() string

String returns the string representation of the log level

type Logger

type Logger interface {
	Log(ctx context.Context, level LogLevel, msg string, data map[string]interface{})
}

Logger interface for database logging

type LoggingConfig

type LoggingConfig struct {
	Logger             Logger
	LogLevel           LogLevel
	LogSlowQueries     bool
	SlowQueryThreshold time.Duration
	LogConnections     bool
	LogTransactions    bool
}

LoggingConfig holds configuration for database logging

func DefaultLoggingConfig

func DefaultLoggingConfig() *LoggingConfig

DefaultLoggingConfig returns a sensible default logging configuration

type LoggingConnection

type LoggingConnection[T Querier] struct {
	*Connection[T]
	// contains filtered or unexported fields
}

LoggingConnection wraps a Connection with logging capabilities

func NewConnectionWithConfigAndLogging

func NewConnectionWithConfigAndLogging[T Querier](ctx context.Context, dsn string, newQueriesFunc func(*pgxpool.Pool) T, cfg *Config, loggingConfig *LoggingConfig) (*LoggingConnection[T], error)

NewConnectionWithConfigAndLogging creates a new connection with both config and logging

func NewConnectionWithLogging

func NewConnectionWithLogging[T Querier](ctx context.Context, dsn string, newQueriesFunc func(*pgxpool.Pool) T, loggingConfig *LoggingConfig) (*LoggingConnection[T], error)

NewConnectionWithLogging creates a new connection with logging enabled

func (*LoggingConnection[T]) WithTransaction

func (lc *LoggingConnection[T]) WithTransaction(ctx context.Context, fn TransactionFunc[T]) error

WithTransaction executes the given function within a database transaction with logging

type MetricsCollector

type MetricsCollector interface {
	RecordConnectionAcquired(duration time.Duration)
	RecordConnectionReleased(duration time.Duration)
	RecordQueryExecuted(queryName string, duration time.Duration, err error)
	RecordTransactionStarted()
	RecordTransactionCommitted(duration time.Duration)
	RecordTransactionRolledBack(duration time.Duration)
}

MetricsCollector interface for collecting database metrics

type NotFoundError

type NotFoundError struct {
	Entity     string
	Identifier interface{}
}

NotFoundError represents when a requested entity is not found in the database. This error should be used instead of returning pgx.ErrNoRows directly.

func NewNotFoundError

func NewNotFoundError(entity string, identifier interface{}) *NotFoundError

NewNotFoundError creates a new NotFoundError with the given entity and identifier.

func (*NotFoundError) Error

func (e *NotFoundError) Error() string

type Querier

type Querier interface {
	WithTx(tx pgx.Tx) Querier
}

Querier represents the interface that sqlc-generated queries implement

type QueryLogger

type QueryLogger[T Querier] struct {
	// contains filtered or unexported fields
}

QueryLogger wraps query execution with logging

func NewQueryLogger

func NewQueryLogger[T Querier](queries T, logger Logger) *QueryLogger[T]

NewQueryLogger creates a new query logger wrapper

func (*QueryLogger[T]) LogQuery

func (ql *QueryLogger[T]) LogQuery(ctx context.Context, queryName string, fn func() error) error

LogQuery logs a query execution with timing

type ReadWriteConnection

type ReadWriteConnection[T Querier] struct {
	// contains filtered or unexported fields
}

ReadWriteConnection represents a database connection with separate read and write pools

func NewReadWriteConnection

func NewReadWriteConnection[T Querier](ctx context.Context, readDSN, writeDSN string, newQueriesFunc func(*pgxpool.Pool) T) (*ReadWriteConnection[T], error)

NewReadWriteConnection creates a new connection with separate read and write pools

func NewReadWriteConnectionWithConfig

func NewReadWriteConnectionWithConfig[T Querier](ctx context.Context, readDSN, writeDSN string, newQueriesFunc func(*pgxpool.Pool) T, readConfig, writeConfig *Config) (*ReadWriteConnection[T], error)

NewReadWriteConnectionWithConfig creates a new read/write connection with configuration options

func (*ReadWriteConnection[T]) BeginTransaction

func (rw *ReadWriteConnection[T]) BeginTransaction(ctx context.Context) (pgx.Tx, T, error)

BeginTransaction starts a new transaction on the write pool

func (*ReadWriteConnection[T]) Close

func (rw *ReadWriteConnection[T]) Close()

Close closes both read and write pools

func (*ReadWriteConnection[T]) HealthCheck

func (rw *ReadWriteConnection[T]) HealthCheck(ctx context.Context) error

HealthCheck performs health checks on both read and write connections

func (*ReadWriteConnection[T]) IsReady

func (rw *ReadWriteConnection[T]) IsReady(ctx context.Context) bool

IsReady checks if both read and write connections are ready

func (*ReadWriteConnection[T]) ReadDB

func (rw *ReadWriteConnection[T]) ReadDB() *pgxpool.Pool

ReadDB returns the underlying read pool

func (*ReadWriteConnection[T]) ReadQueries

func (rw *ReadWriteConnection[T]) ReadQueries() T

ReadQueries returns the queries instance for read operations

func (*ReadWriteConnection[T]) ReadStats

func (rw *ReadWriteConnection[T]) ReadStats() *pgxpool.Stat

ReadStats returns statistics for the read connection pool

func (*ReadWriteConnection[T]) WithHooks

func (rw *ReadWriteConnection[T]) WithHooks(hooks *ConnectionHooks) *ReadWriteConnection[T]

WithHooks returns a new read/write connection with hooks enabled

func (*ReadWriteConnection[T]) WithMetrics

func (rw *ReadWriteConnection[T]) WithMetrics(metrics MetricsCollector) *ReadWriteConnection[T]

WithMetrics returns a new read/write connection with metrics collection enabled

func (*ReadWriteConnection[T]) WithRetry

func (rw *ReadWriteConnection[T]) WithRetry(config *RetryConfig) *RetryableReadWriteConnection[T]

WithRetry returns a new read/write connection with retry logic enabled

func (*ReadWriteConnection[T]) WithTransaction

func (rw *ReadWriteConnection[T]) WithTransaction(ctx context.Context, fn TransactionFunc[T]) error

WithTransaction executes the given function within a database transaction on the write pool

func (*ReadWriteConnection[T]) WriteDB

func (rw *ReadWriteConnection[T]) WriteDB() *pgxpool.Pool

WriteDB returns the underlying write pool

func (*ReadWriteConnection[T]) WriteQueries

func (rw *ReadWriteConnection[T]) WriteQueries() T

WriteQueries returns the queries instance for write operations

func (*ReadWriteConnection[T]) WriteStats

func (rw *ReadWriteConnection[T]) WriteStats() *pgxpool.Stat

WriteStats returns statistics for the write connection pool

type RetryConfig

type RetryConfig struct {
	MaxRetries int
	BaseDelay  time.Duration
	MaxDelay   time.Duration
	Multiplier float64
}

RetryConfig holds configuration for retry logic

func DefaultRetryConfig

func DefaultRetryConfig() *RetryConfig

DefaultRetryConfig returns a sensible default retry configuration

type RetryableConnection

type RetryableConnection[T Querier] struct {
	*Connection[T]
	// contains filtered or unexported fields
}

RetryableConnection wraps a Connection with retry logic

func (*RetryableConnection[T]) WithRetryableTransaction

func (rc *RetryableConnection[T]) WithRetryableTransaction(ctx context.Context, fn TransactionFunc[T]) error

WithRetryableTransaction executes the given function within a database transaction with retry logic

type RetryableReadWriteConnection

type RetryableReadWriteConnection[T Querier] struct {
	*ReadWriteConnection[T]
	// contains filtered or unexported fields
}

RetryableReadWriteConnection wraps a ReadWriteConnection with retry logic

func (*RetryableReadWriteConnection[T]) WithRetryableTransaction

func (rrc *RetryableReadWriteConnection[T]) WithRetryableTransaction(ctx context.Context, fn TransactionFunc[T]) error

WithRetryableTransaction executes the given function within a database transaction with retry logic

type SlowQueryLogger

type SlowQueryLogger struct {
	// contains filtered or unexported fields
}

SlowQueryLogger logs only slow queries

func NewSlowQueryLogger

func NewSlowQueryLogger(logger Logger, threshold time.Duration) *SlowQueryLogger

NewSlowQueryLogger creates a new slow query logger

func (*SlowQueryLogger) LogIfSlow

func (sql *SlowQueryLogger) LogIfSlow(ctx context.Context, queryName string, duration time.Duration, err error)

LogIfSlow logs a query only if it exceeds the threshold

type TestingT

type TestingT interface {
	Skip(args ...interface{})
	Logf(format string, args ...interface{})
}

TestingT is an interface that matches both *testing.T and *testing.B

type TransactionFunc

type TransactionFunc[T Querier] func(ctx context.Context, tx T) error

TransactionFunc is a function that executes within a transaction

type ValidationError

type ValidationError struct {
	Entity    string
	Operation string
	Field     string
	Reason    string
	Err       error
}

ValidationError represents validation failures that occur before database operations. Use this for input validation, constraint violations, or business rule failures.

func NewValidationError

func NewValidationError(entity, operation, field, reason string, err error) *ValidationError

NewValidationError creates a new ValidationError with the given parameters.

func (*ValidationError) Error

func (e *ValidationError) Error() string

func (*ValidationError) Unwrap

func (e *ValidationError) Unwrap() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL