Documentation
¶
Index ¶
- func IsLimiterRejected(err error) bool
- func IsPermanentWriteError(err error) bool
- type AdaptiveLimiterConfig
- type ChGoConfig
- type Config
- type FlushResult
- type TableConfig
- type Writer
- func (w *Writer) FlushTableEvents(ctx context.Context, tableEvents map[string][]*xatu.DecoratedEvent) *FlushResult
- func (w *Writer) Ping(ctx context.Context) error
- func (w *Writer) RegisterBatchFactories(routes []route.Route)
- func (w *Writer) Start(ctx context.Context) error
- func (w *Writer) Stop(_ context.Context) error
- func (w *Writer) ValidateColumns(ctx context.Context) error
- func (w *Writer) ValidateTables(ctx context.Context, routeTableNames []string) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsLimiterRejected ¶
IsLimiterRejected reports whether err was caused by adaptive concurrency limiter rejection.
func IsPermanentWriteError ¶
IsPermanentWriteError returns true for errors that will never succeed on retry: data-quality problems (bad values, parse failures) and code bugs (syntax errors, bad arguments). Schema mismatches (unknown table, missing column, column count) are intentionally excluded because they can be transient during rolling deployments when migrations haven't been applied yet — NAK + Kafka redelivery lets them self-resolve.
For joined errors (from multi-table concurrent flushes), all constituent errors must be permanent for the result to be permanent. A single transient failure in any table means the group should be NAK'd for retry.
Types ¶
type AdaptiveLimiterConfig ¶
type AdaptiveLimiterConfig struct {
// Enabled turns on adaptive concurrency limiting.
//
// Pointer type so that an explicit YAML "false" survives the
// defaults.Set pass — see FailOnMissingTables for the same rationale.
Enabled *bool `yaml:"enabled" default:"true"`
// MinLimit is the minimum concurrent INSERTs the limiter allows.
MinLimit uint `yaml:"minLimit" default:"1"`
// MaxLimit caps the maximum concurrent INSERTs the limiter allows.
MaxLimit uint `yaml:"maxLimit" default:"50"`
// InitialLimit is the starting concurrency before adaptation.
InitialLimit uint `yaml:"initialLimit" default:"8"`
// QueueInitialRejectionFactor controls the queue size below which
// requests are rejected during the initial learning phase.
QueueInitialRejectionFactor float64 `yaml:"queueInitialRejectionFactor" default:"2"`
// QueueMaxRejectionFactor controls the queue size below which
// requests are rejected after the initial learning phase.
QueueMaxRejectionFactor float64 `yaml:"queueMaxRejectionFactor" default:"3"`
}
AdaptiveLimiterConfig configures per-table adaptive concurrency limiting. When enabled, each table writer independently adjusts its INSERT concurrency based on observed ClickHouse latency using an AIMD algorithm.
func (*AdaptiveLimiterConfig) IsEnabled ¶
func (c *AdaptiveLimiterConfig) IsEnabled() bool
IsEnabled resolves the Enabled pointer.
Returns false when nil (unset) — see Config.ShouldFailOnMissingTables for the rationale; same applies here. YAML callers pick up the documented `default:"true"` via creasty/defaults during load.
func (*AdaptiveLimiterConfig) Validate ¶
func (c *AdaptiveLimiterConfig) Validate() error
Validate checks the adaptive limiter configuration for errors.
type ChGoConfig ¶
type ChGoConfig struct {
// DialTimeout is the timeout for establishing a connection to ClickHouse.
DialTimeout time.Duration `yaml:"dialTimeout" default:"5s"`
// ReadTimeout is the timeout for reading responses from ClickHouse.
ReadTimeout time.Duration `yaml:"readTimeout" default:"30s"`
// QueryTimeout is the per-attempt timeout for ch-go operations.
// Set to 0 to disable timeout wrapping.
QueryTimeout time.Duration `yaml:"queryTimeout" default:"30s"`
// MaxRetries is the number of retry attempts after the initial try.
MaxRetries int `yaml:"maxRetries" default:"3"`
// RetryBaseDelay is the initial delay before retry attempt 1.
RetryBaseDelay time.Duration `yaml:"retryBaseDelay" default:"100ms"`
// RetryMaxDelay caps exponential retry backoff.
RetryMaxDelay time.Duration `yaml:"retryMaxDelay" default:"2s"`
// MaxConns is the maximum number of pooled ClickHouse connections.
MaxConns int32 `yaml:"maxConns" default:"64"`
// MinConns is the minimum number of pooled ClickHouse connections.
MinConns int32 `yaml:"minConns" default:"1"`
// ConnMaxLifetime is the maximum lifetime for pooled connections.
ConnMaxLifetime time.Duration `yaml:"connMaxLifetime" default:"1h"`
// ConnMaxIdleTime is the maximum idle time for pooled connections.
ConnMaxIdleTime time.Duration `yaml:"connMaxIdleTime" default:"10m"`
// HealthCheckPeriod is the interval for pool health checks.
HealthCheckPeriod time.Duration `yaml:"healthCheckPeriod" default:"30s"`
// PoolMetricsInterval controls how often pool stats are sampled.
// Set to 0 to disable pool metrics collection.
PoolMetricsInterval time.Duration `yaml:"poolMetricsInterval" default:"15s"`
// GroupRetryMaxAttempts is the number of retry attempts at the
// processGroup level for partial table failures (e.g. fanout where
// some tables succeed and others fail transiently). Only failed tables
// are retried, preventing duplicate writes to already-succeeded tables.
GroupRetryMaxAttempts int `yaml:"groupRetryMaxAttempts" default:"3"`
// GroupRetryBaseDelay is the initial delay before the first group retry.
GroupRetryBaseDelay time.Duration `yaml:"groupRetryBaseDelay" default:"1s"`
// GroupRetryMaxDelay caps exponential backoff for group retries.
GroupRetryMaxDelay time.Duration `yaml:"groupRetryMaxDelay" default:"30s"`
// AdaptiveLimiter configures per-table adaptive concurrency limiting.
AdaptiveLimiter AdaptiveLimiterConfig `yaml:"adaptiveLimiter"`
}
ChGoConfig configures the ch-go backend query retries and connection pooling.
func (*ChGoConfig) Validate ¶
func (c *ChGoConfig) Validate() error
Validate checks the ch-go backend configuration for errors.
type Config ¶
type Config struct {
// DSN is the ClickHouse connection string.
DSN string `yaml:"dsn"`
// TLS configures TLS for the ClickHouse connection. The DSN scheme
// "clickhouses://" still triggers TLS, but this allows custom CA and
// client certificate settings.
TLS xtls.Config `yaml:"tls"`
// TableSuffix is appended to every table name before writing.
// For example, set to "_local" to bypass Distributed tables and write
// directly to ReplicatedMergeTree tables in a clustered setup.
TableSuffix string `yaml:"tableSuffix"`
// FailOnMissingTables controls whether missing ClickHouse tables cause
// a fatal startup error. When true (default), startup is aborted if any
// registered route table does not exist in the target database. Set to
// false to downgrade to warnings and allow startup to proceed.
//
// Pointer type so that an explicit YAML "false" survives the
// defaults.Set pass — a plain bool would be indistinguishable from
// "unset" and silently overridden back to the default.
FailOnMissingTables *bool `yaml:"failOnMissingTables" default:"true"`
// Defaults are the default table settings.
Defaults TableConfig `yaml:"defaults"`
// Tables contains per-table overrides for batch settings.
Tables map[string]TableConfig `yaml:"tables"`
// ChGo configures ch-go backend retry/pooling behavior.
ChGo ChGoConfig `yaml:"chgo"`
}
Config configures the ClickHouse writer.
func (*Config) ShouldFailOnMissingTables ¶
ShouldFailOnMissingTables resolves the FailOnMissingTables pointer.
Returns false when the field is nil (unset). The documented default is `true`, but that default is applied by creasty/defaults during YAML load via defaults.Set — production code paths run that pass and arrive here with FailOnMissingTables explicitly set. A nil pointer therefore means "code constructed Config without going through defaults.Set", for which the safe behaviour is "off" (don't fail startup on a missing table that the caller never opted in to caring about).
func (*Config) TableConfigFor ¶
func (c *Config) TableConfigFor(table string) TableConfig
TableConfigFor returns the merged table config for a given table name, using per-table overrides on top of defaults.
type FlushResult ¶
type FlushResult struct {
// InvalidEvents contains events that failed FlattenTo with
// route.ErrInvalidEvent. These are permanently unflattenable and
// should be sent to the DLQ by the caller.
InvalidEvents []*xatu.DecoratedEvent
// TableErrors maps base table names to their flush errors.
// Tables that succeeded are absent from the map.
TableErrors map[string]error
}
FlushResult holds the structured outcome of a FlushTableEvents call.
func (*FlushResult) Err ¶
func (r *FlushResult) Err() error
Err returns a joined error of all table failures, or nil if all succeeded.
type TableConfig ¶
type TableConfig struct {
// SkipFlattenErrors when true skips events that fail FlattenTo
// instead of failing the entire batch. Default false = fail-fast.
SkipFlattenErrors bool `yaml:"skipFlattenErrors"`
// InsertSettings appends ClickHouse SETTINGS to INSERT statements.
// Canonical tables (name prefix "canonical_") default to
// insert_quorum=auto unless explicitly overridden.
// Example:
// insertSettings:
// insert_quorum: 2
// insert_quorum_timeout: 60000
InsertSettings map[string]any `yaml:"insertSettings"`
}
TableConfig holds per-table settings for the ClickHouse writer.
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer manages batched inserts using the ch-go client.
func NewWriter ¶
func NewWriter( log logrus.FieldLogger, config *Config, metrics *telemetry.Metrics, ) (*Writer, error)
NewWriter creates a new ch-go writer.
func (*Writer) FlushTableEvents ¶
func (w *Writer) FlushTableEvents( ctx context.Context, tableEvents map[string][]*xatu.DecoratedEvent, ) *FlushResult
FlushTableEvents writes the given events directly to their respective ClickHouse tables concurrently. The map keys are base table names (without suffix). Returns a FlushResult containing per-table errors and any invalid events that should be sent to the DLQ.
Concurrency model: each table flush runs in its own goroutine. The effective global concurrency cap is the ch-go pool's MaxConns, since each flush must acquire a pool connection. The per-table adaptive limiter further constrains concurrency per table. Goroutines blocked on pool.Do hold their batch data in memory; the aggregate footprint is bounded by (streams × maxInFlight × avg_batch_size).
func (*Writer) RegisterBatchFactories ¶
RegisterBatchFactories extracts ColumnarBatch factories from routes and stores them keyed by base table name. This must be called before Write.
func (*Writer) Stop ¶
Stop closes the connection pool. It is safe to call multiple times; only the first call performs cleanup.
func (*Writer) ValidateColumns ¶
ValidateColumns asserts that every column each registered route writes exists in its target ClickHouse table. Catches the schema-vs-code skew that otherwise surfaces only at first-INSERT time as `DB::Exception: There is no column 'X' in table` and stalls the deriver's checkpoint while it retries forever.
Direction asymmetry is intentional: only "route writes a column the table is missing" is fatal. The other direction (table has a column the route doesn't write) is fine — those columns are simply not listed in the INSERT and ClickHouse fills them with DEFAULT. That is the supported "cannon writes to legacy schema, leaves geo columns empty" semantic.
Honors the same `failOnMissingTables` toggle as ValidateTables — set it false to downgrade to warnings.
func (*Writer) ValidateTables ¶
ValidateTables queries ClickHouse system.tables and checks that every registered route table (with TableSuffix applied) exists. By default, missing tables cause a fatal startup error. Set FailOnMissingTables to false to downgrade to warnings.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
beacon/cmd/chrowgen
command
|
|
|
cmd/chgo-rowgen
command
|
|
|
cmd/generate
command
Command generate spins up a ClickHouse container via testcontainers, applies all migrations, and regenerates every .gen.go file using chgo-rowgen.
|
Command generate spins up a ClickHouse container via testcontainers, applies all migrations, and regenerates every .gen.go file using chgo-rowgen. |
|
testfixture
Package testfixture provides shared test helpers for per-table snapshot tests across the flattener domain packages.
|
Package testfixture provides shared test helpers for per-table snapshot tests across the flattener domain packages. |
|
Package tls provides shared TLS configuration for consumoor sinks and sources.
|
Package tls provides shared TLS configuration for consumoor sinks and sources. |