pgutil

package module
v0.0.0-...-93ba6c1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2024 License: MIT Imports: 30 Imported by: 4

README

Nacelle Postgres Utilities

PkgGoDev Build status Latest release

Postgres utilities for use with nacelle.


Usage

This library creates a Postgres connection wrapped in a nacelle logger. The supplied initializer adds this connection into the nacelle service container under the key db. The initializer will block until a ping succeeds.

func setup(processes nacelle.ProcessContainer, services nacelle.ServiceContainer) error {
    processes.RegisterInitializer(pgutil.NewInitializer())

    // additional setup
    return nil
}
Configuration

The default service behavior can be configured by the following environment variables.

Environment Variable Required Default Description
DATABASE_URL yes The connection string of the remote database.
LOG_SQL_QUERIES false Whether or not to log parameterized SQL queries.

Documentation

Index

Constants

View Source
const MaxPingAttempts = 15
View Source
const ServiceName = "db"

Variables

View Source
var (
	ErrDoesNotExist  = fmt.Errorf("record does not exist")
	ErrAlreadyExists = fmt.Errorf("record already exists")
)
View Source
var (
	ScanAny           = NewFirstScanner(NewAnyValueScanner[any]())
	ScanAnys          = NewSliceScanner(NewAnyValueScanner[any]())
	ScanBool          = NewFirstScanner(NewAnyValueScanner[bool]())
	ScanBools         = NewSliceScanner(NewAnyValueScanner[bool]())
	ScanFloat32       = NewFirstScanner(NewAnyValueScanner[float32]())
	ScanFloat32s      = NewSliceScanner(NewAnyValueScanner[float32]())
	ScanFloat64       = NewFirstScanner(NewAnyValueScanner[float64]())
	ScanFloat64s      = NewSliceScanner(NewAnyValueScanner[float64]())
	ScanInt           = NewFirstScanner(NewAnyValueScanner[int]())
	ScanInts          = NewSliceScanner(NewAnyValueScanner[int]())
	ScanInt16         = NewFirstScanner(NewAnyValueScanner[int16]())
	ScanInt16s        = NewSliceScanner(NewAnyValueScanner[int16]())
	ScanInt32         = NewFirstScanner(NewAnyValueScanner[int32]())
	ScanInt32s        = NewSliceScanner(NewAnyValueScanner[int32]())
	ScanInt64         = NewFirstScanner(NewAnyValueScanner[int64]())
	ScanInt64s        = NewSliceScanner(NewAnyValueScanner[int64]())
	ScanInt8          = NewFirstScanner(NewAnyValueScanner[int8]())
	ScanInt8s         = NewSliceScanner(NewAnyValueScanner[int8]())
	ScanString        = NewFirstScanner(NewAnyValueScanner[string]())
	ScanStrings       = NewSliceScanner(NewAnyValueScanner[string]())
	ScanUint          = NewFirstScanner(NewAnyValueScanner[uint]())
	ScanUints         = NewSliceScanner(NewAnyValueScanner[uint]())
	ScanUint16        = NewFirstScanner(NewAnyValueScanner[uint16]())
	ScanUint16s       = NewSliceScanner(NewAnyValueScanner[uint16]())
	ScanUint32        = NewFirstScanner(NewAnyValueScanner[uint32]())
	ScanUint32s       = NewSliceScanner(NewAnyValueScanner[uint32]())
	ScanUint64        = NewFirstScanner(NewAnyValueScanner[uint64]())
	ScanUint64s       = NewSliceScanner(NewAnyValueScanner[uint64]())
	ScanUint8         = NewFirstScanner(NewAnyValueScanner[uint8]())
	ScanUint8s        = NewSliceScanner(NewAnyValueScanner[uint8]())
	ScanTimestamp     = NewFirstScanner(NewAnyValueScanner[time.Time]())
	ScanTimestamps    = NewSliceScanner(NewAnyValueScanner[time.Time]())
	ScanNilBool       = NewFirstScanner(NewAnyValueScanner[*bool]())
	ScanNilBools      = NewSliceScanner(NewAnyValueScanner[*bool]())
	ScanNilFloat32    = NewFirstScanner(NewAnyValueScanner[*float32]())
	ScanNilFloat32s   = NewSliceScanner(NewAnyValueScanner[*float32]())
	ScanNilFloat64    = NewFirstScanner(NewAnyValueScanner[*float64]())
	ScanNilFloat64s   = NewSliceScanner(NewAnyValueScanner[*float64]())
	ScanNilInt        = NewFirstScanner(NewAnyValueScanner[*int]())
	ScanNilInts       = NewSliceScanner(NewAnyValueScanner[*int]())
	ScanNilInt16      = NewFirstScanner(NewAnyValueScanner[*int16]())
	ScanNilInt16s     = NewSliceScanner(NewAnyValueScanner[*int16]())
	ScanNilInt32      = NewFirstScanner(NewAnyValueScanner[*int32]())
	ScanNilInt32s     = NewSliceScanner(NewAnyValueScanner[*int32]())
	ScanNilInt64      = NewFirstScanner(NewAnyValueScanner[*int64]())
	ScanNilInt64s     = NewSliceScanner(NewAnyValueScanner[*int64]())
	ScanNilInt8       = NewFirstScanner(NewAnyValueScanner[*int8]())
	ScanNilInt8s      = NewSliceScanner(NewAnyValueScanner[*int8]())
	ScanNilString     = NewFirstScanner(NewAnyValueScanner[*string]())
	ScanNilStrings    = NewSliceScanner(NewAnyValueScanner[*string]())
	ScanNilUint       = NewFirstScanner(NewAnyValueScanner[*uint]())
	ScanNilUints      = NewSliceScanner(NewAnyValueScanner[*uint]())
	ScanNilUint16     = NewFirstScanner(NewAnyValueScanner[*uint16]())
	ScanNilUint16s    = NewSliceScanner(NewAnyValueScanner[*uint16]())
	ScanNilUint32     = NewFirstScanner(NewAnyValueScanner[*uint32]())
	ScanNilUint32s    = NewSliceScanner(NewAnyValueScanner[*uint32]())
	ScanNilUint64     = NewFirstScanner(NewAnyValueScanner[*uint64]())
	ScanNilUint64s    = NewSliceScanner(NewAnyValueScanner[*uint64]())
	ScanNilUint8      = NewFirstScanner(NewAnyValueScanner[*uint8]())
	ScanNilUint8s     = NewSliceScanner(NewAnyValueScanner[*uint8]())
	ScanNilTimestamp  = NewFirstScanner(NewAnyValueScanner[*time.Time]())
	ScanNilTimestamps = NewSliceScanner(NewAnyValueScanner[*time.Time]())
)
View Source
var ErrInTransaction = errors.New("locker database must not be in transaction")
View Source
var ErrNotInTransaction = fmt.Errorf("not in a transaction")
View Source
var ErrPanicDuringTransaction = fmt.Errorf("encountered panic during transaction")

Functions

func BuildDatabaseURL

func BuildDatabaseURL() string

func Compare

func Compare(a, b SchemaDescription) (statements []string)

func HandleError

func HandleError(err error, description string) error

func StringKey

func StringKey(key string) int32

Types

type Args

type Args map[string]any

type BatchInserter

type BatchInserter struct {
	// contains filtered or unexported fields
}

func NewBatchInserter

func NewBatchInserter(db DB, tableName string, columnNames []string, configs ...BatchInserterConfigFunc) *BatchInserter

func (*BatchInserter) Flush

func (i *BatchInserter) Flush(ctx context.Context) error

func (*BatchInserter) Insert

func (i *BatchInserter) Insert(ctx context.Context, values ...any) error

type BatchInserterConfigFunc

type BatchInserterConfigFunc func(*batchInserterOptions)

func WithBatchInserterOnConflict

func WithBatchInserterOnConflict(clause string) BatchInserterConfigFunc

func WithBatchInserterReturn

func WithBatchInserterReturn(columns []string, scanner ScanFunc) BatchInserterConfigFunc

type Collector

type Collector[T any] struct {
	// contains filtered or unexported fields
}

func NewCollector

func NewCollector[T any](scanner ScanValueFunc[T]) *Collector[T]

func (*Collector[T]) Scanner

func (c *Collector[T]) Scanner() ScanFunc

func (*Collector[T]) Slice

func (c *Collector[T]) Slice() []T

type ColumnDependency

type ColumnDependency struct {
	SourceNamespace       string
	SourceTableOrViewName string
	SourceColumnName      string
	UsedNamespace         string
	UsedTableOrView       string
}

func DescribeColumnDependencies

func DescribeColumnDependencies(ctx context.Context, db DB) ([]ColumnDependency, error)

type ColumnDescription

type ColumnDescription struct {
	Name                   string
	Type                   string
	IsNullable             bool
	Default                string
	CharacterMaximumLength int
	IsIdentity             bool
	IdentityGeneration     string
	IsGenerated            bool
	GenerationExpression   string
}

func (ColumnDescription) Equals

func (d ColumnDescription) Equals(other ColumnDescription) bool

type ColumnModifier

type ColumnModifier struct {
	// contains filtered or unexported fields
}

func (ColumnModifier) AlterExisting

func (m ColumnModifier) AlterExisting(existingSchema SchemaDescription, existingObject ColumnDescription) ([]ddlStatement, bool)

func (ColumnModifier) Create

func (m ColumnModifier) Create() string

func (ColumnModifier) Description

func (m ColumnModifier) Description() ColumnDescription

func (ColumnModifier) Drop

func (m ColumnModifier) Drop() string

func (ColumnModifier) Key

func (m ColumnModifier) Key() string

func (ColumnModifier) ObjectType

func (m ColumnModifier) ObjectType() string

type Config

type Config struct {
	DatabaseURL   string `env:"database_url" required:"true"`
	LogSQLQueries bool   `env:"log_sql_queries" default:"false"`
}

type ConfigFunc

type ConfigFunc func(*options)

ConfigFunc is a function used to configure an initializer.

type ConstraintDescription

type ConstraintDescription struct {
	Name                string
	Type                string
	IsDeferrable        bool
	ReferencedTableName string
	Definition          string
}

func (ConstraintDescription) Equals

type ConstraintModifier

type ConstraintModifier struct {
	// contains filtered or unexported fields
}

func (ConstraintModifier) Create

func (m ConstraintModifier) Create() string

func (ConstraintModifier) Description

func (m ConstraintModifier) Description() ConstraintDescription

func (ConstraintModifier) Drop

func (m ConstraintModifier) Drop() string

func (ConstraintModifier) Key

func (m ConstraintModifier) Key() string

func (ConstraintModifier) ObjectType

func (m ConstraintModifier) ObjectType() string

type DB

type DB interface {
	Query(ctx context.Context, query Q) (*sql.Rows, error)
	Exec(ctx context.Context, query Q) error
	WithTransaction(ctx context.Context, f func(tx DB) error) error

	IsInTransaction() bool
	Transact(ctx context.Context) (DB, error)
	Done(err error) error
}

func Dial

func Dial(url string, logger nacelle.Logger) (DB, error)

func NewTestDB

func NewTestDB(t testing.TB) DB

func NewTestDBWithLogger

func NewTestDBWithLogger(t testing.TB, logger log.Logger) DB

type Definition

type Definition struct {
	ID            int
	Name          string
	UpQuery       Q
	DownQuery     Q
	IndexMetadata *IndexMetadata
}

func ReadMigrations

func ReadMigrations(reader MigrationReader) (definitions []Definition, _ error)

type EnumDependency

type EnumDependency struct {
	EnumNamespace  string
	EnumName       string
	TableNamespace string
	TableName      string
	ColumnName     string
}

func DescribeEnumDependencies

func DescribeEnumDependencies(ctx context.Context, db DB) ([]EnumDependency, error)

type EnumDescription

type EnumDescription struct {
	Namespace string
	Name      string
	Labels    []string
}

func DescribeEnums

func DescribeEnums(ctx context.Context, db DB) ([]EnumDescription, error)

func (EnumDescription) Equals

func (d EnumDescription) Equals(other EnumDescription) bool

type EnumModifier

type EnumModifier struct {
	// contains filtered or unexported fields
}

func (EnumModifier) AlterExisting

func (m EnumModifier) AlterExisting(existingSchema SchemaDescription, existingObject EnumDescription) ([]ddlStatement, bool)

NOTE: This depends on the order of the schema being modified. We must be certain that the order of drop/apply/create ensures that the columns here in the existing schema are still valid within the schema being altered.

func (EnumModifier) Create

func (m EnumModifier) Create() string

func (EnumModifier) Description

func (m EnumModifier) Description() EnumDescription

func (EnumModifier) Drop

func (m EnumModifier) Drop() string

func (EnumModifier) Key

func (m EnumModifier) Key() string

func (EnumModifier) ObjectType

func (m EnumModifier) ObjectType() string

type ExtensionDescription

type ExtensionDescription struct {
	Namespace string
	Name      string
}

func DescribeExtensions

func DescribeExtensions(ctx context.Context, db DB) ([]ExtensionDescription, error)

func (ExtensionDescription) Equals

type ExtensionModifier

type ExtensionModifier struct {
	// contains filtered or unexported fields
}

func (ExtensionModifier) Create

func (m ExtensionModifier) Create() string

func (ExtensionModifier) Description

func (m ExtensionModifier) Description() ExtensionDescription

func (ExtensionModifier) Drop

func (m ExtensionModifier) Drop() string

func (ExtensionModifier) Key

func (m ExtensionModifier) Key() string

func (ExtensionModifier) ObjectType

func (m ExtensionModifier) ObjectType() string

type FilesystemMigrationReader

type FilesystemMigrationReader struct {
	// contains filtered or unexported fields
}

func (*FilesystemMigrationReader) ReadAll

func (r *FilesystemMigrationReader) ReadAll() (definitions []RawDefinition, _ error)

type FirstScannerFunc

type FirstScannerFunc[T any] func(rows Rows, queryErr error) (T, bool, error)

func NewFirstScanner

func NewFirstScanner[T any](f ScanValueFunc[T]) FirstScannerFunc[T]

func NewMaybeFirstScanner

func NewMaybeFirstScanner[T any](f MaybeScanValueFunc[T]) FirstScannerFunc[T]

type FunctionDescription

type FunctionDescription struct {
	Namespace  string
	Name       string
	Definition string
	ArgTypes   []string
}

func DescribeFunctions

func DescribeFunctions(ctx context.Context, db DB) ([]FunctionDescription, error)

func (FunctionDescription) Equals

type FunctionModifier

type FunctionModifier struct {
	// contains filtered or unexported fields
}

func (FunctionModifier) AlterExisting

func (m FunctionModifier) AlterExisting(_ SchemaDescription, _ FunctionDescription) ([]ddlStatement, bool)

func (FunctionModifier) Create

func (m FunctionModifier) Create() string

func (FunctionModifier) Description

func (m FunctionModifier) Description() FunctionDescription

func (FunctionModifier) Drop

func (m FunctionModifier) Drop() string

func (FunctionModifier) Key

func (m FunctionModifier) Key() string

func (FunctionModifier) ObjectType

func (m FunctionModifier) ObjectType() string

type IndexDescription

type IndexDescription struct {
	Name                 string
	IsPrimaryKey         bool
	IsUnique             bool
	IsExclusion          bool
	IsDeferrable         bool
	IndexDefinition      string
	ConstraintType       string
	ConstraintDefinition string
}

func (IndexDescription) Equals

func (d IndexDescription) Equals(other IndexDescription) bool

type IndexMetadata

type IndexMetadata struct {
	TableName string
	IndexName string
}

type IndexModifier

type IndexModifier struct {
	// contains filtered or unexported fields
}

func (IndexModifier) Create

func (m IndexModifier) Create() string

func (IndexModifier) Description

func (m IndexModifier) Description() IndexDescription

func (IndexModifier) Drop

func (m IndexModifier) Drop() string

func (IndexModifier) Key

func (m IndexModifier) Key() string

func (IndexModifier) ObjectType

func (m IndexModifier) ObjectType() string

type Initializer

type Initializer struct {
	Logger   nacelle.Logger           `service:"logger"`
	Services nacelle.ServiceContainer `service:"services"`
}

func NewInitializer

func NewInitializer(configs ...ConfigFunc) *Initializer

func (*Initializer) Init

func (i *Initializer) Init(config nacelle.Config) error

type MaybeScanFunc

type MaybeScanFunc func(Scanner) (bool, error)

type MaybeScanValueFunc

type MaybeScanValueFunc[T any] func(Scanner) (T, bool, error)

type MigrationLog

type MigrationLog struct {
	MigrationID  int
	Reverse      bool
	Success      *bool
	ErrorMessage *string
}

type MigrationReader

type MigrationReader interface {
	ReadAll() ([]RawDefinition, error)
}

func NewEmbedMigrationReader

func NewEmbedMigrationReader(fs embed.FS) MigrationReader

func NewFilesystemMigrationReader

func NewFilesystemMigrationReader(dirname string) MigrationReader

type MigrationReaderFunc

type MigrationReaderFunc func() ([]RawDefinition, error)

func (MigrationReaderFunc) ReadAll

func (f MigrationReaderFunc) ReadAll() ([]RawDefinition, error)

type Q

type Q struct {
	// contains filtered or unexported fields
}

func Query

func Query(format string, args Args) Q

func Quote

func Quote(format string) Q

func RawQuery

func RawQuery(format string, args ...any) Q

func (Q) Format

func (q Q) Format() (string, []any)

type RawDefinition

type RawDefinition struct {
	ID           int
	Name         string
	RawUpQuery   string
	RawDownQuery string
}

type RowScannerFunc

type RowScannerFunc func(rows Rows, queryErr error) error

func NewMaybeRowScanner

func NewMaybeRowScanner(f MaybeScanFunc) RowScannerFunc

func NewRowScanner

func NewRowScanner(f ScanFunc) RowScannerFunc

type Rows

type Rows interface {
	Scanner

	Next() bool
	Close() error
	Err() error
}

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

func NewMigrationRunner

func NewMigrationRunner(db DB, reader MigrationReader, logger nacelle.Logger) (*Runner, error)

func (*Runner) Apply

func (r *Runner) Apply(ctx context.Context, id int) error

func (*Runner) ApplyAll

func (r *Runner) ApplyAll(ctx context.Context) error

func (*Runner) Definitions

func (r *Runner) Definitions() []Definition

func (*Runner) MigrationLogs

func (r *Runner) MigrationLogs(ctx context.Context) (map[int]MigrationLog, error)

func (*Runner) Undo

func (r *Runner) Undo(ctx context.Context, id int) error

func (*Runner) WriteMigrationLog

func (r *Runner) WriteMigrationLog(ctx context.Context, id int) error

type ScanFunc

type ScanFunc func(Scanner) error

type ScanValueFunc

type ScanValueFunc[T any] func(Scanner) (T, error)

func NewAnyValueScanner

func NewAnyValueScanner[T any]() ScanValueFunc[T]

type Scanner

type Scanner interface {
	Scan(dst ...any) error
}

type SchemaDescription

type SchemaDescription struct {
	Extensions         []ExtensionDescription
	Enums              []EnumDescription
	Functions          []FunctionDescription
	Tables             []TableDescription
	Sequences          []SequenceDescription
	Views              []ViewDescription
	Triggers           []TriggerDescription
	EnumDependencies   []EnumDependency
	ColumnDependencies []ColumnDependency
}

func DescribeSchema

func DescribeSchema(ctx context.Context, db DB) (SchemaDescription, error)

type SequenceDescription

type SequenceDescription struct {
	Namespace    string
	Name         string
	Type         string
	StartValue   int
	MinimumValue int
	MaximumValue int
	Increment    int
	CycleOption  string
}

func DescribeSequences

func DescribeSequences(ctx context.Context, db DB) ([]SequenceDescription, error)

func (SequenceDescription) Equals

type SequenceModifier

type SequenceModifier struct {
	// contains filtered or unexported fields
}

func (SequenceModifier) AlterExisting

func (m SequenceModifier) AlterExisting(existingSchema SchemaDescription, existingObject SequenceDescription) ([]ddlStatement, bool)

func (SequenceModifier) Create

func (m SequenceModifier) Create() string

func (SequenceModifier) Description

func (m SequenceModifier) Description() SequenceDescription

func (SequenceModifier) Drop

func (m SequenceModifier) Drop() string

func (SequenceModifier) Key

func (m SequenceModifier) Key() string

func (SequenceModifier) ObjectType

func (m SequenceModifier) ObjectType() string

type SliceScannerFunc

type SliceScannerFunc[T any] func(rows Rows, queryErr error) ([]T, error)

func NewMaybeSliceScanner

func NewMaybeSliceScanner[T any](f MaybeScanValueFunc[T]) SliceScannerFunc[T]

func NewSliceScanner

func NewSliceScanner[T any](f ScanValueFunc[T]) SliceScannerFunc[T]

type TableComponentModifiers

type TableComponentModifiers struct {
	Columns     []ColumnModifier
	Constraints []ConstraintModifier
	Indexes     []IndexModifier
}

func NewTableComponentModifiers

func NewTableComponentModifiers(schema SchemaDescription, tables []TableDescription) TableComponentModifiers

type TableDescription

type TableDescription struct {
	Namespace   string
	Name        string
	Columns     []ColumnDescription
	Constraints []ConstraintDescription
	Indexes     []IndexDescription
}

func DescribeTables

func DescribeTables(ctx context.Context, db DB) ([]TableDescription, error)

func (TableDescription) Equals

func (d TableDescription) Equals(other TableDescription) bool

Note: not a deep comparison

type TableModifier

type TableModifier struct {
	// contains filtered or unexported fields
}

func (TableModifier) Create

func (m TableModifier) Create() string

func (TableModifier) Description

func (m TableModifier) Description() TableDescription

func (TableModifier) Drop

func (m TableModifier) Drop() string

func (TableModifier) Key

func (m TableModifier) Key() string

func (TableModifier) ObjectType

func (m TableModifier) ObjectType() string

type TransactionalLocker

type TransactionalLocker struct {
	// contains filtered or unexported fields
}

func NewTransactionalLocker

func NewTransactionalLocker(db DB, namespace int32) (*TransactionalLocker, error)

func (*TransactionalLocker) TryWithLock

func (l *TransactionalLocker) TryWithLock(ctx context.Context, key int32, f func(tx DB) error) (acquired bool, _ error)

func (*TransactionalLocker) WithLock

func (l *TransactionalLocker) WithLock(ctx context.Context, key int32, f func(tx DB) error) error

type TriggerDescription

type TriggerDescription struct {
	Namespace         string
	Name              string
	TableName         string
	FunctionNamespace string
	Definition        string
}

func DescribeTriggers

func DescribeTriggers(ctx context.Context, db DB) ([]TriggerDescription, error)

func (TriggerDescription) Equals

func (d TriggerDescription) Equals(other TriggerDescription) bool

type TriggerModifier

type TriggerModifier struct {
	// contains filtered or unexported fields
}

func (TriggerModifier) Create

func (m TriggerModifier) Create() string

func (TriggerModifier) Description

func (m TriggerModifier) Description() TriggerDescription

func (TriggerModifier) Drop

func (m TriggerModifier) Drop() string

func (TriggerModifier) Key

func (m TriggerModifier) Key() string

func (TriggerModifier) ObjectType

func (m TriggerModifier) ObjectType() string

type ViewDescription

type ViewDescription struct {
	Namespace  string
	Name       string
	Definition string
}

func DescribeViews

func DescribeViews(ctx context.Context, db DB) ([]ViewDescription, error)

func (ViewDescription) Equals

func (d ViewDescription) Equals(other ViewDescription) bool

type ViewModifier

type ViewModifier struct {
	// contains filtered or unexported fields
}

func (ViewModifier) Create

func (m ViewModifier) Create() string

func (ViewModifier) Description

func (m ViewModifier) Description() ViewDescription

func (ViewModifier) Drop

func (m ViewModifier) Drop() string

func (ViewModifier) Key

func (m ViewModifier) Key() string

func (ViewModifier) ObjectType

func (m ViewModifier) ObjectType() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL