database

package
v0.0.0-...-04478d6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2026 License: AGPL-3.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// ErrCodeUniqueViolation is the PostgreSQL error code for unique constraint violations
	ErrCodeUniqueViolation = "23505"
	// ErrCodeForeignKeyViolation is the PostgreSQL error code for foreign key violations
	ErrCodeForeignKeyViolation = "23503"
	// ErrCodeCheckViolation is the PostgreSQL error code for check constraint violations
	ErrCodeCheckViolation = "23514"
)

PostgreSQL error codes

Variables

This section is empty.

Functions

func ContextWithAuth

func ContextWithAuth(ctx context.Context, userID, userRole string, isAdmin bool) context.Context

ContextWithAuth returns a context with auth information for audit logging

func ContextWithTenant

func ContextWithTenant(ctx context.Context, tenantID string) context.Context

ContextWithTenant returns a context with tenant ID set

func ContextWithTenantID

func ContextWithTenantID(ctx context.Context, tenantID string) context.Context

ContextWithTenantID returns a context with tenant ID for multi-tenancy support This is an alias for ContextWithTenant for API consistency

func ExtractDDLMetadata

func ExtractDDLMetadata(sql string) string

ExtractDDLMetadata extracts operation type and target from a DDL query for logging Returns a safe, redacted string like "CREATE TABLE users", "DROP INDEX idx_name"

func ExtractOperation

func ExtractOperation(sql string) string

ExtractOperation extracts the SQL operation type from a query

func ExtractTableName

func ExtractTableName(sql string) string

ExtractTableName attempts to extract the table name from a SQL query Returns "unknown" if the table cannot be determined

func GetConstraintName

func GetConstraintName(err error) string

GetConstraintName returns the constraint name from a PostgreSQL error

func IsCheckViolation

func IsCheckViolation(err error) bool

IsCheckViolation checks if an error is a check constraint violation

func IsForeignKeyViolation

func IsForeignKeyViolation(err error) bool

IsForeignKeyViolation checks if an error is a foreign key violation

func IsUniqueViolation

func IsUniqueViolation(err error) bool

IsUniqueViolation checks if an error is a unique constraint violation

func LogSchemaIntrospection

func LogSchemaIntrospection(ctx context.Context, operation string, details map[string]interface{})

LogSchemaIntrospection logs schema introspection for audit purposes

func TenantFromContext

func TenantFromContext(ctx context.Context) string

TenantFromContext extracts tenant ID from context. Returns empty string if no tenant context is set

func TenantIDFromContext

func TenantIDFromContext(ctx context.Context) string

TenantIDFromContext extracts tenant ID from context Returns empty string if no tenant context is set This is an alias for TenantFromContext for API consistency

func TenantOrNil

func TenantOrNil(tenantID string) interface{}

TenantOrNil converts an empty tenant string to nil for UUID column compatibility. PostgreSQL UUID columns accept NULL but reject empty strings, so this helper is used when passing tenant IDs as query parameters.

func WithCaller

func WithCaller(ctx context.Context, caller string) context.Context

func WrapWithServiceRole

func WrapWithServiceRole(ctx context.Context, conn *Connection, fn func(tx pgx.Tx) error) error

WrapWithServiceRole wraps a database operation with service_role context Used for privileged operations like auth, admin tasks, and webhooks

func WrapWithServiceRoleAndTenant

func WrapWithServiceRoleAndTenant(ctx context.Context, conn *Connection, tenantID string, fn func(tx pgx.Tx) error) error

WrapWithServiceRoleAndTenant wraps a database operation with both service_role and tenant context. This bypasses RLS but still sets tenant_id for new records via the set_tenant_id trigger. Use this for privileged operations that still need to associate records with a tenant.

func WrapWithTenantAwareRole

func WrapWithTenantAwareRole(ctx context.Context, conn *Connection, tenantID string, fn func(tx pgx.Tx) error) error

WrapWithTenantAwareRole wraps a database operation with the appropriate role based on tenant context. When a tenant context is active, it uses tenant_service (NOBYPASSRLS) so RLS policies enforce tenant isolation. When no tenant context, it uses service_role (BYPASSRLS) for full instance-admin access.

func WrapWithTenantContext

func WrapWithTenantContext(ctx context.Context, conn *Connection, tenantID string, fn func(tx pgx.Tx) error) error

WrapWithTenantContext wraps a database operation with tenant context for multi-tenancy. This sets the app.current_tenant_id session variable so that RLS policies and triggers can enforce tenant isolation. Use this for storage operations on tenant-scoped tables.

Types

type AdminExecutor

type AdminExecutor interface {
	Executor

	// ExecuteWithAdminRole executes a database operation using admin credentials
	// inside a transaction. Used for migrations that require DDL privileges
	// (CREATE TABLE, ALTER, etc.). The callback receives the transaction, not a
	// raw connection, so all statements are properly transactional.
	ExecuteWithAdminRole(ctx context.Context, fn func(tx pgx.Tx) error) error

	// Inspector returns the schema inspector for introspecting database structure
	Inspector() *SchemaInspector
}

AdminExecutor extends Executor with privileged operations that require admin database credentials (e.g., for DDL operations, migrations).

type AuthContext

type AuthContext struct {
	UserID    string
	UserRole  string // "authenticated", "anon", "service_role", "admin", etc.
	IsAdmin   bool
	ClientKey string // For service role access via client keys
}

AuthContext represents authentication and authorization context for schema introspection

func AuthFromContext

func AuthFromContext(ctx context.Context) *AuthContext

AuthFromContext extracts auth context from context for audit logging Returns nil if no auth context is set

type AuthContextKey

type AuthContextKey struct{}

AuthContextKey is the context key for storing authorization information

type ColumnInfo

type ColumnInfo struct {
	Name         string           `json:"name"`
	DataType     string           `json:"data_type"`
	IsNullable   bool             `json:"is_nullable"`
	DefaultValue *string          `json:"default_value"`
	IsPrimaryKey bool             `json:"is_primary_key"`
	IsForeignKey bool             `json:"is_foreign_key"`
	IsUnique     bool             `json:"is_unique"`
	MaxLength    *int             `json:"max_length"`
	Position     int              `json:"position"`
	Description  string           `json:"description,omitempty"`
	JSONBSchema  *JSONBSchemaInfo `json:"jsonb_schema,omitempty"`
}

ColumnInfo represents metadata about a table column

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection represents a database connection pool

func ConnectWithRetry

func ConnectWithRetry(cfg config.DatabaseConfig, maxAttempts int) (*Connection, error)

ConnectWithRetry attempts to connect to the database with exponential backoff.

func NewConnection

func NewConnection(cfg config.DatabaseConfig) (*Connection, error)

NewConnection creates a new database connection pool The connection pool uses the runtime user, while migrations use the admin user

func NewConnectionWithPool

func NewConnectionWithPool(pool *pgxpool.Pool) *Connection

NewConnectionWithPool creates a new Connection wrapper around an existing pgxpool.Pool. This is useful for tests where you have a pre-configured pool.

func (*Connection) BeginTx

func (c *Connection) BeginTx(ctx context.Context) (pgx.Tx, error)

BeginTx starts a new transaction

func (*Connection) Close

func (c *Connection) Close()

Close closes the database connection pool

func (*Connection) Exec

func (c *Connection) Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)

Exec executes a query that doesn't return rows

func (*Connection) ExecuteWithAdminRole

func (c *Connection) ExecuteWithAdminRole(ctx context.Context, fn func(tx pgx.Tx) error) error

ExecuteWithAdminRole executes a database operation using admin credentials Used for migrations that require DDL privileges (CREATE TABLE, ALTER, etc.) Creates a temporary admin connection that is closed after execution

func (*Connection) ExecuteWithAdminRoleForDB

func (c *Connection) ExecuteWithAdminRoleForDB(ctx context.Context, dbName string, fn func(tx pgx.Tx) error) error

ExecuteWithAdminRoleForDB executes a function with admin privileges against a specific database (for tenant DDL operations). It replaces the database name in the admin connection string with the provided dbName.

func (*Connection) Health

func (c *Connection) Health(ctx context.Context) error

Health checks the health of the database connection

func (*Connection) Inspector

func (c *Connection) Inspector() *SchemaInspector

Inspector returns the schema inspector

func (*Connection) Migrate

func (c *Connection) Migrate() error

Migrate runs database migrations from user sources Note: Internal Fluxbase schema is now managed declaratively (see bootstrap + pgschema)

func (*Connection) Pool

func (c *Connection) Pool() *pgxpool.Pool

Pool returns the underlying connection pool

func (*Connection) Query

func (c *Connection) Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)

Query executes a query that returns rows

func (*Connection) QueryRow

func (c *Connection) QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row

QueryRow executes a query that returns a single row

func (*Connection) RecreatePool

func (c *Connection) RecreatePool() error

RecreatePool closes the current pool and creates a new one. This is safer than Reset() as it ensures a completely fresh pool state. Use this after schema changes (migrations) to avoid prepared statement cache issues.

func (*Connection) SetMetrics

func (c *Connection) SetMetrics(m *observability.Metrics)

SetMetrics sets the metrics instance for recording database metrics

func (*Connection) Stats

func (c *Connection) Stats() *pgxpool.Stat

Stats returns database connection pool statistics

type Executor

type Executor interface {
	// Query executes a query that returns rows
	Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)

	// QueryRow executes a query that returns a single row
	QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row

	// Exec executes a query that doesn't return rows
	Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)

	// BeginTx starts a new transaction
	BeginTx(ctx context.Context) (pgx.Tx, error)

	// Pool returns the underlying connection pool (for advanced operations)
	Pool() *pgxpool.Pool

	// Health checks the health of the database connection
	Health(ctx context.Context) error
}

Executor defines the interface for database query operations. This interface abstracts database access for easier testing via mocks.

type ForeignKey

type ForeignKey struct {
	Name             string `json:"name"`
	ColumnName       string `json:"column_name"`
	ReferencedTable  string `json:"referenced_table"`
	ReferencedColumn string `json:"referenced_column"`
	OnDelete         string `json:"on_delete"`
	OnUpdate         string `json:"on_update"`
}

ForeignKey represents a foreign key relationship

type FunctionInfo

type FunctionInfo struct {
	Schema      string          `json:"schema"`
	Name        string          `json:"name"`
	Description string          `json:"description"`
	Parameters  []FunctionParam `json:"parameters"`
	ReturnType  string          `json:"return_type"`
	IsSetOf     bool            `json:"is_set_of"`
	Volatility  string          `json:"volatility"` // VOLATILE, STABLE, IMMUTABLE
	Language    string          `json:"language"`
}

FunctionInfo represents metadata about a database function

type FunctionParam

type FunctionParam struct {
	Name       string `json:"name"`
	Type       string `json:"type"`
	Mode       string `json:"mode"` // IN, OUT, INOUT
	HasDefault bool   `json:"has_default"`
	Position   int    `json:"position"`
}

FunctionParam represents a function parameter

type IndexInfo

type IndexInfo struct {
	Name      string   `json:"name"`
	Columns   []string `json:"columns"`
	IsUnique  bool     `json:"is_unique"`
	IsPrimary bool     `json:"is_primary"`
}

IndexInfo represents an index on a table

type JSONBProperty

type JSONBProperty struct {
	Type        string                   `json:"type"`
	Description string                   `json:"description,omitempty"`
	Properties  map[string]JSONBProperty `json:"properties,omitempty"` // For nested objects
	Items       *JSONBProperty           `json:"items,omitempty"`      // For arrays
}

JSONBProperty represents a single property in a JSONB schema

type JSONBSchemaInfo

type JSONBSchemaInfo struct {
	Properties map[string]JSONBProperty `json:"properties,omitempty"`
	Required   []string                 `json:"required,omitempty"`
}

JSONBSchemaInfo represents the schema of a JSONB column

type Querier

type Querier interface{}

These aliases allow the middleware and handlers to use simpler type names

type SchemaCache

type SchemaCache struct {
	// contains filtered or unexported fields
}

SchemaCache provides a thread-safe cache for database schema information with TTL-based expiration and manual invalidation support. When PubSub is configured, invalidation is broadcast to all instances.

func NewSchemaCache

func NewSchemaCache(inspector *SchemaInspector, ttl time.Duration) *SchemaCache

NewSchemaCache creates a new schema cache with the given TTL

func (*SchemaCache) Close

func (c *SchemaCache) Close()

Close stops the invalidation listener if running

func (*SchemaCache) GetAllMaterializedViews

func (c *SchemaCache) GetAllMaterializedViews(ctx context.Context) ([]TableInfo, error)

GetAllMaterializedViews returns all cached materialized views, refreshing if necessary

func (*SchemaCache) GetAllTables

func (c *SchemaCache) GetAllTables(ctx context.Context) ([]TableInfo, error)

GetAllTables returns all cached tables, refreshing if necessary

func (*SchemaCache) GetAllViews

func (c *SchemaCache) GetAllViews(ctx context.Context) ([]TableInfo, error)

GetAllViews returns all cached views, refreshing if necessary

func (*SchemaCache) GetSchemas

func (c *SchemaCache) GetSchemas(ctx context.Context) ([]string, error)

GetSchemas returns cached schemas

func (*SchemaCache) GetTable

func (c *SchemaCache) GetTable(ctx context.Context, schema, table string) (*TableInfo, bool, error)

GetTable retrieves table info from the cache, refreshing if necessary. Returns (TableInfo, exists, error)

func (*SchemaCache) Invalidate

func (c *SchemaCache) Invalidate()

Invalidate marks the cache as stale, forcing a refresh on next access. This only invalidates the local cache. Use InvalidateAll to broadcast invalidation to all instances.

func (*SchemaCache) InvalidateAll

func (c *SchemaCache) InvalidateAll(ctx context.Context)

InvalidateAll marks the cache as stale and broadcasts the invalidation to all other instances via PubSub. Use this when schema changes occur (e.g., after migrations) to ensure all instances refresh their caches.

func (*SchemaCache) IsTableWritable

func (c *SchemaCache) IsTableWritable(ctx context.Context, schema, table string) (bool, error)

IsTableWritable checks if a table is writable (not a view or materialized view)

func (*SchemaCache) Refresh

func (c *SchemaCache) Refresh(ctx context.Context) error

Refresh forces an immediate cache refresh

func (*SchemaCache) SetPubSub

func (c *SchemaCache) SetPubSub(ps pubsub.PubSub)

SetPubSub configures the PubSub backend for cross-instance cache invalidation. When set, InvalidateAll will broadcast invalidation messages to all instances, and this instance will listen for invalidation messages from others.

func (*SchemaCache) TableCount

func (c *SchemaCache) TableCount() int

TableCount returns the number of cached tables

func (*SchemaCache) ViewCount

func (c *SchemaCache) ViewCount() int

ViewCount returns the number of cached views

type SchemaInspector

type SchemaInspector struct {
	// contains filtered or unexported fields
}

SchemaInspector provides PostgreSQL schema introspection capabilities

func NewSchemaInspector

func NewSchemaInspector(conn *Connection) *SchemaInspector

NewSchemaInspector creates a new schema inspector

func (*SchemaInspector) BuildRESTPath

func (si *SchemaInspector) BuildRESTPath(table TableInfo) string

BuildRESTPath builds a REST API path for a table

func (*SchemaInspector) GetAllFunctions

func (si *SchemaInspector) GetAllFunctions(ctx context.Context, schemas ...string) ([]FunctionInfo, error)

GetAllFunctions retrieves information about all functions in the specified schemas

func (*SchemaInspector) GetAllMaterializedViews

func (si *SchemaInspector) GetAllMaterializedViews(ctx context.Context, schemas ...string) ([]TableInfo, error)

GetAllMaterializedViews retrieves information about all materialized views in the specified schemas. This uses batched queries to avoid N+1 query patterns.

func (*SchemaInspector) GetAllMaterializedViewsFromPool

func (si *SchemaInspector) GetAllMaterializedViewsFromPool(ctx context.Context, pool *pgxpool.Pool, schemas ...string) ([]TableInfo, error)

GetAllMaterializedViewsFromPool retrieves materialized view info using a tenant-specific pool.

func (*SchemaInspector) GetAllTables

func (si *SchemaInspector) GetAllTables(ctx context.Context, schemas ...string) ([]TableInfo, error)

GetAllTables retrieves information about all tables in the specified schemas. This uses batched queries to avoid N+1 query patterns.

func (*SchemaInspector) GetAllTablesFromPool

func (si *SchemaInspector) GetAllTablesFromPool(ctx context.Context, pool *pgxpool.Pool, schemas ...string) ([]TableInfo, error)

GetAllTablesFromPool retrieves table info using a tenant-specific pool.

func (*SchemaInspector) GetAllViews

func (si *SchemaInspector) GetAllViews(ctx context.Context, schemas ...string) ([]TableInfo, error)

GetAllViews retrieves information about all views in the specified schemas. This uses batched queries to avoid N+1 query patterns.

func (*SchemaInspector) GetAllViewsFromPool

func (si *SchemaInspector) GetAllViewsFromPool(ctx context.Context, pool *pgxpool.Pool, schemas ...string) ([]TableInfo, error)

GetAllViewsFromPool retrieves view info using a tenant-specific pool.

func (*SchemaInspector) GetSchemas

func (si *SchemaInspector) GetSchemas(ctx context.Context) ([]string, error)

GetSchemas retrieves all available schemas

func (*SchemaInspector) GetSchemasFromPool

func (si *SchemaInspector) GetSchemasFromPool(ctx context.Context, pool *pgxpool.Pool) ([]string, error)

GetSchemasFromPool retrieves all schemas using a tenant-specific pool.

func (*SchemaInspector) GetTableInfo

func (si *SchemaInspector) GetTableInfo(ctx context.Context, schema, table string) (*TableInfo, error)

GetTableInfo retrieves detailed information about a specific table

func (*SchemaInspector) GetTableInfoFromPool

func (si *SchemaInspector) GetTableInfoFromPool(ctx context.Context, pool *pgxpool.Pool, schema, table string) (*TableInfo, error)

GetTableInfoFromPool retrieves detailed table info using a tenant-specific pool.

func (*SchemaInspector) GetVectorColumns

func (si *SchemaInspector) GetVectorColumns(ctx context.Context, schema, table string) ([]VectorColumnInfo, error)

GetVectorColumns retrieves all vector columns in the specified schema and table If table is empty, returns all vector columns in the schema If both schema and table are empty, returns all vector columns in public schema

func (*SchemaInspector) IsPgVectorInstalled

func (si *SchemaInspector) IsPgVectorInstalled(ctx context.Context) (bool, string, error)

IsPgVectorInstalled checks if the pgvector extension is installed

type TableInfo

type TableInfo struct {
	Schema      string       `json:"schema"`
	Name        string       `json:"name"`
	Type        string       `json:"type"`                // "table", "view", or "materialized_view"
	RESTPath    string       `json:"rest_path,omitempty"` // The REST API path for this table (e.g., "/auth/users")
	Columns     []ColumnInfo `json:"columns"`
	PrimaryKey  []string     `json:"primary_key"`
	ForeignKeys []ForeignKey `json:"foreign_keys"`
	Indexes     []IndexInfo  `json:"indexes"`
	RLSEnabled  bool         `json:"rls_enabled"`

	// ColumnMap provides O(1) column lookup by name (populated lazily or by BuildColumnMap)
	ColumnMap map[string]*ColumnInfo `json:"-"`
}

TableInfo represents metadata about a database table, view, or materialized view

func (*TableInfo) BuildColumnMap

func (t *TableInfo) BuildColumnMap()

BuildColumnMap populates the ColumnMap for O(1) column lookups. This is called automatically during schema cache refresh.

func (*TableInfo) GetColumn

func (t *TableInfo) GetColumn(name string) *ColumnInfo

GetColumn returns the column info for the given column name, or nil if not found. Uses ColumnMap for O(1) lookup if available, otherwise falls back to O(n) search.

func (*TableInfo) HasColumn

func (t *TableInfo) HasColumn(name string) bool

HasColumn checks if a column exists in the table using O(1) lookup.

type TenantAware

type TenantAware struct {
	DB *Connection
}

TenantAware provides a reusable embedded struct for tenant-scoped database operations. Storage types embed this to get the WithTenant helper method.

func (*TenantAware) WithTenant

func (t *TenantAware) WithTenant(ctx context.Context, fn func(tx pgx.Tx) error) error

WithTenant wraps a database operation with tenant-aware role selection. When a tenant context is active, uses tenant_service (respects RLS). When no tenant context, uses service_role (bypasses RLS).

type TenantContextKey

type TenantContextKey struct{}

TenantContextKey is the context key for storing tenant information

type TxConnection

type TxConnection = pgx.Tx

These aliases allow the middleware and handlers to use simpler type names

type VectorColumnInfo

type VectorColumnInfo struct {
	SchemaName string `json:"schema_name"`
	TableName  string `json:"table_name"`
	ColumnName string `json:"column_name"`
	Dimensions int    `json:"dimensions"` // -1 if variable/unspecified
}

VectorColumnInfo represents metadata about a vector column (pgvector)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL