harmonyquery

package module
v0.0.0-...-9243aa5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2026 License: MIT Imports: 32 Imported by: 0

README

HarmonyQuery

A Postgres/Yugabyte adapter in harmony with busy developers. The top few dev mistakes with DB

Features

  • Rolling to secondary database servers on connection failure
  • Convenience features for Go + SQL
  • Prevention of SQL injection vulnerabilities
  • Prevents non-transaction calls in a transaction (really hard to debug)
  • Requires context, so cancel at your lowest layer is guaranteed plumbed.
  • Includes your migrations and test isolation so your dev's db instance can run tests and have a main instance
  • Discourages dangling transactions & dangling result cursors.
  • Conveniences: QueryRow().Scan(&a) and: var a []resultType; db.Select(&a, "query")
  • Monitors behavior via Prometheus stats and logging of errors
  • Entirely within the Go1 promise. No unsafe.

Notes

  • Only DB can be shared across threads.

Installation

go get github.com/curiostorage/harmonyquery

Usage

package main

import (
    "context"
    "github.com/curiostorage/harmonyquery"
)

//go:embed sql
var upgradeFS embed.FS

//go:embed downgrade
var downgradeFS embed.FS

func main() {
    db, err := harmonyquery.NewFromConfig(harmonyquery.Config{
        Hosts:       []string{"localhost"},
        Username:    "yugabyte",
        Password:    "yugabyte",
        Database:    "yugabyte",
        Port:        "5433",
        LoadBalance: false,
        Schema:      "BobsFishery",
        UpgradeFS:   &upgradeFS,
        DowngradeF:  &downgradeFS,
        SSLMode:     "require",
    })
    if err != nil {
        panic(err)
    }

    // Execute queries
    ctx := context.Background()
    
    // Insert/Update/Delete
    count, err := db.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", "Alice")
    
    // Select into struct slice
    var users []struct {
        ID   int
        Name string
    }
    err = db.Select(ctx, &users, "SELECT id, name FROM users")
    
    // Query single row
    var name string
    err = db.QueryRow(ctx, "SELECT name FROM users WHERE id = $1", 1).Scan(&name)
    
    // Transactions
    committed, err := db.BeginTransaction(ctx, func(tx *harmonyquery.Tx) (bool, error) {
        _, err := tx.Exec("UPDATE users SET name = $1 WHERE id = $2", "Bob", 1)
        if err != nil {
            return false, err
        }
        return true, nil // commit
    })
}

Schema Migrations

SQL migrations are embedded in the package and automatically applied on connection.

Place migration files in the sql/ folder with naming convention: YYYYMMDD-description.sql

Rules:

  • CREATE TABLE should NOT have a schema (managed automatically)
  • Never change shipped SQL files - create new files for corrections
  • All migrations run once in order

License

MIT License - see LICENSE for details.

Documentation

Overview

HarmonyQuery provides database abstractions over SP-wide Postgres-compatible instance(s).

Features

Rolling to secondary database servers on connection failure
Convenience features for Go + SQL
Prevention of SQL injection vulnerabilities
Monitors behavior via Prometheus stats and logging of errors.

Usage

Processes should use New() to instantiate a *DB and keep it. Consumers can use this *DB concurrently. Creating and changing tables & views should happen in ./sql/ folder. Name the file "today's date" in the format: YYYYMMDD.sql (ex: 20231231.sql for the year's last day)

a. CREATE TABLE should NOT have a schema:
	GOOD: CREATE TABLE foo ();
	BAD:  CREATE TABLE me.foo ();
b. Schema is managed for you. It provides isolation for integraton tests & multi-use.
c. Git Merges: All run once, so old-after-new is OK when there are no deps.
d. NEVER change shipped sql files. Have later files make corrections.
e. Anything not ran will be ran, so an older date making it to master is OK.

Write SQL with context, raw strings, and args:

name := "Alice"
var ID int
err := QueryRow(ctx, "SELECT id from people where first_name=?", name).Scan(&ID)
fmt.Println(ID)

Note: Scan() is column-oriented, while Select() & StructScan() is field name/tag oriented.

Index

Constants

View Source
const SQL_START = ctxkey("sqlStart")
View Source
const SQL_STRING = ctxkey("sqlString")

Variables

View Source
var DBMeasures = struct {
	Hits            *stats.Int64Measure
	TotalWait       *stats.Int64Measure
	Waits           prometheus.Histogram
	OpenConnections *stats.Int64Measure
	Errors          *stats.Int64Measure
	WhichHost       prometheus.Histogram
}{
	Hits:      stats.Int64(pre+"hits", "Total number of uses.", stats.UnitDimensionless),
	TotalWait: stats.Int64(pre+"total_wait", "Total delay. A numerator over hits to get average wait.", stats.UnitMilliseconds),
	Waits: prometheus.NewHistogram(prometheus.HistogramOpts{
		Name:    pre + "waits",
		Buckets: waitsBuckets,
		Help:    "The histogram of waits for query completions.",
	}),
	OpenConnections: stats.Int64(pre+"open_connections", "Total connection count.", stats.UnitDimensionless),
	Errors:          stats.Int64(pre+"errors", "Total error count.", stats.UnitDimensionless),
	WhichHost: prometheus.NewHistogram(prometheus.HistogramOpts{
		Name:    pre + "which_host",
		Buckets: whichHostBuckets,
		Help:    "The index of the hostname being used",
	}),
}

DBMeasures groups all db metrics.

View Source
var DefaultHostEnv = "HARMONYQUERY_HOSTS"
View Source
var DefaultSchema = "curio"
View Source
var ITestUpgradeFunc func(*pgxpool.Pool, string, string)

Functions

func IsErrDDLConflict

func IsErrDDLConflict(err error) bool

IsErrDDLConflict returns true if the error is a DDL conflict (object already exists or doesn't exist)

func IsErrSerialization

func IsErrSerialization(err error) bool

func IsErrUniqueContraint

func IsErrUniqueContraint(err error) bool

func Iter

func Iter[T any](db *DB, ctx context.Context, errOut *error, sql rawStringOnly, args ...any) iter.Seq[T]

Iter returns an iterator that scans each row into a value of type T. T is typically a struct with fields matching the query's columns (via name or `db` tags), but a single-column result can scan into a primitive type. The underlying cursor is closed automatically when the iterator finishes or the caller breaks out of the loop.

On any error (query, scan, or cursor), iteration stops and *errOut is set.

Example:

var err error
for user := range harmonyquery.Iter[User](db, ctx, &err, "SELECT name, id FROM users WHERE active = $1", true) {
    fmt.Println(user.Name, user.ID)
}
if err != nil {
    return err
}

func TxIter

func TxIter[T any](tx *Tx, errOut *error, sql rawStringOnly, args ...any) iter.Seq[T]

TxIter is the transaction variant of Iter. It returns an iterator that scans each row into a value of type T within an open transaction.

Example:

db.BeginTransaction(ctx, func(tx *Tx) (bool, error) {
    var err error
    for user := range harmonyquery.TxIter[User](tx, &err, "SELECT name, id FROM users") {
        fmt.Println(user.Name, user.ID)
    }
    return err == nil, err
})

Types

type Config

type Config struct {
	// HOSTS is a list of hostnames to nodes running YugabyteDB
	// in a cluster. Only 1 is required
	Hosts []string

	// The Yugabyte server's username with full credentials to operate on Lotus' Database. Blank for default.
	Username string

	// The password for the related username. Blank for default.
	Password string

	// The database (logical partition) within Yugabyte. Blank for default.
	Database string

	// The port to find Yugabyte. Blank for default.
	Port string

	// Load Balance the connection over multiple nodes
	LoadBalance bool

	// SSL Mode for the connection
	SSLMode string

	// Schema to use for the connection
	Schema string

	// ApplicationName is sent to PostgreSQL as the application_name connection parameter,
	// visible in pg_stat_activity. Defaults to the binary name (os.Args[0]).
	ApplicationName string

	SqlEmbedFS *embed.FS

	DowngradeEmbedFS *embed.FS

	ITestID ITestID

	*PoolConfig // Set all or nothing. We use every value.
}

type DB

type DB struct {
	BTFPOnce sync.Once
	BTFP     uintptr // A PC only in your stack when you call BeginTransaction()
	// contains filtered or unexported fields
}

func New

func New(hosts []string, username, password, database, port string, loadBalance bool, itestID ITestID) (*DB, error)

New is to be called once per binary to establish the pool. log() is for errors. It returns an upgraded database's connection. This entry point serves both production and integration tests, so it's more DI.

func NewFromConfig

func NewFromConfig(options Config) (*DB, error)

func NewFromConfigWithITestID

func NewFromConfigWithITestID(t *testing.T, id ITestID) (*DB, error)

func (*DB) BeginTransaction

func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, err error), opt ...TransactionOption) (didCommit bool, retErr error)

BeginTransaction is how you can access transactions using this library. The entire transaction happens in the function passed in. The return must be true or a rollback will occur. Be sure to test the error for IsErrSerialization() if you want to retry

when there is a DB serialization error.

func (*DB) DowngradeTo

func (db *DB) DowngradeTo(ctx context.Context, dateNum int) error

DowngradeTo downgrades the database schema to a previous date (when an upgrade was applied). Note: these dates (YYYYMMDD) are not the SQL date but the date the user did an upgrade.

func (*DB) Exec

func (db *DB) Exec(ctx context.Context, sql rawStringOnly, arguments ...any) (count int, err error)

Exec executes changes (INSERT, DELETE, or UPDATE). Note, for CREATE & DROP please keep these permanent and express them in the ./sql/ files (next number).

func (*DB) GetRoutableIP

func (db *DB) GetRoutableIP() (string, error)

func (*DB) ITestDeleteAll

func (db *DB) ITestDeleteAll()

ITestDeleteAll will delete everything created for "this" integration test. This must be called at the end of each integration test.

func (*DB) Query

func (db *DB) Query(ctx context.Context, sql rawStringOnly, arguments ...any) (*Query, error)

Query allows iterating returned values to save memory consumption with the downside of needing to `defer q.Close()`. For a simpler interface, try Select() Next() must be called to advance the row cursor, including the first time: Ex: q, err := db.Query(ctx, "SELECT id, name FROM users") handleError(err) defer q.Close()

for q.Next() {
	  var id int
   var name string
   handleError(q.Scan(&id, &name))
   fmt.Println(id, name)
}

func (*DB) QueryRow

func (db *DB) QueryRow(ctx context.Context, sql rawStringOnly, arguments ...any) Row

QueryRow gets 1 row using column order matching. This is a timesaver for the special case of wanting the first row returned only. EX:

var name, pet string
var ID = 123
err := db.QueryRow(ctx, "SELECT name, pet FROM users WHERE ID=?", ID).Scan(&name, &pet)

func (*DB) Select

func (db *DB) Select(ctx context.Context, sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error

Select multiple rows into a slice using name matching Ex:

type user struct {
	Name string
	ID int
	Number string `db:"tel_no"`
}

var users []user
pet := "cat"
err := db.Select(ctx, &users, "SELECT name, id, tel_no FROM customers WHERE pet=?", pet)

type ITestID

type ITestID string

func ITestNewID

func ITestNewID() ITestID

ITestNewID see ITestWithID doc

type PoolConfig

type PoolConfig struct {
	MaxConnections        int
	MinConnections        int
	MaxConnectionLifetime time.Duration
	MaxIdleTime           time.Duration
}

type Qry

type Qry interface {
	Next() bool
	Err() error
	Close()
	Scan(...any) error
	Values() ([]any, error)
}

type Query

type Query struct {
	Qry
}

Query offers Next/Err/Close/Scan/Values

func (*Query) StructScan

func (q *Query) StructScan(s any) error

StructScan allows scanning a single row into a struct. This improves efficiency of processing large result sets by avoiding the need to allocate a slice of structs.

type Row

type Row interface {
	Scan(...any) error
}

type TransactionOption

type TransactionOption func(*TransactionOptions)

func OptionRetry

func OptionRetry() TransactionOption

type TransactionOptions

type TransactionOptions struct {
	RetrySerializationError bool
}

type Tx

type Tx struct {
	// contains filtered or unexported fields
}

func (*Tx) Exec

func (t *Tx) Exec(sql rawStringOnly, arguments ...any) (count int, err error)

Exec in a transaction.

func (*Tx) Query

func (t *Tx) Query(sql rawStringOnly, arguments ...any) (*Query, error)

Query in a transaction.

func (*Tx) QueryRow

func (t *Tx) QueryRow(sql rawStringOnly, arguments ...any) Row

QueryRow in a transaction.

func (*Tx) Select

func (t *Tx) Select(sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error

Select in a transaction.

func (*Tx) SendBatch

func (t *Tx) SendBatch(ctx context.Context, b *pgx.Batch) (pgx.BatchResults, error)

SendBatch in a transaction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL