pg

package module
v1.0.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2025 License: Apache-2.0 Imports: 16 Imported by: 0

README

go-pg

Postgresql Support for Go, built on top of pgx. This module provides:

  • Binding SQL statements to named arguments;
  • Support for mapping go structures to SQL tables, and vice versa;
  • Easy semantics for Insert, Delete, Update, Get and List operations;
  • Bulk insert operations and transactions;
  • Support for tracing and observability;
  • PostgreSQL Manager for server administration with REST API, an optional frontend and prometheus metrics;
  • Testing utilities for integration testing with testcontainers.

Documentation: https://pkg.go.dev/github.com/mutablelogic/go-pg

Motivation

The package provides a simple way to interact with a Postgresql database from Go, to reduce the amount of boilerplate code required to interact with the database. The supported operations align with API calls POST, PUT, GET, DELETE and PATCH.

  • Insert - Insert a row into a table, and return the inserted row (POST or PUT);
  • Delete - Delete one or more rows from a table, and optionally return the deleted rows (DELETE);
  • Update - Update one or more rows in a table, and optionally return the updated rows (PATCH);
  • Get - Get a single row from a table (GET);
  • List - Get a list of rows from a table (GET).

In order to support database operations on go types, those types need to implement one or more of the following interfaces:

Selector
type Selector interface {
  // Bind row selection variables, returning the SQL statement required for the operation
  // The operation can be Get, Update, Delete or List
  Select(*Bind, Op) (string, error)
}

A type which implements a Selector interface can be used to select rows from a table, for get, list, update and deleting operations.

Reader
type Reader interface {
  // Scan a row into the receiver
  Scan(Row) error
}

A type which implements a Reader interface can be used to translate SQL types to the type instance. If multiple rows are returned, then the Scan method is called repeatly until no more rows are returned.

type ListReader interface {
  // Scan a count of returned rows into the receiver
  ScanCount(Row) error
}

A type which implements a ListReader interface can be used to scan the count of rows returned.

Writer
type Writer interface {
  // Bind insert parameters, returning the SQL statement required for the insert
  Insert(*Bind) (string, error)

  // Bind update parameters
  Update(*Bind) error
}

A type which implements a Writer interface can be used to bind object instance variables to SQL parameters. An example of how to implement an API gateway using this package is shown below.

Database Server Connection Pool

You can create a connection pool to a database server using the pg.NewPool function:

import (
  pg "github.com/mutablelogic/go-pg"
)

func main() {
  pool, err := pg.NewPool(ctx,
    pg.WithHostPort(host, port),
    pg.WithCredentials("postgres", "password"),
    pg.WithDatabase(name),
  )
  if err != nil {
      panic(err)
  }
  defer pool.Close()

  // ...
}

The options that can be passed to pg.NewPool are:

  • WithURL(string) - Set connection parameters from a PostgreSQL URL in the format postgres://user:password@host:port/database?sslmode=disable. Query parameters are passed as additional connection options.
  • WithCredentials(string,string) - Set connection pool username and password. If the database name is not set, then the username will be used as the default database name.
  • WithDatabase(string) - Set the database name for the connection. If the user name is not set, then the database name will be used as the user name.
  • WithAddr(string) - Set the address (host) or (host:port) for the connection
  • WithHostPort(string, string) - Set the hostname and port for the connection. If the port is not set, then the default port 5432 will be used.
  • WithSSLMode( string) - Set the SSL connection mode. Valid values are "disable", "allow", "prefer", "require", "verify-ca", "verify-full". See https://www.postgresql.org/docs/current/libpq-ssl.html for more information.
  • pg.WithTrace(pg.TraceFn) - Set the trace function for the connection pool. The signature of the trace unction is func(ctx context.Context, sql string, args any, err error) and is called for every query executed by the connection pool.
  • pg.WithBind(string,any) - Set the bind variable to a value the the lifetime of the connection.

Executing Statements

To simply execute a statement, use the Exec call:

  if err := pool.Exec(ctx, `CREATE TABLE test (id SERIAL PRIMARY KEY, name TEXT)`); err != nil {
    panic(err)
  }

You can use bind variables to bind named arguments to a statement using the With function. Within the statement, the following formats are replaced with bound values:

  • ${"name"} - Replace with the value of the named argument "name", double-quoted string
  • ${'name'} - Replace with the value of the named argument "name", single-quoted string
  • ${name} - Replace with the value of the named argument "name", unquoted string
  • $$ - Pass a literal dollar sign
  • @name - Pass by bound variable parameter

For example,

  var name string
  // ...
  if err := pool.With("table", "test", "name", name).Exec(ctx, `INSERT INTO ${"table"} (name) VALUES (@name)`); err != nil {
    panic(err)
  }

This will re-use or create a new database connection from the connection, pool, bind the named arguments, replace the named arguments in the statement, and execute the statement.

Implementing Get

If you have a http handler which needs to get a row from a table, you can implement a Selector interface. For example,

type MyObject struct {
  Id int
  Name string
}

// Reader - bind to object
func (obj *MyObject) Scan(row pg.Row) error {
  return row.Scan(&obj.Id, &obj.Name)
}

// Selector - select rows from database
func (obj MyObject) Select(bind *pg.Bind, op pg.Op) (string, error) {
  switch op {
  case pg.Get:
    bind.Set("id", obj.Id)
    return `SELECT id, name FROM mytable WHERE id=@id`, nil
  }
}

// Select the row from the database
func main() {
  // ...
  var obj MyObject
  if err := conn.Get(ctx, &obj, MyObject{ Id: 1 }); err != nil {
    panic(err)
  }
  // ...
}

Implementing List

You may wish to use paging to list rows from a table. The List operation is used to list rows from a table, with offset and limit parameters. The http handler may look like this:

func ListHandler(w http.ResponseWriter, r *http.Request) {
  var conn pg.Conn

  // ....Set pool....

  // Get up to 10 rows
  var response MyList
  if err := conn.List(ctx, &response, MyListRequest{Offset: 0, Limit: 10}); err != nil {
    http.Error(w, err.Error(), http.StatusInternalServerError)
    return
  }

  // Write the row to the response - TODO: Add Content-Type header
  json.NewEncoder(w).Encode(response)
}

The implementation of MyList and MyListRequest may look like this:

type MyListRequest struct {
  Offset uint64
  Limit uint64
}

type MyList struct {
  Count uint64
  Names []string
}

// Reader - note this needs to be a pointer receiver
func (obj *MyList) Scan(row pg.Row) error {
  var name string
  if err := row.Scan(&name); err != nil {
    return err
  }
  obj = append(obj, row.String())
  return nil
}

// ListReader - optional interface to scan count of all rows
func (obj MyList) Scan(row pg.Row) error {
 return row.Scan(&obj.Count)
}

// Selector - select rows from database. Use bind variables
// offsetlimit, groupby and orderby to filter the selected rows.
func (obj MyListRequest) Select(bind *pg.Bind, op pg.Op) (string, error) {
  bind.Set("offsetlimit", fmt.Sprintf("OFFSET %v LIMIT %v",obj.Offset,obj.Limit))
  switch op {
  case pg.List:
    return `SELECT name FROM mytable`, nil
  default:
    return "", fmt.Errorf("Unsupported operation: ",op)
  }
}

You can of course use a WHERE clause in your query to filter the rows returned from the table. Always implement the offsetlimit as a bind variable.

Implementing Insert

To insert a row into a table, implement the Writer interface:

type MyObject struct {
  Id   int
  Name string
}

// Writer - bind insert parameters
func (obj MyObject) Insert(bind *pg.Bind) (string, error) {
  bind.Set("name", obj.Name)
  return `INSERT INTO mytable (name) VALUES (@name) RETURNING id, name`, nil
}

// Reader - scan the returned row
func (obj *MyObject) Scan(row pg.Row) error {
  return row.Scan(&obj.Id, &obj.Name)
}

// Insert a row
func main() {
  // ...
  obj := MyObject{Name: "hello"}
  if err := conn.Insert(ctx, &obj, obj); err != nil {
    panic(err)
  }
  fmt.Println("Inserted with ID:", obj.Id)
}

The RETURNING clause allows you to get the inserted row back, including any auto-generated values like serial IDs.

Implementing Patch

To update rows in a table, implement both Selector (to identify rows) and Writer (for update values):

type MyObject struct {
  Id   int
  Name string
}

// Selector - identify rows to update
func (obj MyObject) Select(bind *pg.Bind, op pg.Op) (string, error) {
  switch op {
  case pg.Update:
    bind.Set("id", obj.Id)
    return `UPDATE mytable SET name = @name WHERE id = @id RETURNING id, name`, nil
  }
  return "", pg.ErrNotImplemented
}

// Writer - bind update parameters
func (obj MyObject) Update(bind *pg.Bind) error {
  bind.Set("name", obj.Name)
  return nil
}

// Reader - scan the returned row
func (obj *MyObject) Scan(row pg.Row) error {
  return row.Scan(&obj.Id, &obj.Name)
}

// Update a row
func main() {
  // ...
  obj := MyObject{Id: 1, Name: "updated"}
  if err := conn.Update(ctx, &obj, obj); err != nil {
    panic(err)
  }
}

Implementing Delete

To delete rows from a table, implement the Selector interface:

type MyObject struct {
  Id   int
  Name string
}

// Selector - identify rows to delete
func (obj MyObject) Select(bind *pg.Bind, op pg.Op) (string, error) {
  switch op {
  case pg.Delete:
    bind.Set("id", obj.Id)
    return `DELETE FROM mytable WHERE id = @id RETURNING id, name`, nil
  }
  return "", pg.ErrNotImplemented
}

// Reader - optionally scan deleted rows
func (obj *MyObject) Scan(row pg.Row) error {
  return row.Scan(&obj.Id, &obj.Name)
}

// Delete a row
func main() {
  // ...
  var deleted MyObject
  if err := conn.Delete(ctx, &deleted, MyObject{Id: 1}); err != nil {
    panic(err)
  }
  fmt.Println("Deleted:", deleted.Name)
}

The RETURNING clause is optional but useful for confirming what was deleted.

Transactions

Transactions are executed within a function called Tx. For example,

  if err := pool.Tx(ctx, func(tx pg.Tx) error {
    if err := tx.Exec(ctx, `CREATE TABLE test (id SERIAL PRIMARY KEY, name TEXT)`); err != nil {
      return err
    }
    if err := tx.Exec(ctx, `INSERT INTO test (name) VALUES ('hello')`); err != nil {
      return err
    }
    return nil
  }); err != nil {
    panic(err)
  }

Any error returned from the function will cause the transaction to be rolled back. If the function returns nil, then the transaction will be committed. Transactions can be nested.

Notify and Listen

PostgreSQL supports asynchronous notifications via NOTIFY and LISTEN. Use pg.NewListener to subscribe to channels:

import pg "github.com/mutablelogic/go-pg"

// Create a listener
listener, err := pg.NewListener(ctx, pool, "my_channel")
if err != nil {
  panic(err)
}
defer listener.Close()

// Listen for notifications
for {
  select {
  case notification := <-listener.C():
    fmt.Printf("Channel: %s, Payload: %s\n", notification.Channel, notification.Payload)
  case <-ctx.Done():
    return
  }
}

To send a notification from another connection:

if err := pool.Exec(ctx, `NOTIFY my_channel, 'hello world'`); err != nil {
  panic(err)
}

Schema Support

The package provides convenience functions for managing PostgreSQL schemas:

import pg "github.com/mutablelogic/go-pg"

// Check if a schema exists
exists, err := pg.SchemaExists(ctx, conn, "myschema")

// Create a schema (IF NOT EXISTS)
err := pg.SchemaCreate(ctx, conn, "myschema")

// Drop a schema (IF EXISTS, CASCADE)
err := pg.SchemaDrop(ctx, conn, "myschema")

Error Handing and Tracing

The package provides typed errors for common PostgreSQL conditions:

import pg "github.com/mutablelogic/go-pg"

if err := conn.Get(ctx, &obj, req); err != nil {
  if errors.Is(err, pg.ErrNotFound) {
    // Row not found
  } else if errors.Is(err, pg.ErrDuplicate) {
    // Unique constraint violation
  } else if errors.Is(err, pg.ErrBadParameter) {
    // Invalid parameter
  } else {
    // Other error
  }
}

To enable query tracing, pass a trace function when creating the pool:

pool, err := pg.NewPool(ctx,
  pg.WithHostPort(host, port),
  pg.WithCredentials(user, pass),
  pg.WithTrace(func(ctx context.Context, sql string, args any, err error) {
    if err != nil {
      log.Printf("ERROR: %s: %v", sql, err)
    } else {
      log.Printf("SQL: %s args=%v", sql, args)
    }
  }),
)

The trace function is called for every query executed through the connection pool.

Testing Support

The pkg/test package provides utilities for integration testing with PostgreSQL using testcontainers.

See pkg/test/README.md for documentation.

PostgreSQL Manager

The pkg/manager package provides a comprehensive API for managing PostgreSQL server resources including roles, databases, schemas, tables, connections, replication slots, and more. It includes a REST API with Prometheus metrics.

See pkg/manager/README.md for documentation.

Documentation

Overview

Package pg provides PostgreSQL support for Go, built on top of pgx.

It provides binding SQL statements to named arguments, mapping Go structures to SQL tables, and easy semantics for Insert, Delete, Update, Get and List operations. The package also supports bulk insert operations, transactions, and tracing for observability.

Connection Pool

Create a connection pool using NewPool:

pool, err := pg.NewPool(ctx,
    pg.WithURL("postgres://user:pass@localhost:5432/dbname"),
)
if err != nil {
    panic(err)
}
defer pool.Close()

Executing Queries

Use bind variables with the With method:

err := pool.With("table", "users").Exec(ctx, `SELECT * FROM ${"table"}`)

CRUD Operations

Implement the Reader, Writer, and Selector interfaces on your types to enable Insert, Update, Delete, Get, and List operations:

err := pool.Insert(ctx, &obj, obj)    // Insert
err := pool.Get(ctx, &obj, selector)  // Get
err := pool.List(ctx, &list, request) // List
err := pool.Update(ctx, &obj, selector, writer) // Update
err := pool.Delete(ctx, &obj, selector) // Delete

Index

Constants

View Source
const (
	DefaultPort = "5432"
)

Variables

This section is empty.

Functions

func NewTracer

func NewTracer(fn TraceFn) *tracer

NewTracer creates a new query tracer.

func SchemaCreate

func SchemaCreate(ctx context.Context, conn Conn, name string) error

SchemaCreate creates a schema with the given name if it does not exist.

func SchemaDrop

func SchemaDrop(ctx context.Context, conn Conn, name string) error

SchemaDrop drops a schema with the given name if it exists.

func SchemaExists

func SchemaExists(ctx context.Context, conn Conn, name string) (bool, error)

SchemaExists returns true if a schema with the given name exists.

Types

type Bind

type Bind struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Bind represents a set of variables and arguments to be used in a query. The vars are substituted in the query string itself, while the args are passed as arguments to the query.

func NewBind

func NewBind(pairs ...any) *Bind

NewBind creates a new Bind object with the given name/value pairs. Returns nil if the number of arguments is not even.

func (*Bind) Append

func (bind *Bind) Append(key string, value any) bool

Append appends a bind var to a list. Returns false if the key is not a list, or the value is not a list.

func (*Bind) Copy

func (bind *Bind) Copy(pairs ...any) *Bind

Copy creates a copy of the bind object with additional name/value pairs.

func (*Bind) Del

func (bind *Bind) Del(key string)

Del deletes a bind var.

func (*Bind) Exec

func (bind *Bind) Exec(ctx context.Context, conn pgx.Tx, query string) error

Exec executes a query.

func (*Bind) Get

func (bind *Bind) Get(key string) any

Get returns a bind var by key.

func (*Bind) Has

func (bind *Bind) Has(key string) bool

Has returns true if there is a bind var with the given key.

func (*Bind) Join

func (bind *Bind) Join(key, sep string) string

Join joins a bind var with a separator when it is a []any and returns the result as a string. Returns an empty string if the key does not exist.

func (*Bind) MarshalJSON

func (bind *Bind) MarshalJSON() ([]byte, error)

func (*Bind) Query

func (bind *Bind) Query(ctx context.Context, conn pgx.Tx, query string) (pgx.Rows, error)

Query a set of rows and return the result

func (*Bind) QueryRow

func (bind *Bind) QueryRow(ctx context.Context, conn pgx.Tx, query string) pgx.Row

QueryRow queries a single row and returns the result.

func (*Bind) Replace

func (bind *Bind) Replace(query string) string

Replace returns a query string with ${subtitution} replaced by the values:

  • ${key} => value
  • ${'key'} => 'value'
  • ${"key"} => "value"
  • $1 => $1
  • $$ => $$

func (*Bind) Set

func (bind *Bind) Set(key string, value any) string

Set sets a bind var and returns the parameter name.

func (*Bind) String

func (bind *Bind) String() string

type Conn

type Conn interface {
	// Return a new connection with bound parameters
	With(...any) Conn

	// Return a connection to a remote database
	Remote(database string) Conn

	// Perform a transaction within a function
	Tx(context.Context, func(Conn) error) error

	// Perform a bulk operation within a function (and indicate whether this
	// should be in a transaction)
	Bulk(context.Context, func(Conn) error) error

	// Execute a query
	Exec(context.Context, string) error

	// Perform an insert
	Insert(context.Context, Reader, Writer) error

	// Perform an update
	Update(context.Context, Reader, Selector, Writer) error

	// Perform a delete
	Delete(context.Context, Reader, Selector) error

	// Perform a get
	Get(context.Context, Reader, Selector) error

	// Perform a list. If the reader is a ListReader, then the
	// count of items is also calculated
	List(context.Context, Reader, Selector) error
}

type Err

type Err int
const (
	ErrSuccess Err = iota
	ErrNotFound
	ErrNotImplemented
	ErrBadParameter
	ErrNotAvailable
)

func (Err) Error

func (e Err) Error() string

Error returns the string representation of the error.

func (Err) With

func (e Err) With(a ...any) error

With returns the error with additional context appended.

func (Err) Withf

func (e Err) Withf(format string, a ...any) error

Withf returns the error with formatted context appended.

type ListReader

type ListReader interface {
	Reader

	// Scan count into the result
	ScanCount(Row) error
}

ListReader scans database rows and counts total results.

type Listener

type Listener interface {
	// Listen to a topic
	Listen(context.Context, string) error

	// Unlisten from a topic
	Unlisten(context.Context, string) error

	// Wait for a notification and return it
	WaitForNotification(context.Context) (*Notification, error)

	// Free resources
	Close(context.Context) error
}

Listener is an interface for listening to notifications

type Notification

type Notification struct {
	Channel string
	Payload []byte
}

type OffsetLimit

type OffsetLimit struct {
	Offset uint64  `json:"offset,omitempty"`
	Limit  *uint64 `json:"limit,omitempty"`
}

func (*OffsetLimit) Bind

func (r *OffsetLimit) Bind(bind *Bind, max uint64)

Bind sets the offset and limit SQL fragment on the bind object.

func (*OffsetLimit) Clamp

func (r *OffsetLimit) Clamp(len uint64)

Clamp restricts the limit to the maximum length.

type Op

type Op uint

Op represents a database operation type.

const (
	None Op = iota
	Get
	Insert
	Update
	Delete
	List
)

Operations

func (Op) String

func (o Op) String() string

type Opt

type Opt func(*opt) error

Opt is a function which applies options for a connection pool

func WithAddr

func WithAddr(addr string) Opt

WithAddr sets the address (host) or (host:port) for the connection.

func WithBind

func WithBind(k string, v any) Opt

WithBind sets a bind variable for the connection pool.

func WithCredentials

func WithCredentials(user, password string) Opt

WithCredentials sets the connection pool username and password. If the database name is not set, then the username will be used as the default database name.

func WithDatabase

func WithDatabase(name string) Opt

WithDatabase sets the database name for the connection. If the user name is not set, then the database name will be used as the user name.

func WithHostPort

func WithHostPort(host, port string) Opt

WithHostPort sets the hostname and port for the connection. If the port is not set, then the default port 5432 will be used.

func WithSSLMode

func WithSSLMode(mode string) Opt

WithSSLMode sets the PostgreSQL SSL mode. Valid values are "disable", "allow", "prefer", "require", "verify-ca", "verify-full".

func WithTrace

func WithTrace(fn TraceFn) Opt

WithTrace sets the trace function for the connection pool.

func WithURL

func WithURL(value string) Opt

WithURL sets connection parameters from a PostgreSQL URL.

type PoolConn

type PoolConn interface {
	Conn

	// Acquire a connection and ping it
	Ping(context.Context) error

	// Release resources
	Close()

	// Reset the connection pool
	Reset()

	// Return a listener for the connection pool
	Listener() Listener
}

func NewPool

func NewPool(ctx context.Context, opts ...Opt) (PoolConn, error)

NewPool creates a new connection pool to a PostgreSQL server.

type Reader

type Reader interface {
	// Scan row into a result
	Scan(Row) error
}

Reader scans a database row into an object.

type Row

type Row pgx.Row

Row is a pgx.Row for scanning query results.

type Selector

type Selector interface {
	// Set bind parameters for getting, updating or deleting
	Select(*Bind, Op) (string, error)
}

Selector binds parameters for get, update, or delete operations.

type TraceFn

type TraceFn func(context.Context, string, any, error)

TraceFn is a function which is called when a query is executed, with the execution context, the SQL and arguments, and the error if any was generated

type Writer

type Writer interface {
	// Set bind parameters for an insert
	Insert(*Bind) (string, error)

	// Set bind parameters for an update
	Update(*Bind) error
}

Writer binds object fields for insert or update operations.

Directories

Path Synopsis
cmd
example command
example2 command
pgmanager command
pkg
manager
Package manager provides a comprehensive API for managing PostgreSQL server resources including roles, databases, schemas, tables, connections, and more.
Package manager provides a comprehensive API for managing PostgreSQL server resources including roles, databases, schemas, tables, connections, and more.
manager/httpclient
Package httpclient provides a typed Go client for consuming the PostgreSQL management REST API.
Package httpclient provides a typed Go client for consuming the PostgreSQL management REST API.
manager/httphandler
Package httphandler provides REST API endpoints for PostgreSQL management operations.
Package httphandler provides REST API endpoints for PostgreSQL management operations.
manager/schema
Package schema defines all data types, request/response structures, and SQL queries for PostgreSQL management resources.
Package schema defines all data types, request/response structures, and SQL queries for PostgreSQL management resources.
test
Package test provides utilities for integration testing with PostgreSQL using testcontainers.
Package test provides utilities for integration testing with PostgreSQL using testcontainers.
types
Package types provides utility functions for pointer conversions and string formatting.
Package types provides utility functions for pointer conversions and string formatting.
version
Package version provides build version information.
Package version provides build version information.
wasm
pgmanager command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL