schema

package
v0.0.0-...-ae8c3b3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2026 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Overview

Package schema manages the physical database schema for metadata tables.

It provisions tables from a template (EnsureTable), manages RANGE partitions by timestamp (PartitionManager), maintains secondary indexes (IndexManager), and tracks dynamic column allocation via the ColumnRegistry.

The column registry maps user-facing field names to physical dim_fNN/agg_fNN columns, supporting thread-safe slot allocation and atomic snapshots for concurrent readers.

Index

Constants

This section is empty.

Variables

View Source
var ErrSlotExhausted = errors.New("column slot exhausted (max 99)")

ErrSlotExhausted is returned when all 99 dim or agg slots are consumed. Callers should log a warning and skip the dimension/aggregation — ingestion continues without the exhausted columns rather than stalling.

View Source
var SchemaSQL string

SchemaSQL contains the full DDL for system tables (_table, _table_config, _table_assignment, _node_registry, _dim_registry, _agg_registry, _sketch_registry, _task_queue) and the _clp_template table.

Functions

func AggCacheKey

func AggCacheKey(aggKey, aggValue, aggType string) string

AggCacheKey builds the composite key for agg cache lookups.

func EnsureTable

func EnsureTable(ctx context.Context, database *sql.DB, tableName string, isMariaDB bool, compressionOverride string, log *zap.Logger) error

EnsureTable idempotently provisions a metadata table and all registry rows. Steps: create physical table, insert registry rows, pre-populate sketch slots, create lookahead partitions. compressionOverride can be "lz4", "none", or "" (auto-detect from isMariaDB).

SQL safety: tableName is validated via db.ValidateSQLIdentifier before any interpolation. Compression values come from a closed switch statement, not user input.

Types

type AggRegistryEntry

type AggRegistryEntry struct {
	TableName       string
	ColumnName      string
	AggKey          string
	AggValue        string
	AggregationType string // EQ, GTE, GT, LTE, LT, SUM, AVG, MIN, MAX
	ValueType       string // INT, FLOAT
	AliasCol        string
	Status          string
}

AggRegistryEntry represents an active aggregation column mapping.

type AggRequest

type AggRequest struct {
	AggKey    string
	AggValue  string
	AggType   string
	ValueType string
	AliasCol  string // optional human-readable alias
}

AggRequest describes an aggregation column to resolve or allocate.

type BaseSchemaValidator

type BaseSchemaValidator struct {
	// contains filtered or unexported fields
}

BaseSchemaValidator verifies that system and template tables have the expected columns, types, and indexes. Run at startup after EnsureSystemTables to catch stale schemas before any data operations.

func NewBaseSchemaValidator

func NewBaseSchemaValidator(db *sql.DB, log *zap.Logger) *BaseSchemaValidator

NewBaseSchemaValidator creates a validator.

func (*BaseSchemaValidator) Validate

func (v *BaseSchemaValidator) Validate(ctx context.Context) error

Validate checks all system tables and the template table.

type ColumnRegistry

type ColumnRegistry struct {
	// contains filtered or unexported fields
}

ColumnRegistry maps physical placeholder columns (dim_fNN, agg_fNN) to field metadata. Thread-safe: RWMutex guards map reads/writes; a separate Mutex serializes slot allocation.

SQL safety: DDL and DML in this type use fmt.Sprintf with interpolated identifiers. This is safe because all table/column names are validated via db.ValidateSQLIdentifier (restricted to ^[a-z_][a-z0-9_]{0,63}$) and quoted via db.QuoteIdentifier before interpolation. SQL types come from internal [dimSQLType] / hardcoded strings, never from user input.

func NewColumnRegistry

func NewColumnRegistry(ctx context.Context, db *sql.DB, tableName string, isMariaDB bool, log *zap.Logger) (*ColumnRegistry, error)

NewColumnRegistry creates a ColumnRegistry and loads all ACTIVE entries from the DB.

func (*ColumnRegistry) ActiveAggColumns

func (cr *ColumnRegistry) ActiveAggColumns() []string

ActiveAggColumns returns the column names of all active aggregation entries. The result is sorted for deterministic SQL generation.

func (*ColumnRegistry) ActiveDimColumns

func (cr *ColumnRegistry) ActiveDimColumns() []string

ActiveDimColumns returns the column names of all active dimension entries. The result is sorted for deterministic SQL generation.

func (*ColumnRegistry) AllAggEntries

func (cr *ColumnRegistry) AllAggEntries() []*AggRegistryEntry

AllAggEntries returns a snapshot of all active aggregation registry entries.

func (*ColumnRegistry) AllDimEntries

func (cr *ColumnRegistry) AllDimEntries() []*DimRegistryEntry

AllDimEntries returns a snapshot of all active dimension registry entries.

func (*ColumnRegistry) EntryCount

func (cr *ColumnRegistry) EntryCount() int

EntryCount returns the total number of active dim + agg entries. Used as a cache-busting version token: when new columns are provisioned, the count changes and cached filter rewrites are invalidated.

func (*ColumnRegistry) FloatAggColumns

func (cr *ColumnRegistry) FloatAggColumns() map[string]bool

FloatAggColumns returns a set of agg column names that hold DOUBLE values.

func (*ColumnRegistry) LookupAggByColumn

func (cr *ColumnRegistry) LookupAggByColumn(colName string) *AggRegistryEntry

LookupAggByColumn returns the AggRegistryEntry for a physical column name, or nil.

func (*ColumnRegistry) LookupDimByColumn

func (cr *ColumnRegistry) LookupDimByColumn(colName string) *DimRegistryEntry

LookupDimByColumn returns the DimRegistryEntry for a physical column name, or nil.

func (*ColumnRegistry) RefreshAliases

func (cr *ColumnRegistry) RefreshAliases(ctx context.Context) error

RefreshAliases re-reads alias_column values from the database for all active entries and updates the in-memory cache. Call periodically so that alias changes made via the admin API (possibly on a different node) propagate.

func (*ColumnRegistry) ResolveAgg

func (cr *ColumnRegistry) ResolveAgg(aggKey, aggValue, aggType string) string

ResolveAgg returns the column name for an aggregation, or empty string if not found.

func (*ColumnRegistry) ResolveDim

func (cr *ColumnRegistry) ResolveDim(dimKey string) string

ResolveDim returns the column name for a dimension key, or empty string if not found.

func (*ColumnRegistry) ResolveOrAllocateAgg

func (cr *ColumnRegistry) ResolveOrAllocateAgg(ctx context.Context, aggKey, aggValue, aggType, valueType string) (string, error)

ResolveOrAllocateAgg resolves an existing agg mapping or allocates a new slot. Same fast-path/slow-path pattern as [ResolveOrAllocateDim].

func (*ColumnRegistry) ResolveOrAllocateAggs

func (cr *ColumnRegistry) ResolveOrAllocateAggs(ctx context.Context, reqs []AggRequest) (map[string]string, error)

ResolveOrAllocateAggs resolves existing agg mappings and batch-allocates new slots. Returns a map from composite key (aggKey+aggValue+aggType) to column name.

func (*ColumnRegistry) ResolveOrAllocateDim

func (cr *ColumnRegistry) ResolveOrAllocateDim(ctx context.Context, dimKey, baseType string, width int) (string, error)

ResolveOrAllocateDim resolves an existing dim mapping or allocates a new slot. If a new slot is needed, it inserts into the registry and issues ALTER TABLE ADD COLUMN. If the existing slot is narrower than width, the column is widened via ALTER TABLE MODIFY.

Uses a fast-path/slow-path pattern: the fast path (RLock) handles the common case where the dim already exists. The slow path (allocMu) serializes DDL operations and double-checks under the lock to handle concurrent allocations.

func (*ColumnRegistry) ResolveOrAllocateDims

func (cr *ColumnRegistry) ResolveOrAllocateDims(ctx context.Context, reqs []DimRequest) (map[string]string, error)

ResolveOrAllocateDims resolves existing dim mappings and batch-allocates new slots. Returns a map from dimKey to column name. Width expansion for existing columns is handled individually (MODIFY COLUMN), while new columns are added via a single multi-column ALTER TABLE.

func (*ColumnRegistry) ResolveOrAllocateSketches

func (cr *ColumnRegistry) ResolveOrAllocateSketches(ctx context.Context, keys []string) (map[string]string, error)

ResolveOrAllocateSketches resolves logical sketch keys to SET member names (s01..s64). Unlike dims/aggs, sketch slots are pre-allocated in the table schema — no ALTER TABLE is needed. Resolution claims AVAILABLE rows in _sketch_registry via UPDATE.

Returns a map from sketch key (e.g. "uuid") to SET member name (e.g. "s03").

func (*ColumnRegistry) ResolveSketch

func (cr *ColumnRegistry) ResolveSketch(sketchKey string) *SketchRegistryEntry

ResolveSketch returns the registry entry for a sketch key, or nil if not found.

func (*ColumnRegistry) RunRecycler

func (cr *ColumnRegistry) RunRecycler(ctx context.Context)

RunRecycler periodically scans for INVALIDATED columns that have aged past retention and recycles them to AVAILABLE. The flow:

  1. Find INVALIDATED entries where invalidated_at + recyclerMinAge < now.
  2. Check if the physical column has any non-NULL rows remaining.
  3. If few remain, NULL them out in batches.
  4. Mark the registry entry as AVAILABLE for reuse.

func (*ColumnRegistry) SetMeter

func (cr *ColumnRegistry) SetMeter(m metric.Meter)

SetMeter configures OpenTelemetry metrics. Must be called before serving.

func (*ColumnRegistry) Snapshot

func (cr *ColumnRegistry) Snapshot() *RegistrySnapshot

Snapshot creates a read-only snapshot of the current registry state. The snapshot is safe to use concurrently without locks.

type DimRegistryEntry

type DimRegistryEntry struct {
	TableName  string
	ColumnName string
	BaseType   string // str, str_utf8, bool, int, float
	Width      int
	DimKey     string
	AliasCol   string
	Status     string
}

DimRegistryEntry represents an active dimension column mapping.

type DimRequest

type DimRequest struct {
	DimKey   string
	BaseType string
	Width    int
	AliasCol string // optional human-readable alias
}

DimRequest describes a dimension column to resolve or allocate.

type Evolver

type Evolver struct {
	// contains filtered or unexported fields
}

Evolver handles online DDL for adding new columns to metadata tables. Internally, ColumnRegistry uses an Evolver to add dim_fNN/agg_fNN columns on the fly. The type is exported so enterprise deployments can substitute a custom DDL strategy (e.g., pt-online-schema-change or an approval workflow) by providing their own Evolver to the registry.

func NewEvolver

func NewEvolver(db *sql.DB, isMariaDB bool, log *zap.Logger) *Evolver

NewEvolver creates an Evolver.

func (*Evolver) AddColumn

func (e *Evolver) AddColumn(ctx context.Context, tableName, colName, sqlType string) error

AddColumn adds a new column to the table using online DDL (ALGORITHM=INPLACE, LOCK=NONE).

SQL safety: tableName and colName are validated via db.ValidateSQLIdentifier and quoted via db.QuoteIdentifier before interpolation. sqlType must come from internal callers (e.g., [dimSQLType]), never from user input.

type IndexConfig

type IndexConfig struct {
	Table   string   `yaml:"table"`
	Columns []string `yaml:"columns"`
}

IndexConfig defines a desired index from index.yaml.

type IndexManager

type IndexManager struct {
	// contains filtered or unexported fields
}

IndexManager reconciles dynamic indexes on dimension columns.

func NewIndexManager

func NewIndexManager(db *sql.DB, isMariaDB bool, log *zap.Logger) *IndexManager

NewIndexManager creates an IndexManager.

func (*IndexManager) DropIndex

func (im *IndexManager) DropIndex(ctx context.Context, tableName, colName string) error

DropIndex removes a dynamic index if it exists.

func (*IndexManager) EnsureIndex

func (im *IndexManager) EnsureIndex(ctx context.Context, tableName, colName string) error

EnsureIndex creates an index on a dimension column if it doesn't exist.

func (*IndexManager) Reconcile

func (im *IndexManager) Reconcile(ctx context.Context, desired []IndexConfig) error

Reconcile ensures all desired indexes exist and removes stale ones. desiredIndexes maps table name to a set of column names that should be indexed.

type PartitionInfo

type PartitionInfo struct {
	Name        string
	Description string
	Rows        int64
	DataLength  int64
}

PartitionInfo holds metadata about a single partition.

type PartitionManager

type PartitionManager struct {
	// contains filtered or unexported fields
}

PartitionManager manages MySQL RANGE partitions on the metadata table. Partitions are daily, keyed on min_timestamp (epoch nanoseconds).

func NewPartitionManager

func NewPartitionManager(db *sql.DB, tableName string, lookaheadDays, cleanupAgeDays int, log *zap.Logger) *PartitionManager

NewPartitionManager creates a PartitionManager.

func (*PartitionManager) EnsureLookaheadPartitions

func (pm *PartitionManager) EnsureLookaheadPartitions(ctx context.Context) (int, error)

EnsureLookaheadPartitions creates partitions ahead of today. It attempts to acquire an advisory lock and proceeds without it after a timeout.

func (*PartitionManager) RunMaintenance

func (pm *PartitionManager) RunMaintenance(ctx context.Context) error

RunMaintenance acquires an advisory lock and runs partition maintenance. Returns immediately without waiting if another node holds the lock.

type RegistrySnapshot

type RegistrySnapshot struct {
	// contains filtered or unexported fields
}

RegistrySnapshot is an immutable point-in-time copy of a ColumnRegistry. Used by the query path to avoid lock contention with the ingestion path.

func (*RegistrySnapshot) AllAggEntries

func (r *RegistrySnapshot) AllAggEntries() []*AggRegistryEntry

AllAggEntries returns all agg entries in the snapshot.

func (*RegistrySnapshot) AllDimEntries

func (r *RegistrySnapshot) AllDimEntries() []*DimRegistryEntry

AllDimEntries returns all dim entries in the snapshot.

func (*RegistrySnapshot) ResolveAgg

func (r *RegistrySnapshot) ResolveAgg(aggKey, aggValue, aggType string) string

ResolveAgg returns the column name for an aggregation, or empty string if not found.

func (*RegistrySnapshot) ResolveDim

func (r *RegistrySnapshot) ResolveDim(dimKey string) string

ResolveDim returns the column name for a dimension key, or empty string if not found.

type SketchRegistryEntry

type SketchRegistryEntry struct {
	TableName  string
	SketchName string // SET member name (e.g. "s01")
	SketchKey  string // logical field name (e.g. "uuid")
	Status     string
}

SketchRegistryEntry represents an active sketch SET member mapping.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL