Documentation
¶
Overview ¶
Package gopgbase provides a unified PostgreSQL client with constructor injection.
gopgbase abstracts multiple PostgreSQL-compatible databases (PostgreSQL, AWS RDS, Supabase, CockroachDB, Neon, Redshift, TimescaleDB, Railway, Render, and others) behind the DataStore interface.
All database access flows through the DataStore interface — never directly through *sql.DB or *sql.Tx. Users inject a DataStore into NewClient and interact exclusively via Client methods.
Custom DataStore implementations (for mocking, alternative drivers, or custom connection pools) are encouraged and require no internal types.
Index ¶
- type Client
- func (c *Client) BatchTransaction(ctx context.Context, operations ...func(tx *sql.Tx) error) error
- func (c *Client) BulkCopy(ctx context.Context, table string, columns []string, data [][]any) (int64, error)
- func (c *Client) BulkInsert(ctx context.Context, table string, columns []string, values [][]any) (int64, error)
- func (c *Client) Count(ctx context.Context, table string, condition string, args ...any) (int64, error)
- func (c *Client) DataStore() DataStore
- func (c *Client) EnableObservability(_ context.Context) *ObservabilityLibrary
- func (c *Client) EnablePreparedStatements(_ context.Context) error
- func (c *Client) Exists(ctx context.Context, query string, args ...any) (bool, error)
- func (c *Client) ForEachRow(ctx context.Context, query string, args []any, ...) error
- func (c *Client) GrafanaDashboardJSON() string
- func (c *Client) HealthCheckHandler(w http.ResponseWriter, r *http.Request)
- func (c *Client) ImportGrafanaDashboard(grafanaURL, apiKey string) error
- func (c *Client) QueryBuilder() *QueryBuilderDSL
- func (c *Client) ReadOnlyTransaction(ctx context.Context, fn func(tx *sql.Tx) error) error
- func (c *Client) Savepoint(ctx context.Context, tx *sql.Tx, name string, fn func(tx *sql.Tx) error) (err error)
- func (c *Client) StructScan(_ context.Context, rows *sql.Rows, dest any) error
- func (c *Client) Transaction(ctx context.Context, fn func(tx *sql.Tx) error) error
- func (c *Client) TransactionWithIsolation(ctx context.Context, level sql.IsolationLevel, fn func(tx *sql.Tx) error) error
- func (c *Client) TunePool(_ context.Context, cpuCores, qps int) error
- func (c *Client) WithReadReplica(_ context.Context, replica DataStore) *Client
- type DataStore
- type ExplainPlan
- type HealthStatus
- type ObservabilityLibrary
- func (o *ObservabilityLibrary) ConnectionPoolMetrics() map[string]int
- func (o *ObservabilityLibrary) Enable(_ context.Context)
- func (o *ObservabilityLibrary) ExplainAnalyze(ctx context.Context, query string, args ...any) (*ExplainPlan, error)
- func (o *ObservabilityLibrary) PrometheusExporter(_ context.Context, port int) error
- func (o *ObservabilityLibrary) QueryMetrics() map[string]float64
- func (o *ObservabilityLibrary) RecordQuery(operation, query string, duration time.Duration, err error)
- func (o *ObservabilityLibrary) SlowQueryDetector(_ context.Context, thresholdMS int) []SlowQuery
- func (o *ObservabilityLibrary) TraceQueries(_ context.Context, sampleRate float64)
- func (o *ObservabilityLibrary) TraceQuery(ctx context.Context, operation, query string) (context.Context, trace.Span)
- func (o *ObservabilityLibrary) UpdatePoolMetrics()
- type QueryBuilderDSL
- func (qb *QueryBuilderDSL) Build() (string, []any, error)
- func (qb *QueryBuilderDSL) Columns(cols ...string) *QueryBuilderDSL
- func (qb *QueryBuilderDSL) Exec(ctx context.Context) (sql.Result, error)
- func (qb *QueryBuilderDSL) GroupBy(group string) *QueryBuilderDSL
- func (qb *QueryBuilderDSL) Having(having string, args ...any) *QueryBuilderDSL
- func (qb *QueryBuilderDSL) Join(join string) *QueryBuilderDSL
- func (qb *QueryBuilderDSL) Limit(n int) *QueryBuilderDSL
- func (qb *QueryBuilderDSL) Offset(n int) *QueryBuilderDSL
- func (qb *QueryBuilderDSL) OrderBy(order string) *QueryBuilderDSL
- func (qb *QueryBuilderDSL) Query(ctx context.Context) (*sql.Rows, error)
- func (qb *QueryBuilderDSL) Select(table string) *QueryBuilderDSL
- func (qb *QueryBuilderDSL) Where(condition string, args ...any) *QueryBuilderDSL
- type SlowQuery
- type Unwrapper
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is the main entry point for all database operations.
It wraps a DataStore (injected via NewClient) and provides helpers for transactions, queries, bulk operations, struct scanning, and more.
Client never constructs its own database connections — all access flows through the injected DataStore. Users may inject different adaptors, share a single adaptor across multiple Clients, or provide a custom DataStore implementation.
All Client methods are safe for concurrent use.
func NewClient ¶
NewClient creates a new Client backed by the given DataStore.
This is the constructor injection point: the caller is responsible for creating and configuring the DataStore (e.g., via an adaptor constructor).
Example:
ds, err := adaptors.NewPostgresAdaptor(cfg)
if err != nil { log.Fatal(err) }
client := gopgbase.NewClient(ds)
func (*Client) BatchTransaction ¶
BatchTransaction executes multiple operations sequentially within a single transaction.
If any operation returns an error, the entire transaction is rolled back.
func (*Client) BulkCopy ¶
func (c *Client) BulkCopy(ctx context.Context, table string, columns []string, data [][]any) (int64, error)
BulkCopy performs a high-performance COPY operation for bulk data loading.
This method requires a pgx-backed DataStore (one that implements Unwrap). For non-pgx DataStores, it falls back to BulkInsert.
Example:
n, err := client.BulkCopy(ctx, "metrics", []string{"time", "value"},
[][]any{{time.Now(), 42.0}, {time.Now(), 43.0}})
func (*Client) BulkInsert ¶
func (c *Client) BulkInsert(ctx context.Context, table string, columns []string, values [][]any) (int64, error)
BulkInsert inserts multiple rows into the given table using parameterized queries.
columns lists the column names, and values is a slice of rows where each row is a slice of column values. Returns the number of rows affected.
The insert is performed as a single statement with multiple value groups. For very large inserts (>65535 parameters), consider using BulkCopy instead.
Example:
n, err := client.BulkInsert(ctx, "users", []string{"name", "age"},
[][]any{{"Alice", 30}, {"Bob", 25}})
func (*Client) Count ¶
func (c *Client) Count(ctx context.Context, table string, condition string, args ...any) (int64, error)
Count returns the number of rows matching the condition in the given table.
The table parameter must be a trusted identifier (e.g., a constant or validated name) — it is quoted as an identifier but not parameterized. The condition is placed in a WHERE clause with args passed as placeholders. An empty condition counts all rows.
Example:
n, err := client.Count(ctx, "users", "active = $1", true)
func (*Client) DataStore ¶
DataStore returns the underlying DataStore for advanced or escape-hatch usage.
func (*Client) EnableObservability ¶
func (c *Client) EnableObservability(_ context.Context) *ObservabilityLibrary
EnableObservability initializes and returns the observability subsystem.
After calling this, Prometheus metrics are registered and available at the standard /metrics endpoint (via promhttp.Handler).
func (*Client) EnablePreparedStatements ¶
EnablePreparedStatements enables prepared statement caching on the underlying connection pool. This requires an Unwrapper-capable DataStore.
func (*Client) Exists ¶
Exists runs the provided SELECT query and returns true if at least one row is returned, false otherwise.
The query is wrapped in SELECT EXISTS(...). Always use placeholders for values in the query.
Example:
ok, err := client.Exists(ctx, "SELECT 1 FROM users WHERE email = $1", email)
func (*Client) ForEachRow ¶
func (c *Client) ForEachRow(ctx context.Context, query string, args []any, fn func(row map[string]any) error) error
ForEachRow executes query and calls fn for each row. Rows are not buffered in memory — fn is called as each row is scanned.
If fn returns an error, iteration stops and that error is returned. The rows are closed automatically.
Example:
err := client.ForEachRow(ctx, "SELECT id, name FROM users", nil,
func(row map[string]any) error {
fmt.Println(row["name"])
return nil
})
func (*Client) GrafanaDashboardJSON ¶
GrafanaDashboardJSON returns a pre-built Grafana dashboard JSON for gopgbase metrics. Users can import this into their Grafana instance.
func (*Client) HealthCheckHandler ¶
func (c *Client) HealthCheckHandler(w http.ResponseWriter, r *http.Request)
HealthCheckHandler returns an http.HandlerFunc that performs a database health check and responds with JSON.
func (*Client) ImportGrafanaDashboard ¶
ImportGrafanaDashboard pushes the gopgbase dashboard to a Grafana instance.
grafanaURL is the base URL (e.g., "http://grafana.local:3000"). apiKey is a Grafana API key with dashboard creation permissions.
func (*Client) QueryBuilder ¶
func (c *Client) QueryBuilder() *QueryBuilderDSL
QueryBuilder provides a fluent interface for constructing SQL queries.
It supports dual placeholder modes:
- MySQL-style ? placeholders: auto-converted to PostgreSQL $N before execution.
- Native PostgreSQL $N placeholders: passed through as-is.
- Mixing ? and $N in the same query is an error.
Example:
results, err := client.QueryBuilder().
Select("users").
Columns("id", "name", "email").
Where("age > ?", 18).
OrderBy("name ASC").
Limit(10).
Query(ctx)
func (*Client) ReadOnlyTransaction ¶
ReadOnlyTransaction executes fn within a read-only transaction.
The database will reject any write operations (INSERT, UPDATE, DELETE) inside fn, which is useful for ensuring SELECT-only logic.
func (*Client) Savepoint ¶
func (c *Client) Savepoint(ctx context.Context, tx *sql.Tx, name string, fn func(tx *sql.Tx) error) (err error)
Savepoint executes fn within a named savepoint inside an existing transaction.
If fn returns an error, the savepoint is rolled back (but the outer transaction remains active). If fn succeeds, the savepoint is released.
func (*Client) StructScan ¶
StructScan scans the current row of rows into the struct pointed to by dest.
It maps column names to struct fields using the "db" tag, falling back to the lowercase field name. Supports JSONB (scanned as json.RawMessage or any json.Unmarshaler) and PostgreSQL arrays (scanned as slices).
Example:
type User struct {
ID int `db:"id"`
Name string `db:"name"`
}
rows, _ := client.DataStore().QueryContext(ctx, "SELECT id, name FROM users")
for rows.Next() {
var u User
if err := client.StructScan(ctx, rows, &u); err != nil { ... }
}
func (*Client) Transaction ¶
Transaction executes fn within a database transaction.
Behavior:
- Starts a read/write transaction with default isolation via BeginTx.
- If fn returns nil, the transaction is committed.
- If fn returns an error, the transaction is rolled back and the error is returned.
- If fn panics, the transaction is rolled back and the panic is re-raised.
- Respects ctx cancellation and deadlines throughout.
Transaction is safe for concurrent use — each call gets its own *sql.Tx.
func (*Client) TransactionWithIsolation ¶
func (c *Client) TransactionWithIsolation(ctx context.Context, level sql.IsolationLevel, fn func(tx *sql.Tx) error) error
TransactionWithIsolation executes fn within a transaction at the given isolation level.
See sql.IsolationLevel constants (e.g., sql.LevelSerializable).
func (*Client) TunePool ¶
TunePool adjusts the connection pool parameters based on CPU cores and expected QPS.
It applies a heuristic: maxOpen = cpuCores * 2 + 1 (capped by qps/100), maxIdle = cpuCores, idleTimeout = 5 minutes. These are starting-point defaults; users should monitor and adjust as needed.
type DataStore ¶
type DataStore interface {
// QueryRowContext executes a query expected to return at most one row.
QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
// QueryContext executes a query that returns rows, typically a SELECT.
QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
// ExecContext executes a query without returning rows (INSERT, UPDATE, DELETE, etc.).
ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
// BeginTx starts a transaction with the given options.
BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
// PingContext verifies the connection to the database is alive.
PingContext(ctx context.Context) error
// Close closes the underlying connection pool and releases resources.
Close() error
}
DataStore defines the minimal database operations required by Client.
It mirrors key parts of *sql.DB but is abstract enough for custom implementations (mocks, alternative pools, or wrapped drivers).
All concrete adaptors in the adaptors package implement this interface. Users may also provide their own implementation for testing or custom setups.
Concrete adaptor types additionally expose Unwrap() *sql.DB for interop with tools (like goose) that require a raw *sql.DB. Unwrap is intentionally NOT part of this interface to keep it driver-agnostic.
type ExplainPlan ¶
type ExplainPlan struct {
JSONPlan map[string]any `json:"json_plan,omitempty"`
Query string `json:"query"`
Plan string `json:"plan"`
}
ExplainPlan holds the output of EXPLAIN ANALYZE.
type HealthStatus ¶
type HealthStatus struct {
Metadata map[string]any `json:"metadata,omitempty"`
Error string `json:"error,omitempty"`
Latency time.Duration `json:"latency_ns"`
Healthy bool `json:"healthy"`
}
HealthStatus represents the result of a database health check.
type ObservabilityLibrary ¶
type ObservabilityLibrary struct {
// contains filtered or unexported fields
}
ObservabilityLibrary provides database monitoring, metrics, and tracing.
func (*ObservabilityLibrary) ConnectionPoolMetrics ¶
func (o *ObservabilityLibrary) ConnectionPoolMetrics() map[string]int
ConnectionPoolMetrics returns connection pool statistics.
func (*ObservabilityLibrary) Enable ¶
func (o *ObservabilityLibrary) Enable(_ context.Context)
Enable activates observability collection.
func (*ObservabilityLibrary) ExplainAnalyze ¶
func (o *ObservabilityLibrary) ExplainAnalyze(ctx context.Context, query string, args ...any) (*ExplainPlan, error)
ExplainAnalyze runs EXPLAIN ANALYZE on the given query and returns the plan.
func (*ObservabilityLibrary) PrometheusExporter ¶
func (o *ObservabilityLibrary) PrometheusExporter(_ context.Context, port int) error
PrometheusExporter starts a Prometheus metrics HTTP server on the given port.
func (*ObservabilityLibrary) QueryMetrics ¶
func (o *ObservabilityLibrary) QueryMetrics() map[string]float64
QueryMetrics returns current query performance metrics as a map. Keys include "qps", "p95_ms", "p99_ms", "error_rate".
func (*ObservabilityLibrary) RecordQuery ¶
func (o *ObservabilityLibrary) RecordQuery(operation, query string, duration time.Duration, err error)
RecordQuery records a query execution for observability purposes.
func (*ObservabilityLibrary) SlowQueryDetector ¶
func (o *ObservabilityLibrary) SlowQueryDetector(_ context.Context, thresholdMS int) []SlowQuery
SlowQueryDetector returns queries that exceeded the given threshold.
func (*ObservabilityLibrary) TraceQueries ¶
func (o *ObservabilityLibrary) TraceQueries(_ context.Context, sampleRate float64)
TraceQueries enables OpenTelemetry tracing for sampled queries.
func (*ObservabilityLibrary) TraceQuery ¶
func (o *ObservabilityLibrary) TraceQuery(ctx context.Context, operation, query string) (context.Context, trace.Span)
TraceQuery creates an OTEL span for a query if sampling allows it.
func (*ObservabilityLibrary) UpdatePoolMetrics ¶
func (o *ObservabilityLibrary) UpdatePoolMetrics()
updatePoolMetrics refreshes connection pool gauge metrics.
type QueryBuilderDSL ¶
type QueryBuilderDSL struct {
// contains filtered or unexported fields
}
QueryBuilderDSL is a fluent SQL query builder.
func (*QueryBuilderDSL) Build ¶
func (qb *QueryBuilderDSL) Build() (string, []any, error)
Build constructs the final SQL query string and arguments. Placeholder conversion (? → $N) is applied here.
func (*QueryBuilderDSL) Columns ¶
func (qb *QueryBuilderDSL) Columns(cols ...string) *QueryBuilderDSL
Columns sets the columns to select. If not called, "*" is used.
func (*QueryBuilderDSL) Exec ¶
Exec executes the built query (for non-SELECT statements adapted to the builder).
func (*QueryBuilderDSL) GroupBy ¶
func (qb *QueryBuilderDSL) GroupBy(group string) *QueryBuilderDSL
GroupBy sets the GROUP BY clause.
func (*QueryBuilderDSL) Having ¶
func (qb *QueryBuilderDSL) Having(having string, args ...any) *QueryBuilderDSL
Having sets the HAVING clause (used with GroupBy).
func (*QueryBuilderDSL) Join ¶
func (qb *QueryBuilderDSL) Join(join string) *QueryBuilderDSL
Join adds a JOIN clause (e.g., "INNER JOIN orders ON users.id = orders.user_id").
func (*QueryBuilderDSL) Limit ¶
func (qb *QueryBuilderDSL) Limit(n int) *QueryBuilderDSL
Limit sets the maximum number of rows to return.
func (*QueryBuilderDSL) Offset ¶
func (qb *QueryBuilderDSL) Offset(n int) *QueryBuilderDSL
Offset sets the number of rows to skip.
func (*QueryBuilderDSL) OrderBy ¶
func (qb *QueryBuilderDSL) OrderBy(order string) *QueryBuilderDSL
OrderBy sets the ORDER BY clause.
func (*QueryBuilderDSL) Select ¶
func (qb *QueryBuilderDSL) Select(table string) *QueryBuilderDSL
Select sets the table for a SELECT query.
func (*QueryBuilderDSL) Where ¶
func (qb *QueryBuilderDSL) Where(condition string, args ...any) *QueryBuilderDSL
Where adds a WHERE condition. Multiple calls are ANDed together. Use ? or $N for placeholders.
type SlowQuery ¶
type SlowQuery struct {
Time time.Time `json:"time"`
Query string `json:"query"`
Duration time.Duration `json:"duration_ns"`
}
SlowQuery represents a query that exceeded the configured latency threshold.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package adaptors provides DataStore implementations for various PostgreSQL-compatible databases and services.
|
Package adaptors provides DataStore implementations for various PostgreSQL-compatible databases and services. |
|
libs
|
|
|
cockroachdb
Package cockroachdb provides CockroachDB-specific operations including multi-region management, distributed SQL, backup/restore, and CDC.
|
Package cockroachdb provides CockroachDB-specific operations including multi-region management, distributed SQL, backup/restore, and CDC. |
|
common
Package common provides shared utility functions that work across all PostgreSQL-compatible adaptors in gopgbase.
|
Package common provides shared utility functions that work across all PostgreSQL-compatible adaptors in gopgbase. |
|
neon
Package neon provides Neon serverless PostgreSQL-specific operations including database branching, compute scaling, connection pooler configuration, and pgvector support.
|
Package neon provides Neon serverless PostgreSQL-specific operations including database branching, compute scaling, connection pooler configuration, and pgvector support. |
|
postgres
Package postgres provides PostgreSQL-specific convenience operations including extension management, maintenance, and monitoring.
|
Package postgres provides PostgreSQL-specific convenience operations including extension management, maintenance, and monitoring. |
|
redshift
Package redshift provides Amazon Redshift-specific operations including vacuum/analyze, materialized views, WLM queue management, concurrency scaling, and Spectrum external tables.
|
Package redshift provides Amazon Redshift-specific operations including vacuum/analyze, materialized views, WLM queue management, concurrency scaling, and Spectrum external tables. |
|
supabase
Package supabase provides Supabase-specific convenience operations including Row Level Security, auth/JWT helpers, Edge Functions, and storage operations.
|
Package supabase provides Supabase-specific convenience operations including Row Level Security, auth/JWT helpers, Edge Functions, and storage operations. |
|
timescale
Package timescale provides TimescaleDB-specific operations including hypertable management, continuous aggregates, compression policies, retention policies, and hyperfunctions.
|
Package timescale provides TimescaleDB-specific operations including hypertable management, continuous aggregates, compression policies, retention policies, and hyperfunctions. |