Documentation
¶
Overview ¶
Package helix provides a high-availability dual-database client library designed to support "Shared Nothing" architecture.
Helix provides robustness through active-active dual writes, sticky reads, and asynchronous reconciliation for independent Cassandra clusters.
Key Features ¶
- Dual Active-Active Writes: Concurrent writes to two independent clusters
- Sticky Read Routing: Per-client sticky reads to maximize cache hits
- Active Failover: Immediate failover to secondary cluster on read failures
- Replay System: Asynchronous reconciliation via in-memory queue or NATS JetStream
- Drop-in Replacement: Interface-based design mirrors gocql API
Basic Usage ¶
// Create sessions for both clusters
sessionA := v1.NewSession(clusterA.CreateSession())
sessionB := v1.NewSession(clusterB.CreateSession())
// Create Helix client
client, err := helix.NewCQLClient(sessionA, sessionB,
helix.WithReadStrategy(policy.NewStickyRead()),
helix.WithWriteStrategy(policy.NewConcurrentDualWrite()),
)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Use like a normal gocql session
err = client.Query("INSERT INTO users (id, name) VALUES (?, ?)", id, name).Exec()
Error Handling ¶
Helix uses standard Go errors with clear semantics for dual-cluster operations.
Write Operation Errors ¶
Write operations (Exec, ExecContext, batch.Exec) follow this error model:
- nil: At least one cluster succeeded (partial writes are queued for replay)
- error: Both clusters failed (operation completely failed)
When both clusters fail, a types.DualClusterError is returned:
err := client.Query("INSERT INTO ...").Exec()
if err != nil {
var dualErr *types.DualClusterError
if errors.As(err, &dualErr) {
// Both clusters failed, inspect individual errors
log.Printf("Cluster A: %v", dualErr.ErrorA)
log.Printf("Cluster B: %v", dualErr.ErrorB)
}
}
Sentinel Errors ¶
Helix defines several sentinel errors for specific scenarios:
- types.ErrSessionClosed: Operation attempted on closed client
- types.ErrReplayQueueFull: Replay queue at capacity, cannot enqueue failed write
- types.ErrNoAvailableCluster: No cluster available for reads (both down/draining)
- types.ErrBothClustersDraining: Both clusters in drain mode, writes rejected
- types.ErrWriteAsync: Write sent async to degraded cluster (AdaptiveDualWrite only)
- types.ErrWriteDropped: Write dropped due to concurrency limit (AdaptiveDualWrite only)
Check for sentinel errors using errors.Is:
if errors.Is(err, types.ErrSessionClosed) {
// Handle closed session
}
Wrapped Errors ¶
Cluster-specific errors are wrapped in types.ClusterError:
var clusterErr *types.ClusterError
if errors.As(err, &clusterErr) {
log.Printf("Cluster %s failed during %s: %v",
clusterErr.Cluster, clusterErr.Operation, clusterErr.Cause)
}
Context and Timeouts ¶
Context usage follows Go idioms. Two equivalent patterns are supported:
// Pattern 1: Direct context method (preferred for simple cases)
err := client.Query("SELECT ...").ExecContext(ctx)
// Pattern 2: Method chaining (useful with multiple options)
err := client.Query("SELECT ...").
Consistency(helix.Quorum).
WithContext(ctx).
Exec()
Both patterns respect context cancellation and deadlines. For dual-writes, each cluster operation gets an independent timeout context to prevent one slow cluster from blocking the other.
Idempotency and Timestamps ¶
Helix uses client-generated timestamps to ensure replay operations are idempotent. Without proper timestamps:
- Replayed writes may overwrite newer data
- Last-write-wins semantics may be violated
Always configure a timestamp provider or use WithTimestamp():
helix.WithTimestampProvider(func() int64 {
return time.Now().UnixMicro()
})
CAS/LWT Warning ¶
Lightweight Transactions (INSERT ... IF NOT EXISTS, ScanCAS, etc.) are NOT safe in a shared-nothing dual-cluster architecture. Helix does not guarantee correctness for CAS/LWT operations.
Example (ContextUsage) ¶
Example_contextUsage demonstrates two patterns for context handling in queries.
var client *CQLClient
ctx := context.Background()
// Pattern 1: Direct context method (simple cases)
err := client.Query("INSERT INTO users (id, name) VALUES (?, ?)", "user-1", "Alice").
ExecContext(ctx)
_ = err
// Pattern 2: Method chaining (multiple options)
err = client.Query("SELECT name FROM users WHERE id = ?", "user-1").
Consistency(Quorum).
WithContext(ctx).
Exec()
_ = err
Example (ErrorHandling) ¶
Example_errorHandling demonstrates inspecting DualClusterError when both clusters fail.
var client *CQLClient
// Perform a write operation
err := client.Query("INSERT INTO users (id, name) VALUES (?, ?)", "user-1", "Alice").Exec()
// Check if both clusters failed
if err != nil {
var dualErr *types.DualClusterError
if errors.As(err, &dualErr) {
// Inspect individual cluster errors
_ = dualErr.ErrorA // Error from cluster A
_ = dualErr.ErrorB // Error from cluster B
}
// Check for specific sentinel errors
if errors.Is(err, types.ErrSessionClosed) {
// Session was closed, recreate client or handle gracefully
return
}
}
Index ¶
- Constants
- func DefaultTimestampProvider() int64
- type Batch
- type BatchStatement
- type BatchType
- type CQLClient
- func (c *CQLClient) Batch(kind BatchType) Batch
- func (c *CQLClient) Close()
- func (c *CQLClient) Config() *ClientConfig
- func (c *CQLClient) DefaultExecuteFunc() replay.ExecuteFunc
- func (c *CQLClient) ExecuteBatch(batch Batch) errordeprecated
- func (c *CQLClient) ExecuteBatchCAS(batch Batch, dest ...any) (applied bool, iter Iter, err error)deprecated
- func (c *CQLClient) IsDraining(cluster ClusterID) bool
- func (c *CQLClient) IsSingleCluster() bool
- func (c *CQLClient) MapExecuteBatchCAS(batch Batch, dest map[string]any) (applied bool, iter Iter, err error)deprecated
- func (c *CQLClient) NewBatch(kind BatchType) Batchdeprecated
- func (c *CQLClient) Query(stmt string, values ...any) Query
- func (c *CQLClient) Session() CQLSession
- func (c *CQLClient) SessionA() cql.Session
- func (c *CQLClient) SessionB() cql.Session
- type CQLSession
- type ClientConfig
- type ClusterID
- type ClusterNames
- type ColumnInfo
- type Consistency
- type FailoverPolicy
- type Iter
- type LatencyRecorder
- type Logger
- type MetricsCollector
- type Option
- func WithAutoMemoryWorker(queueCapacity int, workerOpts ...replay.WorkerOption) Option
- func WithClusterNames(nameA, nameB string) Option
- func WithFailoverPolicy(policy FailoverPolicy) Option
- func WithLogger(logger types.Logger) Option
- func WithMetrics(collector MetricsCollector) Option
- func WithOnReplayDropped(handler ReplayDroppedHandler) Option
- func WithReadStrategy(strategy ReadStrategy) Option
- func WithReplayWorker(worker ReplayWorker) Option
- func WithReplayer(replayer Replayer) Option
- func WithTimestampProvider(fn TimestampProvider) Option
- func WithTopologyWatcher(watcher TopologyWatcher) Option
- func WithWriteStrategy(strategy WriteStrategy) Option
- type PriorityLevel
- type Query
- type ReadStrategy
- type ReplayDroppedHandler
- type ReplayPayload
- type ReplayWorker
- type Replayer
- type SQLClient
- func (c *SQLClient) Close() error
- func (c *SQLClient) Exec(query string, args ...any) (sql.Result, error)
- func (c *SQLClient) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
- func (c *SQLClient) IsSingleCluster() bool
- func (c *SQLClient) Ping() error
- func (c *SQLClient) PingContext(ctx context.Context) error
- func (c *SQLClient) Query(query string, args ...any) (*sql.Rows, error)
- func (c *SQLClient) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
- func (c *SQLClient) QueryRow(query string, args ...any) *sql.Row
- func (c *SQLClient) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
- type Scanner
- type TimestampProvider
- type TopologyOperator
- type TopologyUpdate
- type TopologyWatcher
- type WriteStrategy
Examples ¶
Constants ¶
const ( ClusterA = types.ClusterA ClusterB = types.ClusterB )
Re-export cluster ID constants for convenience.
const ( Any = types.Any One = types.One Two = types.Two Three = types.Three Quorum = types.Quorum All = types.All LocalQuorum = types.LocalQuorum EachQuorum = types.EachQuorum Serial = types.Serial LocalSerial = types.LocalSerial LocalOne = types.LocalOne )
Re-export consistency level constants for convenience.
const ( LoggedBatch = types.LoggedBatch UnloggedBatch = types.UnloggedBatch CounterBatch = types.CounterBatch )
Re-export batch type constants for convenience.
const ( PriorityHigh = types.PriorityHigh PriorityLow = types.PriorityLow )
Re-export priority level constants for convenience.
Variables ¶
This section is empty.
Functions ¶
func DefaultTimestampProvider ¶
func DefaultTimestampProvider() int64
DefaultTimestampProvider returns the current time in microseconds.
Types ¶
type Batch ¶
type Batch interface {
// Query adds a statement to the batch.
//
// Parameters:
// - stmt: CQL statement with ? placeholders
// - args: Values to bind to placeholders
//
// Returns:
// - Batch: The same batch for chaining
Query(stmt string, args ...any) Batch
// Consistency sets the consistency level for the batch.
//
// Parameters:
// - c: Consistency level
//
// Returns:
// - Batch: The same batch for chaining
Consistency(c Consistency) Batch
// SetConsistency sets the consistency level for the batch.
//
// Deprecated: Use Consistency() instead. This method exists for gocql v1 compatibility.
//
// Parameters:
// - c: Consistency level
SetConsistency(c Consistency)
// WithContext associates a context with the batch.
//
// Parameters:
// - ctx: Context for cancellation and timeout
//
// Returns:
// - Batch: The same batch for chaining
WithContext(ctx context.Context) Batch
// WithTimestamp sets a specific timestamp for all statements in the batch.
//
// Parameters:
// - ts: Timestamp in microseconds since Unix epoch
//
// Returns:
// - Batch: The same batch for chaining
WithTimestamp(ts int64) Batch
// WithPriority sets the replay priority level for this batch operation.
//
// Priority affects replay processing order when a write fails on one cluster.
// High priority writes are processed before low priority writes.
// If not set, defaults to PriorityHigh.
//
// Parameters:
// - p: Priority level (PriorityHigh or PriorityLow)
//
// Returns:
// - Batch: The same batch for chaining
WithPriority(p PriorityLevel) Batch
// SerialConsistency sets the consistency level for the serial phase of CAS operations.
//
// This only applies to batch CAS operations.
// Valid values are Serial or LocalSerial.
//
// Parameters:
// - c: Serial consistency level (Serial or LocalSerial)
//
// Returns:
// - Batch: The same batch for chaining
SerialConsistency(c Consistency) Batch
// Size returns the number of statements in the batch.
//
// Returns:
// - int: Number of statements added to the batch
Size() int
// Exec executes the batch using the Write Strategy.
//
// This triggers concurrent dual-write to both clusters.
//
// Returns:
// - error: nil on success, error if both clusters fail
Exec() error
// ExecContext executes the batch with the given context.
//
// Parameters:
// - ctx: Context for cancellation and timeout
//
// Returns:
// - error: nil on success, error if both clusters fail
ExecContext(ctx context.Context) error
// IterContext executes the batch with context and returns an iterator.
//
// This is useful for accessing execution metadata like latency and attempts.
// The iterator reads from the selected cluster based on sticky routing.
//
// Parameters:
// - ctx: Context for cancellation and timeout
//
// Returns:
// - Iter: Iterator for accessing batch execution results
IterContext(ctx context.Context) Iter
// ExecCAS executes a batch with lightweight transactions.
//
// NOTE: CAS operations are executed on a single cluster and are NOT replicated.
//
// Parameters:
// - dest: Pointers to variables to receive previous values if not applied
//
// Returns:
// - applied: true if the CAS operation was applied
// - iter: Iterator for scanning additional CAS result rows
// - err: error if the operation failed
ExecCAS(dest ...any) (applied bool, iter Iter, err error)
// ExecCASContext executes a batch with lightweight transactions and context.
//
// NOTE: CAS operations are executed on a single cluster and are NOT replicated.
//
// Parameters:
// - ctx: Context for cancellation and timeout
// - dest: Pointers to variables to receive previous values if not applied
//
// Returns:
// - applied: true if the CAS operation was applied
// - iter: Iterator for scanning additional CAS result rows
// - err: error if the operation failed
ExecCASContext(ctx context.Context, dest ...any) (applied bool, iter Iter, err error)
// MapExecCAS executes a batch with lightweight transactions and returns previous values as a map.
//
// NOTE: CAS operations are executed on a single cluster and are NOT replicated.
//
// Parameters:
// - dest: Map to receive previous values if not applied
//
// Returns:
// - applied: true if the CAS operation was applied
// - iter: Iterator for scanning additional CAS result rows
// - err: error if the operation failed
MapExecCAS(dest map[string]any) (applied bool, iter Iter, err error)
// MapExecCASContext executes a batch with lightweight transactions, context, and map result.
//
// NOTE: CAS operations are executed on a single cluster and are NOT replicated.
//
// Parameters:
// - ctx: Context for cancellation and timeout
// - dest: Map to receive previous values if not applied
//
// Returns:
// - applied: true if the CAS operation was applied
// - iter: Iterator for scanning additional CAS result rows
// - err: error if the operation failed
MapExecCASContext(ctx context.Context, dest map[string]any) (applied bool, iter Iter, err error)
}
Batch mirrors gocql.Batch for grouping multiple mutations.
All statements in a batch are executed atomically and use the Write Strategy.
type BatchStatement ¶
type BatchStatement = types.BatchStatement
Type aliases for convenience - re-export from types package.
type CQLClient ¶
type CQLClient struct {
// contains filtered or unexported fields
}
CQLClient is the main Helix CQL client for single or dual-cluster operations.
It wraps one or two CQL sessions and orchestrates reads and writes according to configured strategies. When only one session is provided (sessionB is nil), the client operates in single-cluster mode with pass-through behavior.
Single-cluster mode is ideal for:
- Migrating existing applications to Helix incrementally
- Development and testing environments
- Applications that don't need dual-cluster redundancy yet
Thread Safety ¶
CQLClient is safe for concurrent use from multiple goroutines. A single client instance can be shared across your application:
// Create once, share everywhere
client, err := helix.NewCQLClient(sessionA, sessionB, ...)
defer client.Close()
// Use from multiple goroutines safely
go func() { client.Query("INSERT ...").Exec() }()
go func() { client.Query("SELECT ...").Scan(&result) }()
All internal state is protected by atomic operations or appropriate locking.
Lifecycle ¶
Create a client with NewCQLClient() and clean up resources with Close():
client, err := helix.NewCQLClient(sessionA, sessionB, opts...)
if err != nil {
log.Fatal(err)
}
defer client.Close() // Always close to release resources
After Close() is called:
- All ongoing operations complete or are cancelled
- Replay worker is stopped (enqueued replays are lost if using MemoryReplayer)
- Topology watcher is stopped
- Underlying sessions are closed
- The client cannot be reused (operations return ErrSessionClosed)
func NewCQLClient ¶
NewCQLClient creates a new Helix CQL client.
The client supports two modes:
- Single-cluster mode: Pass sessionB as nil. Operations are executed directly on sessionA without dual-write or failover logic. This provides a drop-in replacement for existing single-cluster applications.
- Dual-cluster mode: Pass both sessions. Operations use configured strategies for dual writes, sticky reads, and failover.
If a ReplayWorker is configured, it will be started automatically. The worker will be stopped when Close() is called.
If a TopologyWatcher is configured, it will be started automatically to monitor drain mode signals. Writes to draining clusters are skipped and enqueued for replay. Reads are failed over away from draining clusters.
Parameters:
- sessionA: CQL session for cluster A (required)
- sessionB: CQL session for cluster B (optional, nil for single-cluster mode)
- opts: Optional configuration options
Returns:
- *CQLClient: A new CQL client
- error: ErrNilSession if sessionA is nil, or error from worker start
Example ¶
ExampleNewCQLClient demonstrates creating a Helix CQL client with custom strategies.
// Note: This example uses nil sessions for illustration purposes.
// In real code, create proper gocql sessions wrapped with v1.NewSession() or v2.NewSession().
// Simulating session creation (replace with actual gocql sessions in real code)
var sessionA, sessionB cql.Session // would be v1.NewSession(gocqlSession)
// Create Helix client with custom strategies
client, err := NewCQLClient(sessionA, sessionB,
WithReadStrategy(nil), // e.g., policy.NewStickyRead()
WithWriteStrategy(nil), // e.g., policy.NewConcurrentDualWrite()
WithFailoverPolicy(nil), // e.g., policy.NewActiveFailover()
)
if err != nil {
// Handle error
return
}
defer client.Close()
// Client is now ready for dual-cluster operations
_ = client
func (*CQLClient) Batch ¶
Batch creates a new Batch for grouping multiple mutations.
WARNING: CounterBatch operations are NOT idempotent. If a counter update partially fails (succeeds on one cluster, fails on another), the Replay System will re-apply the operation, causing double-counting. Avoid using CounterBatch with dual-cluster mode if you require exactly-once semantics.
Parameters:
- kind: Type of batch (Logged, Unlogged, or Counter)
Returns:
- Batch: A batch builder for adding statements
func (*CQLClient) Close ¶
func (c *CQLClient) Close()
Close terminates connections to cluster(s) and stops the replay worker.
The topology watcher and replay worker are stopped first. After Close is called, the client cannot be reused.
func (*CQLClient) Config ¶
func (c *CQLClient) Config() *ClientConfig
Config returns the current client configuration.
Returns:
- *ClientConfig: The client's configuration
func (*CQLClient) DefaultExecuteFunc ¶ added in v0.5.0
func (c *CQLClient) DefaultExecuteFunc() replay.ExecuteFunc
DefaultExecuteFunc returns an ExecuteFunc for use with replay workers.
This is a convenience method that creates an executor which routes replay payloads to the appropriate cluster session. It handles both single queries and batch operations, preserving the original timestamp for idempotency.
The returned function:
- Routes to sessionA or sessionB based on payload.TargetCluster
- Handles batch operations (IsBatch=true) with proper BatchType
- Preserves the original write timestamp for idempotent replays
- Respects context cancellation and timeouts via ExecContext
Example:
client, _ := helix.NewCQLClient(sessionA, sessionB, helix.WithReplayer(replayer))
// Create worker with the default executor
worker := replay.NewMemoryWorker(replayer, client.DefaultExecuteFunc(),
replay.WithOnSuccess(func(p types.ReplayPayload) {
log.Printf("Replay succeeded for cluster %s", p.TargetCluster)
}),
)
Returns:
- replay.ExecuteFunc: A function that executes replay payloads
func (*CQLClient) ExecuteBatch
deprecated
func (*CQLClient) ExecuteBatchCAS
deprecated
ExecuteBatchCAS executes a batch with lightweight transaction semantics.
Deprecated: Use batch.ExecCAS() instead for the modern fluent API. This method is provided for compatibility with gocql v1 users.
Parameters:
- batch: The batch to execute
- dest: Optional destination for result columns
Returns:
- applied: true if the transaction was applied
- iter: Iterator for result rows if not applied
- err: Any execution error
func (*CQLClient) IsDraining ¶
IsDraining returns whether the specified cluster is currently in drain mode.
When a cluster is draining:
- Writes to the cluster are skipped and enqueued for replay
- Reads are failed over to the non-draining cluster
Parameters:
- cluster: The cluster to check
Returns:
- bool: true if the cluster is being drained
func (*CQLClient) IsSingleCluster ¶
IsSingleCluster returns true if the client is operating in single-cluster mode.
In single-cluster mode, all operations are executed directly on the primary session without dual-write or failover logic.
func (*CQLClient) MapExecuteBatchCAS
deprecated
func (c *CQLClient) MapExecuteBatchCAS(batch Batch, dest map[string]any) (applied bool, iter Iter, err error)
MapExecuteBatchCAS executes a batch CAS operation and maps results to a map.
Deprecated: Use batch.MapExecCAS() instead for the modern fluent API. This method is provided for compatibility with gocql v1 users.
Parameters:
- batch: The batch to execute
- dest: Map to receive result columns
Returns:
- applied: true if the transaction was applied
- iter: Iterator for result rows if not applied
- err: Any execution error
func (*CQLClient) NewBatch
deprecated
NewBatch creates a new Batch for grouping multiple mutations.
Deprecated: Use Batch() instead for the modern fluent API. This method is provided for compatibility with gocql v1 users.
Parameters:
- kind: Type of batch (Logged, Unlogged, or Counter)
Returns:
- Batch: A batch builder for adding statements
func (*CQLClient) Query ¶
Query creates a new Query for the given statement.
The method called on the returned Query determines the strategy:
- Exec/ExecContext → Write Strategy (Dual Write)
- Scan/Iter/MapScan → Read Strategy (Sticky Read)
Parameters:
- stmt: CQL statement with ? placeholders
- values: Values to bind to placeholders
Returns:
- Query: A query builder for further configuration
Example ¶
ExampleCQLClient_Query demonstrates basic query operations with dual-cluster client.
// Note: Assumes client is already created (see ExampleNewCQLClient)
var client *CQLClient
// Write operation - triggers dual-write to both clusters
err := client.Query("INSERT INTO users (id, name) VALUES (?, ?)", "user-1", "Alice").Exec()
if err != nil {
// Both clusters failed
return
}
// Read operation - uses sticky read strategy
var name string
err = client.Query("SELECT name FROM users WHERE id = ?", "user-1").Scan(&name)
if err != nil {
// Read failed (with failover attempted)
return
}
_ = name
func (*CQLClient) Session ¶ added in v0.3.0
func (c *CQLClient) Session() CQLSession
Session returns the CQLClient as a CQLSession interface.
This allows the CQLClient to be used as a drop-in replacement for gocql.Session in code that expects a session-like interface.
Example:
client, _ := helix.NewCQLClient(sessionA, sessionB)
session := client.Session()
// Use session like a regular gocql.Session
err := session.Query("INSERT INTO ...").Exec()
Returns:
- CQLSession: The client as a CQLSession interface
type CQLSession ¶
type CQLSession interface {
// Query creates a new Query instance for the given statement.
//
// The method called on the returned Query determines the strategy:
// - Exec/ExecContext: Triggers Write Strategy (Dual Write)
// - Scan/Iter/MapScan: Triggers Read Strategy (Sticky Read)
//
// Parameters:
// - stmt: CQL statement with ? placeholders
// - values: Values to bind to placeholders
//
// Returns:
// - Query: A query builder for further configuration
Query(stmt string, values ...any) Query
// Batch creates a new Batch instance for grouping multiple mutations.
//
// All statements in a batch are executed atomically against both clusters
// using the Write Strategy.
//
// Parameters:
// - kind: Type of batch (Logged, Unlogged, or Counter)
//
// Returns:
// - Batch: A batch builder for adding statements
Batch(kind BatchType) Batch
// NewBatch creates a new Batch instance for grouping multiple mutations.
//
// Deprecated: Use Batch() instead. This method exists for gocql v1 compatibility.
//
// Parameters:
// - kind: Type of batch (Logged, Unlogged, or Counter)
//
// Returns:
// - Batch: A batch builder for adding statements
NewBatch(kind BatchType) Batch
// ExecuteBatch executes a batch of statements.
//
// Deprecated: Use Batch().Exec() instead. This method exists for gocql v1 compatibility.
//
// Parameters:
// - batch: The batch to execute
//
// Returns:
// - error: nil on success, error if execution fails
ExecuteBatch(batch Batch) error
// ExecuteBatchCAS executes a batch lightweight transaction.
//
// Deprecated: Use Batch().ExecCAS() instead. This method exists for gocql v1 compatibility.
//
// Parameters:
// - batch: The batch to execute
// - dest: Pointers to variables to receive previous values if not applied
//
// Returns:
// - applied: true if the CAS operation was applied
// - iter: Iterator for scanning additional CAS result rows
// - err: error if the operation failed
ExecuteBatchCAS(batch Batch, dest ...any) (applied bool, iter Iter, err error)
// MapExecuteBatchCAS executes a batch lightweight transaction and scans into a map.
//
// Deprecated: Use Batch().MapExecCAS() instead. This method exists for gocql v1 compatibility.
//
// Parameters:
// - batch: The batch to execute
// - dest: Map to receive previous values if not applied
//
// Returns:
// - applied: true if the CAS operation was applied
// - iter: Iterator for scanning additional CAS result rows
// - err: error if the operation failed
MapExecuteBatchCAS(batch Batch, dest map[string]any) (applied bool, iter Iter, err error)
// Close terminates connections to both clusters.
//
// After Close is called, the session cannot be reused.
Close()
}
CQLSession mirrors gocql.Session and provides a drop-in replacement interface.
This interface abstracts over different gocql versions (v1 and v2) and allows Helix to perform dual-cluster operations transparently.
type ClientConfig ¶
type ClientConfig struct {
ReadStrategy ReadStrategy
WriteStrategy WriteStrategy
FailoverPolicy FailoverPolicy
Replayer Replayer
ReplayWorker ReplayWorker
TimestampProvider TimestampProvider
TopologyWatcher TopologyWatcher
Metrics MetricsCollector
Logger types.Logger
ClusterNames types.ClusterNames
OnReplayDropped ReplayDroppedHandler
// AutoMemoryWorker enables automatic in-process replay with MemoryReplayer.
// When true, a MemoryReplayer and Worker are created automatically.
AutoMemoryWorker bool
// AutoMemoryCapacity is the queue capacity for auto-created MemoryReplayer.
// Default: 10000
AutoMemoryCapacity int
// AutoMemoryWorkerOpts are options passed to the auto-created Worker.
AutoMemoryWorkerOpts []replay.WorkerOption
}
ClientConfig holds configuration for Helix clients.
func DefaultConfig ¶
func DefaultConfig() *ClientConfig
DefaultConfig returns a ClientConfig with sensible defaults.
The default configuration provides minimal, non-nil infrastructure:
- TimestampProvider: Uses time.Now().UnixMicro() for idempotent writes
- Metrics: No-op collector (silent, no overhead)
- Logger: No-op logger (silent, no overhead)
- ClusterNames: "ClusterA" and "ClusterB"
Strategy and policy defaults (all nil):
- ReadStrategy: nil - Falls back to ClusterA only (no load balancing)
- WriteStrategy: nil - Concurrent dual-write to both clusters
- FailoverPolicy: nil - Always attempts failover on read failure
- Replayer: nil - CAUTION: Partial write failures will be lost!
Production recommendations:
- ReadStrategy: policy.NewStickyRead() for cache-efficient reads
- WriteStrategy: policy.NewAdaptiveDualWrite() for latency-aware writes
- FailoverPolicy: policy.NewActiveFailover() for immediate failover
- Replayer: replay.NewNATSReplayer() for durable failure recovery
A warning is logged during client creation if dual-cluster mode is used without a Replayer configured.
Returns:
- *ClientConfig: Configuration with default settings
type ClusterNames ¶
type ClusterNames = types.ClusterNames
Type aliases for convenience - re-export from types package.
type ColumnInfo ¶
type ColumnInfo struct {
// Keyspace is the keyspace containing the table.
Keyspace string
// Table is the table name.
Table string
// Name is the column name.
Name string
// TypeInfo describes the CQL type (implementation-specific).
TypeInfo any
}
ColumnInfo holds metadata about a column in query results.
type Consistency ¶
type Consistency = types.Consistency
Type aliases for convenience - re-export from types package.
type FailoverPolicy ¶
type FailoverPolicy interface {
// ShouldFailover determines if failover should occur.
//
// Parameters:
// - cluster: The cluster that failed
// - err: The error that occurred
//
// Returns:
// - bool: true if failover should be attempted
ShouldFailover(cluster ClusterID, err error) bool
// RecordFailure records a failure for circuit breaker logic.
//
// Parameters:
// - cluster: The cluster that failed
RecordFailure(cluster ClusterID)
// RecordSuccess records a success to reset failure counters.
//
// Parameters:
// - cluster: The cluster that succeeded
RecordSuccess(cluster ClusterID)
}
FailoverPolicy controls when and how failover occurs.
Implementations MUST be safe for concurrent use from multiple goroutines. RecordFailure/RecordSuccess may be called concurrently while ShouldFailover is evaluating the current state.
type Iter ¶
type Iter interface {
// Scan reads the next row into dest.
//
// Parameters:
// - dest: Pointers to variables to receive column values
//
// Returns:
// - bool: true if a row was read, false if no more rows
Scan(dest ...any) bool
// Close closes the iterator and returns any error.
//
// Returns:
// - error: Any error that occurred during iteration
Close() error
// MapScan reads the next row into the map.
//
// Parameters:
// - m: Map to receive column name → value pairs
//
// Returns:
// - bool: true if a row was read, false if no more rows
MapScan(m map[string]any) bool
// SliceMap reads all remaining rows into a slice of maps.
//
// Returns:
// - []map[string]any: All remaining rows
// - error: Any error that occurred
SliceMap() ([]map[string]any, error)
// PageState returns the pagination token for resuming iteration.
//
// Returns:
// - []byte: Opaque pagination token
PageState() []byte
// NumRows returns the number of rows in the current page.
//
// Note: This only reflects rows in the current page, not total results.
// The value updates when new pages are fetched.
//
// Returns:
// - int: Number of rows in current page
NumRows() int
// Columns returns metadata about the columns in the result set.
//
// Returns:
// - []ColumnInfo: Slice of column metadata
Columns() []ColumnInfo
// Scanner returns a database/sql-style scanner for the iterator.
//
// After calling Scanner, the iterator should not be used directly.
//
// Returns:
// - Scanner: Scanner for row-by-row processing
Scanner() Scanner
// Warnings returns any warnings from the Cassandra server.
//
// Available in CQL protocol v4 and above.
//
// Returns:
// - []string: Server warning messages
Warnings() []string
}
Iter mirrors gocql.Iter for iterating over query results.
type LatencyRecorder ¶
type LatencyRecorder interface {
// RecordLatency records the latency of a successful operation.
//
// Implementations may treat slow responses (above a threshold) as failures
// for circuit breaker purposes.
//
// Parameters:
// - cluster: The cluster that was accessed
// - latency: The operation duration
RecordLatency(cluster ClusterID, latency time.Duration)
}
LatencyRecorder is an optional interface for failover policies that track latency.
Policies implementing this interface will have RecordLatency called automatically by the client after successful operations. This enables latency-aware circuit breaking where slow responses are treated as "soft failures".
Implementations MUST be safe for concurrent use from multiple goroutines.
Example implementation: policy.LatencyCircuitBreaker
type MetricsCollector ¶
type MetricsCollector = types.MetricsCollector
Type aliases for convenience - re-export from types package.
type Option ¶
type Option func(*ClientConfig)
Option configures a ClientConfig.
func WithAutoMemoryWorker ¶ added in v0.5.0
func WithAutoMemoryWorker(queueCapacity int, workerOpts ...replay.WorkerOption) Option
WithAutoMemoryWorker enables automatic in-process replay with MemoryReplayer.
This is a convenience option that creates both a MemoryReplayer and a Worker automatically, eliminating boilerplate setup code. The worker uses the client's DefaultExecuteFunc() to route replays to the correct cluster.
Use this for:
- Development and testing environments
- Simple deployments where in-process replay is acceptable
For production with durable replay, use WithReplayer() with NATSReplayer instead. NATS workers typically run as separate consumer services.
Parameters:
- queueCapacity: Maximum pending replays (0 uses default of 10000)
- workerOpts: Optional worker configuration (poll interval, callbacks, etc.)
Returns:
- Option: Configuration option
Example:
// Simple setup with defaults
client, _ := helix.NewCQLClient(sessionA, sessionB,
helix.WithAutoMemoryWorker(0),
)
// Custom capacity and callbacks
client, _ := helix.NewCQLClient(sessionA, sessionB,
helix.WithAutoMemoryWorker(50000,
replay.WithPollInterval(50*time.Millisecond),
replay.WithOnSuccess(func(p types.ReplayPayload) {
log.Printf("Replay succeeded: cluster=%s", p.TargetCluster)
}),
),
)
func WithClusterNames ¶
WithClusterNames sets custom display names for the clusters.
These names are used in metrics labels and log messages instead of the default "A" and "B". Names must be Prometheus-compatible (alphanumeric with underscores, starting with letter or underscore, max 32 chars).
If not set, defaults to "A" and "B".
Parameters:
- nameA: Display name for cluster A (e.g., "us_east", "primary", "dc1")
- nameB: Display name for cluster B (e.g., "us_west", "secondary", "dc2")
Returns:
- Option: Configuration option
Example:
client, _ := helix.NewCQLClient(sessionA, sessionB,
helix.WithClusterNames("us_east", "us_west"),
)
This will produce metrics like:
helix_read_total{cluster="us_east"} 100
helix_read_total{cluster="us_west"} 95
func WithFailoverPolicy ¶
func WithFailoverPolicy(policy FailoverPolicy) Option
WithFailoverPolicy sets the failover policy for reads.
Parameters:
- policy: The failover policy to use
Returns:
- Option: Configuration option
func WithLogger ¶
WithLogger sets the structured logger.
If not set, a no-op logger is used that discards all messages. The logger interface is compatible with zap.SugaredLogger.
Parameters:
- logger: The logger implementation
Returns:
- Option: Configuration option
Example:
logger, _ := zap.NewProduction()
client, _ := helix.NewCQLClient(sessionA, sessionB,
helix.WithLogger(logger.Sugar()),
)
func WithMetrics ¶
func WithMetrics(collector MetricsCollector) Option
WithMetrics sets the metrics collector.
If not set, a no-op collector is used that discards all metrics. Use contrib/metrics/vm.New() for VictoriaMetrics integration.
Parameters:
- collector: The metrics collector implementation
Returns:
- Option: Configuration option
Example:
import vmmetrics "github.com/arloliu/helix/contrib/metrics/vm"
collector := vmmetrics.New(vmmetrics.WithPrefix("myapp"))
client, _ := helix.NewCQLClient(sessionA, sessionB,
helix.WithMetrics(collector),
)
func WithOnReplayDropped ¶
func WithOnReplayDropped(handler ReplayDroppedHandler) Option
WithOnReplayDropped sets a callback for when replay payloads cannot be enqueued.
This callback is invoked when a write succeeds on one cluster but fails on the other, and the failed write cannot be enqueued for replay (e.g., queue is full). Use this to implement custom alerting or fallback persistence strategies.
Parameters:
- handler: Function called with the dropped payload and the enqueue error
Returns:
- Option: Configuration option
Example:
helix.WithOnReplayDropped(func(payload types.ReplayPayload, err error) {
log.Error("replay queue full, potential data loss",
"cluster", payload.TargetCluster,
"query", payload.Query,
"error", err,
)
// Optionally persist to a fallback store
fallbackStore.Save(payload)
})
func WithReadStrategy ¶
func WithReadStrategy(strategy ReadStrategy) Option
WithReadStrategy sets the read routing strategy.
Parameters:
- strategy: The read strategy to use (e.g., StickyRead)
Returns:
- Option: Configuration option
func WithReplayWorker ¶
func WithReplayWorker(worker ReplayWorker) Option
WithReplayWorker sets the replay worker for processing failed writes.
The worker will be started automatically when the client is created and stopped when the client is closed.
Parameters:
- worker: The replay worker implementation (e.g., MemoryWorker, NATSWorker)
Returns:
- Option: Configuration option
func WithReplayer ¶
WithReplayer sets the replayer for failed writes.
Parameters:
- replayer: The replayer implementation
Returns:
- Option: Configuration option
func WithTimestampProvider ¶
func WithTimestampProvider(fn TimestampProvider) Option
WithTimestampProvider sets the timestamp generator.
Parameters:
- fn: Function that returns current timestamp in microseconds
Returns:
- Option: Configuration option
func WithTopologyWatcher ¶
func WithTopologyWatcher(watcher TopologyWatcher) Option
WithTopologyWatcher sets the topology watcher for drain mode support.
Parameters:
- watcher: The topology watcher implementation
Returns:
- Option: Configuration option
func WithWriteStrategy ¶
func WithWriteStrategy(strategy WriteStrategy) Option
WithWriteStrategy sets the write execution strategy.
Parameters:
- strategy: The write strategy to use (e.g., ConcurrentDualWrite)
Returns:
- Option: Configuration option
type PriorityLevel ¶
type PriorityLevel = types.PriorityLevel
Type aliases for convenience - re-export from types package.
type Query ¶
type Query interface {
// WithContext associates a context with the query.
//
// Parameters:
// - ctx: Context for cancellation and timeout
//
// Returns:
// - Query: The same query for chaining
WithContext(ctx context.Context) Query
// Consistency sets the consistency level for the query.
//
// Parameters:
// - c: Consistency level
//
// Returns:
// - Query: The same query for chaining
Consistency(c Consistency) Query
// SetConsistency sets the consistency level for the query.
//
// Deprecated: Use Consistency() instead. This method exists for gocql v1 compatibility.
//
// Parameters:
// - c: Consistency level
SetConsistency(c Consistency)
// PageSize sets the number of rows to fetch per page.
//
// Parameters:
// - n: Number of rows per page
//
// Returns:
// - Query: The same query for chaining
PageSize(n int) Query
// PageState sets the pagination state for resuming iteration.
//
// Parameters:
// - state: Opaque pagination token from a previous Iter.PageState()
//
// Returns:
// - Query: The same query for chaining
PageState(state []byte) Query
// WithTimestamp sets a specific timestamp for the write operation.
//
// This is critical for idempotency in dual-write scenarios.
// If not set, the client's TimestampProvider is used.
//
// Parameters:
// - ts: Timestamp in microseconds since Unix epoch
//
// Returns:
// - Query: The same query for chaining
WithTimestamp(ts int64) Query
// WithPriority sets the replay priority level for this write operation.
//
// Priority affects replay processing order when a write fails on one cluster.
// High priority writes are processed before low priority writes.
// If not set, defaults to PriorityHigh.
//
// Parameters:
// - p: Priority level (PriorityHigh or PriorityLow)
//
// Returns:
// - Query: The same query for chaining
WithPriority(p PriorityLevel) Query
// SerialConsistency sets the consistency level for the serial phase of CAS operations.
//
// This only applies to conditional updates (INSERT/UPDATE with IF clause).
// Valid values are Serial or LocalSerial.
//
// Parameters:
// - c: Serial consistency level (Serial or LocalSerial)
//
// Returns:
// - Query: The same query for chaining
SerialConsistency(c Consistency) Query
// Exec executes a write query using the Write Strategy.
//
// This triggers concurrent dual-write to both clusters.
// Returns nil if at least one cluster succeeds (partial writes
// are handled via the Replayer).
//
// Returns:
// - error: nil on success (at least one cluster), error if both fail
Exec() error
// ExecContext executes a write query with the given context.
//
// Parameters:
// - ctx: Context for cancellation and timeout
//
// Returns:
// - error: nil on success, error if both clusters fail
ExecContext(ctx context.Context) error
// Scan executes a read query and scans a single row into dest.
//
// This triggers the Read Strategy (Sticky Read with failover).
//
// Parameters:
// - dest: Pointers to variables to receive column values
//
// Returns:
// - error: nil on success, error if read fails on all clusters
Scan(dest ...any) error
// ScanContext executes a read query with context.
//
// Parameters:
// - ctx: Context for cancellation and timeout
// - dest: Pointers to variables to receive column values
//
// Returns:
// - error: nil on success, error if read fails
ScanContext(ctx context.Context, dest ...any) error
// Iter returns an iterator for reading multiple rows.
//
// This triggers the Read Strategy. The iterator reads from
// the selected cluster based on sticky routing and failover.
//
// Returns:
// - Iter: Iterator for scanning rows
Iter() Iter
// IterContext returns an iterator with the given context.
//
// Parameters:
// - ctx: Context for cancellation and timeout
//
// Returns:
// - Iter: Iterator for scanning rows
IterContext(ctx context.Context) Iter
// MapScan executes a read and scans a single row into the map.
//
// Parameters:
// - m: Map to receive column name → value pairs
//
// Returns:
// - error: nil on success, error if read fails
MapScan(m map[string]any) error
// MapScanContext executes a read with context and scans a single row into the map.
//
// Parameters:
// - ctx: Context for cancellation and timeout
// - m: Map to receive column name → value pairs
//
// Returns:
// - error: nil on success, error if read fails
MapScanContext(ctx context.Context, m map[string]any) error
// ScanCAS executes a lightweight transaction (INSERT/UPDATE with IF clause).
//
// If the transaction fails because conditions weren't met, the previous
// values are stored in dest.
//
// NOTE: CAS operations are executed on a single cluster and are NOT replicated.
//
// Parameters:
// - dest: Pointers to variables to receive previous values if not applied
//
// Returns:
// - applied: true if the CAS operation was applied
// - err: error if the operation failed
ScanCAS(dest ...any) (applied bool, err error)
// ScanCASContext executes a lightweight transaction with context.
//
// NOTE: CAS operations are executed on a single cluster and are NOT replicated.
//
// Parameters:
// - ctx: Context for cancellation and timeout
// - dest: Pointers to variables to receive previous values if not applied
//
// Returns:
// - applied: true if the CAS operation was applied
// - err: error if the operation failed
ScanCASContext(ctx context.Context, dest ...any) (applied bool, err error)
// MapScanCAS executes a lightweight transaction and returns previous values as a map.
//
// NOTE: CAS operations are executed on a single cluster and are NOT replicated.
//
// Parameters:
// - dest: Map to receive previous values if not applied
//
// Returns:
// - applied: true if the CAS operation was applied
// - err: error if the operation failed
MapScanCAS(dest map[string]any) (applied bool, err error)
// MapScanCASContext executes a lightweight transaction with context.
//
// NOTE: CAS operations are executed on a single cluster and are NOT replicated.
//
// Parameters:
// - ctx: Context for cancellation and timeout
// - dest: Map to receive previous values if not applied
//
// Returns:
// - applied: true if the CAS operation was applied
// - err: error if the operation failed
MapScanCASContext(ctx context.Context, dest map[string]any) (applied bool, err error)
}
Query mirrors gocql.Query and provides chainable configuration.
The method called to execute the query determines whether it's a read or write:
- Exec/ExecContext → Write Strategy (Dual Write)
- Scan/Iter/MapScan → Read Strategy (Sticky Read)
Context Usage Patterns ¶
Two equivalent patterns are supported for context handling:
// Pattern 1: Direct context method (recommended for simple cases) err := query.ExecContext(ctx) result := query.ScanContext(ctx, &dest) // Pattern 2: Method chaining (useful with multiple options) err := query.Consistency(helix.Quorum).WithContext(ctx).Exec()
Both patterns respect context cancellation and timeouts. Choose Pattern 1 when you only need to set a context. Choose Pattern 2 when chaining multiple configuration calls (consistency, page size, timestamps, etc.).
type ReadStrategy ¶
type ReadStrategy interface {
// Select chooses which cluster to read from.
//
// Parameters:
// - ctx: Context for the operation
//
// Returns:
// - ClusterID: The cluster to read from
Select(ctx context.Context) ClusterID
// OnSuccess is called when a read succeeds.
//
// Parameters:
// - cluster: The cluster that succeeded
OnSuccess(cluster ClusterID)
// OnFailure is called when a read fails.
//
// Parameters:
// - cluster: The cluster that failed
// - err: The error that occurred
//
// Returns:
// - ClusterID: Alternative cluster to try, or empty if no failover
// - bool: true if failover should be attempted
OnFailure(cluster ClusterID, err error) (ClusterID, bool)
}
ReadStrategy defines how reads are routed to clusters.
Implementations MUST be safe for concurrent use from multiple goroutines. All methods may be called concurrently from different operations.
type ReplayDroppedHandler ¶
type ReplayDroppedHandler func(payload types.ReplayPayload, err error)
ReplayDroppedHandler is called when a replay payload cannot be enqueued. This callback allows applications to handle potential data loss scenarios.
Parameters:
- payload: The payload that could not be enqueued
- err: The error from the enqueue attempt
type ReplayPayload ¶
type ReplayPayload = types.ReplayPayload
Type aliases for convenience - re-export from types package.
type ReplayWorker ¶
type ReplayWorker interface {
// Start begins processing replay messages in background goroutines.
//
// Returns:
// - error: ErrWorkerAlreadyRunning if already started
Start() error
// Stop gracefully stops the worker and waits for pending work to complete.
Stop()
// IsRunning returns whether the worker is currently running.
IsRunning() bool
}
ReplayWorker processes failed writes from a replay queue.
Implementations include MemoryWorker and NATSWorker from the replay package.
type Replayer ¶
type Replayer interface {
// Enqueue adds a failed write to the replay queue.
//
// Parameters:
// - ctx: Context for cancellation
// - payload: The write operation to replay
//
// Returns:
// - error: nil on success, error if enqueue fails
Enqueue(ctx context.Context, payload ReplayPayload) error
}
Replayer handles asynchronous reconciliation of failed writes.
When a dual-write partially fails (one cluster succeeds, one fails), the failed write is enqueued for later replay.
Implementations MUST be safe for concurrent use from multiple goroutines. Enqueue may be called concurrently from different write operations.
type SQLClient ¶
type SQLClient struct {
// contains filtered or unexported fields
}
SQLClient is the Helix SQL client for single or dual-database operations.
It wraps one or two SQL databases and orchestrates writes according to configured strategies. When only one database is provided (secondary is nil), the client operates in single-database mode with pass-through behavior.
Single-database mode is ideal for:
- Migrating existing applications to Helix incrementally
- Development and testing environments
- Applications that don't need dual-database redundancy yet
Limitations (dual-database mode):
- Single-statement non-transactional operations only
- No transaction support across databases
- Best-effort semantics (no strong consistency guarantees)
- Write order determined by arrival time (no USING TIMESTAMP like Cassandra)
func NewSQLClient ¶
func NewSQLClient(primary, secondary sqladapter.DB, opts ...Option) (*SQLClient, error)
NewSQLClient creates a new Helix SQL client.
The client supports two modes:
- Single-database mode: Pass secondary as nil. Operations are executed directly on primary without dual-write or failover logic. This provides a drop-in replacement for existing single-database applications.
- Dual-database mode: Pass both databases. Operations use configured strategies for dual writes, sticky reads, and failover.
If a ReplayWorker is configured, it will be started automatically. The worker will be stopped when Close() is called.
Parameters:
- primary: Primary database connection (required)
- secondary: Secondary database connection (optional, nil for single-database mode)
- opts: Optional configuration options
Returns:
- *SQLClient: A new SQL client
- error: ErrNilSession if primary is nil, or error from worker start
func NewSQLClientFromDB ¶
NewSQLClientFromDB creates a new Helix SQL client from standard *sql.DB connections.
This is a convenience constructor that wraps the databases in adapters. Pass secondary as nil for single-database mode.
Parameters:
- primary: Primary *sql.DB connection (required)
- secondary: Secondary *sql.DB connection (optional, nil for single-database mode)
- opts: Optional configuration options
Returns:
- *SQLClient: A new SQL client
- error: ErrNilSession if primary is nil
func (*SQLClient) Close ¶
Close closes database connection(s) and stops the replay worker.
The replay worker is stopped first to allow any pending replays to complete. After Close is called, the client cannot be reused.
func (*SQLClient) Exec ¶
Exec executes a write query using a background context.
Parameters:
- query: SQL statement to execute
- args: Arguments for the query
Returns:
- sql.Result: Result from the successful write
- error: nil if at least one succeeds, error if both fail
func (*SQLClient) ExecContext ¶
ExecContext executes a write query.
In single-database mode, the query is executed directly on the primary database. In dual-database mode, the query is executed on both databases concurrently. Success is defined as at least one database succeeding. Failed writes are enqueued for replay if a Replayer is configured.
Parameters:
- ctx: Context for cancellation and timeout
- query: SQL statement to execute
- args: Arguments for the query
Returns:
- sql.Result: Result from the successful write (primary preferred)
- error: nil if at least one succeeds, error if both fail
func (*SQLClient) IsSingleCluster ¶
IsSingleCluster returns true if the client is operating in single-database mode.
In single-database mode, all operations are executed directly on the primary database without dual-write or failover logic.
func (*SQLClient) Ping ¶
Ping checks if the database(s) are reachable.
Returns:
- error: nil if at least one database responds, error if all fail
func (*SQLClient) PingContext ¶
PingContext checks if the database(s) are reachable.
In single-database mode, only the primary database is pinged. In dual-database mode, both databases are pinged concurrently.
Parameters:
- ctx: Context for cancellation and timeout
Returns:
- error: nil if at least one database responds, error if all fail
func (*SQLClient) Query ¶
Query executes a read query using a background context.
Parameters:
- query: SQL statement to execute
- args: Arguments for the query
Returns:
- *sql.Rows: Result rows from the query
- error: Error if the query fails on all databases
func (*SQLClient) QueryContext ¶
QueryContext executes a read query.
In single-database mode, the query is executed directly on the primary database. In dual-database mode, the query is routed to the preferred database based on the ReadStrategy. On failure, it may failover to the secondary.
Parameters:
- ctx: Context for cancellation and timeout
- query: SQL statement to execute
- args: Arguments for the query
Returns:
- *sql.Rows: Result rows from the query (caller must close)
- error: Error if the query fails on all databases
func (*SQLClient) QueryRow ¶
QueryRow executes a read query expecting at most one row.
Parameters:
- query: SQL statement to execute
- args: Arguments for the query
Returns:
- *sql.Row: Single row result
func (*SQLClient) QueryRowContext ¶
QueryRowContext executes a read query expecting at most one row.
In single-database mode, the query is executed directly on the primary database. In dual-database mode, the query is routed based on the ReadStrategy.
If the client is closed, returns a Row that will return sql.ErrNoRows on Scan. Callers should check IsClosed() before calling if they need to distinguish between "no rows" and "client closed" scenarios.
Parameters:
- ctx: Context for cancellation and timeout
- query: SQL statement to execute
- args: Arguments for the query
Returns:
- *sql.Row: Single row result (never nil)
type Scanner ¶
type Scanner interface {
// Next advances to the next row, returning true if a row is available.
Next() bool
// Scan reads the current row into dest.
Scan(dest ...any) error
// Err returns any error from iteration and releases resources.
Err() error
}
Scanner provides database/sql-style row scanning.
type TimestampProvider ¶
type TimestampProvider func() int64
TimestampProvider generates timestamps for write operations.
The default provider uses time.Now().UnixMicro().
type TopologyOperator ¶
type TopologyOperator interface {
// SetDrain sets the drain state for a cluster.
//
// Parameters:
// - ctx: Context for cancellation/timeout
// - cluster: The cluster to update
// - draining: true to enable drain mode, false to disable
// - reason: Human-readable reason for the drain (only used when draining=true)
//
// Returns:
// - error: nil on success, error if the operation fails
SetDrain(ctx context.Context, cluster ClusterID, draining bool, reason string) error
}
TopologyOperator allows setting cluster drain states.
This interface is typically used by operations tools and tests to control cluster availability. Implementations include topology.Local (in-memory).
type TopologyUpdate ¶
type TopologyUpdate struct {
// Cluster that was updated.
Cluster ClusterID
// Available indicates if the cluster is available.
Available bool
// DrainMode indicates if the cluster is in drain mode.
DrainMode bool
}
TopologyUpdate represents a change in cluster topology.
type TopologyWatcher ¶
type TopologyWatcher interface {
// Watch returns a channel that receives topology updates.
//
// Parameters:
// - ctx: Context for cancellation
//
// Returns:
// - <-chan TopologyUpdate: Channel of topology changes
Watch(ctx context.Context) <-chan TopologyUpdate
}
TopologyWatcher monitors cluster topology changes.
Implementations include topology.Local (in-memory) and topology.NATS (NATS KV backed).
type WriteStrategy ¶
type WriteStrategy interface {
// Execute performs the write operation on both clusters.
//
// Parameters:
// - ctx: Context for the operation
// - writeA: Function to write to cluster A
// - writeB: Function to write to cluster B
//
// Returns:
// - resultA: Error from cluster A (nil if successful)
// - resultB: Error from cluster B (nil if successful)
Execute(
ctx context.Context,
writeA func(context.Context) error,
writeB func(context.Context) error,
) (resultA, resultB error)
}
WriteStrategy defines how writes are executed across clusters.
Implementations MUST be safe for concurrent use from multiple goroutines. Execute may be called concurrently from different write operations.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
adapter
|
|
|
cql
Package cql provides CQL-specific adapter interfaces for different gocql versions.
|
Package cql provides CQL-specific adapter interfaces for different gocql versions. |
|
cql/v1
Package v1 provides an adapter for gocql v1 (github.com/gocql/gocql).
|
Package v1 provides an adapter for gocql v1 (github.com/gocql/gocql). |
|
cql/v2
Package v2 provides an adapter for gocql v2 (github.com/apache/cassandra-gocql-driver).
|
Package v2 provides an adapter for gocql v2 (github.com/apache/cassandra-gocql-driver). |
|
sql
Package sql provides SQL-specific adapter interfaces for database/sql.
|
Package sql provides SQL-specific adapter interfaces for database/sql. |
|
contrib
|
|
|
metrics/vm
Package vm provides a VictoriaMetrics-based implementation of the MetricsCollector interface.
|
Package vm provides a VictoriaMetrics-based implementation of the MetricsCollector interface. |
|
examples
|
|
|
basic
command
Package main demonstrates basic usage of the Helix dual-database client library.
|
Package main demonstrates basic usage of the Helix dual-database client library. |
|
custom-strategy
command
Package main demonstrates how to implement custom strategies for Helix.
|
Package main demonstrates how to implement custom strategies for Helix. |
|
failover
command
Package main demonstrates failover behavior in the Helix dual-database client.
|
Package main demonstrates failover behavior in the Helix dual-database client. |
|
replay
command
Package main demonstrates the Helix replay system for handling partial write failures.
|
Package main demonstrates the Helix replay system for handling partial write failures. |
|
internal
|
|
|
logging
Package logging provides internal logging utilities for Helix.
|
Package logging provides internal logging utilities for Helix. |
|
metrics
Package metrics provides internal metrics utilities for Helix.
|
Package metrics provides internal metrics utilities for Helix. |
|
Package policy provides read strategies, write strategies, and failover policies for the helix dual-database client.
|
Package policy provides read strategies, write strategies, and failover policies for the helix dual-database client. |
|
Package replay provides asynchronous reconciliation mechanisms for failed writes in the helix dual-database client.
|
Package replay provides asynchronous reconciliation mechanisms for failed writes in the helix dual-database client. |
|
test
|
|
|
simulation/cmd
command
|
|
|
testutil
Package testutil provides testing utilities for the helix project.
|
Package testutil provides testing utilities for the helix project. |
|
Package topology provides cluster topology monitoring for drain mode support.
|
Package topology provides cluster topology monitoring for drain mode support. |
|
Package types provides shared types and error definitions for the helix library.
|
Package types provides shared types and error definitions for the helix library. |