clickhouse

package
v1.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2026 License: GPL-3.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsLimiterRejected

func IsLimiterRejected(err error) bool

IsLimiterRejected reports whether err was caused by adaptive concurrency limiter rejection.

func IsPermanentWriteError

func IsPermanentWriteError(err error) bool

IsPermanentWriteError returns true for errors that will never succeed on retry: data-quality problems (bad values, parse failures) and code bugs (syntax errors, bad arguments). Schema mismatches (unknown table, missing column, column count) are intentionally excluded because they can be transient during rolling deployments when migrations haven't been applied yet — NAK + Kafka redelivery lets them self-resolve.

For joined errors (from multi-table concurrent flushes), all constituent errors must be permanent for the result to be permanent. A single transient failure in any table means the group should be NAK'd for retry.

Types

type AdaptiveLimiterConfig

type AdaptiveLimiterConfig struct {
	// Enabled turns on adaptive concurrency limiting.
	//
	// Pointer type so that an explicit YAML "false" survives the
	// defaults.Set pass — see FailOnMissingTables for the same rationale.
	Enabled *bool `yaml:"enabled" default:"true"`
	// MinLimit is the minimum concurrent INSERTs the limiter allows.
	MinLimit uint `yaml:"minLimit" default:"1"`
	// MaxLimit caps the maximum concurrent INSERTs the limiter allows.
	MaxLimit uint `yaml:"maxLimit" default:"50"`
	// InitialLimit is the starting concurrency before adaptation.
	InitialLimit uint `yaml:"initialLimit" default:"8"`
	// QueueInitialRejectionFactor controls the queue size below which
	// requests are rejected during the initial learning phase.
	QueueInitialRejectionFactor float64 `yaml:"queueInitialRejectionFactor" default:"2"`
	// QueueMaxRejectionFactor controls the queue size below which
	// requests are rejected after the initial learning phase.
	QueueMaxRejectionFactor float64 `yaml:"queueMaxRejectionFactor" default:"3"`
}

AdaptiveLimiterConfig configures per-table adaptive concurrency limiting. When enabled, each table writer independently adjusts its INSERT concurrency based on observed ClickHouse latency using an AIMD algorithm.

func (*AdaptiveLimiterConfig) IsEnabled

func (c *AdaptiveLimiterConfig) IsEnabled() bool

IsEnabled resolves the Enabled pointer.

Returns false when nil (unset) — see Config.ShouldFailOnMissingTables for the rationale; same applies here. YAML callers pick up the documented `default:"true"` via creasty/defaults during load.

func (*AdaptiveLimiterConfig) Validate

func (c *AdaptiveLimiterConfig) Validate() error

Validate checks the adaptive limiter configuration for errors.

type ChGoConfig

type ChGoConfig struct {
	// DialTimeout is the timeout for establishing a connection to ClickHouse.
	DialTimeout time.Duration `yaml:"dialTimeout" default:"5s"`
	// ReadTimeout is the timeout for reading responses from ClickHouse.
	ReadTimeout time.Duration `yaml:"readTimeout" default:"30s"`

	// QueryTimeout is the per-attempt timeout for ch-go operations.
	// Set to 0 to disable timeout wrapping.
	QueryTimeout time.Duration `yaml:"queryTimeout" default:"30s"`

	// MaxRetries is the number of retry attempts after the initial try.
	MaxRetries int `yaml:"maxRetries" default:"3"`
	// RetryBaseDelay is the initial delay before retry attempt 1.
	RetryBaseDelay time.Duration `yaml:"retryBaseDelay" default:"100ms"`
	// RetryMaxDelay caps exponential retry backoff.
	RetryMaxDelay time.Duration `yaml:"retryMaxDelay" default:"2s"`

	// MaxConns is the maximum number of pooled ClickHouse connections.
	MaxConns int32 `yaml:"maxConns" default:"64"`
	// MinConns is the minimum number of pooled ClickHouse connections.
	MinConns int32 `yaml:"minConns" default:"1"`
	// ConnMaxLifetime is the maximum lifetime for pooled connections.
	ConnMaxLifetime time.Duration `yaml:"connMaxLifetime" default:"1h"`
	// ConnMaxIdleTime is the maximum idle time for pooled connections.
	ConnMaxIdleTime time.Duration `yaml:"connMaxIdleTime" default:"10m"`
	// HealthCheckPeriod is the interval for pool health checks.
	HealthCheckPeriod time.Duration `yaml:"healthCheckPeriod" default:"30s"`

	// PoolMetricsInterval controls how often pool stats are sampled.
	// Set to 0 to disable pool metrics collection.
	PoolMetricsInterval time.Duration `yaml:"poolMetricsInterval" default:"15s"`

	// GroupRetryMaxAttempts is the number of retry attempts at the
	// processGroup level for partial table failures (e.g. fanout where
	// some tables succeed and others fail transiently). Only failed tables
	// are retried, preventing duplicate writes to already-succeeded tables.
	GroupRetryMaxAttempts int `yaml:"groupRetryMaxAttempts" default:"3"`
	// GroupRetryBaseDelay is the initial delay before the first group retry.
	GroupRetryBaseDelay time.Duration `yaml:"groupRetryBaseDelay" default:"1s"`
	// GroupRetryMaxDelay caps exponential backoff for group retries.
	GroupRetryMaxDelay time.Duration `yaml:"groupRetryMaxDelay" default:"30s"`

	// AdaptiveLimiter configures per-table adaptive concurrency limiting.
	AdaptiveLimiter AdaptiveLimiterConfig `yaml:"adaptiveLimiter"`
}

ChGoConfig configures the ch-go backend query retries and connection pooling.

func (*ChGoConfig) Validate

func (c *ChGoConfig) Validate() error

Validate checks the ch-go backend configuration for errors.

type Config

type Config struct {
	// DSN is the ClickHouse connection string.
	DSN string `yaml:"dsn"`

	// TLS configures TLS for the ClickHouse connection. The DSN scheme
	// "clickhouses://" still triggers TLS, but this allows custom CA and
	// client certificate settings.
	TLS xtls.Config `yaml:"tls"`

	// TableSuffix is appended to every table name before writing.
	// For example, set to "_local" to bypass Distributed tables and write
	// directly to ReplicatedMergeTree tables in a clustered setup.
	TableSuffix string `yaml:"tableSuffix"`

	// FailOnMissingTables controls whether missing ClickHouse tables cause
	// a fatal startup error. When true (default), startup is aborted if any
	// registered route table does not exist in the target database. Set to
	// false to downgrade to warnings and allow startup to proceed.
	//
	// Pointer type so that an explicit YAML "false" survives the
	// defaults.Set pass — a plain bool would be indistinguishable from
	// "unset" and silently overridden back to the default.
	FailOnMissingTables *bool `yaml:"failOnMissingTables" default:"true"`

	// Defaults are the default table settings.
	Defaults TableConfig `yaml:"defaults"`

	// Tables contains per-table overrides for batch settings.
	Tables map[string]TableConfig `yaml:"tables"`

	// ChGo configures ch-go backend retry/pooling behavior.
	ChGo ChGoConfig `yaml:"chgo"`
}

Config configures the ClickHouse writer.

func (*Config) ShouldFailOnMissingTables

func (c *Config) ShouldFailOnMissingTables() bool

ShouldFailOnMissingTables resolves the FailOnMissingTables pointer.

Returns false when the field is nil (unset). The documented default is `true`, but that default is applied by creasty/defaults during YAML load via defaults.Set — production code paths run that pass and arrive here with FailOnMissingTables explicitly set. A nil pointer therefore means "code constructed Config without going through defaults.Set", for which the safe behaviour is "off" (don't fail startup on a missing table that the caller never opted in to caring about).

func (*Config) TableConfigFor

func (c *Config) TableConfigFor(table string) TableConfig

TableConfigFor returns the merged table config for a given table name, using per-table overrides on top of defaults.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks the ClickHouse configuration for errors.

type FlushResult

type FlushResult struct {
	// InvalidEvents contains events that failed FlattenTo with
	// route.ErrInvalidEvent. These are permanently unflattenable and
	// should be sent to the DLQ by the caller.
	InvalidEvents []*xatu.DecoratedEvent

	// TableErrors maps base table names to their flush errors.
	// Tables that succeeded are absent from the map.
	TableErrors map[string]error
}

FlushResult holds the structured outcome of a FlushTableEvents call.

func (*FlushResult) Err

func (r *FlushResult) Err() error

Err returns a joined error of all table failures, or nil if all succeeded.

type TableConfig

type TableConfig struct {
	// SkipFlattenErrors when true skips events that fail FlattenTo
	// instead of failing the entire batch. Default false = fail-fast.
	SkipFlattenErrors bool `yaml:"skipFlattenErrors"`
	// InsertSettings appends ClickHouse SETTINGS to INSERT statements.
	// Canonical tables (name prefix "canonical_") default to
	// insert_quorum=auto unless explicitly overridden.
	// Example:
	// insertSettings:
	//   insert_quorum: 2
	//   insert_quorum_timeout: 60000
	InsertSettings map[string]any `yaml:"insertSettings"`
}

TableConfig holds per-table settings for the ClickHouse writer.

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer manages batched inserts using the ch-go client.

func NewWriter

func NewWriter(
	log logrus.FieldLogger,
	config *Config,
	metrics *telemetry.Metrics,
) (*Writer, error)

NewWriter creates a new ch-go writer.

func (*Writer) FlushTableEvents

func (w *Writer) FlushTableEvents(
	ctx context.Context,
	tableEvents map[string][]*xatu.DecoratedEvent,
) *FlushResult

FlushTableEvents writes the given events directly to their respective ClickHouse tables concurrently. The map keys are base table names (without suffix). Returns a FlushResult containing per-table errors and any invalid events that should be sent to the DLQ.

Concurrency model: each table flush runs in its own goroutine. The effective global concurrency cap is the ch-go pool's MaxConns, since each flush must acquire a pool connection. The per-table adaptive limiter further constrains concurrency per table. Goroutines blocked on pool.Do hold their batch data in memory; the aggregate footprint is bounded by (streams × maxInFlight × avg_batch_size).

func (*Writer) Ping

func (w *Writer) Ping(ctx context.Context) error

Ping checks connectivity to the ClickHouse pool.

func (*Writer) RegisterBatchFactories

func (w *Writer) RegisterBatchFactories(routes []route.Route)

RegisterBatchFactories extracts ColumnarBatch factories from routes and stores them keyed by base table name. This must be called before Write.

func (*Writer) Start

func (w *Writer) Start(ctx context.Context) error

Start dials the pool and verifies connectivity.

func (*Writer) Stop

func (w *Writer) Stop(_ context.Context) error

Stop closes the connection pool. It is safe to call multiple times; only the first call performs cleanup.

func (*Writer) ValidateColumns

func (w *Writer) ValidateColumns(ctx context.Context) error

ValidateColumns asserts that every column each registered route writes exists in its target ClickHouse table. Catches the schema-vs-code skew that otherwise surfaces only at first-INSERT time as `DB::Exception: There is no column 'X' in table` and stalls the deriver's checkpoint while it retries forever.

Direction asymmetry is intentional: only "route writes a column the table is missing" is fatal. The other direction (table has a column the route doesn't write) is fine — those columns are simply not listed in the INSERT and ClickHouse fills them with DEFAULT. That is the supported "cannon writes to legacy schema, leaves geo columns empty" semantic.

Honors the same `failOnMissingTables` toggle as ValidateTables — set it false to downgrade to warnings.

func (*Writer) ValidateTables

func (w *Writer) ValidateTables(ctx context.Context, routeTableNames []string) error

ValidateTables queries ClickHouse system.tables and checks that every registered route table (with TableSuffix applied) exists. By default, missing tables cause a fatal startup error. Set FailOnMissingTables to false to downgrade to warnings.

Directories

Path Synopsis
all
cmd/chgo-rowgen command
cmd/generate command
Command generate spins up a ClickHouse container via testcontainers, applies all migrations, and regenerates every .gen.go file using chgo-rowgen.
Command generate spins up a ClickHouse container via testcontainers, applies all migrations, and regenerates every .gen.go file using chgo-rowgen.
mev
testfixture
Package testfixture provides shared test helpers for per-table snapshot tests across the flattener domain packages.
Package testfixture provides shared test helpers for per-table snapshot tests across the flattener domain packages.
Package tls provides shared TLS configuration for consumoor sinks and sources.
Package tls provides shared TLS configuration for consumoor sinks and sources.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL