Documentation
¶
Overview ¶
Package schema manages the physical database schema for metadata tables.
It provisions tables from a template (EnsureTable), manages RANGE partitions by timestamp (PartitionManager), maintains secondary indexes (IndexManager), and tracks dynamic column allocation via the ColumnRegistry.
The column registry maps user-facing field names to physical dim_fNN/agg_fNN columns, supporting thread-safe slot allocation and atomic snapshots for concurrent readers.
Index ¶
- Variables
- func AggCacheKey(aggKey, aggValue, aggType string) string
- func EnsureTable(ctx context.Context, database *sql.DB, tableName string, isMariaDB bool, ...) error
- type AggRegistryEntry
- type AggRequest
- type BaseSchemaValidator
- type ColumnRegistry
- func (cr *ColumnRegistry) ActiveAggColumns() []string
- func (cr *ColumnRegistry) ActiveDimColumns() []string
- func (cr *ColumnRegistry) AllAggEntries() []*AggRegistryEntry
- func (cr *ColumnRegistry) AllDimEntries() []*DimRegistryEntry
- func (cr *ColumnRegistry) EntryCount() int
- func (cr *ColumnRegistry) FloatAggColumns() map[string]bool
- func (cr *ColumnRegistry) LookupAggByColumn(colName string) *AggRegistryEntry
- func (cr *ColumnRegistry) LookupDimByColumn(colName string) *DimRegistryEntry
- func (cr *ColumnRegistry) RefreshAliases(ctx context.Context) error
- func (cr *ColumnRegistry) ResolveAgg(aggKey, aggValue, aggType string) string
- func (cr *ColumnRegistry) ResolveDim(dimKey string) string
- func (cr *ColumnRegistry) ResolveOrAllocateAgg(ctx context.Context, aggKey, aggValue, aggType, valueType string) (string, error)
- func (cr *ColumnRegistry) ResolveOrAllocateAggs(ctx context.Context, reqs []AggRequest) (map[string]string, error)
- func (cr *ColumnRegistry) ResolveOrAllocateDim(ctx context.Context, dimKey, baseType string, width int) (string, error)
- func (cr *ColumnRegistry) ResolveOrAllocateDims(ctx context.Context, reqs []DimRequest) (map[string]string, error)
- func (cr *ColumnRegistry) ResolveOrAllocateSketches(ctx context.Context, keys []string) (map[string]string, error)
- func (cr *ColumnRegistry) ResolveSketch(sketchKey string) *SketchRegistryEntry
- func (cr *ColumnRegistry) RunRecycler(ctx context.Context)
- func (cr *ColumnRegistry) SetMeter(m metric.Meter)
- func (cr *ColumnRegistry) Snapshot() *RegistrySnapshot
- type DimRegistryEntry
- type DimRequest
- type Evolver
- type IndexConfig
- type IndexManager
- type PartitionInfo
- type PartitionManager
- type RegistrySnapshot
- type SketchRegistryEntry
Constants ¶
This section is empty.
Variables ¶
var ErrSlotExhausted = errors.New("column slot exhausted (max 99)")
ErrSlotExhausted is returned when all 99 dim or agg slots are consumed. Callers should log a warning and skip the dimension/aggregation — ingestion continues without the exhausted columns rather than stalling.
var SchemaSQL string
SchemaSQL contains the full DDL for system tables (_table, _table_config, _table_assignment, _node_registry, _dim_registry, _agg_registry, _sketch_registry, _task_queue) and the _clp_template table.
Functions ¶
func AggCacheKey ¶
AggCacheKey builds the composite key for agg cache lookups.
func EnsureTable ¶
func EnsureTable(ctx context.Context, database *sql.DB, tableName string, isMariaDB bool, compressionOverride string, log *zap.Logger) error
EnsureTable idempotently provisions a metadata table and all registry rows. Steps: create physical table, insert registry rows, pre-populate sketch slots, create lookahead partitions. compressionOverride can be "lz4", "none", or "" (auto-detect from isMariaDB).
SQL safety: tableName is validated via db.ValidateSQLIdentifier before any interpolation. Compression values come from a closed switch statement, not user input.
Types ¶
type AggRegistryEntry ¶
type AggRegistryEntry struct {
TableName string
ColumnName string
AggKey string
AggValue string
AggregationType string // EQ, GTE, GT, LTE, LT, SUM, AVG, MIN, MAX
ValueType string // INT, FLOAT
AliasCol string
Status string
}
AggRegistryEntry represents an active aggregation column mapping.
type AggRequest ¶
type AggRequest struct {
AggKey string
AggValue string
AggType string
ValueType string
AliasCol string // optional human-readable alias
}
AggRequest describes an aggregation column to resolve or allocate.
type BaseSchemaValidator ¶
type BaseSchemaValidator struct {
// contains filtered or unexported fields
}
BaseSchemaValidator verifies that system and template tables have the expected columns, types, and indexes. Run at startup after EnsureSystemTables to catch stale schemas before any data operations.
func NewBaseSchemaValidator ¶
func NewBaseSchemaValidator(db *sql.DB, log *zap.Logger) *BaseSchemaValidator
NewBaseSchemaValidator creates a validator.
type ColumnRegistry ¶
type ColumnRegistry struct {
// contains filtered or unexported fields
}
ColumnRegistry maps physical placeholder columns (dim_fNN, agg_fNN) to field metadata. Thread-safe: RWMutex guards map reads/writes; a separate Mutex serializes slot allocation.
SQL safety: DDL and DML in this type use fmt.Sprintf with interpolated identifiers. This is safe because all table/column names are validated via db.ValidateSQLIdentifier (restricted to ^[a-z_][a-z0-9_]{0,63}$) and quoted via db.QuoteIdentifier before interpolation. SQL types come from internal [dimSQLType] / hardcoded strings, never from user input.
func NewColumnRegistry ¶
func NewColumnRegistry(ctx context.Context, db *sql.DB, tableName string, isMariaDB bool, log *zap.Logger) (*ColumnRegistry, error)
NewColumnRegistry creates a ColumnRegistry and loads all ACTIVE entries from the DB.
func (*ColumnRegistry) ActiveAggColumns ¶
func (cr *ColumnRegistry) ActiveAggColumns() []string
ActiveAggColumns returns the column names of all active aggregation entries. The result is sorted for deterministic SQL generation.
func (*ColumnRegistry) ActiveDimColumns ¶
func (cr *ColumnRegistry) ActiveDimColumns() []string
ActiveDimColumns returns the column names of all active dimension entries. The result is sorted for deterministic SQL generation.
func (*ColumnRegistry) AllAggEntries ¶
func (cr *ColumnRegistry) AllAggEntries() []*AggRegistryEntry
AllAggEntries returns a snapshot of all active aggregation registry entries.
func (*ColumnRegistry) AllDimEntries ¶
func (cr *ColumnRegistry) AllDimEntries() []*DimRegistryEntry
AllDimEntries returns a snapshot of all active dimension registry entries.
func (*ColumnRegistry) EntryCount ¶
func (cr *ColumnRegistry) EntryCount() int
EntryCount returns the total number of active dim + agg entries. Used as a cache-busting version token: when new columns are provisioned, the count changes and cached filter rewrites are invalidated.
func (*ColumnRegistry) FloatAggColumns ¶
func (cr *ColumnRegistry) FloatAggColumns() map[string]bool
FloatAggColumns returns a set of agg column names that hold DOUBLE values.
func (*ColumnRegistry) LookupAggByColumn ¶
func (cr *ColumnRegistry) LookupAggByColumn(colName string) *AggRegistryEntry
LookupAggByColumn returns the AggRegistryEntry for a physical column name, or nil.
func (*ColumnRegistry) LookupDimByColumn ¶
func (cr *ColumnRegistry) LookupDimByColumn(colName string) *DimRegistryEntry
LookupDimByColumn returns the DimRegistryEntry for a physical column name, or nil.
func (*ColumnRegistry) RefreshAliases ¶
func (cr *ColumnRegistry) RefreshAliases(ctx context.Context) error
RefreshAliases re-reads alias_column values from the database for all active entries and updates the in-memory cache. Call periodically so that alias changes made via the admin API (possibly on a different node) propagate.
func (*ColumnRegistry) ResolveAgg ¶
func (cr *ColumnRegistry) ResolveAgg(aggKey, aggValue, aggType string) string
ResolveAgg returns the column name for an aggregation, or empty string if not found.
func (*ColumnRegistry) ResolveDim ¶
func (cr *ColumnRegistry) ResolveDim(dimKey string) string
ResolveDim returns the column name for a dimension key, or empty string if not found.
func (*ColumnRegistry) ResolveOrAllocateAgg ¶
func (cr *ColumnRegistry) ResolveOrAllocateAgg(ctx context.Context, aggKey, aggValue, aggType, valueType string) (string, error)
ResolveOrAllocateAgg resolves an existing agg mapping or allocates a new slot. Same fast-path/slow-path pattern as [ResolveOrAllocateDim].
func (*ColumnRegistry) ResolveOrAllocateAggs ¶
func (cr *ColumnRegistry) ResolveOrAllocateAggs(ctx context.Context, reqs []AggRequest) (map[string]string, error)
ResolveOrAllocateAggs resolves existing agg mappings and batch-allocates new slots. Returns a map from composite key (aggKey+aggValue+aggType) to column name.
func (*ColumnRegistry) ResolveOrAllocateDim ¶
func (cr *ColumnRegistry) ResolveOrAllocateDim(ctx context.Context, dimKey, baseType string, width int) (string, error)
ResolveOrAllocateDim resolves an existing dim mapping or allocates a new slot. If a new slot is needed, it inserts into the registry and issues ALTER TABLE ADD COLUMN. If the existing slot is narrower than width, the column is widened via ALTER TABLE MODIFY.
Uses a fast-path/slow-path pattern: the fast path (RLock) handles the common case where the dim already exists. The slow path (allocMu) serializes DDL operations and double-checks under the lock to handle concurrent allocations.
func (*ColumnRegistry) ResolveOrAllocateDims ¶
func (cr *ColumnRegistry) ResolveOrAllocateDims(ctx context.Context, reqs []DimRequest) (map[string]string, error)
ResolveOrAllocateDims resolves existing dim mappings and batch-allocates new slots. Returns a map from dimKey to column name. Width expansion for existing columns is handled individually (MODIFY COLUMN), while new columns are added via a single multi-column ALTER TABLE.
func (*ColumnRegistry) ResolveOrAllocateSketches ¶
func (cr *ColumnRegistry) ResolveOrAllocateSketches(ctx context.Context, keys []string) (map[string]string, error)
ResolveOrAllocateSketches resolves logical sketch keys to SET member names (s01..s64). Unlike dims/aggs, sketch slots are pre-allocated in the table schema — no ALTER TABLE is needed. Resolution claims AVAILABLE rows in _sketch_registry via UPDATE.
Returns a map from sketch key (e.g. "uuid") to SET member name (e.g. "s03").
func (*ColumnRegistry) ResolveSketch ¶
func (cr *ColumnRegistry) ResolveSketch(sketchKey string) *SketchRegistryEntry
ResolveSketch returns the registry entry for a sketch key, or nil if not found.
func (*ColumnRegistry) RunRecycler ¶
func (cr *ColumnRegistry) RunRecycler(ctx context.Context)
RunRecycler periodically scans for INVALIDATED columns that have aged past retention and recycles them to AVAILABLE. The flow:
- Find INVALIDATED entries where invalidated_at + recyclerMinAge < now.
- Check if the physical column has any non-NULL rows remaining.
- If few remain, NULL them out in batches.
- Mark the registry entry as AVAILABLE for reuse.
func (*ColumnRegistry) SetMeter ¶
func (cr *ColumnRegistry) SetMeter(m metric.Meter)
SetMeter configures OpenTelemetry metrics. Must be called before serving.
func (*ColumnRegistry) Snapshot ¶
func (cr *ColumnRegistry) Snapshot() *RegistrySnapshot
Snapshot creates a read-only snapshot of the current registry state. The snapshot is safe to use concurrently without locks.
type DimRegistryEntry ¶
type DimRegistryEntry struct {
TableName string
ColumnName string
BaseType string // str, str_utf8, bool, int, float
Width int
DimKey string
AliasCol string
Status string
}
DimRegistryEntry represents an active dimension column mapping.
type DimRequest ¶
type DimRequest struct {
DimKey string
BaseType string
Width int
AliasCol string // optional human-readable alias
}
DimRequest describes a dimension column to resolve or allocate.
type Evolver ¶
type Evolver struct {
// contains filtered or unexported fields
}
Evolver handles online DDL for adding new columns to metadata tables. Internally, ColumnRegistry uses an Evolver to add dim_fNN/agg_fNN columns on the fly. The type is exported so enterprise deployments can substitute a custom DDL strategy (e.g., pt-online-schema-change or an approval workflow) by providing their own Evolver to the registry.
func NewEvolver ¶
NewEvolver creates an Evolver.
func (*Evolver) AddColumn ¶
AddColumn adds a new column to the table using online DDL (ALGORITHM=INPLACE, LOCK=NONE).
SQL safety: tableName and colName are validated via db.ValidateSQLIdentifier and quoted via db.QuoteIdentifier before interpolation. sqlType must come from internal callers (e.g., [dimSQLType]), never from user input.
type IndexConfig ¶
IndexConfig defines a desired index from index.yaml.
type IndexManager ¶
type IndexManager struct {
// contains filtered or unexported fields
}
IndexManager reconciles dynamic indexes on dimension columns.
func NewIndexManager ¶
NewIndexManager creates an IndexManager.
func (*IndexManager) DropIndex ¶
func (im *IndexManager) DropIndex(ctx context.Context, tableName, colName string) error
DropIndex removes a dynamic index if it exists.
func (*IndexManager) EnsureIndex ¶
func (im *IndexManager) EnsureIndex(ctx context.Context, tableName, colName string) error
EnsureIndex creates an index on a dimension column if it doesn't exist.
func (*IndexManager) Reconcile ¶
func (im *IndexManager) Reconcile(ctx context.Context, desired []IndexConfig) error
Reconcile ensures all desired indexes exist and removes stale ones. desiredIndexes maps table name to a set of column names that should be indexed.
type PartitionInfo ¶
PartitionInfo holds metadata about a single partition.
type PartitionManager ¶
type PartitionManager struct {
// contains filtered or unexported fields
}
PartitionManager manages MySQL RANGE partitions on the metadata table. Partitions are daily, keyed on min_timestamp (epoch nanoseconds).
func NewPartitionManager ¶
func NewPartitionManager(db *sql.DB, tableName string, lookaheadDays, cleanupAgeDays int, log *zap.Logger) *PartitionManager
NewPartitionManager creates a PartitionManager.
func (*PartitionManager) EnsureLookaheadPartitions ¶
func (pm *PartitionManager) EnsureLookaheadPartitions(ctx context.Context) (int, error)
EnsureLookaheadPartitions creates partitions ahead of today. It attempts to acquire an advisory lock and proceeds without it after a timeout.
func (*PartitionManager) RunMaintenance ¶
func (pm *PartitionManager) RunMaintenance(ctx context.Context) error
RunMaintenance acquires an advisory lock and runs partition maintenance. Returns immediately without waiting if another node holds the lock.
type RegistrySnapshot ¶
type RegistrySnapshot struct {
// contains filtered or unexported fields
}
RegistrySnapshot is an immutable point-in-time copy of a ColumnRegistry. Used by the query path to avoid lock contention with the ingestion path.
func (*RegistrySnapshot) AllAggEntries ¶
func (r *RegistrySnapshot) AllAggEntries() []*AggRegistryEntry
AllAggEntries returns all agg entries in the snapshot.
func (*RegistrySnapshot) AllDimEntries ¶
func (r *RegistrySnapshot) AllDimEntries() []*DimRegistryEntry
AllDimEntries returns all dim entries in the snapshot.
func (*RegistrySnapshot) ResolveAgg ¶
func (r *RegistrySnapshot) ResolveAgg(aggKey, aggValue, aggType string) string
ResolveAgg returns the column name for an aggregation, or empty string if not found.
func (*RegistrySnapshot) ResolveDim ¶
func (r *RegistrySnapshot) ResolveDim(dimKey string) string
ResolveDim returns the column name for a dimension key, or empty string if not found.