dbqcore

package module
v0.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

README

dbqcore

DataBridge Quality Core library is a part of dbqctl.

Available check functions:

  • Schema:
    • expect_columns_ordered: Validate table columns match an ordered list
    • expect_columns: Validate table has one of columns from unordered list
    • columns_not_present: Validate table doesn't have any columns from the list or matching pattern
  • Table:
    • row_count: Count of rows in the table
    • raw_query: Custom SQL query for complex validations
  • Column:
    • not_null: Check for null values in a column
    • freshness: Check data recency based on timestamp column
    • uniqueness: Check for unique values in a column
    • min/max: Minimum and maximum values for numeric columns
    • sum: Sum of values in a column
    • avg: Average of values in a column
    • stddev: Standard deviation of values in a column
Operators supported:
  • Comparison: <, >, <=, >=, ==, !=
  • Range: between X and Y
  • Function-only checks (like not_null, uniqueness)

Changelog

v0.5.0
Added
  • Schema Validation: New schema_checks support for validating database table schemas
    • expect_columns check to validate required column presence and types
    • expect_columns_ordered check to validate required column presence and types in specific order
    • columns_not_present check to ensure specific columns are not present by stop-list or pattern
  • Enhanced Check Configuration: New flexible checks format with improved YAML configuration
  • Database Adapter Architecture: Refactored to use adapter pattern for better database abstraction
  • Comprehensive Test Coverage: Added extensive test suites for all adapters and validation logic
  • CI/CD Pipeline: GitHub Actions workflow for automated testing
Improved
  • Performance: Enhanced query execution with optimized adapter interfaces
  • Configuration: More flexible check expression parsing and validation
  • Error Handling: Better validation and error reporting for check results
  • Code Quality: Comprehensive refactoring with improved maintainability

Documentation

Index

Constants

View Source
const (
	CheckTypeSchemaCheck = "schema_check"
	CheckTypeRawQuery    = "raw_query"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BetweenRange added in v0.5.0

type BetweenRange struct {
	Min interface{}
	Max interface{}
}

type CheckExpression added in v0.5.0

type CheckExpression struct {
	FunctionName       string
	FunctionParameters []string
	Scope              CheckScope
	Operator           string
	ThresholdValue     interface{}
}

func ParseCheckExpression added in v0.5.0

func ParseCheckExpression(expression string) (*CheckExpression, error)

type CheckScope added in v0.5.0

type CheckScope string
const (
	ScopeSchema CheckScope = "schema"
	ScopeTable  CheckScope = "table"
	ScopeColumn CheckScope = "column"
)

type ChecksFileConfig added in v0.1.0

type ChecksFileConfig struct {
	Version string           `yaml:"version"`
	Rules   []ValidationRule `yaml:"rules"`
}

func LoadChecksFileConfig added in v0.1.0

func LoadChecksFileConfig(fileName string) (*ChecksFileConfig, error)

type ColumnInfo

type ColumnInfo struct {
	Name     string
	Type     string
	Comment  string
	Position uint
}

ColumnInfo represents the basic information of a column.

type ColumnMetrics

type ColumnMetrics struct {
	ColumnName          string   `json:"col_name"`
	ColumnComment       string   `json:"col_comment"`
	ColumnPosition      uint     `json:"col_position"`
	DataType            string   `json:"data_type"`
	NullCount           uint64   `json:"null_count"`
	BlankCount          *int64   `json:"blank_count,omitempty"`         // string only
	MinValue            *float64 `json:"min_value,omitempty"`           // numeric only
	MaxValue            *float64 `json:"max_value,omitempty"`           // numeric only
	AvgValue            *float64 `json:"avg_value,omitempty"`           // numeric only
	StddevValue         *float64 `json:"stddev_value,omitempty"`        // numeric only (Population StdDev)
	MostFrequentValue   *string  `json:"most_frequent_value,omitempty"` // pointer to handle NULL as most frequent
	ProfilingDurationMs int64    `json:"profiling_duration_ms"`
}

ColumnMetrics represents the metrics of a column.

type ColumnsNotPresentConfig added in v0.5.0

type ColumnsNotPresentConfig struct {
	Columns []string `yaml:"columns,omitempty"`
	Pattern string   `yaml:"pattern,omitempty"`
}

type ConnectionConfig added in v0.1.0

type ConnectionConfig struct {
	Host     string `yaml:"host"`
	Port     int    `yaml:"port"`
	Username string `yaml:"username"`
	Password string `yaml:"password"`
	Database string `yaml:"database,omitempty"`
}

type DataQualityCheck added in v0.1.0

type DataQualityCheck struct {
	Expression  string       `yaml:"-"`
	Description string       `yaml:"desc,omitempty"`
	OnFail      OnFailAction `yaml:"on_fail,omitempty"`
	Query       string       `yaml:"query,omitempty"`

	// Schema check fields
	SchemaCheck *SchemaCheckConfig `yaml:"schema_check,omitempty"`
	ParsedCheck *CheckExpression   `yaml:"-"`
}

func (*DataQualityCheck) UnmarshalYAML added in v0.5.0

func (c *DataQualityCheck) UnmarshalYAML(node *yaml.Node) error

type DataQualityCheckType added in v0.1.0

type DataQualityCheckType string

DataQualityCheckType represents the type of data quality check.

type DataSource

type DataSource struct {
	ID            string           `yaml:"id"`
	Type          DataSourceType   `yaml:"type"`
	Configuration ConnectionConfig `yaml:"configuration"`
	Datasets      []string         `yaml:"datasets"`
}

type DataSourceType added in v0.0.8

type DataSourceType string
const (
	DataSourceTypeClickhouse DataSourceType = "clickhouse"
	DataSourceTypePostgresql DataSourceType = "postgresql"
	DataSourceTypeMysql      DataSourceType = "mysql"
)

type DbqConfig

type DbqConfig struct {
	Version     string       `yaml:"version"`
	DataSources []DataSource `yaml:"datasources"`
}

type DbqConnector

type DbqConnector interface {
	// Ping checks if the connection to the data source is alive.
	Ping(ctx context.Context) (string, error)

	// ImportDatasets imports datasets from the data source, with an optional filter.
	ImportDatasets(ctx context.Context, filter string) ([]string, error)
}

DbqConnector is the interface that wraps the basic connector methods.

type DbqDataProfiler added in v0.1.0

type DbqDataProfiler interface {
	// ProfileDataset is an entry point that runs profiling process by tying all specific profiling calls together
	// todo: consider extracting it into separate entity
	ProfileDataset(ctx context.Context, dataset string, sample bool, maxConcurrent int, collectErrors bool) (*TableMetrics, error)

	GetColumns(ctx context.Context, databaseName string, tableName string) ([]*ColumnInfo, error)
	GetTotalRows(ctx context.Context, dataset string) (uint64, error)
	GetNullCount(ctx context.Context, dataset string, column *ColumnInfo) (uint64, error)
	GetBlankCount(ctx context.Context, dataset string, column *ColumnInfo) (int64, error)
	GetNumericStats(ctx context.Context, dataset string, column *ColumnInfo) (*NumericStats, error)
	GetMostFrequentValue(ctx context.Context, dataset string, column *ColumnInfo) (*string, error)
	GetSampleData(ctx context.Context, dataset string) ([]map[string]interface{}, error)
	IsNumericType(dataType string) bool
	IsStringType(dataType string) bool
}

DbqDataProfiler is the interface that wraps the basic data profiling methods

type DbqDataSourceAdapter added in v0.5.0

type DbqDataSourceAdapter interface {
	// InterpretDataQualityCheck generates a SQL query specific for datasource for a data quality check
	InterpretDataQualityCheck(check *DataQualityCheck, dataset string, defaultWhere string) (string, error)

	// ExecuteQuery executes the SQL query and returns the query result
	ExecuteQuery(ctx context.Context, query string) (interface{}, error)
}

type DbqDataValidator added in v0.1.0

type DbqDataValidator interface {
	// RunCheck runs a data quality check and returns the result.
	RunCheck(ctx context.Context, adapter DbqDataSourceAdapter, check *DataQualityCheck, dataset string, defaultWhere string) *ValidationResult
}

DbqDataValidator is the interface that wraps the basic data validation methods.

func NewDbqDataValidator added in v0.5.0

func NewDbqDataValidator(logger *slog.Logger) DbqDataValidator

type DbqDataValidatorImpl added in v0.5.0

type DbqDataValidatorImpl struct {
	// contains filtered or unexported fields
}

func (DbqDataValidatorImpl) RunCheck added in v0.5.0

func (d DbqDataValidatorImpl) RunCheck(ctx context.Context, adapter DbqDataSourceAdapter, check *DataQualityCheck, dataset string, defaultWhere string) *ValidationResult

type ExpectColumnsConfig added in v0.5.0

type ExpectColumnsConfig struct {
	Columns []string `yaml:"columns"`
}

type ExpectColumnsOrderedConfig added in v0.5.0

type ExpectColumnsOrderedConfig struct {
	ColumnsOrder []string `yaml:"columns_order"`
}

type NumericStats added in v0.2.0

type NumericStats struct {
	MinValue    *float64
	MaxValue    *float64
	AvgValue    *float64
	StddevValue *float64
}

NumericStats represents the numeric statistics of a column.

type OnFailAction

type OnFailAction string
const (
	OnFailActionWarn  OnFailAction = "warn"
	OnFailActionError OnFailAction = "error"
)

type SchemaCheckConfig added in v0.5.0

type SchemaCheckConfig struct {
	ExpectColumnsOrdered *ExpectColumnsOrderedConfig `yaml:"expect_columns_ordered,omitempty"`
	ExpectColumns        *ExpectColumnsConfig        `yaml:"expect_columns,omitempty"`
	ColumnsNotPresent    *ColumnsNotPresentConfig    `yaml:"columns_not_present,omitempty"`
}

type TableMetrics

type TableMetrics struct {
	ProfiledAt          int64                     `json:"profiled_at"`
	TableName           string                    `json:"table_name"`
	DatabaseName        string                    `json:"database_name"`
	TotalRows           uint64                    `json:"total_rows"`
	ColumnsMetrics      map[string]*ColumnMetrics `json:"columns_metrics"`
	RowsSample          []map[string]interface{}  `json:"rows_sample"`
	ProfilingDurationMs int64                     `json:"profiling_duration_ms"`
	DbqErrors           []error                   `json:"__dbq_errors"`
}

TableMetrics represents the metrics of a table.

type TaskPool added in v0.0.5

type TaskPool struct {
	// contains filtered or unexported fields
}

func NewTaskPool added in v0.0.5

func NewTaskPool(poolSize int, logger *slog.Logger) *TaskPool

func (*TaskPool) Enqueue added in v0.0.5

func (tp *TaskPool) Enqueue(id string, task func() error)

func (*TaskPool) Errors added in v0.0.6

func (tp *TaskPool) Errors() []error

func (*TaskPool) Join added in v0.0.5

func (tp *TaskPool) Join()

type ValidationResult added in v0.0.7

type ValidationResult struct {
	CheckID          string `json:"check_id"`
	Pass             bool   `json:"pass"`
	QueryResultValue string `json:"query_result_value,omitempty"`
	Error            string `json:"error,omitempty"`
}

ValidationResult represents the result of a data quality check.

type ValidationRule added in v0.1.0

type ValidationRule struct {
	Dataset string             `yaml:"dataset"`
	Where   string             `yaml:"where,omitempty"`
	Checks  []DataQualityCheck `yaml:"checks"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL